diff --git a/pom.xml b/pom.xml
index 45aaedf8b..15267cb1a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
org.apache.mesos
chronos
- 3.0.1-SNAPSHOT
+ 3.0.3-SNAPSHOT
2012
diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/config/Features.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/config/Features.scala
new file mode 100644
index 000000000..467be17ee
--- /dev/null
+++ b/src/main/scala/org/apache/mesos/chronos/scheduler/config/Features.scala
@@ -0,0 +1,17 @@
+package org.apache.mesos.chronos.scheduler.config
+
+import javax.ws.rs.core.{MediaType => CoreMediaType}
+
+object Features {
+
+ //enable GPUs
+ lazy val GPU_RESOURCES = "gpu_resources"
+
+ lazy val availableFeatures = Map(
+ GPU_RESOURCES -> "Enable support for GPU in Chronos (experimental)"
+ )
+
+ def description: String = {
+ availableFeatures.map { case (name, description) => s"$name - $description" }.mkString(", ")
+ }
+}
diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/config/SchedulerConfiguration.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/config/SchedulerConfiguration.scala
index fad554397..4df531817 100644
--- a/src/main/scala/org/apache/mesos/chronos/scheduler/config/SchedulerConfiguration.scala
+++ b/src/main/scala/org/apache/mesos/chronos/scheduler/config/SchedulerConfiguration.scala
@@ -94,6 +94,9 @@ trait SchedulerConfiguration extends ScallopConf {
lazy val mesosTaskDisk = opt[Double]("mesos_task_disk",
descr = "Amount of disk capacity to request from Mesos for each task (MB)",
default = Some(256.0))
+ lazy val mesosTaskGpu = opt[Int]("mesos_task_gpu",
+ descr = "Number of GPUs to request from Mesos for each task",
+ default = Some(0))
lazy val mesosCheckpoint = opt[Boolean]("mesos_checkpoint",
descr = "Enable checkpointing in Mesos",
default = Some(true))
@@ -132,7 +135,13 @@ trait SchedulerConfiguration extends ScallopConf {
lazy val minReviveOffersInterval = opt[Long]("min_revive_offers_interval",
descr = "Do not ask for all offers (also already seen ones) more often than this interval (ms). (Default: 5000)",
default = Some(5000))
-
+ lazy val features = opt[String]("enable_features",
+ descr = s"A comma-separated list of features. Available features are: ${Features.description}",
+ required = false,
+ default = None,
+ noshort = true,
+ validate = validateFeatures
+ )
def zooKeeperHostAddresses: Seq[InetSocketAddress] =
for (s <- zookeeperServers().split(",")) yield {
@@ -151,4 +160,23 @@ trait SchedulerConfiguration extends ScallopConf {
def zooKeeperStatePath = "%s/state".format(zooKeeperPath())
def zooKeeperCandidatePath = "%s/candidate".format(zooKeeperPath())
+
+ lazy val availableFeatures: Set[String] = features.get.map(parseFeatures).getOrElse(Set.empty)
+
+ private[this] def parseFeatures(str: String): Set[String] =
+ str.split(',').map(_.trim).filter(_.nonEmpty).toSet
+
+ private[this] def validateFeatures(str: String): Boolean = {
+ val parsed = parseFeatures(str)
+ // throw exceptions for better error messages
+ val unknownFeatures = parsed.filter(!Features.availableFeatures.contains(_))
+ lazy val unknownFeaturesString = unknownFeatures.mkString(", ")
+ require(
+ unknownFeatures.isEmpty,
+ s"Unknown features specified: $unknownFeaturesString. Available features are: ${Features.description}"
+ )
+ true
+ }
+
+ def isFeatureSet(name: String): Boolean = availableFeatures.contains(name)
}
diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/Jobs.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/Jobs.scala
index 0234496ce..f957ea95f 100644
--- a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/Jobs.scala
+++ b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/Jobs.scala
@@ -55,6 +55,8 @@ trait BaseJob {
def mem: Double = 0
+ def gpus: Int = 0
+
def disabled: Boolean = false
def errorsSinceLastSuccess: Long = 0L
@@ -104,6 +106,7 @@ case class ScheduleBasedJob(
@JsonProperty override val cpus: Double = 0,
@JsonProperty override val disk: Double = 0,
@JsonProperty override val mem: Double = 0,
+ @JsonProperty override val gpus: Int = 0,
@JsonProperty override val disabled: Boolean = false,
@JsonProperty override val errorsSinceLastSuccess: Long = 0L,
@Deprecated @JsonProperty override val uris: Seq[String] = List(),
@@ -141,6 +144,7 @@ case class DependencyBasedJob(
@JsonProperty override val cpus: Double = 0,
@JsonProperty override val disk: Double = 0,
@JsonProperty override val mem: Double = 0,
+ @JsonProperty override val gpus: Int = 0,
@JsonProperty override val disabled: Boolean = false,
@JsonProperty override val errorsSinceLastSuccess: Long = 0L,
@Deprecated @JsonProperty override val uris: Seq[String] = List(),
diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFramework.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFramework.scala
index b24cf5897..e91fb0480 100644
--- a/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFramework.scala
+++ b/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFramework.scala
@@ -71,12 +71,13 @@ class MesosJobFramework @Inject()(
log.warning("Disconnected")
}
- def getReservedResources(offer: Offer): (Double, Double) = {
+ def getReservedResources(offer: Offer): (Double, Double, Int) = {
val resources = offer.getResourcesList.asScala
val reservedResources = resources.filter({ x => x.hasRole && x.getRole != "*" })
(
getScalarValueOrElse(reservedResources.find(x => x.getName == "cpus"), 0),
- getScalarValueOrElse(reservedResources.find(x => x.getName == "mem"), 0)
+ getScalarValueOrElse(reservedResources.find(x => x.getName == "mem"), 0),
+ getScalarValueOrElse(reservedResources.find(x => x.getName == "gpus"), 0)
)
}
@@ -84,6 +85,10 @@ class MesosJobFramework @Inject()(
opt.map(x => x.getScalar.getValue).getOrElse(value)
}
+ def getScalarValueOrElse(opt: Option[Resource], value: Int): Int = {
+ opt.map(x => x.getScalar.getValue.intValue()).getOrElse(value)
+ }
+
//TODO(FL): Persist the UPDATED task or job into ZK such that on failover / reload, we don't have to step through the
// entire task stream.
@Override
@@ -281,30 +286,34 @@ class MesosJobFramework @Inject()(
class Resources(
var cpus: Double,
var mem: Double,
- var disk: Double
+ var disk: Double,
+ var gpus: Int
) {
def this(job: BaseJob) {
this(
if (job.cpus > 0) job.cpus else config.mesosTaskCpu(),
if (job.mem > 0) job.mem else config.mesosTaskMem(),
- if (job.disk > 0) job.disk else config.mesosTaskDisk()
+ if (job.disk > 0) job.disk else config.mesosTaskDisk(),
+ if (job.gpus > 0) job.gpus else config.mesosTaskGpu()
)
}
def canSatisfy(needed: Resources): Boolean = {
(this.cpus >= needed.cpus) &&
(this.mem >= needed.mem) &&
- (this.disk >= needed.disk)
+ (this.disk >= needed.disk)&&
+ (this.gpus >= needed.gpus)
}
def -=(that: Resources) {
this.cpus -= that.cpus
this.mem -= that.mem
this.disk -= that.disk
+ this.gpus -= that.gpus
}
override def toString: String = {
- "cpus: " + this.cpus + " mem: " + this.mem + " disk: " + this.disk
+ "cpus: " + this.cpus + " mem: " + this.mem + " disk: " + this.disk + " gpus: " + this.gpus
}
}
@@ -314,7 +323,8 @@ class MesosJobFramework @Inject()(
new Resources(
getScalarValueOrElse(resources.find(_.getName == "cpus"), 0),
getScalarValueOrElse(resources.find(_.getName == "mem"), 0),
- getScalarValueOrElse(resources.find(_.getName == "disk"), 0)
+ getScalarValueOrElse(resources.find(_.getName == "disk"), 0),
+ getScalarValueOrElse(resources.find(_.getName == "gpus"), 0)
)
}
}
diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosTaskBuilder.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosTaskBuilder.scala
index 3a54e0c95..75f5bdc65 100644
--- a/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosTaskBuilder.scala
+++ b/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosTaskBuilder.scala
@@ -24,6 +24,7 @@ class MesosTaskBuilder @Inject()(val conf: SchedulerConfiguration) {
final val cpusResourceName = "cpus"
final val memResourceName = "mem"
final val diskResourceName = "disk"
+ final val gpusResourceName = "gpus"
val taskNameTemplate = "ChronosTask:%s"
//args|command.
// e.g. args: -av (async job), verbose mode
@@ -124,10 +125,12 @@ class MesosTaskBuilder @Inject()(val conf: SchedulerConfiguration) {
val mem = if (job.mem > 0) job.mem else conf.mesosTaskMem()
val cpus = if (job.cpus > 0) job.cpus else conf.mesosTaskCpu()
val disk = if (job.disk > 0) job.disk else conf.mesosTaskDisk()
+ val gpus : Int = if (job.gpus > 0) job.gpus else conf.mesosTaskGpu()
taskInfo
.addResources(scalarResource(cpusResourceName, cpus, offer))
.addResources(scalarResource(memResourceName, mem, offer))
.addResources(scalarResource(diskResourceName, disk, offer))
+ .addResources(scalarResource(gpusResourceName, gpus, offer))
taskInfo
}
@@ -145,6 +148,7 @@ class MesosTaskBuilder @Inject()(val conf: SchedulerConfiguration) {
"CHRONOS_RESOURCE_MEM" -> job.mem.toString,
"CHRONOS_RESOURCE_CPU" -> job.cpus.toString,
"CHRONOS_RESOURCE_DISK" -> job.disk.toString,
+ "CHRONOS_RESOURCE_GPU" -> job.gpus.toString,
"CHRONOS_JOB_RUN_TIME" -> start.toString,
"CHRONOS_JOB_RUN_ATTEMPT" -> attempt.toString
)
diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/SchedulerDriverBuilder.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/SchedulerDriverBuilder.scala
index ecf39c87a..2ece29aff 100644
--- a/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/SchedulerDriverBuilder.scala
+++ b/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/SchedulerDriverBuilder.scala
@@ -8,13 +8,14 @@ import java.util.logging.Logger
import com.google.protobuf.ByteString
import mesosphere.chaos.http.HttpConf
import org.apache.mesos.Protos.{Credential, FrameworkID, FrameworkInfo}
-import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
+import org.apache.mesos.chronos.scheduler.config.{Features, SchedulerConfiguration}
import org.apache.mesos.{
MesosSchedulerDriver,
Protos,
Scheduler,
SchedulerDriver
}
+import FrameworkInfo.Capability
import scala.collection.JavaConverters.asScalaSetConverter
@@ -82,6 +83,11 @@ class SchedulerDriverBuilder {
config.mesosAuthenticationPrincipal.get
.foreach(frameworkInfoBuilder.setPrincipal)
+ if (config.isFeatureSet(Features.GPU_RESOURCES)) {
+ frameworkInfoBuilder.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES))
+ log.info("GPU_RESOURCES feature enabled.")
+ }
+
frameworkInfoBuilder.build()
}
diff --git a/src/main/scala/org/apache/mesos/chronos/utils/JobDeserializer.scala b/src/main/scala/org/apache/mesos/chronos/utils/JobDeserializer.scala
index 5104f11bb..2922095d8 100644
--- a/src/main/scala/org/apache/mesos/chronos/utils/JobDeserializer.scala
+++ b/src/main/scala/org/apache/mesos/chronos/utils/JobDeserializer.scala
@@ -144,6 +144,14 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
JobDeserializer.config.mesosTaskMem()
else 0
+ val gpus =
+ if (node.has("gpus") && node.get("gpus") != null && node
+ .get("gpus")
+ .asInt != 0) node.get("gpus").asInt
+ else if (JobDeserializer.config != null)
+ JobDeserializer.config.mesosTaskGpu()
+ else 0
+
val errorsSinceLastSuccess =
if (node.has("errorsSinceLastSuccess") && node.get(
"errorsSinceLastSuccess") != null)
@@ -396,6 +404,7 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
cpus = cpus,
disk = disk,
mem = mem,
+ gpus = gpus,
disabled = disabled,
concurrent = concurrent,
errorsSinceLastSuccess = errorsSinceLastSuccess,
@@ -433,6 +442,7 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
cpus = cpus,
disk = disk,
mem = mem,
+ gpus = gpus,
disabled = disabled,
concurrent = concurrent,
errorsSinceLastSuccess = errorsSinceLastSuccess,
@@ -469,6 +479,7 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
cpus = cpus,
disk = disk,
mem = mem,
+ gpus = gpus,
disabled = disabled,
errorsSinceLastSuccess = errorsSinceLastSuccess,
fetch = fetch,
diff --git a/src/main/scala/org/apache/mesos/chronos/utils/JobSerializer.scala b/src/main/scala/org/apache/mesos/chronos/utils/JobSerializer.scala
index c8c4eb15d..34a66a88b 100644
--- a/src/main/scala/org/apache/mesos/chronos/utils/JobSerializer.scala
+++ b/src/main/scala/org/apache/mesos/chronos/utils/JobSerializer.scala
@@ -75,6 +75,9 @@ class JobSerializer extends JsonSerializer[BaseJob] {
json.writeFieldName("mem")
json.writeNumber(baseJob.mem)
+ json.writeFieldName("gpus")
+ json.writeNumber(baseJob.gpus)
+
json.writeFieldName("disabled")
json.writeBoolean(baseJob.disabled)
diff --git a/src/test/scala/org/apache/mesos/chronos/scheduler/mesos/MesosTaskBuilderSpec.scala b/src/test/scala/org/apache/mesos/chronos/scheduler/mesos/MesosTaskBuilderSpec.scala
index 623331e59..a095f01f9 100644
--- a/src/test/scala/org/apache/mesos/chronos/scheduler/mesos/MesosTaskBuilderSpec.scala
+++ b/src/test/scala/org/apache/mesos/chronos/scheduler/mesos/MesosTaskBuilderSpec.scala
@@ -45,7 +45,7 @@ class MesosTaskBuilderSpec extends SpecificationWithJUnit with Mockito {
ScheduleBasedJob("FOO/BAR/BAM", "AJob", "noop", 10L, 20L,
"fooexec", "fooflags", "none", 7, "foo@bar.com", "Foo", "Test schedule based job", "TODAY",
- "YESTERDAY", cpus = 2, disk = 3, mem = 5, container = container, environmentVariables = Seq(),
+ "YESTERDAY", cpus = 2, disk = 3, mem = 5, gpus = 0, container = container, environmentVariables = Seq(),
shell = true, arguments = Seq(), softError = true, constraints = constraints)
}
@@ -58,6 +58,7 @@ class MesosTaskBuilderSpec extends SpecificationWithJUnit with Mockito {
"CHRONOS_RESOURCE_MEM" -> job.mem.toString,
"CHRONOS_RESOURCE_CPU" -> job.cpus.toString,
"CHRONOS_RESOURCE_DISK" -> job.disk.toString,
+ "CHRONOS_RESOURCE_GPU" -> job.gpus.toString,
"CHRONOS_JOB_RUN_TIME" -> start.toString,
"CHRONOS_JOB_RUN_ATTEMPT" -> attempt.toString
)