Skip to content

Commit a99d70c

Browse files
Add data ingestion sample
1 parent 35dfea3 commit a99d70c

File tree

2 files changed

+168
-0
lines changed

2 files changed

+168
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
{
2+
"doc": "Schema for avro.",
3+
"name": "Products",
4+
"namespace": "products.avro",
5+
"type": "record",
6+
"fields": [
7+
{
8+
"name": "product_id",
9+
"type": "string"
10+
},
11+
{
12+
"name": "name",
13+
"type": "string"
14+
},
15+
{
16+
"name": "price",
17+
"type": "float"
18+
},
19+
{
20+
"name": "fulfillment_type",
21+
"type": "string"
22+
},
23+
{
24+
"name": "brand",
25+
"type": "string"
26+
},
27+
{
28+
"name": "review_count",
29+
"type": "int"
30+
},
31+
{
32+
"name": "rating_average",
33+
"type": "float"
34+
},
35+
{
36+
"name": "current_seller",
37+
"type": "string"
38+
},
39+
{
40+
"name": "category",
41+
"type": "string"
42+
},
43+
{
44+
"name": "quantity_sold",
45+
"type": "int"
46+
}
47+
]
48+
}

data-ingestion/kafka_producer.py

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import argparse
2+
import io
3+
import json
4+
from datetime import datetime
5+
from time import sleep
6+
7+
import numpy as np
8+
from bson import json_util
9+
from kafka import KafkaAdminClient, KafkaProducer
10+
from kafka.admin import NewTopic
11+
12+
parser = argparse.ArgumentParser()
13+
parser.add_argument(
14+
"-m",
15+
"--mode",
16+
default="setup",
17+
choices=["setup", "teardown"],
18+
help="Whether to setup or teardown a Kafka topic with driver stats events. Setup will teardown before beginning emitting events.",
19+
)
20+
parser.add_argument(
21+
"-b",
22+
"--bootstrap_servers",
23+
default="localhost:9092",
24+
help="Where the bootstrap server is",
25+
)
26+
parser.add_argument(
27+
"-c",
28+
"--schemas_path",
29+
default="./avro_schemas",
30+
help="Folder containing all generated avro schemas",
31+
)
32+
33+
args = parser.parse_args()
34+
35+
# Define some constants
36+
NUM_PRODUCTS = 1
37+
38+
def create_topic(admin, topic_name):
39+
# Create topic if not exists
40+
try:
41+
# Create Kafka topic
42+
topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
43+
admin.create_topics([topic])
44+
print(f"A new topic {topic_name} has been created!")
45+
except Exception:
46+
print(f"Topic {topic_name} already exists. Skipping creation!")
47+
pass
48+
49+
def create_streams(servers, schemas_path):
50+
producer = None
51+
admin = None
52+
for _ in range(10):
53+
try:
54+
producer = KafkaProducer(bootstrap_servers=servers)
55+
admin = KafkaAdminClient(bootstrap_servers=servers)
56+
print("Success: instantiated Kafka admin and producer.")
57+
except Exception as e:
58+
print(
59+
f"Trying to instantiate admin and producer with bootstrap servers {servers} with error {e}"
60+
)
61+
sleep(10)
62+
pass
63+
64+
while True:
65+
data = {}
66+
# Make event one more year recent to simulate fresher data
67+
data["created"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
68+
data["product_id"] = np.random.randint(low=0, high=NUM_PRODUCTS)
69+
70+
schema_path = f"{schemas_path}/products_schema_{data['product_id']}.avsc"
71+
with open(schema_path, "r") as f:
72+
parsed_schema = json.loads(f.read())
73+
74+
for field in parsed_schema["fields"]:
75+
if field["name"] not in ["created", "product_id"]:
76+
data[field["name"]] = np.random.rand()
77+
78+
79+
record = {
80+
"schema": {"type" : "struct", "fields": parsed_schema["fields"]},
81+
"payload": data,
82+
}
83+
84+
topic_name = "product_input"
85+
86+
create_topic(admin = admin, topic_name=topic_name)
87+
88+
# Send message to the topic
89+
producer.send(
90+
topic_name, json.dumps(record, default = json_util.default).encode("utf-8")
91+
)
92+
print(record)
93+
sleep(2)
94+
95+
def teardown_stream(topic_name, server = ["localhost:9092"]):
96+
try:
97+
admin = KafkaAdminClient(bootstrap_servers = server)
98+
print(admin.delete_topics([topic_name]))
99+
print(f"Topic {topic_name} deleted")
100+
except Exception as e:
101+
print(str(e))
102+
pass
103+
104+
105+
if __name__ == "__main__":
106+
parsed_args = vars(args)
107+
mode = parsed_args["mode"]
108+
servers = parsed_args["bootstrap_servers"]
109+
110+
# Tear down all previous streams
111+
print("Tearing down all existing topics!")
112+
for device_id in range(NUM_PRODUCTS):
113+
try:
114+
teardown_stream(f"device_{device_id}", [servers])
115+
except Exception as e:
116+
print(f"Topic device_{device_id} does not exist. Skipping...!")
117+
118+
if mode == "setup":
119+
schemas_path = parsed_args["schemas_path"]
120+
create_streams([servers], schemas_path)

0 commit comments

Comments
 (0)