Extensibility in Connectors API #9175
-
Hello, Thanks for this very cool product. The native Postgres CDC support you have is awesome, and I see you have some other items you are researching as well. I'm wondering if you have currently (or in future plans) an extensible API for developers to write their own connectors? Put differently, if there is a connector my team is really hungry for (in this case, MongoDB change streams) would it be crazy to try to roll our own hookup? Best, Abe |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 5 replies
-
Hi, @lermana Thank you for your appreciation of this project. At present, RisingWave has no plans to expose its source connector API, for two reasons:
Therefore, we suggest using Kafka Connect, which is mature and widely-used, or other ETL tools to normalize the stream before ingesting it into RisingWave. For MongoDB, you can set up a MongoDB Kafka Connector to consumes the Change Streams into Kafka in the JSON format. You can find more information about this connector at https://www.mongodb.com/docs/kafka-connector/current/ . Using this existing connector will likely be much easier than writing your own. Since [RisingWave supports the jsonb type](https://www.risingwave.dev/docs/current/data-type-jsonb/), it's technically possible to first ingest the json change streams into RisingWave and transform the data as desired in the materialized view phase. Here is a sample of the Change Stream in JSON (https://www.mongodb.com/docs/kafka-connector/current/introduction/data-formats/#json): {
"_id": {
"ts": {
"$timestamp": {
"t": 1649964602,
"i": 1
}
},
"h": 1560227458199354643,
"v": 2,
"op": "i",
"ns": "mydb.mycol"
},
"fullDocument": {
"_id": ObjectId("61edc26e8b462f82196bde90"),
"name": "John Smith",
"email": "[email protected]",
"age": 35
},
"ns": {
"db": "mydb",
"coll": "mycol"
},
"documentKey": {
"_id": ObjectId("61edc26e8b462f82196bde90")
}
} I can help build a demo for this integration in the coming days. Before we proceed, do you have any concerns about this solution? |
Beta Was this translation helpful? Give feedback.
-
Hi Tao,
Anytime, and thanks for the quick and detailed response!
I definitely understand where you're coming from around not publicizing an
extensible API for CDC connectors. I was kind of hoping we could leverage
RisingWave as an all-in-one (we're definitely excited about streaming
Postgres changes right into a data-persisting system that we can also
execute SQL against) but maybe using Kafka as an intermediary for MongoDB
change streams wouldn't be so bad.
Just to make sure I understand your point about jsonb data, can I ask you
to clarify whether you mean that a) that's how we'd pull the Mongo streams
in from a Kafka stream or b) that we could theoretically write raw Mongo
change streams to RisingWave (which could transform further as needed), but
that we just need something (not necessarily Kafka) to capture changes from
Mongo and execute the writes into RisingWave?
Also, if I'm not being annoying, may I ask whether you currently (or in the
future plan to) support anything in the realm of Kubernetes event capture?
Thanks,
Abe
…On Thu, Apr 13, 2023 at 10:13 PM Tao Wu ***@***.***> wrote:
Hi, @lermana <https://github.com/lermana>
Thank you for your appreciation of this project. At present, RisingWave
has no plans to expose its source connector API, for two reasons:
1. While we can provide a framework, we cannot guarantee the stability
of its API in the short term.
2. Developing a production-ready CDC (change data capture) tool is
typically more difficult than people might expect. The complexity lies in
several aspects, including security, rate-limiting, monitoring, and
compatibility, etc.
Therefore, we suggest using Kafka Connect, which is mature and
widely-used, or other ETL tools to normalize the stream before ingesting it
into RisingWave. For MongoDB, you can set up a MongoDB Kafka Connector to
consumes the Change Streams into Kafka in the JSON format. You can find
more information about this connector at
https://www.mongodb.com/docs/kafka-connector/current/ . Using this
existing connector will likely be much easier than writing your own.
Since [RisingWave supports the jsonb type
<https://www.risingwave.dev/docs/current/data-type-jsonb/>](
https://www.risingwave.dev/docs/current/data-type-jsonb/), it's
technically possible to first ingest the json change streams into
RisingWave and transform the data as desired in the materialized view
phase. Here is a sample of the Change Stream in JSON (
https://www.mongodb.com/docs/kafka-connector/current/introduction/data-formats/#json
):
{
"_id": {
"ts": {
"$timestamp": {
"t": 1649964602,
"i": 1
}
},
"h": 1560227458199354643,
"v": 2,
"op": "i",
"ns": "mydb.mycol"
},
"fullDocument": {
"_id": ObjectId("61edc26e8b462f82196bde90"),
"name": "John Smith",
"email": ***@***.***",
"age": 35
},
"ns": {
"db": "mydb",
"coll": "mycol"
},
"documentKey": {
"_id": ObjectId("61edc26e8b462f82196bde90")
}
}
I can help build a demo for this integration in the coming days. Before we
proceed, do you have any concerns about this solution?
—
Reply to this email directly, view it on GitHub
<#9175 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ACQEPE4C5DB3ECA4EAJYHGTXBDFFLANCNFSM6AAAAAAW5VREVM>
.
You are receiving this because you were mentioned.Message ID:
***@***.***
com>
--
Abe Lerman
|
Beta Was this translation helpful? Give feedback.
-
Awesome, would love to see that.
On Thu, Apr 13, 2023 at 11:48 PM Tao Wu ***@***.***> wrote:
a) that's how we'd pull the Mongo streams
in from a Kafka stream or b) that we could theoretically write raw Mongo
change streams to RisingWave (which could transform further as needed), but
that we just need something (not necessarily Kafka) to capture changes from
Mongo and execute the writes into RisingWave?
I plan to write a demo to showcase how to achieve that. While I may not be
able to provide a quick answer, based on my prior experience, I believe
it's achievable. I can deliver this demo to you by tomorrow.
—
Reply to this email directly, view it on GitHub
<#9175 (reply in thread)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ACQEPE4NK4EKUS2LID7NAEDXBDQMRANCNFSM6AAAAAAW5VREVM>
.
You are receiving this because you were mentioned.Message ID:
***@***.***
com>
--
Abe Lerman
|
Beta Was this translation helpful? Give feedback.
-
Thanks Tao, will check this out! Appreciate the detail. Quick question: do
you recommend Kafka over Flink for this (like does RW integrate better with
Kafka)? Wondering because I think Flink has a Mongo CDC hookup too.
On Mon, Apr 17, 2023 at 8:00 PM Tao Wu ***@***.***> wrote:
Hi @lermana <https://github.com/lermana> FYI the MongoDB + Debezium Demo
will be delayed due to an Debezium format issue that we previously ignored.
#9243 <#9243> We will
fix it as soon as possible. Before that, you could ingest sample JSONB data
into RisingWave using the INSERT command to test the other functionalities
of RisingWave.
In term of the set up of the ingestion process, I've built a docker
compose to showcase how to extract the mongodb change streams to Kafka
https://github.com/risingwavelabs/risingwave/pull/9235/files (under
debezium-mongo):
debezium:
image: debezium/connect:1.9
container_name: debezium
depends_on:
- message_queue
- mongodb
ports:
- "8083:8083"
healthcheck:
test: curl -f localhost:8083
interval: 1s
start_period: 120s
environment:
BOOTSTRAP_SERVERS: message_queue:29092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: debezium_mongodb_config
OFFSET_STORAGE_TOPIC: debezium_mongodb_offset
STATUS_STORAGE_TOPIC: debezium_mongodb_status
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://message_queue:8081
register-mongodb-connector:
image: curlimages/curl:7.79.1
container_name: register-mongodb-connector
depends_on:
debezium: { condition: service_healthy }
command: > /bin/sh -c " echo 'Waiting for Kafka Connect to start...' && while ! curl -s http://debezium:8083/ | grep -q 'version'; do sleep 5; done && echo 'Kafka Connect started.' && echo 'Registering MongoDB connector...' && curl -X POST -H 'Content-Type: application/json' --data '{ \"name\": \"mongodb-connector\", \"config\": { \"connector.class\": \"io.debezium.connector.mongodb.MongoDbConnector\", \"tasks.max\": \"1\", \"mongodb.hosts\": \"mongodb:27017\", \"mongodb.name\": \"dbserver1\", \"mongodb.user\": \"admin\", \"mongodb.password\": \"admin123\", \"database.history.kafka.bootstrap.servers\": \"message_queue:29092\", \"database.include.list\" : \"random_data\" } }' http://debezium:8083/connectors && echo 'MongoDB connector registered.'" restart: always
Essentially, we simply need to set up a Kafka Connect server,
pre-installed with the debezium plugin, and register the mongodb cdc task
to it. Then it will continuously consume the MongoDB changelogs.
—
Reply to this email directly, view it on GitHub
<#9175 (reply in thread)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ACQEPE2RJUGJXT3KIWJWXFTXBXYVTANCNFSM6AAAAAAW5VREVM>
.
You are receiving this because you were mentioned.Message ID:
***@***.***
com>
--
Abe Lerman
|
Beta Was this translation helpful? Give feedback.
Hi, @lermana
Thank you for your appreciation of this project. At present, RisingWave has no plans to expose its source connector API, for two reasons:
Therefore, we suggest using Kafka Connect, which is mature and widely-used, or other ETL tools to normalize the stream before ingesting it into RisingWave. For MongoDB, you can set up a MongoDB Kafka Connector to consume…