OpenCDC record
An OpenCDC record in Conduit aims to standardize the format of data records exchanged between different connectors within a data processing pipeline. The primary objective is to ensure compatibility between various combinations of source and destination connectors.
Benefits
- Support for Operations: The format should support representing records
for
create
,update
,delete
, andsnapshot
operations. - Standard Metadata Fields: Definition of standard metadata fields to provide essential information about each record. These can vary depending on the record. See Metadata Fields for more information.
- Integration with Data Tools: We believe that being strict about the record format Conduit consumes and produces will make it easier to integrate with other data processing tools.
Fields
.Position
uniquely represents the position of record. This is used to track the position of a record in a source connector, enabling Conduit to resume a stopped pipeline..Operation
defines what triggered the creation of a record. There are four possibilities:create
,update
,delete
orsnapshot
. The first three operations are encountered during normal CDC operation, whilesnapshot
is meant to represent records during an initial load. Depending on the operation, the record will contain either the payload before the change, after the change, both or none (see fields.Payload.Before
and.Payload.After
)..Key
represents a value that should identify the entity (e.g. database row)..Metadata
contains additional information regarding the record..Payload.Before
holds the payload before the operation ocurred. These could be present in operations such asupdate
anddelete
..Payload.After
holds the payload after the operation ocurred. These could be present in operations such ascreate
,snapshot
orupdate
.
We're indicating .Position
, and not .position
as defined in
its Record
message,
to show its Go template notation as used by
the Go representation of an OpenCDC record.
This field is public and must start with an uppercase letter.
Representation
Conduit uses Protocol Buffers (protobuf) to define an OpenCDC record. Its definition can be found in the Buf Schema Registry.
When processing records in Conduit, you can always expect a similar structure to the following:
{
"position": "c3RhbmRpbmc=",
"operation": "update",
"metadata": {
"file.path": "./example.in",
"opencdc.readAt": "1663858188836816000",
"opencdc.version": "v1"
},
"key": "cGFkbG9jay1rZXk=",
"payload": {
"before": "eWVsbG93",
"after": {
"bool": true,
"float32": 1.2,
"float64": 1.2,
"int": 1,
"int32": 1,
"int64": 1,
"string": "orange"
}
}
}
.Position
, .Key
, and .Payload.Before
are represented as Base64
encoded
in the example above because these will be a byte slice when represented as
JSON.
Data types
There are no limitations when it comes to data types a source connector can read from a source. Certain type limitations might apply depending on how the data is moved internally in Conduit, from source connector to Conduit and from Conduit to destination connectors.
The key and the payload data can be represented in two ways: raw and structured.
Raw data
Raw data in Conduit is an array of bytes that can represent the source data in
its original form (such as a file) or encoded in a certain way (for example, a
JSON representation of a database row or an Avro encoded structure). It's
represented by the opencdc.RawData
Go type. For example, when writing a
connector or a processor in Go, you can construct raw data like this:
opencdc.RawData([]byte{1, 3, 5})
Structured data
Structured data in Conduit is a map in which the keys are field names and values
are field values. An example of that is the below record's .Payload.After
field:
{
// other record fields
"payload": {
"before": "eWVsbG93",
"after": {
"bool_field": true,
"float_field": 1.2,
"int_field": 1,
"string_field": "orange"
}
}
}
When writing a connector or a processor in Go, it's represented by
the opencdc.StructuredData
type.
The supported data types for values in opencdc.StructuredData
depend on following:
- connector or processor type (built-in or standalone)
- schema support (enabled or disabled).
In built-in connectors, the field values can be of any Go type, given that there's no (de)serialization involved.
In standalone connectors with schema middleware enabled (which is the default), any type that is supported by the Apache Avro™ format is also supported by Conduit.
If the schema middleware is disabled in a connector, then the supported types are limited to what Protobuf allows as a value. That translates to the following Go types:
bool
int
,int32
,int64
,uint
,uint32
,uint64
float32
,float64
string
[]byte
(stored as a string, base64-encoded)map[string]interface{}
(a map of strings to any of the values that are supported)[]interface{}
(a slice of any value that is supported)
A notable limitation is timestamps, i.e. time.Time
values are not supported.
Metadata fields
As part of an OpenCDC record, there will be a set of fields provided that will vary depending on the connector. These fields can be common to all OpenCDC records as part of our standard, some related to Conduit, and others that will be provided by each Connector implementation independently. These fields can be useful to define conventions that will be then used by Conduit to expand its functionality. Notice that all these fields use a dot notation syntax to indicate what they refer to, preventing accidental clashes. Here are the ones you can find:
opencdc.version
Contains the version of the OpenCDC format (e.g., "v1"). This field exists to ensure the OpenCDC format version can be easily identified in case the record gets marshaled into a different untyped format (e.g. JSON).
{
// other record fields
"metadata": {
"opencdc.version": "v1",
// rest of metadata
},
// other record fields
}
opencdc.createdAt
Contains the time when the record was created in the 3rd party system. The expected format is a Unix timestamp in nanoseconds.
{
// other record fields
"metadata": {
"opencdc.createdAt": "1663858188836816000",
// rest of metadata
},
// other record fields
}
opencdc.readAt
Contains the time when the record was read from the 3rd party system. The expected format is a Unix timestamp in nanoseconds.
{
// other record fields
"metadata": {
"opencdc.readAt": "1663858188836816000",
// rest of metadata
},
// other record fields
}
opencdc.collection
Contains the name of the collection from which the record originated and/or where it should be written to.
It's up to the connector to populate this field. In other words, not all records may have this field.
{
// other record fields
"metadata": {
"opencdc.collection": "employees",
// rest of metadata
},
// other record fields
}
opencdc.key.schema.*
opencdc.key.schema.subject
, opencdc.key.schema.version
Contains the subject and version of the schema for the records' .Key
field.
This field will only be populated when using structured data.
{
// other record fields
"metadata": {
"opencdc.key.schema.subject": "employees.key.v1",
"opencdc.key.schema.version": "1",
// rest of metadata
},
// other record fields
}
opencdc.payload.schema.*
opencdc.payload.schema.subject
, opencdc.payload.schema.version
Contains the subject and version of the schema for the records' .Payload.After
and .Paylod.Before
fields.
This field will only be populated when using structured data.
{
// other record fields
"metadata": {
"opencdc.payload.schema.subject": "connector-id:collection.payload",
"opencdc.payload.schema.version": "1",
// rest of metadata
},
// other record fields
}
conduit.source.plugin.name
The name of the source plugin that created the record.
{
// other record fields
"metadata": {
"conduit.source.plugin.name": "builtin:file",
// rest of metadata
},
// other record fields
}
conduit.source.plugin.version
The version of the source plugin that created the record.
{
// other record fields
"metadata": {
"conduit.source.plugin.version": "v1.0.2",
// rest of metadata
},
// other record fields
}
conduit.source.connector.id
conduit.source.connector.id
is the ID of the source connector that received the record.
{
// other record fields
"metadata": {
"conduit.source.connector.id": "connectorID",
// rest of metadata
},
// other record fields
}
conduit.destination.plugin.name
The name of the destination plugin that has written the record.
{
// other record fields
"metadata": {
"conduit.destination.plugin.name": "builtin:file",
// rest of metadata
},
// other record fields
}
conduit.destination.plugin.version
The version of the destination plugin that has written the record.
{
// other record fields
"metadata": {
"conduit.destination.plugin.version": "v0.9.1",
// rest of metadata
},
// other record fields
}
conduit.dlq.nack.error
Contains the error that caused a record to be nacked and pushed to the dead-letter queue (DLQ).
conduit.dlq.nack.node.id
The ID of the internal node that nacked the record.
Connector-specific metadata
These metadata fields will be provided by each connector implementation allowing them to add any necessary metadata. As previously mentioned, to avoid unintended conflicts of metadata keys, the convention these will follow are the same as before, indicating first the connector name that's adding them.
Taking the same previous record example, you'll notice there
is a metadata key named file.path
, which would indicate this field was added
by a file
plugin.
{
// other record fields
"metadata": {
"file.path": "./example.in",
// rest of metadata
},
// other record fields
}