mysql
Latest release
- conduit-connector-mysql_0.1.3_Darwin_arm64.tar.gz
- conduit-connector-mysql_0.1.3_Darwin_x86_64.tar.gz
- conduit-connector-mysql_0.1.3_Linux_arm64.tar.gz
- conduit-connector-mysql_0.1.3_Linux_x86_64.tar.gz
- conduit-connector-mysql_0.1.3_Windows_arm64.tar.gz
- conduit-connector-mysql_0.1.3_Windows_x86_64.tar.gz
Description
Source
A source connector pulls data from an external resource and pushes it to downstream resources via Conduit.
Snapshot mode
Snapshot mode is the first stage of the source sync process. It reads all rows from the configured tables as record snapshots.
In snapshot mode, the record payload consists of opencdc.StructuredData, with each key being a column and each value being that column's value.
Change Data Capture mode
When the connector switches to CDC mode, it starts streaming changes from the obtained position at the start of the snapshot. It uses the row-based binlog format to capture detailed changes at the individual row level.
Unsafe snapshot
By default, the connector will error out if it finds a table that has no primary key and no specified sorting column specified, as we can't guarantee that the snapshot will be consistent. Table changes during the snapshot will be however captured by CDC mode.
As of writing, the unsafe snapshot is implemented using batches with LIMIT
and OFFSET
, so expect it to be slow for large tables. You can optimize the snapshot by specifying a sorting column (see source configuration parameters).
The position of the table is currently not recorded, so the unsafe snapshot will restart from zero every time.
Schema
The source connector uses avro to decode mysql rows. Here's the MySQL datatype to avro datatype equivalence that the connector uses:
MySQL Type | Avro Type |
---|---|
TINYINT | int |
SMALLINT | int |
MEDIUMINT | int |
INT | int |
BIGINT | long |
DECIMAL | double |
NUMERIC | double |
FLOAT | double |
DOUBLE | double |
BIT | bytes |
CHAR | string |
VARCHAR | string |
TINYTEXT | string |
TEXT | string |
MEDIUMTEXT | string |
LONGTEXT | string |
BINARY | bytes |
VARBINARY | bytes |
TINYBLOB | bytes |
BLOB | bytes |
MEDIUMBLOB | bytes |
LONGBLOB | bytes |
DATE | string |
TIME | string |
DATETIME | string |
TIMESTAMP | string |
YEAR | long |
ENUM | string |
SET | string |
JSON | string |
Destination
The MySQL destination takes a opencdc.Record
and parses it into a valid SQL query. Each record is individually parsed and upserted. Writing in batches is planned to be implemented, which should greatly improve performance over the current implementation.
Upsert Behavior
If the target table contains a column with a unique constraint (this includes PRIMARY KEY and UNIQUE indexes), records will be upserted. Otherwise, they will be appended. Support for updating tables without unique constraints is tracked here.
If the target table already contains a record with the same key, the Destination will upsert with its current received values. Because Keys must be unique, this can overwrite and thus potentially lose data, so keys should be assigned correctly from the Source.
If there is no key, the record will be simply appended.
Multicollection mode
(Planned to do). You can upvote the following issue to add more interest on getting this feature implemented sooner.
Requirements and compatibility
The connector is tested against MySQL v8.0. Compatibility with older versions isn't guaranteed.
MySQL Server Requirements:
- Binary Log (binlog) must be enabled.
- Binlog format must be set to ROW.
- Binlog row image must be set to FULL.
- Tables must have sortable primary keys.
MySQL User Privileges:
For Snapshot and CDC modes, the following privileges are required:
- SELECT
- LOCK TABLES
- RELOAD
- REPLICATION CLIENT
- REPLICATION SLAVE
Source Parameters
version: 2.2
pipelines:
- id: example
status: running
connectors:
- id: example-source
type: source
plugin: "mysql"
name: example-source
settings:
# DSN is the connection string for the MySQL database.
# Type: string
dsn: ""
# Tables represents the tables to read from. - By default, no tables
# are included, but can be modified by adding a comma-separated string
# of regex patterns. - They are applied in the order that they are
# provided, so the final regex supersedes all previous ones. - To
# include all tables, use "*". You can then filter that list by adding
# a comma-separated string of regex patterns. - To set an "include"
# regex, add "+" or nothing in front of the regex. - To set an
# "exclude" regex, add "-" in front of the regex. - e.g. "-.*meta$,
# wp_postmeta" will exclude all tables ending with "meta" but include
# the table "wp_postmeta".
# Type: string
tables: ""
# DisableCanalLogs disables verbose logs.
# Type: bool
disableCanalLogs: ""
# FetchSize limits how many rows should be retrieved on each database
# fetch.
# Type: int
fetchSize: "10000"
# SortingColumn allows to force using a custom column to sort the
# snapshot.
# Type: string
tableConfig.*.sortingColumn: ""
# UnsafeSnapshot allows a snapshot of a table with neither a primary
# key nor a defined sorting column. The opencdc.Position won't record
# the last record read from a table.
# Type: bool
unsafeSnapshot: ""
# Maximum delay before an incomplete batch is read from the source.
# Type: duration
sdk.batch.delay: "0"
# Maximum size of batch before it gets read from the source.
# Type: int
sdk.batch.size: "0"
# Specifies whether to use a schema context name. If set to false, no
# schema context name will be used, and schemas will be saved with the
# subject name specified in the connector (not safe because of name
# conflicts).
# Type: bool
sdk.schema.context.enabled: "true"
# Schema context name to be used. Used as a prefix for all schema
# subject names. If empty, defaults to the connector ID.
# Type: string
sdk.schema.context.name: ""
# Whether to extract and encode the record key with a schema.
# Type: bool
sdk.schema.extract.key.enabled: "true"
# The subject of the key schema. If the record metadata contains the
# field "opencdc.collection" it is prepended to the subject name and
# separated with a dot.
# Type: string
sdk.schema.extract.key.subject: "key"
# Whether to extract and encode the record payload with a schema.
# Type: bool
sdk.schema.extract.payload.enabled: "true"
# The subject of the payload schema. If the record metadata contains
# the field "opencdc.collection" it is prepended to the subject name
# and separated with a dot.
# Type: string
sdk.schema.extract.payload.subject: "payload"
# The type of the payload schema.
# Type: string
sdk.schema.extract.type: "avro"
Destination Parameters
version: 2.2
pipelines:
- id: example
status: running
connectors:
- id: example-destination
type: destination
plugin: "mysql"
name: example-destination
settings:
# DSN is the connection string for the MySQL database.
# Type: string
dsn: ""
# Key is the primary key of the specified table.
# Type: string
key: ""
# Table is used as the target table into which records are inserted.
# Type: string
table: ""
# Maximum delay before an incomplete batch is written to the
# destination.
# Type: duration
sdk.batch.delay: "0"
# Maximum size of batch before it gets written to the destination.
# Type: int
sdk.batch.size: "0"
# Allow bursts of at most X records (0 or less means that bursts are
# not limited). Only takes effect if a rate limit per second is set.
# Note that if `sdk.batch.size` is bigger than `sdk.rate.burst`, the
# effective batch size will be equal to `sdk.rate.burst`.
# Type: int
sdk.rate.burst: "0"
# Maximum number of records written per second (0 means no rate
# limit).
# Type: float
sdk.rate.perSecond: "0"
# The format of the output record. See the Conduit documentation for a
# full list of supported formats
# (https://conduit.io/docs/using/connectors/configuration-parameters/output-format).
# Type: string
sdk.record.format: "opencdc/json"
# Options to configure the chosen output record format. Options are
# normally key=value pairs separated with comma (e.g.
# opt1=val2,opt2=val2), except for the `template` record format, where
# options are a Go template.
# Type: string
sdk.record.format.options: ""
# Whether to extract and decode the record key with a schema.
# Type: bool
sdk.schema.extract.key.enabled: "true"
# Whether to extract and decode the record payload with a schema.
# Type: bool
sdk.schema.extract.payload.enabled: "true"