diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala index dfb83fbf4..dd691ccb3 100644 --- a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala @@ -5,7 +5,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.logging.{Level, Logger} import com.datastax.driver.core._ -import com.datastax.driver.core.exceptions.{DriverException, NoHostAvailableException, QueryExecutionException, QueryValidationException} +import com.datastax.driver.core.exceptions.{DriverException, NoHostAvailableException, QueryExecutionException, QueryValidationException, InvalidQueryException} import com.datastax.driver.core.querybuilder.{Insert, QueryBuilder} import com.google.inject.Inject import org.apache.mesos.Protos.{TaskState, TaskStatus} @@ -164,15 +164,30 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan case Some(c) => try { val session = c.build.connect() - session.execute(new SimpleStatement( - s"CREATE KEYSPACE IF NOT EXISTS ${config.cassandraKeyspace()} WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" - )) - session.execute(new SimpleStatement( - s"USE ${config.cassandraKeyspace()};" - )) - - session.execute(new SimpleStatement( - s"CREATE TABLE IF NOT EXISTS ${config.cassandraTable()}" + + + val createKeyspaceStatement = s"CREATE KEYSPACE IF NOT EXISTS ${config.cassandraKeyspace()} WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" + try { + session.execute(new SimpleStatement( + createKeyspaceStatement + )) + } catch { + case e: Exception => + log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(createKeyspaceStatement)) + throw e + } + + val useKeyspaceStatement = s"USE ${config.cassandraKeyspace()};" + try { + session.execute(new SimpleStatement( + useKeyspaceStatement + )) + } catch { + case e: Exception => + log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(useKeyspaceStatement)) + throw e + } + + val createTableStatement = s"CREATE TABLE IF NOT EXISTS ${config.cassandraTable()}" + """ |( | id VARCHAR, @@ -190,17 +205,34 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan | WITH bloom_filter_fp_chance=0.100000 AND | compaction = {'class':'LeveledCompactionStrategy'} """.stripMargin - )) - session.execute(new SimpleStatement( - s"CREATE INDEX IF NOT EXISTS ON ${config.cassandraTable()} ($JOB_NAME);" - )) + try { + session.execute(new SimpleStatement( + createTableStatement + )) + } catch { + case e: Exception => + log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(createTableStatement)) + throw e + } + + val createIndexStatement = s"CREATE INDEX IF NOT EXISTS ON ${config.cassandraTable()} ($JOB_NAME);" + try { + session.execute(new SimpleStatement( + createIndexStatement + )) + } catch { + case e: InvalidQueryException => + log.log(Level.WARNING, "Caught InvalidQueryException when creating Cassandra JobStats session (%s), probably not fatal".format(createIndexStatement), e) + case e: Exception => + log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(createIndexStatement)) + throw e + } /* * highest bloom filter to reduce memory consumption and reducing * false positives */ - session.execute(new SimpleStatement( - s"CREATE TABLE IF NOT EXISTS ${config.cassandraStatCountTable()}" + + val createStatTableStatement = s"CREATE TABLE IF NOT EXISTS ${config.cassandraStatCountTable()}" + """ |( | task_id VARCHAR, @@ -210,7 +242,15 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan | WITH bloom_filter_fp_chance=0.100000 AND | compaction = {'class':'LeveledCompactionStrategy'} """.stripMargin - )) + try { + session.execute(new SimpleStatement( + createStatTableStatement + )) + } catch { + case e: Exception => + log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(createStatTableStatement)) + throw e + } _session = Some(session) _session