Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Combine Historical and Incremental Data #85

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

fuyufjh
Copy link
Member

@fuyufjh fuyufjh commented Jan 12, 2024

@st1page
Copy link
Contributor

st1page commented May 20, 2024

We encountered an interesting problem: what happens when users want to define a watermark on this table? If we directly apply the ideas from this RFC, because the order of data inserted into the table by batch queries is unordered, unexpected records will likely be expired and deleted by the watermark. I think there are a few possibilities in my thoughts.

  1. (Completely unfeasible) Define a watermark only on the Kafka source and sink it into the table. This actually cannot achieve the semantics that users want. Because the table after the source will erase all the watermark semantics that users want, and the watermark cannot be used downstream.
  2. Use insert ... select ... order by time asc to insert data into the table with a watermark definition in time order. In this case, we may need to ensure these:
    • In the shuffle from BatchInsertExecutor to StreamDMLExecutor, we need to adopt a sort merge or no shuffle method to ensure that the order of the insertion stream is the same as the output of the batch, maintaining the order.
    • (Optimization) We need to implement a sort scan from the Iceberg source (push down of order by).
  3. Support the ALTER TABLE ... ADD WATERMARK ... syntax. In this method, users need to first import historical data into a table without a watermark. Then add a watermark to the table, and subsequently construct the upstream (subscribe to changes from Kafka) and downstream (stream processing logic). The problem with this approach is that when we have a materialized view on the table, we cannot send watermarks downstream during the backfill of historical data, which is inefficient or even unacceptable for some stream operators that rely on watermarks. If users use features related to EOWC (End of Watermark Collection), they may also have to accept sending a large amount of data downstream when the first watermark arrives.

@fuyufjh
Copy link
Member Author

fuyufjh commented May 20, 2024

The 3rd proposal ALTER TABLE ... ADD WATERMARK ... sounds very limited. It requires the table has no downstream streaming jobs associated, which asks the users to prepare everything before doing it in fact. It works but not quite friendly.

I slightly prefer the 2nd proposal insert ... select ... [order by time asc]. In my mind, the order by clause here is not enforced. For example, if the users ingested data from Kafka source first e.g. a topic named historical_events, the events will be almost ordered naturally. Furthermore, if Iceberg can provide such almost ordered reading, it's also doable. (According to Iceberg - Flink queries docs, perhaps there is such a method)

Copy link
Member

@xxchan xxchan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some new ideas by @st1page from https://risingwave-labs.slack.com/archives/C07CU2YBKCG/p1721184731055789

Since we are adding batch read function risingwavelabs/risingwave#17673, we can combine a batch query with a connector. Then we don't need a batch source and no need to ALTER TABLE any more.

tentative syntax:

CREATE TABLE orders (
    order_id INT,
    customer_name VARCHAR,
    data JSONB,
    PRIMARY KEY (order_id, customer_name)
) INITIAL WITH SELECT * FROM file_scan(
  'parquet',
  's3',
  'ap-southeast-2',
  'xxxxxxxxxx',
  'yyyyyyyy',
  's3://your-bucket/path/to/*'
);
WITH (
  connector = 'kinesis',
  stream = 'wkx-dynamo-orders',
  scan.startup.mode='earliest',
  aws.region = 'us-east-1',
  kinesis.credentials.access = 'ABCDEFG',
  kinesis.credentials.secret = 'abcdefg',
) FORMAT DYNAMODB_CDC ENCODE JSON;

@xiangjinwu : Can be achieved by pause_on_create + insert into t select + resume

@fuyufjh
Copy link
Member Author

fuyufjh commented Jul 18, 2024

we can combine a batch query with a connector. Then we don't need a batch source and no need to ALTER TABLE any more.

Is it just CREATE TABLE AS <select-query>?

Here, taking your example, the columns in the table definition and the columns in SELECT * actually duplicates. To eliminate the duplication, we will get something like CREATE TABLE AS.

@xxchan
Copy link
Member

xxchan commented Jul 18, 2024

Is it just CREATE TABLE AS <select-query>?

Yes, I asked the same question. 😄 @st1page feels for the specific needs, the syntax CTAS is weird, so he wants to introduce a separated syntax.

Specifically,

  • INITIAL WITH has an order
  • We can do INITIAL WITH for source. The syntax CTAS is reasonable, but CSAS looks weird (to him)

@fuyufjh
Copy link
Member Author

fuyufjh commented Jul 18, 2024

  • INITIAL WITH has an order

Order is not that important when processing historical data. Particularly, considering multiple parallelism, the order might be less useful to users.

  • We can do INITIAL WITH for source. The syntax CTAS is reasonable, but CSAS looks weird (to him)

I feel CREATE SOURCE + INITIAL WITH brings more confusing if you take backfilling into consideration.


Hmmm, overall, I feel this is not better than the idea of CREATE TABLE/SOURCE with pause_on_create = true 😀

@st1page
Copy link
Contributor

st1page commented Jul 18, 2024

  • INITIAL WITH has an order

Order is not that important when processing historical data. Particularly, considering multiple parallelism, the order might be less useful to users.

Hmmm, overall, I feel this is not better than the idea of CREATE TABLE/SOURCE with pause_on_create = true 😀

LGTM. detailly we need

  • a with option pause_on_create
  • a new statement RESUME table_name;

And we might need to discuss about insert statment on the source later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants