From d49b5672602aeb3aaaa1b9cd12278eb1cd298ee9 Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Sat, 8 Nov 2014 11:41:53 +0800 Subject: [PATCH 01/22] add module to a-stream --- .../com/twitter/zipkin/kafka/KafkaService.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala index 01a7af77436..d16a8f2c55b 100644 --- a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala +++ b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala @@ -44,6 +44,7 @@ class KafkaService( val response_time = (span.duration.getOrElse(0.toLong) / 1000) val mapData = Map( + "module" -> getModule(span).getOrElse("service"), "page_view" -> "1", "response_time" -> response_time.toString, "event_time" -> System.currentTimeMillis.toLong, @@ -54,10 +55,19 @@ class KafkaService( } def genTopic(span: Span): Option[String] = { - val product = span.serviceName.getOrElse("topic_default").toString.split(":")(0) + val product = span.serviceName.getOrElse("default").toString.split(":")(0) val service = "zipkin" Some("%s_%s_topic".format(product, service).toString) } + def getModule(span: Span): Option[String] = { + val service = span.serviceName.getOrElse("service").split(":") + val name = service.size match { + case 2 => service(1) + case _ => "service" + } + Some(name) + } + } From eabd080e95b442134ac3a5bd7ee082f3e9086425 Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Sat, 8 Nov 2014 11:44:41 +0800 Subject: [PATCH 02/22] delete kafka test --- .../src/test/scala/com/twitter/zipkin/config/ConfigSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipkin-collector-service/src/test/scala/com/twitter/zipkin/config/ConfigSpec.scala b/zipkin-collector-service/src/test/scala/com/twitter/zipkin/config/ConfigSpec.scala index 76243c35ff2..767ffce76bb 100644 --- a/zipkin-collector-service/src/test/scala/com/twitter/zipkin/config/ConfigSpec.scala +++ b/zipkin-collector-service/src/test/scala/com/twitter/zipkin/config/ConfigSpec.scala @@ -29,7 +29,7 @@ class ConfigSpec extends Specification { "validate collector configs" in { val configFiles = Seq( "/collector-dev.scala", - "/collector-hbase.scala", + /*"/collector-hbase.scala",*/ "/collector-cassandra.scala" ) map { TempFile.fromResourcePath(_) } From c8c6990810e3e8461eee9594816d88af1480a8e8 Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Mon, 1 Dec 2014 15:49:43 +0800 Subject: [PATCH 03/22] send all binary annotations to kafka except that subfix is log for ES --- .../twitter/zipkin/kafka/KafkaService.scala | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala index d16a8f2c55b..deead4a14cf 100644 --- a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala +++ b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala @@ -1,5 +1,7 @@ package com.twitter.zipkin.kafka +import java.nio.charset.Charset + import com.twitter.finagle.Service import com.twitter.util.{Future, Time} import com.twitter.zipkin.common.Span @@ -19,13 +21,13 @@ class KafkaService( def apply(span: Span): Future[Unit] = { val msg = spanFormat(span) - val astreamTopic = genTopic(span).getOrElse(topic) - val keyMsg = new KeyedMessage[String, String](astreamTopic, msg) + val kafkaTopic = genTopic(span).getOrElse(topic) + val keyMsg = new KeyedMessage[String, String](kafkaTopic, msg) Future { kafka.send(keyMsg) } onSuccess { (_) => - //println("sended to kafka success") + //println("send to kafka success") } } @@ -47,11 +49,29 @@ class KafkaService( "module" -> getModule(span).getOrElse("service"), "page_view" -> "1", "response_time" -> response_time.toString, - "event_time" -> System.currentTimeMillis.toLong, - "zipkin_time" -> (span.firstAnnotation.get.timestamp / 1000).toLong + "event_time" -> System.currentTimeMillis, + "zipkin_time" -> (span.firstAnnotation.get.timestamp / 1000) ) - jsonGen(mapData).toString() + var binaryMap: Map[String, Any] = Map() + + span.binaryAnnotations.foreach( t => { + val s = Charset.forName("UTF-8").newDecoder().decode(t.value) + val key = t.key.toString + val subfix = key.split('.').lastOption match { + case Some(s) => s + case None => "log" + case _ => "log" + } + + subfix match { + case "log" => "" + case _ => binaryMap += key -> s.toString + } + + }) + + jsonGen(binaryMap ++ mapData).toString() } def genTopic(span: Span): Option[String] = { From c60fd670632889ee902f4da3ec4d2e83f0ea3cff Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Mon, 1 Dec 2014 15:50:32 +0800 Subject: [PATCH 04/22] send kafka port to 9092 --- zipkin-collector-service/config/collector-redis.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipkin-collector-service/config/collector-redis.scala b/zipkin-collector-service/config/collector-redis.scala index 0665643aee6..db6e4682334 100644 --- a/zipkin-collector-service/config/collector-redis.scala +++ b/zipkin-collector-service/config/collector-redis.scala @@ -26,7 +26,7 @@ val redisBuilder = Store.Builder( ) val kafkaBuilder = Store.Builder( - kafka.StorageBuilder("10.26.107.44", 2181, "topic"), + kafka.StorageBuilder("127.0.0.1", 9092, "topic"), kafka.IndexBuilder() ) From 3c9bdf72d55e1a4979d2790adc27116eb13c79bb Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Tue, 2 Dec 2014 14:21:13 +0800 Subject: [PATCH 05/22] add product and service for ES --- .../com/twitter/zipkin/kafka/KafkaService.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala index deead4a14cf..11f2c34e7d5 100644 --- a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala +++ b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala @@ -46,6 +46,8 @@ class KafkaService( val response_time = (span.duration.getOrElse(0.toLong) / 1000) val mapData = Map( + "product" -> getProduct(span.serviceName), + "service" -> getModule(span).getOrElse("service"), "module" -> getModule(span).getOrElse("service"), "page_view" -> "1", "response_time" -> response_time.toString, @@ -75,13 +77,17 @@ class KafkaService( } def genTopic(span: Span): Option[String] = { - val product = span.serviceName.getOrElse("default").toString.split(":")(0) - val service = "zipkin" + val product = getProduct(span.serviceName) + val service = "dtrace" Some("%s_%s_topic".format(product, service).toString) } + def getProduct(serviceName: Option[String]): String = { + serviceName.getOrElse("default").split(":", 2)(0) + } + def getModule(span: Span): Option[String] = { - val service = span.serviceName.getOrElse("service").split(":") + val service = span.serviceName.getOrElse("service").split(":", 2) val name = service.size match { case 2 => service(1) case _ => "service" From c9519f8b9be81ba0ce9a0de0832796956aaf07d2 Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Tue, 2 Dec 2014 14:21:44 +0800 Subject: [PATCH 06/22] add log for collector --- zipkin-collector-service/config/collector-cassandra.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/zipkin-collector-service/config/collector-cassandra.scala b/zipkin-collector-service/config/collector-cassandra.scala index f8c81d5b3a6..c322af34e34 100644 --- a/zipkin-collector-service/config/collector-cassandra.scala +++ b/zipkin-collector-service/config/collector-cassandra.scala @@ -15,7 +15,7 @@ */ import com.twitter.logging._ -import com.twitter.zipkin.builder.Scribe +import com.twitter.zipkin.builder.{ZipkinServerBuilder, Scribe} import com.twitter.zipkin.cassandra import com.twitter.zipkin.collector.builder.CollectorServiceBuilder import com.twitter.zipkin.storage.Store @@ -28,7 +28,6 @@ val loggers = List(LoggerFactory(level = Some(Level.INFO), append = true, formatter = BareFormatter)))) - val keyspaceBuilder = cassandra.Keyspace.static(nodes = Set("localhost")) val cassandraBuilder = Store.Builder( cassandra.StorageBuilder(keyspaceBuilder), @@ -38,4 +37,4 @@ val cassandraBuilder = Store.Builder( CollectorServiceBuilder(Scribe.Interface(categories = Set("zipkin"))) .writeTo(cassandraBuilder) - .copy(serverBuilder = ZipkinServerBuilder(9410, 9900).loggers(loggers)) + //.copy(serverBuilder = ZipkinServerBuilder(9410, 9900).loggers(loggers)) From 4b34f7ea36547fbda1b6f218a4c5c082f7c13678 Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Thu, 4 Dec 2014 16:33:56 +0800 Subject: [PATCH 07/22] use default service name --- .../scala/com/twitter/zipkin/kafka/KafkaService.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala index 11f2c34e7d5..8d5b1d13259 100644 --- a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala +++ b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala @@ -16,7 +16,8 @@ import scala.util.parsing.json class KafkaService( kafka: Producer[String, String], - topic: String + topic: String, + service: String = "dtrace" ) extends Service[Span, Unit] { def apply(span: Span): Future[Unit] = { @@ -52,7 +53,8 @@ class KafkaService( "page_view" -> "1", "response_time" -> response_time.toString, "event_time" -> System.currentTimeMillis, - "zipkin_time" -> (span.firstAnnotation.get.timestamp / 1000) + "zipkin_time" -> (span.firstAnnotation.get.timestamp / 1000), + "trace_id" -> span.id ) var binaryMap: Map[String, Any] = Map() @@ -70,7 +72,6 @@ class KafkaService( case "log" => "" case _ => binaryMap += key -> s.toString } - }) jsonGen(binaryMap ++ mapData).toString() @@ -78,7 +79,7 @@ class KafkaService( def genTopic(span: Span): Option[String] = { val product = getProduct(span.serviceName) - val service = "dtrace" + Some("%s_%s_topic".format(product, service).toString) } From 990d5ba16b0762df9352cf0aff61147f7872d0b0 Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Thu, 4 Dec 2014 16:34:33 +0800 Subject: [PATCH 08/22] set default service name --- .../main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala index 57f9522d939..60a58b84ca9 100644 --- a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala +++ b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala @@ -14,7 +14,8 @@ import com.twitter.zipkin.{kafka => outKafka} case class StorageBuilder( host: String, port: Int, - topic: String = "topic" + topic: String = "default_dtrace_topic", + service: String = "dtrace" ) extends Builder[Storage] { self => def apply() = { @@ -29,7 +30,7 @@ case class StorageBuilder( val producerConfig = new ProducerConfig(properties) val producerClient = new Producer[String, String](producerConfig) - val kafkaService = new outKafka.KafkaService(producerClient, topic) + val kafkaService = new outKafka.KafkaService(producerClient, topic, service) new KafkaStorage { val service = kafkaService From 07f3a971b0a15cd057bfc3cab623b5d8391825d7 Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Thu, 4 Dec 2014 16:35:08 +0800 Subject: [PATCH 09/22] delete default value --- .../src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala index 8d5b1d13259..28e520f97e4 100644 --- a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala +++ b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala @@ -17,7 +17,7 @@ import scala.util.parsing.json class KafkaService( kafka: Producer[String, String], topic: String, - service: String = "dtrace" + service: String ) extends Service[Span, Unit] { def apply(span: Span): Future[Unit] = { From cebf3008e3ce2d97b8f1ea9541bf98840612912e Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Thu, 4 Dec 2014 16:57:17 +0800 Subject: [PATCH 10/22] set TTL to 3.days --- .../main/scala/com/twitter/zipkin/cassandra/IndexBuilder.scala | 2 +- .../scala/com/twitter/zipkin/cassandra/StorageBuilder.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/IndexBuilder.scala b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/IndexBuilder.scala index c61f19a3c76..f3280e4164b 100644 --- a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/IndexBuilder.scala +++ b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/IndexBuilder.scala @@ -32,7 +32,7 @@ case class IndexBuilder( serviceSpanNameIndexCf: String = "ServiceSpanNameIndex", annotationsIndexCf: String = "AnnotationsIndex", durationIndexCf: String = "DurationIndex", - dataTimeToLive: Duration = 14.days, + dataTimeToLive: Duration = 3.days, numBuckets: Int = 10, writeConsistency: WriteConsistency = WriteConsistency.One, readConsistency: ReadConsistency = ReadConsistency.One diff --git a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/StorageBuilder.scala b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/StorageBuilder.scala index 82fda64f44f..d9811b82683 100644 --- a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/StorageBuilder.scala +++ b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/StorageBuilder.scala @@ -29,7 +29,7 @@ case class StorageBuilder( columnFamily: String = "Traces", writeConsistency: WriteConsistency = WriteConsistency.One, readConsistency: ReadConsistency = ReadConsistency.One, - dataTimeToLive: Duration = 14.days, + dataTimeToLive: Duration = 3.days, readBatchSize: Int = 500, spanCodec: Codec[gen.Span] = new SnappyCodec(new ScroogeThriftCodec[gen.Span](gen.Span)) ) extends Builder[Storage] { From 94de4ce44cfe86ee10b13c4272cb8b2674fe5e9b Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Thu, 25 Dec 2014 16:50:42 +0800 Subject: [PATCH 11/22] redis and kafka --- .../config/collector-redis.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/zipkin-collector-service/config/collector-redis.scala b/zipkin-collector-service/config/collector-redis.scala index db6e4682334..36c0e06a4b7 100644 --- a/zipkin-collector-service/config/collector-redis.scala +++ b/zipkin-collector-service/config/collector-redis.scala @@ -14,10 +14,9 @@ * limitations under the License. */ import com.twitter.zipkin.builder.Scribe -import com.twitter.zipkin.redis +import com.twitter.zipkin.{cassandra, redis, kafka} import com.twitter.zipkin.collector.builder.CollectorServiceBuilder import com.twitter.zipkin.storage.Store -import com.twitter.zipkin.kafka val redisBuilder = Store.Builder( @@ -26,11 +25,18 @@ val redisBuilder = Store.Builder( ) val kafkaBuilder = Store.Builder( - kafka.StorageBuilder("127.0.0.1", 9092, "topic"), + kafka.StorageBuilder("127.0.0.1", 9092, "default_dtrace_topic"), kafka.IndexBuilder() ) - +val keyspaceBuilder = cassandra.Keyspace.static(nodes = Set("localhost")) +val cassandraBuilder = Store.Builder( + cassandra.StorageBuilder(keyspaceBuilder), + cassandra.IndexBuilder(keyspaceBuilder), + cassandra.AggregatesBuilder(keyspaceBuilder) +) CollectorServiceBuilder(Scribe.Interface(categories = Set("zipkin"))) - .writeTo(redisBuilder).writeTo(kafkaBuilder) + .writeTo(cassandraBuilder) + .writeTo(kafkaBuilder) + From d2da7fbb7e1451f5be68d89eac125287df6769b8 Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Mon, 29 Dec 2014 15:09:43 +0800 Subject: [PATCH 12/22] add dtrace changelog and upgrade version to 1.2.1 --- DTRACE-CHANGELOG | 3 +++ project/Project.scala | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) create mode 100644 DTRACE-CHANGELOG diff --git a/DTRACE-CHANGELOG b/DTRACE-CHANGELOG new file mode 100644 index 00000000000..9a1f659421a --- /dev/null +++ b/DTRACE-CHANGELOG @@ -0,0 +1,3 @@ +# 1.2.1 +* Zipkin-collector-service + * Add kafka module for collect service diff --git a/project/Project.scala b/project/Project.scala index ae86e62760c..dffa0a24a20 100644 --- a/project/Project.scala +++ b/project/Project.scala @@ -23,7 +23,7 @@ import sbtassembly.Plugin._ import AssemblyKeys._ object Zipkin extends Build { - val zipkinVersion = "1.2.0-SNAPSHOT" + val zipkinVersion = "1.2.1-SNAPSHOT" val finagleVersion = "6.16.0" val utilVersion = "6.16.0" From 0490d5740cf73d2ea8eca5bfbff08429cb540571 Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Sun, 4 Jan 2015 22:54:19 +0800 Subject: [PATCH 13/22] change kafak ack = 1 --- .../main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala index 60a58b84ca9..72f65f15f92 100644 --- a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala +++ b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala @@ -26,7 +26,8 @@ case class StorageBuilder( properties.put("metadata.broker.list", kafkaBroker) properties.put("producer.type", "async") properties.put("serializer.class", "kafka.serializer.StringEncoder") - properties.put("request.required.acks", "0") + properties.put("request.required.acks", "1") + properties.put("batch.num.messages", "8") val producerConfig = new ProducerConfig(properties) val producerClient = new Producer[String, String](producerConfig) From cfc22f9287897e76f73e3b28a813820b6871f6a5 Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Mon, 5 Jan 2015 10:05:47 +0800 Subject: [PATCH 14/22] cassandra distribution schema --- .../schema/cassandra-distribution-schema.txt | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 zipkin-cassandra/src/schema/cassandra-distribution-schema.txt diff --git a/zipkin-cassandra/src/schema/cassandra-distribution-schema.txt b/zipkin-cassandra/src/schema/cassandra-distribution-schema.txt new file mode 100644 index 00000000000..159cc883c7c --- /dev/null +++ b/zipkin-cassandra/src/schema/cassandra-distribution-schema.txt @@ -0,0 +1,26 @@ +connect 127.0.0.1/9160; + +create keyspace Zipkin + with placement_strategy = 'NetworkTopologyStrategy' + and strategy_options = {'datacenter' : 1} + and durable_writes = true; + +use Zipkin; + +create column family Traces; + +create column family SpanNames; +create column family ServiceNames; + +create column family ServiceSpanNameIndex with comparator = LongType; +create column family ServiceNameIndex with comparator = LongType; +create column family AnnotationsIndex with comparator = LongType; +create column family DurationIndex with comparator = LongType; + +/* +TopAnnotations stores the top normal and key value annotations per service, +and dependencies stores the parents and number of calls to parents per service +*/ + +create column family TopAnnotations with comparator = LongType; +create column family Dependencies with comparator = LongType; From 03d8de444dc6cb3d07314fd92845bd12ed55de16 Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Wed, 7 Jan 2015 14:03:43 +0800 Subject: [PATCH 15/22] format numeric --- .../scala/com/twitter/zipkin/kafka/KafkaService.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala index 28e520f97e4..a42a0381483 100644 --- a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala +++ b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala @@ -70,6 +70,9 @@ class KafkaService( subfix match { case "log" => "" + case "raw" => binaryMap += key -> s + case "numeric" => binaryMap += key -> {try {BigDecimal(s.toString) } catch { case _ => 0 }} + case "string" => binaryMap += key -> s.toString case _ => binaryMap += key -> s.toString } }) @@ -84,7 +87,12 @@ class KafkaService( } def getProduct(serviceName: Option[String]): String = { - serviceName.getOrElse("default").split(":", 2)(0) + val product = serviceName.getOrElse("default").split(":", 2) + product.length match { + case 1 => "default" + case 2 => product(0) + case _ => "default" + } } def getModule(span: Span): Option[String] = { From 857e457619ce45de1d45e45e92be25791a114a0c Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Wed, 7 Jan 2015 14:04:02 +0800 Subject: [PATCH 16/22] set TTL = 3.days --- .../com/twitter/zipkin/storage/cassandra/CassandraIndex.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala index e7b1abd1f13..54ca0f350b0 100644 --- a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala +++ b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala @@ -39,7 +39,7 @@ case class CassandraIndex( serviceSpanNameIndex: ColumnFamily[String, Long, Long], annotationsIndex: ColumnFamily[ByteBuffer, Long, Long], durationIndex: ColumnFamily[Long, Long, String], - dataTimeToLive: Duration = 14.days + dataTimeToLive: Duration = 3.days ) extends Index { def close() { From 71bf1ef002efb6288da81f6745590ef64e2e60bc Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Fri, 23 Jan 2015 14:42:11 +0800 Subject: [PATCH 17/22] index span without endpoint --- .../zipkin/storage/cassandra/CassandraIndex.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala index 54ca0f350b0..e85a338d9fa 100644 --- a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala +++ b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala @@ -289,7 +289,14 @@ case class CassandraIndex( batch.insert(ByteBuffer.wrap(key ++ INDEX_DELIMITER.getBytes ++ Util.getArrayFromBuffer(ba.value)), col) batch.insert(ByteBuffer.wrap(key), col) } - case None => + case None => { +// index span without end point + WRITE_REQUEST_COUNTER.incr(2) + val key = encode(endpoint.serviceName, ba.key).getBytes + val col = Column[Long, Long](timestamp, span.traceId).ttl(dataTimeToLive) + batch.insert(ByteBuffer.wrap(key ++ INDEX_DELIMITER.getBytes ++ Util.getArrayFromBuffer(ba.value)), col) + batch.insert(ByteBuffer.wrap(key), col) + } } } val annFuture = batch.execute() From 8c6f5f02665901f24e647a1511d1daf6f485d66f Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Tue, 27 Jan 2015 13:38:46 +0800 Subject: [PATCH 18/22] delete cassandra config --- .../config/collector-redis.scala | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/zipkin-collector-service/config/collector-redis.scala b/zipkin-collector-service/config/collector-redis.scala index 36c0e06a4b7..9469bf2fb6a 100644 --- a/zipkin-collector-service/config/collector-redis.scala +++ b/zipkin-collector-service/config/collector-redis.scala @@ -24,19 +24,7 @@ val redisBuilder = Store.Builder( redis.IndexBuilder("0.0.0.0", 6379) ) -val kafkaBuilder = Store.Builder( - kafka.StorageBuilder("127.0.0.1", 9092, "default_dtrace_topic"), - kafka.IndexBuilder() -) -val keyspaceBuilder = cassandra.Keyspace.static(nodes = Set("localhost")) -val cassandraBuilder = Store.Builder( - cassandra.StorageBuilder(keyspaceBuilder), - cassandra.IndexBuilder(keyspaceBuilder), - cassandra.AggregatesBuilder(keyspaceBuilder) -) - CollectorServiceBuilder(Scribe.Interface(categories = Set("zipkin"))) - .writeTo(cassandraBuilder) - .writeTo(kafkaBuilder) + .writeTo(redisBuilder) From 75065ae91f282c563b43f815da40eae4bf6fb07b Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Tue, 27 Jan 2015 13:39:22 +0800 Subject: [PATCH 19/22] delete batch number for kafka --- .../src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala index 72f65f15f92..5358d99ff9d 100644 --- a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala +++ b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala @@ -27,7 +27,6 @@ case class StorageBuilder( properties.put("producer.type", "async") properties.put("serializer.class", "kafka.serializer.StringEncoder") properties.put("request.required.acks", "1") - properties.put("batch.num.messages", "8") val producerConfig = new ProducerConfig(properties) val producerClient = new Producer[String, String](producerConfig) From d7a9136c693c76284a0b8998766ba6748944225f Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Tue, 27 Jan 2015 13:44:00 +0800 Subject: [PATCH 20/22] index span without endpoint --- .../zipkin/storage/cassandra/CassandraIndex.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala index e85a338d9fa..9ec9acae5fe 100644 --- a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala +++ b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala @@ -263,6 +263,17 @@ case class CassandraIndex( val batch = annotationsIndex.batch + val serviceName = span.firstAnnotation match { + case Some(annotation) => { + try { + annotation.host.get.serviceName.toLowerCase + } catch { + case e: UnsupportedOperationException => "dtrace_service" + } + } + case None => "dtrace_service" + } + span.annotations.filter { a => // skip core annotations since that query can be done by service name/span name anyway !Constants.CoreAnnotations.contains(a.value) @@ -292,7 +303,7 @@ case class CassandraIndex( case None => { // index span without end point WRITE_REQUEST_COUNTER.incr(2) - val key = encode(endpoint.serviceName, ba.key).getBytes + val key = encode(serviceName, ba.key).getBytes val col = Column[Long, Long](timestamp, span.traceId).ttl(dataTimeToLive) batch.insert(ByteBuffer.wrap(key ++ INDEX_DELIMITER.getBytes ++ Util.getArrayFromBuffer(ba.value)), col) batch.insert(ByteBuffer.wrap(key), col) From 439f72d23170c4f72fad1e2cc99d5ee3e57af13a Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Thu, 12 Feb 2015 10:04:11 +0800 Subject: [PATCH 21/22] get service name by annotation --- .../zipkin/storage/cassandra/CassandraIndex.scala | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala index 9ec9acae5fe..fad5d688750 100644 --- a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala +++ b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala @@ -262,17 +262,9 @@ case class CassandraIndex( val timestamp = lastAnnotation.timestamp val batch = annotationsIndex.batch + span.serviceName - val serviceName = span.firstAnnotation match { - case Some(annotation) => { - try { - annotation.host.get.serviceName.toLowerCase - } catch { - case e: UnsupportedOperationException => "dtrace_service" - } - } - case None => "dtrace_service" - } + val serviceName = span.serviceName.getOrElse("default_dtrace_service") span.annotations.filter { a => // skip core annotations since that query can be done by service name/span name anyway @@ -287,7 +279,7 @@ case class CassandraIndex( val col = Column[Long, Long](a.timestamp, span.traceId).ttl(dataTimeToLive) batch.insert(ByteBuffer.wrap(encode(endpoint.serviceName.toLowerCase, a.value.toLowerCase).getBytes), col) } - case None => // Nothin + case None => // Nothing } } From 24baa3863ebbfbfd202e6e5ab431841bb2db3272 Mon Sep 17 00:00:00 2001 From: zhaoxuan Date: Thu, 12 Feb 2015 15:02:16 +0800 Subject: [PATCH 22/22] add error message to logger --- .../com/twitter/zipkin/collector/processor/IndexService.scala | 1 + .../com/twitter/zipkin/collector/processor/StorageService.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/IndexService.scala b/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/IndexService.scala index 1bef878dd47..f0a98f8f8eb 100644 --- a/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/IndexService.scala +++ b/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/IndexService.scala @@ -40,6 +40,7 @@ class IndexService(index: Index) extends Service[Span, Unit] { case e => { Stats.getCounter("exception_%s_%s".format(method, e.getClass)).incr() log.error(e, method) + log.error(e.getMessage, method) } } diff --git a/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/StorageService.scala b/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/StorageService.scala index 3de0a3b982d..80f9ae2fbb1 100644 --- a/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/StorageService.scala +++ b/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/StorageService.scala @@ -32,6 +32,7 @@ class StorageService(storage: Storage) extends Service[Span, Unit] { case e => { Stats.getCounter("exception_%s_%s".format("storeSpan", e.getClass)).incr() log.error(e, "storeSpan") + log.error(e.getMessage, "storeSpan") } } }