Schema Extraction
Source and destination connectors can be configured to automatically extract the schema from the key and payload of a record. This is especially useful when the data is structured and the schema is known in advance. By default, Conduit extracts the schema from the key and the payload of a record and encodes them using the extracted schema.
Configuration parameters
These are the configuration parameters that control schema extraction on a
source connector (Note that sdk.schema.extract.payload.enabled
and sdk.schema.extract.key.enabled
are also available on destination
connectors):
sdk.schema.extract.type
: The type of schema extraction to perform. Supported value isavro
.sdk.schema.extract.payload.enabled
: A boolean value that indicates whether the payload should be extracted.sdk.schema.extract.payload.subject
: The subject of the payload schema.sdk.schema.extract.key.enabled
: A boolean value that indicates whether the key should be extracted.sdk.schema.extract.key.subject
: The subject of the key schema.
sdk.schema.extract.payload.enabled
and sdk.schema.extract.key.enabled
should be set to false
when producing raw (not structured) data, as shown in the example below.
If you are developing a connector, you can disable this automatically by updating the connector's default middleware. For more information about NewSource()
when developing a source connector, see here.
Example
The below pipeline will generate a single record and write it to a file. Notice that it's configured so that the generator source does not extract the schema or encode the data.
version: "2.2"
pipelines:
- id: generator-to-file
status: running
name: generator-to-file
description: Generates a single record, no schema generated, writes to file
connectors:
- id: file-src
type: source
plugin: builtin:generator
name: file-src
settings:
recordCount: "1"
collections.users.format.type: structured
collections.users.format.options.id: int
collections.users.format.options.name: string
sdk.schema.extract.payload.enabled: false
sdk.schema.extract.key.enabled: false
- id: file-dest
type: destination
plugin: builtin:file
name: file-dest
settings:
path: /tmp/file-destination.txt
When the pipeline is run, /tmp/file-destination.txt
will contain output similar to this:
{
"position": "MQ==",
"operation": "create",
"metadata": {
"conduit.source.connector.id": "generator-to-file:file-src",
"opencdc.collection": "users",
"opencdc.createdAt": "1723046776830339829"
},
"key": "c2F1cm9wc2lkYW4=",
"payload": {
"before": null,
"after": {
"id": 7819649577989235000,
"name": "Iambe"
}
}
}
Notice that the written record doesn't contain any schema information in its metadata. However, if you leave the schema extraction enabled, then you'll see something below in the record's metadata:
"opencdc.payload.schema.subject": "generator-to-file:file-src:users.payload",
"opencdc.payload.schema.version": "1"
To learn more about Schema Support, check out this page.