Skip to main content

Output Formats

One of the challenges to be solved when integrating Conduit with other systems, such as Kafka Connect and Debezium, is the data format. This is present in situations where raw data needs to be written, for example when writing messages to Kafka. By default, Conduit uses the OpenCDC format. Conduit also makes it possible to change the output format so that the data can be consumed by other systems.

note

Configuring different output formats is possible for connectors which use Conduit SDK's default middleware. Please consult the connector's documentation to confirm the feature is available.

The output format is configured at the level of a destination connector. This implies that different connectors in the same pipeline can use different formats.

Configuration parameters

There are two connector configuration parameters which control the output format:

  • sdk.record.format: used to choose the format itself
  • sdk.record.format.options: used to configure the specifics of the chosen format.

Available formats

Most formats come in the form of name/encoding. name is the name of the format and through that we control the structure of the output. encoding is the encoding used for the structure created by the converter. Currently, a json encoding is supported. More encoders are planned in future (e.g. Avro, Parquet, ...)

opencdc/json

This is the default format. An extensive description of this format is available here.

Options

None.

debezium/json

Generates output compatible with Debezium connectors.

Options

debezium.schema.name (Optional)

Name of schema to be used in converted Debezium records.

Default value: none

debezium.rawData.key (Optional)

If the record to be converted doesn't contain a structured payload or its payload cannot be parsed as JSON, then this key defines the field into which the "raw" payload data will be put.

Default value: opencdc.rawData

template

The user gets full control over the output format by using Go templates. The value provided to the template is a sdk.Record, so the template has access to all its fields (e.g. .Position, .Key, .Metadata, and so on). We also inject all template functions provided by sprig to make it easier to write templates.

Options

The only option for this formatter is Go template itself, as described here.

Examples

Example 1: A pipeline with a Debezium record formatter

The following pipeline has a generator source and a file destination. We use the debezium/json record formatter so that the file destination outputs the records in the Debezium format. Additionally, we set the schema name to be employee by setting the format options to debezium.schema.name=employee.

---
version: 2.2

pipelines:
- id: pipeline1
status: running
name: pipeline1
description: 'A pipeline with a Debezium record formatter'
connectors:
- id: source1
type: source
plugin: builtin:generator
name: source1
settings:
recordCount: "10"
readTime: "1s"
format.options: "name:string,company:string"
format.type: "structured"
- id: destination1
type: destination
plugin: "builtin:file"
name: destination1
settings:
sdk.record.format: "debezium/json"
sdk.record.format.options: 'debezium.schema.name=employee'
path: /tmp/file-destination.txt

If you run this pipeline and tail the destination file, you'll see something like this:

{
"schema": {
"type": "struct",
"name": "employee",
"fields": [
// omitted for brevity
]
},
"payload": {
"before": null,
"after": {
"company": "string 3cea28f1-8ec2-4035-b4d3-790a9239feb3",
"name": "string 8fbbbf0c-aaaa-499f-b145-ac5345c8c067"
},
"source": {
"conduit.source.connector.id": "pipeline1:source1",
"opencdc.readAt": "1680796794653155968",
"opencdc.version": "v1"
},
"op": "c",
"ts_ms": 1680796794653,
"transaction": null
}
}

Example 2: A pipeline with a template record formatter

The following pipeline has a generator source and a file destination. The generator source produces records which have a name field and a company field. In this pipeline, we use the template record formatter to extract only the id field and then prefix it with employee ID.

---
version: 2.2

pipelines:
- id: pipeline1
status: running
name: pipeline1
description: 'A pipeline with a template record formatter'
connectors:
- id: source1
type: source
plugin: builtin:generator
name: source1
settings:
recordCount: "3"
readTime: "1s"
format.options: "id:int,name:string,company:string"
format.type: "structured"
- id: destination1
type: destination
plugin: "builtin:file"
name: destination1
settings:
sdk.record.format: template
# print only the payload, format it as a string (normally it's a byte slice)
sdk.record.format.options: '{{ printf "employee ID %d" .Payload.After.id }}'
path: /tmp/file-destination.txt

If you run Conduit and then tail the destination file, you'll see something like this:

employee ID 636561854
employee ID 1848322815
employee ID 123675258