Skip to content

Add a native MySQL Input connector #211

@zxqfd555

Description

@zxqfd555

Is your feature request related to a problem? Please describe.

Pathway already has a MySQL output connector (pw.io.mysql.write), but there is no corresponding input connector. Users who want to stream data from MySQL into Pathway pipelines must implement custom workarounds on the Python side, which is brittle and bypasses all the engine-level optimizations. We need a first-class, native pw.io.mysql.read connector.

Describe the solution you'd like

Add a native MySQL input connector implemented fully in Rust, exposed as pw.io.mysql.read, consistent with the existing connector patterns (PostgreSQL, MS SQL Server, MongoDB).

Crates to use:

  • mysql v26+ — MIT OR Apache-2.0 license. Already used by the existing MySQL output connector; handles static polling, query execution, and type deserialization.
  • mysql_cdc v0.1.5 — MIT license. Pure-Rust binlog replication client implementing the MySQL replica protocol. Used for streaming (CDC) mode only.

Read modes:

Static mode — execute a single SELECT query, snapshot the result set, emit all rows as insertions, then close the connection. Uses only the mysql crate. Requires no special server configuration, no binlog access, no replication privileges — a plain read-only database user is sufficient.

Streaming mode (CDC via binlog replication) — register as a replica using mysql_cdc::BinlogClient, then consume a stream of WriteRowsEvent, UpdateRowsEvent, and DeleteRowsEvent entries from the MySQL binary log in ROW format. This is the correct approach for real-time CDC.

CDC approach and tradeoffs:

MySQL's binlog-based CDC works by having the client masquerade as a MySQL replica. The server sends binlog events over the replication protocol. The client tracks its position either as a (binlog filename, byte offset) pair or, preferably, as a GTID set (mysql_cdc::BinlogOptions::from_mysql_gtid). GTID-based tracking is strongly recommended because it survives source failover and server restarts without manual position recalculation.

Tradeoffs vs. the PostgreSQL situation:

MySQL binlog PostgreSQL logical replication slot
Server-side WAL retention Controlled by binlog_expire_logs_seconds (default: 30 days). Binlog files are rotated and deleted on schedule regardless of whether any replica has consumed them. The replication slot holds WAL until the consumer acknowledges. This can cause unbounded WAL accumulation if the consumer is slow or disconnected — the "WAL retention trap".
Risk of falling behind If Pathway is offline longer than the binlog retention window, the saved position becomes invalid and a full re-snapshot is required. Slot lag causes disk exhaustion on the server.
Consumer-side persistence Store the current GTID set in Pathway's persistence layer after each committed transaction. On restart, resume from that GTID set. Store LSN in the persistence layer; slot holds server-side state.

This means MySQL CDC does not have the WAL retention trap that PostgreSQL has (replication slots can bloat disk), but it has the mirror problem: the server does not hold binlog for you. If the persistence state is lost or the connector is offline too long, the saved GTID set may reference binlog files that no longer exist.

Persistence handling:

  • In streaming mode, persist the current GTID set to Pathway's persistence storage after every successfully processed transaction (BinlogClient::commit). On restart, resume via BinlogOptions::from_mysql_gtid(saved_gtid_set).
  • If the saved GTID set references a binlog file that no longer exists on the server (the server will respond with an error upon replication handshake), the connector must raise a descriptive error at startup — something like: "Saved binlog position is no longer available on the MySQL server (binlog may have been purged). Manual re-snapshot or persistence reset is required." Do not silently fall back to from_start() or from_end(), as that would produce incorrect results (data loss or duplicate emission).
  • This error path must have a corresponding code comment referencing this issue so future contributors understand the design decision.
  • Static mode uses no persistence mechanism and does not interact with the binlog at all.

Implementation scope:

  • Python API: pw.io.mysql.read(host, port, user, password, database, table, schema, *, mode="static"|"streaming", ...)
  • Rust connector: static polling via the mysql crate only; streaming via mysql_cdc
  • Type mapping: cover all Pathway-supported types, mirroring the existing output connector's mapping
  • Integration tests (following the pattern of PostgreSQL, MS SQL Server, MongoDB integration tests):
    • Read in static mode
    • Read in streaming mode
    • Field parsing tests: for each MySQL column type that has a Pathway equivalent, perform a round-trip (write via pw.io.mysql.write, read back via pw.io.mysql.read in both static and streaming mode, assert data equality) — similar to the existing *_parsing tests
    • A Docker Compose setup with a MySQL container to run the integration tests in CI

Describe alternatives you've considered

One alternative for streaming would be polling-based CDC: periodically SELECT rows modified after a stored updated_at timestamp. This avoids the need for binlog access but requires the table to have an appropriate timestamp column, misses hard deletes entirely, and adds query load to the source database. It is not suitable as the primary streaming mechanism.

Another alternative is using an external CDC tool (e.g., Debezium) as a bridge, but that defeats the purpose of a native connector.

Additional context

  • The existing pw.io.mysql.write connector uses the mysql crate (MIT OR Apache-2.0), so both new dependencies are license-compatible with Pathway's BSL 1.1 / Apache 2.0 dual licensing.
  • MySQL server must have binlog_format = ROW and binary logging enabled (log_bin = ON) for streaming mode to work. The connector should verify this at connection time and emit a clear error if not. Static mode has no such requirements.
  • mysql_cdc does not currently support SSL. This is a known limitation that should be documented in the Python API docstring and flagged with a // TODO in the Rust code for streaming mode.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions