diff --git a/DTRACE-CHANGELOG b/DTRACE-CHANGELOG new file mode 100644 index 00000000000..9a1f659421a --- /dev/null +++ b/DTRACE-CHANGELOG @@ -0,0 +1,3 @@ +# 1.2.1 +* Zipkin-collector-service + * Add kafka module for collect service diff --git a/project/Project.scala b/project/Project.scala index ae86e62760c..dffa0a24a20 100644 --- a/project/Project.scala +++ b/project/Project.scala @@ -23,7 +23,7 @@ import sbtassembly.Plugin._ import AssemblyKeys._ object Zipkin extends Build { - val zipkinVersion = "1.2.0-SNAPSHOT" + val zipkinVersion = "1.2.1-SNAPSHOT" val finagleVersion = "6.16.0" val utilVersion = "6.16.0" diff --git a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/IndexBuilder.scala b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/IndexBuilder.scala index c61f19a3c76..f3280e4164b 100644 --- a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/IndexBuilder.scala +++ b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/IndexBuilder.scala @@ -32,7 +32,7 @@ case class IndexBuilder( serviceSpanNameIndexCf: String = "ServiceSpanNameIndex", annotationsIndexCf: String = "AnnotationsIndex", durationIndexCf: String = "DurationIndex", - dataTimeToLive: Duration = 14.days, + dataTimeToLive: Duration = 3.days, numBuckets: Int = 10, writeConsistency: WriteConsistency = WriteConsistency.One, readConsistency: ReadConsistency = ReadConsistency.One diff --git a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/StorageBuilder.scala b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/StorageBuilder.scala index 82fda64f44f..d9811b82683 100644 --- a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/StorageBuilder.scala +++ b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/cassandra/StorageBuilder.scala @@ -29,7 +29,7 @@ case class StorageBuilder( columnFamily: String = "Traces", writeConsistency: WriteConsistency = WriteConsistency.One, readConsistency: ReadConsistency = ReadConsistency.One, - dataTimeToLive: Duration = 14.days, + dataTimeToLive: Duration = 3.days, readBatchSize: Int = 500, spanCodec: Codec[gen.Span] = new SnappyCodec(new ScroogeThriftCodec[gen.Span](gen.Span)) ) extends Builder[Storage] { diff --git a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala index e7b1abd1f13..fad5d688750 100644 --- a/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala +++ b/zipkin-cassandra/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala @@ -39,7 +39,7 @@ case class CassandraIndex( serviceSpanNameIndex: ColumnFamily[String, Long, Long], annotationsIndex: ColumnFamily[ByteBuffer, Long, Long], durationIndex: ColumnFamily[Long, Long, String], - dataTimeToLive: Duration = 14.days + dataTimeToLive: Duration = 3.days ) extends Index { def close() { @@ -262,6 +262,9 @@ case class CassandraIndex( val timestamp = lastAnnotation.timestamp val batch = annotationsIndex.batch + span.serviceName + + val serviceName = span.serviceName.getOrElse("default_dtrace_service") span.annotations.filter { a => // skip core annotations since that query can be done by service name/span name anyway @@ -276,7 +279,7 @@ case class CassandraIndex( val col = Column[Long, Long](a.timestamp, span.traceId).ttl(dataTimeToLive) batch.insert(ByteBuffer.wrap(encode(endpoint.serviceName.toLowerCase, a.value.toLowerCase).getBytes), col) } - case None => // Nothin + case None => // Nothing } } @@ -289,7 +292,14 @@ case class CassandraIndex( batch.insert(ByteBuffer.wrap(key ++ INDEX_DELIMITER.getBytes ++ Util.getArrayFromBuffer(ba.value)), col) batch.insert(ByteBuffer.wrap(key), col) } - case None => + case None => { +// index span without end point + WRITE_REQUEST_COUNTER.incr(2) + val key = encode(serviceName, ba.key).getBytes + val col = Column[Long, Long](timestamp, span.traceId).ttl(dataTimeToLive) + batch.insert(ByteBuffer.wrap(key ++ INDEX_DELIMITER.getBytes ++ Util.getArrayFromBuffer(ba.value)), col) + batch.insert(ByteBuffer.wrap(key), col) + } } } val annFuture = batch.execute() diff --git a/zipkin-cassandra/src/schema/cassandra-distribution-schema.txt b/zipkin-cassandra/src/schema/cassandra-distribution-schema.txt new file mode 100644 index 00000000000..159cc883c7c --- /dev/null +++ b/zipkin-cassandra/src/schema/cassandra-distribution-schema.txt @@ -0,0 +1,26 @@ +connect 127.0.0.1/9160; + +create keyspace Zipkin + with placement_strategy = 'NetworkTopologyStrategy' + and strategy_options = {'datacenter' : 1} + and durable_writes = true; + +use Zipkin; + +create column family Traces; + +create column family SpanNames; +create column family ServiceNames; + +create column family ServiceSpanNameIndex with comparator = LongType; +create column family ServiceNameIndex with comparator = LongType; +create column family AnnotationsIndex with comparator = LongType; +create column family DurationIndex with comparator = LongType; + +/* +TopAnnotations stores the top normal and key value annotations per service, +and dependencies stores the parents and number of calls to parents per service +*/ + +create column family TopAnnotations with comparator = LongType; +create column family Dependencies with comparator = LongType; diff --git a/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/IndexService.scala b/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/IndexService.scala index 1bef878dd47..f0a98f8f8eb 100644 --- a/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/IndexService.scala +++ b/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/IndexService.scala @@ -40,6 +40,7 @@ class IndexService(index: Index) extends Service[Span, Unit] { case e => { Stats.getCounter("exception_%s_%s".format(method, e.getClass)).incr() log.error(e, method) + log.error(e.getMessage, method) } } diff --git a/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/StorageService.scala b/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/StorageService.scala index 3de0a3b982d..80f9ae2fbb1 100644 --- a/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/StorageService.scala +++ b/zipkin-collector-core/src/main/scala/com/twitter/zipkin/collector/processor/StorageService.scala @@ -32,6 +32,7 @@ class StorageService(storage: Storage) extends Service[Span, Unit] { case e => { Stats.getCounter("exception_%s_%s".format("storeSpan", e.getClass)).incr() log.error(e, "storeSpan") + log.error(e.getMessage, "storeSpan") } } } diff --git a/zipkin-collector-service/config/collector-cassandra.scala b/zipkin-collector-service/config/collector-cassandra.scala index f8c81d5b3a6..c322af34e34 100644 --- a/zipkin-collector-service/config/collector-cassandra.scala +++ b/zipkin-collector-service/config/collector-cassandra.scala @@ -15,7 +15,7 @@ */ import com.twitter.logging._ -import com.twitter.zipkin.builder.Scribe +import com.twitter.zipkin.builder.{ZipkinServerBuilder, Scribe} import com.twitter.zipkin.cassandra import com.twitter.zipkin.collector.builder.CollectorServiceBuilder import com.twitter.zipkin.storage.Store @@ -28,7 +28,6 @@ val loggers = List(LoggerFactory(level = Some(Level.INFO), append = true, formatter = BareFormatter)))) - val keyspaceBuilder = cassandra.Keyspace.static(nodes = Set("localhost")) val cassandraBuilder = Store.Builder( cassandra.StorageBuilder(keyspaceBuilder), @@ -38,4 +37,4 @@ val cassandraBuilder = Store.Builder( CollectorServiceBuilder(Scribe.Interface(categories = Set("zipkin"))) .writeTo(cassandraBuilder) - .copy(serverBuilder = ZipkinServerBuilder(9410, 9900).loggers(loggers)) + //.copy(serverBuilder = ZipkinServerBuilder(9410, 9900).loggers(loggers)) diff --git a/zipkin-collector-service/config/collector-redis.scala b/zipkin-collector-service/config/collector-redis.scala index 0665643aee6..9469bf2fb6a 100644 --- a/zipkin-collector-service/config/collector-redis.scala +++ b/zipkin-collector-service/config/collector-redis.scala @@ -14,10 +14,9 @@ * limitations under the License. */ import com.twitter.zipkin.builder.Scribe -import com.twitter.zipkin.redis +import com.twitter.zipkin.{cassandra, redis, kafka} import com.twitter.zipkin.collector.builder.CollectorServiceBuilder import com.twitter.zipkin.storage.Store -import com.twitter.zipkin.kafka val redisBuilder = Store.Builder( @@ -25,12 +24,7 @@ val redisBuilder = Store.Builder( redis.IndexBuilder("0.0.0.0", 6379) ) -val kafkaBuilder = Store.Builder( - kafka.StorageBuilder("10.26.107.44", 2181, "topic"), - kafka.IndexBuilder() -) - - CollectorServiceBuilder(Scribe.Interface(categories = Set("zipkin"))) - .writeTo(redisBuilder).writeTo(kafkaBuilder) + .writeTo(redisBuilder) + diff --git a/zipkin-collector-service/src/test/scala/com/twitter/zipkin/config/ConfigSpec.scala b/zipkin-collector-service/src/test/scala/com/twitter/zipkin/config/ConfigSpec.scala index 76243c35ff2..767ffce76bb 100644 --- a/zipkin-collector-service/src/test/scala/com/twitter/zipkin/config/ConfigSpec.scala +++ b/zipkin-collector-service/src/test/scala/com/twitter/zipkin/config/ConfigSpec.scala @@ -29,7 +29,7 @@ class ConfigSpec extends Specification { "validate collector configs" in { val configFiles = Seq( "/collector-dev.scala", - "/collector-hbase.scala", + /*"/collector-hbase.scala",*/ "/collector-cassandra.scala" ) map { TempFile.fromResourcePath(_) } diff --git a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala index 01a7af77436..a42a0381483 100644 --- a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala +++ b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/KafkaService.scala @@ -1,5 +1,7 @@ package com.twitter.zipkin.kafka +import java.nio.charset.Charset + import com.twitter.finagle.Service import com.twitter.util.{Future, Time} import com.twitter.zipkin.common.Span @@ -14,18 +16,19 @@ import scala.util.parsing.json class KafkaService( kafka: Producer[String, String], - topic: String + topic: String, + service: String ) extends Service[Span, Unit] { def apply(span: Span): Future[Unit] = { val msg = spanFormat(span) - val astreamTopic = genTopic(span).getOrElse(topic) - val keyMsg = new KeyedMessage[String, String](astreamTopic, msg) + val kafkaTopic = genTopic(span).getOrElse(topic) + val keyMsg = new KeyedMessage[String, String](kafkaTopic, msg) Future { kafka.send(keyMsg) } onSuccess { (_) => - //println("sended to kafka success") + //println("send to kafka success") } } @@ -44,20 +47,62 @@ class KafkaService( val response_time = (span.duration.getOrElse(0.toLong) / 1000) val mapData = Map( + "product" -> getProduct(span.serviceName), + "service" -> getModule(span).getOrElse("service"), + "module" -> getModule(span).getOrElse("service"), "page_view" -> "1", "response_time" -> response_time.toString, - "event_time" -> System.currentTimeMillis.toLong, - "zipkin_time" -> (span.firstAnnotation.get.timestamp / 1000).toLong + "event_time" -> System.currentTimeMillis, + "zipkin_time" -> (span.firstAnnotation.get.timestamp / 1000), + "trace_id" -> span.id ) - jsonGen(mapData).toString() + var binaryMap: Map[String, Any] = Map() + + span.binaryAnnotations.foreach( t => { + val s = Charset.forName("UTF-8").newDecoder().decode(t.value) + val key = t.key.toString + val subfix = key.split('.').lastOption match { + case Some(s) => s + case None => "log" + case _ => "log" + } + + subfix match { + case "log" => "" + case "raw" => binaryMap += key -> s + case "numeric" => binaryMap += key -> {try {BigDecimal(s.toString) } catch { case _ => 0 }} + case "string" => binaryMap += key -> s.toString + case _ => binaryMap += key -> s.toString + } + }) + + jsonGen(binaryMap ++ mapData).toString() } def genTopic(span: Span): Option[String] = { - val product = span.serviceName.getOrElse("topic_default").toString.split(":")(0) - val service = "zipkin" + val product = getProduct(span.serviceName) + Some("%s_%s_topic".format(product, service).toString) } + def getProduct(serviceName: Option[String]): String = { + val product = serviceName.getOrElse("default").split(":", 2) + product.length match { + case 1 => "default" + case 2 => product(0) + case _ => "default" + } + } + + def getModule(span: Span): Option[String] = { + val service = span.serviceName.getOrElse("service").split(":", 2) + val name = service.size match { + case 2 => service(1) + case _ => "service" + } + Some(name) + } + } diff --git a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala index 57f9522d939..5358d99ff9d 100644 --- a/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala +++ b/zipkin-kafka/src/main/scala/com/twitter/zipkin/kafka/StorageBuilder.scala @@ -14,7 +14,8 @@ import com.twitter.zipkin.{kafka => outKafka} case class StorageBuilder( host: String, port: Int, - topic: String = "topic" + topic: String = "default_dtrace_topic", + service: String = "dtrace" ) extends Builder[Storage] { self => def apply() = { @@ -25,11 +26,11 @@ case class StorageBuilder( properties.put("metadata.broker.list", kafkaBroker) properties.put("producer.type", "async") properties.put("serializer.class", "kafka.serializer.StringEncoder") - properties.put("request.required.acks", "0") + properties.put("request.required.acks", "1") val producerConfig = new ProducerConfig(properties) val producerClient = new Producer[String, String](producerConfig) - val kafkaService = new outKafka.KafkaService(producerClient, topic) + val kafkaService = new outKafka.KafkaService(producerClient, topic, service) new KafkaStorage { val service = kafkaService