- Introduction
- Features
- Deployment
3.1. Basic
3.2. Docker
3.2.1. Standalone
3.2.2. Distributed
3.2.2.1. Additional Node
3.2.2.2. Entry Node - Configuration
4.1. Specific Options
4.2. Tuning
4.2.1. Concurrency
4.2.2. Inherited Storage Driver Usage Warnings - Usage
5.1. Record Operations
5.1.1. Create
5.1.2. Read
5.1.3. Update
5.1.4. Delete
5.1.5. List
5.1.6. End-to-end Latency
5.2. Topic Operations
5.2.1. Create
5.2.2. Read
5.2.3. Update
5.2.4. Delete
5.2.5. List
5.3. Custom Kafka Headers
5.3.1. Expressions - Performance
6.1. Compariosn of Mongoose Kafka Storage Driver and Kafka Benchmark
6.1.1. Records Creating - Open Issues
- Development
8.1. Build
8.2. Test
8.2.1. Manual
8.2.2. Automated
8.2.2.1. Unit
8.2.2.2. Integration
8.2.2.3. Functional
Mongoose and Kafka are using quite different concepts. So it's necessary to determine how Kafka-specific terms are mapped to the Mongoose abstractions.
Kafka | Mongoose |
---|---|
Record | Data Item |
Topic | Path Item |
Partition | N/A |
- Item types:
data item
- Record/Messagepath
- Topic
- Data item operation types:
create
read
- Path item operation types:
create
read
delete
list
- Storage specific:
- Records' keys
- Compression type for producer's data
Java 11+ is required to build/run.
-
Get the latest
mongoose-base
jar from the maven repo and put it to your working directory. Note the particular version, which is referred as BASE_VERSION below. -
Get the latest
mongoose-storage-driver-preempt
jar from the maven repo and put it to the~/.mongoose/<BASE_VERSION>/ext
directory. -
Get the latest
mongoose-storage-driver-kafka
jar from the maven repo and put it to the~/.mongoose/<BASE_VERSION>/ext
directory.
java -jar mongoose-base-<BASE_VERSION>.jar \
--storage-driver-type=kafka \
--storage-net-node-addrs=<NODE_IP_ADDRS> \
--storage-net-node-port=9092 \
--storage-driver-create-key-enabled=false \
--storage-driver-compression-type=none \
--item-data-size=1KB \
...
docker run \
--network host \
emcmongoose/mongoose-storage-driver-kafka \
--storage-net-node-addrs=<NODE_IP_ADDRS> \
--storage-driver-create-key-enabled=false \
--storage-driver-compression-type=none \
...
docker run \
--network host \
--expose 1099 \
emcmongoose/mongoose-storage-driver-kafka \
--run-node
docker run \
--network host \
emcmongoose/mongoose-storage-driver-kafka \
--load-step-node-addrs=<ADDR1,ADDR2,...> \
--storage-net-node-addrs=<NODE_IP_ADDRS> \
--storage-driver-create-key-enabled=false \
--storage-driver-compression-type=none \
Name | Type | Default Value | Description |
---|---|---|---|
storage-driver-record-timeoutMillis | long | 10000 | The event read and create timeout in milliseconds |
storage-driver-create-key-enabled | boolean | false | Creates a record with or without a key |
storage-net-sndBuf | integer | 131072 | The size of the TCP send buffer to use when sending data. If the value is -1, the OS default will be used. |
storage-net-rcvBuf | integer | 32768 | The size of the TCP receive buffer to use when reading data. If the value is -1, the OS default will be used. |
storage-driver-request-size | integer | 1048576 | The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. |
storage-net-linger | integer | 0 | The delay before sending the records. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. |
storage-driver-buffer-memory | long | 33554432 | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. |
storage-driver-compression-type | string | none | The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, snappy, lz4, or zstd. |
storage-net-node-addrs | list | "" | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This list should be in the form host1:port1,host2:port2 |
storage-net-node-port | integer | 9092 | The common port number to access the storage nodes, may be overriden adding the port number to the storage-driver-addrs, for example: "127.0.0.1:9020,127.0.0.1:9022,..." |
There are two configuration options controlling the load operations concurrency level.
-
storage-driver-limit-concurrency
Limits the count of the active load operations at any moment of the time. The best practice is to set it to 0 (unlimited concurrency for the asynchronous operations, aka the top gear of the "burst mode"). -
storage-driver-threads
The count of the threads running/submitting the load operations execution. The meaningful values are usually only few times more than the count of the available CPU threads.
See the design notes
Mongoose should perform the load operations on the records when the configuration option item-type
is set to data
.
ProducerApi
has a KafkaProducer
class with function send(), which can send a record to topic.
- Steps:
Note:
KafkaProducer contains thread, the number of threads is equal to the number of producers.
ConsumerApi
has a KafkaConsumer
class, provided with function poll(). According to Kafka documentation, on each poll Consumer begins to consume records from last offset.
- Steps:
Note:
num.consumer.fetchers — the number fetcher threads used to fetch data, default value = 1.
Not supported.
deleteRecords() function from AdminClient(AdminClient API) class, deletes all records before the one with giving offset.
Not supported.
The end-to-end latency is a time span between the CREATE and READ operations executed for the same item. The end-to-end latency may be measured using Mongoose's Pipeline Load extension which is included into this extension's docker image. To do this, it's necessary to produce the raw operations trace data.
Scenario example: https://github.com/emc-mongoose/mongoose-storage-driver-pravega/blob/master/src/test/robot/api/storage/data/e2e_latency.js
Command line example:
docker run \
--network host \
--volume "$(pwd)"/src/test/robot/api/storage/data:/root \
--volume /tmp/log:/root/.mongoose/<BASE_VERSION>/log \
emcmongoose/mongoose-storage-driver-kafka \
--item-output-path=topic1 \
--run-scenario=/root/e2e_latency.js \
--load-step-id=e2e_latency \
--item-data-size=10KB \
--load-op-limit-count=100000 \
--output-metrics-trace-persist
Once the raw operations trace data is obtained, it may be used to produce the end-to-end latency data using the tool: https://github.com/emc-mongoose/e2e-latency-generator
Scenario example e2e_latency.js:
var topic = "topic"
var sharedConfig = {
"storage": {
"driver": {
"type": "kafka"
}
},
"output": {
"metrics": {
"trace": {
"persist": true
}
}
},
"load": {
"op": {
"limit": {
"recycle": 1000
}
}
}
};
var createConfig = {
"item" : {
"type" : "data",
"output" : {
"path" : topic
}
},
"load" : {
"op" : {
"limit" : {
"count" : 1000000
}
}
}
};
var readConfig = {
"item" : {
"type" : "data",
"input" : {
"path" : topic
}
},
"load" : {
"op" : {
"type" : "read",
"limit" : {
"count" : 1000000
},
"recycle" : true
}
}
};
PipelineLoad
.config(sharedConfig)
.append(createConfig)
.append(readConfig)
.run()
Command line example:
java -jar mongoose-base-<BASE_VERSION>.jar \
--storage-driver-type=kafka \
--storage-net-node-port=<NODE_IP_ADDRS> \
--run-scenario=e2e_latency.js \
--item-data-size=1 \
--load-step-id=e2e_latency_test
Results:
topic/h7r9j6cz0mtk,0,24783469
topic/z0zoqp868v75,39948230,27580282
topic/tlp1bns9jc7p,40276731,29334974
topic/6txhtx4ljv5,40471469,31805473
topic/oxcouyv1dycw,40510647,32548057
topic/gohqxf7oafbp,40635783,33145122
topic/g6auzxpfz4id,40708983,33710917
topic/azdtimn0292s,40824477,34392279
topic/m9iflaa0ezrf,40845198,34853497
topic/r9etwfl5is4,40865059,35485910
...
topic/t54d9deejguy,2954340782,30107011025
topic/16vhzvhw11sp,2954341641,30107029774
topic/nyjq2l2uk1kd,2954342243,30107050436
topic/oh7zv7sii4nd,2954342925,30107081205
topic/7eqxrpt20pa1,2954343556,30107102342
topic/gl1w13xmz1v,2954344453,30107120725
topic/2k9i0cdxglpf,2954345143,30107141259
topic/iuswkxqxeorx,2954345930,30107164887
topic/suy5gu6ajf4n,2954346629,30107250413
topic/8b7s40r29iel,2954348924,30107249488
Each record has has the following columns:
- Item path
- Item writing start time offset in microseconds
- The calculated end-to-end latency
In this chart above, the min latency value is 24 783 469 μs, max is 30 107 249 488 μs. The ratio between max and min is ~ 30 082 466 019.
Heatmap Output:
Y axis is logarithmic between the detected latency value min and max. By default it's height is 100 px and corresponding 100 rows. X axis is linear. By default it's width is the count of pixels equal max timestamp minus min.
Mongoose should perform the load operations on the topic when the configuration option item-type
is set to path
.
Apache Kafka has AdminClient Api
, which provides function for managing and inspecting topics.
createTopics() creates a batch of new topics.
- Steps:
java -jar mongoose-base-4.2.11.jar \
--storage-driver-type=kafka \
--storage-net-node-addrs=127.0.0.1 \
--storage-net-node-port=9092 \
--item-type=path \
--load-op-limit-count=100 \
--storage-driver-limit-concurrency=100 \
This example creates 100 simple topics. Each topic has one partition and replication factor of one.
Note:
storage-driver-limit-concurrency must equal to load-batch-size because of concurrencyTrottle.acquire(batchSize).
Mongoose's implementation of a topic reading reads the whole topic in one operation. This is achieved by invocation of the poll method for each topic until it'll return an empty record collection. If collection is empty, mark the topic as read.
The read operation marks as failed when an exception occurs. According to Kafka docs the poll() method throws exceptions.
- Steps:
java -jar mongoose-base-4.2.11.jar \
--storage-driver-type=kafka \
--storage-net-node-addrs=127.0.0.1 \
--storage-net-node-port=9092 \
--item-data-size=1 \
--run-scenario=readTopicsLoad.js \
Using scenario:
PreconditionLoad
.config({
"item" : {
"type" : "data",
"output" : {
"path" : "topic1"
}
},
"load" : {
"op" : {
"limit" : {
"count" : 5
}
}
}
})
.run();
PreconditionLoad
.config({
"item" : {
"type" : "data",
"output" : {
"path" : "topic2"
}
},
"load" : {
"op" : {
"limit" : {
"count" : 5
}
}
}
})
.run();
var topic_list_file = "topic_list.csv"
ReadLoad
.config({
"item" : {
"type" : "path",
"input" : {
"file" : topic_list_file
}
},
"load" : {
"op" : {
"limit" : {
"count" : 2
},
"recycle" : true
}
}
})
.run();
This example creates two topics and writes 5 records to both of them. Then it reads each topic as a Path operation.
Note:
KafkaConsumer raises no exceptions when the user subscribes and polls to the topic which doesn't exist. So that operation will mark as SUCCESSFUL. See Transfer Size to deal with it.
Not supported
deleteTopics() deletes a batch of topics.
- Steps:
listTopics() returns list of topics
- Steps:
Scenario example:
var customKafkaHeadersConfig = {
"storage" : {
"driver" : {
"create" : {
"headers" : {
"header-name-0" : "header_value_0",
"header-name-1" : "header_value_1",
// ...
"header-name-N" : "header_value_N"
}
}
}
}
};
Load
.config(customKafkaHeadersConfig)
.run();
Note:
Don't use the command line arguments for the custom Kafka headers setting.
Scenario example, note both the parameterized header name and value:
var varKafkaHeadersConfig = {
"storage" : {
"driver" : {
"create" : {
"headers" : {
"x-amz-meta-${math:random(30) + 1}" : "${date:format("yyyy-MM-dd'T'HH:mm:ssZ").format(date:from(rnd.nextLong(time:millisSinceEpoch())))}"
}
}
}
}
};
Load
.config(varKafkaHeadersConfig)
.run();
Note about KAFKA benchmark:
Set KAFKA_HEAP_OPTS="-Xmx1024M" in kafka-run-class.sh
Command line example of KAFKA benchmark:
./bin/kafka-run-class.sh \
org.apache.kafka.tools.ProducerPerformance --throughput=-1 \
--topic=test-one \
--num-records=2000000 \
--record-size=1 \
--producer-props bootstrap.servers=localhost:9092 \
buffer.memory=33554432 \
batch.size=200
Result:
2000000 records sent,
16953.750170 records/sec (0.02 MB/sec),
58455.55 ms avg latency,
80970.00 ms max latency,
57215 ms 50th,
79765 ms 95th,
80630 ms 99th,
80933 ms 99.9th.
Command line example of KAFKA storage driver:
docker run --network host \
emcmongoose/mongoose-storage-driver-kafka:4.2.8 \
--load-batch-size=200 \
--load-op-limit-count=2000000 \
--storage-driver-threads=1 \
--storage-driver-limit-concurrency=0 \
--item-data-size=1 \
--storage-driver-limit-queue-input=5000
Result:
- Load Step Id: linear_20190607.181733.007
Operation Type: CREATE
Node Count: 1
Concurrency:
Limit Per Storage Driver: 0
Actual:
Last: 1
Mean: 0.9955257270693513
Operations Count:
Successful: 2000000
Failed: 0
Transfer Size: 1.907MB
Duration [s]:
Elapsed: 93.913
Sum: 9763.191236
Throughput [op/s]:
Last: 31365.05419108817
Mean: 21505.37634408602
Bandwidth [MB/s]:
Last: 0.029912046614730996
Mean: 0.020509125084005375
Operations Duration [us]:
Avg: 4881.9910592758015
Min: 275
LoQ: 880
Med: 1426
HiQ: 1973
Max: 998292
Operations Latency [us]:
Avg: 4876.804555168968
Min: 9
LoQ: 879
Med: 1424
HiQ: 1971
Max: 344294
...
Computer configuration:
- OS - Ubuntu 18.04.2 LTS
- Memory - 3.8 GiB
- Processor - Intel® Core™ i5-6200U CPU @ 2.30GHz × 4
- OS type - 64-bit
Issue | Description |
---|
Use command below to build the driver
./gradlew clean jar
-
Copy the storage driver's jar file into the mongoose's
ext
directory:cp -f build/libs/mongoose-storage-driver-kafka-*.jar ~/.mongoose/<MONGOOSE_BASE_VERSION>/ext/
Note that the Kafka storage driver depends on the Preemptive Storage Driver extension so it should be also put into the
ext
directory -
Build and install the corresponding Kafka version.
-
Run the Kafka standalone node:
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
-
Run Mongoose's default scenario with some specific command-line arguments:
java -jar mongoose-base-<BASE_VERSION>.jar \ --storage-driver-type=kafka \ --storage-net-node-addrs=<NODE_IP_ADDRS> \ --storage-net-node-port=9092 \ --item-data-size=1KB \ --load-op-limit-count=100 \
./gradlew clean test
Note:
To run integration tests manually you need to run KAFKA
./gradlew integrationTest
./gradlew jar
export SUITE=api.storage
TEST=create_record ./gradlew robotest