Skip to main content

Schema Extraction

Source and destination connectors can be configured to automatically extract the schema from the key and payload of a record. This is especially useful when the data is structured and the schema is known in advance. By default, Conduit extracts the schema from the key and the payload of a record and encodes them using the extracted schema.

Configuration parameters

These are the configuration parameters that control schema extraction on a source connector (Note that sdk.schema.extract.payload.enabled and sdk.schema.extract.key.enabled are also available on destination connectors):

  • sdk.schema.extract.type: The type of schema extraction to perform. Supported value is avro.
  • sdk.schema.extract.payload.enabled: A boolean value that indicates whether the payload should be extracted.
  • sdk.schema.extract.payload.subject: The subject of the payload schema.
  • sdk.schema.extract.key.enabled: A boolean value that indicates whether the key should be extracted.
  • sdk.schema.extract.key.subject: The subject of the key schema.
caution

sdk.schema.extract.payload.enabled and sdk.schema.extract.key.enabled should be set to false when producing raw (not structured) data, as shown in the example below.

If you are developing a connector, you can disable this automatically by updating the connector's default middleware. For more information about NewSource() when developing a source connector, see here.

Example

The below pipeline will generate a single record and write it to a file. Notice that it's configured so that the generator source does not extract the schema or encode the data.

version: "2.2"
pipelines:
- id: generator-to-file
status: running
name: generator-to-file
description: Generates a single record, no schema generated, writes to file
connectors:
- id: file-src
type: source
plugin: builtin:generator
name: file-src
settings:
recordCount: "1"
collections.users.format.type: structured
collections.users.format.options.id: int
collections.users.format.options.name: string

sdk.schema.extract.payload.enabled: false
sdk.schema.extract.key.enabled: false

- id: file-dest
type: destination
plugin: builtin:file
name: file-dest
settings:
path: /tmp/file-destination.txt

When the pipeline is run, /tmp/file-destination.txt will contain output similar to this:

{
"position": "MQ==",
"operation": "create",
"metadata": {
"conduit.source.connector.id": "generator-to-file:file-src",
"opencdc.collection": "users",
"opencdc.createdAt": "1723046776830339829"
},
"key": "c2F1cm9wc2lkYW4=",
"payload": {
"before": null,
"after": {
"id": 7819649577989235000,
"name": "Iambe"
}
}
}

Notice that the written record doesn't contain any schema information in its metadata. However, if you leave the schema extraction enabled, then you'll see something below in the record's metadata:

"opencdc.payload.schema.subject": "generator-to-file:file-src:users.payload",
"opencdc.payload.schema.version": "1"
tip

To learn more about Schema Support, check out this page.