Skip to content
This repository was archived by the owner on Sep 25, 2020. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,51 @@
kafkaquery
# CodeFeedr KafkaQuery

CodeFeedr KafkaQuery is a tool that allow to operate on JSON data in a Kafka instance using FlinkSQL.

## Getting Started

Run ```sbt pack``` to create a package folder which includes program launch scripts ìn the following directory: ```kafkatime/target/pack/bin/kakfaquery```

Add CodeFeedr to you path:
```
export PATH=$PATH:<path to bin folder>

e.g. export PATH=$PATH:/mnt/c/Users/MyName/Documents/kafkaquery/kafkatime/target/pack/bin
```

Specify Kafka and Zookeeper addresses either in your environment variables or as [arguments](#address) when executing the launch script.<br/>


| Property | Default value |Environment variable name (optional) |
|-------------------|----------------|-------------------------------------|
| Kafka Address | localhost:9092 |KAFKA_ADDR |
| ZooKeeper Address | localhost:2181 |ZK_ADDR |

## Usage

Execute the codefeedr script which can be found here ```kafkaquery/kafkatime/target/pack/bin```

//TODO record a nice video <br/>
![Some Text](https://media.giphy.com/media/xT9IgzoKnwFNmISR8I/giphy.gif)

## Commands


| Command | Description | Example |
|--------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| --help <br/> -h | Lists all available commands & options. | --help |
| --topics | Lists all available topics. | --topics |
| --topic <topic_name> | Displays information about data format in the specified topic. | --topic "pypi_releases_min" |
| --schema <topic_name>=<avro_Schema> | Updates (or adds if not present) the schema for the specified topic. | --schema "my_topic"="{\"type\":\"record\",\"name\":\"Person\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"city\",\"type\":\"string\"}]}" |
| --infer-schema <topic_name> | Infers the Avro schema from the last record in the specified topic and registers it. | --infer-schema "my_topic" |
| --zookeeper <zookeeper_address> <a id="address"></a> | Sets the ZooKeeper address to the specified one for this execution. The default address is taken from the environment variable ZK_ADDR or if not present "localhost:2181" will be used. | --zookeeper 192.168.1.10:4242 |
| --kafka <kafka_address> | Sets the Kafka address to the specified one for this execution. The default address is taken from the environment variable KAFKA_ADDR or if not present "localhost:9092" will be used. | --kafka 192.168.1.10:9161 |
| --query \<query> <br/> -q \<query> | Executes query using [Flink SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html) and writes the result to the console. | --query "SELECT title FROM pypi_releases_min" |
| | | |
| <b>Options for --query</b> | | |
| -q \<query> --port \<port> <br/> -q \<query> -p \<port> | Executes query and writes result to local socket on the specified port. | -q "SELECT crate.id FROM crate_releases_min" -p 1234 |
| -q \<query> --kafka-topic \<name> <br/> -q \<query> -k \<name> | Executes query and writes result to the specified Kafka topic. | -q "SELECT crate.id FROM crate_releases_min" -k "myTopic" |
| -q \<query> --timeout \<duration> <br/> -q \<query> -t \<duration> | Executes query, terminates the program once there have been no new records for the specified duration (in seconds). TO BE DISCUSSED | -q "SELECT crate.id FROM crate_releases_min" -t 42 |
| -q \<query> --from-earliest | Executes query and specify that the query results should be printed starting from the earliest retrievals. By default, the query output will be printed staring from earliest. | -q "SELECT crate.id FROM crate_releases_min" --from-earliest |
| -q \<query> --from-latest | Executes query and specify that the query results should be printed starting from the latest retrievals. | -q "SELECT crate.id FROM crate_releases_min" --from-latest |

Binary file removed docs/Architecture Diagram.pdf
Binary file not shown.
66 changes: 0 additions & 66 deletions docs/CLI docs.md

This file was deleted.

28 changes: 28 additions & 0 deletions docs/CodeFeedrCoreSupport.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
## Support for a CodeFeedr core plugin

To add support for a CodeFeedr plugin the [input stages](http://codefeedr.org/codefeedr/mydoc_pipeline.html#inputstage) that produce for requested topics need to override the getSchema method.
The method should return an AvroSchema of the output type of the stage generated by avro4s.
The
To update the information about the data schema in Zookeeper, every input stage must be run as a part of a CodeFeedr pipeline at least once. <br/>
Supported output types are scala case classes (possibly nested), all primitives, Dates, SQL Timestamps, Arrays & Lists.<br/>
The following is an example for the PyPI plugin:
```Scala

import com.sksamuel.avro4s.AvroSchema

class PyPiReleasesStage(stageId: String = "pypi_releases_min", sourceConfig: PyPiSourceConfig = PyPiSourceConfig()) extends InputStage[PyPiRelease](Some(stageId)) {

override def main(context: Context): DataStream[PyPiRelease] =
context.env
.addSource(new PyPiReleasesSource(sourceConfig))

override def getSchema: Schema = {
implicit val dateSchema: DateSchemaFor = new DateSchemaFor(true)
AvroSchema[PyPiRelease]
}
}
```
Don't forget to specify the zookeeper address in the buffer properties.
```Scala
new PipelineBuilder().setBufferProperty(KafkaBuffer.ZOOKEEPER, "localhost:2181")
```
19 changes: 0 additions & 19 deletions docs/Description_DFD.md

This file was deleted.

Binary file removed docs/Extended Architecture Diagram.pdf
Binary file not shown.
Binary file removed docs/REPL_DFD.pdf
Binary file not shown.