Skip to main content

avro.encode

Encodes a record's field into the Avro format.

Description

The processor takes a record's field and encodes it using a schema into the Avro format. It provides two strategies for determining the schema:

  • preRegistered (recommended) This strategy downloads an existing schema from the schema registry and uses it to encode the record. This requires the schema to already be registered in the schema registry. The schema is downloaded only once and cached locally.

  • autoRegister (for development purposes) This strategy infers the schema by inspecting the structured data and registers it in the schema registry. If the record schema is known in advance it's recommended to use the preRegistered strategy and manually register the schema, as this strategy comes with limitations.

    The strategy uses reflection to traverse the structured data of each record and determine the type of each field. If a specific field is set to nil the processor won't have enough information to determine the type and will default to a nullable string. Because of this it is not guaranteed that two records with the same structure produce the same schema or even a backwards compatible schema. The processor registers each inferred schema in the schema registry with the same subject, therefore the schema compatibility checks need to be disabled for this schema to prevent failures. If the schema subject does not exist before running this processor, it will automatically set the correct compatibility settings in the schema registry.

This processor is the counterpart to avro.decode.

Configuration parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "avro.encode"
settings:
# The password to use with basic authentication. This option is
# required if auth.basic.username contains a value. If both
# auth.basic.username and auth.basic.password are empty basic
# authentication is disabled.
# Type: string
auth.basic.password: ""
# The username to use with basic authentication. This option is
# required if auth.basic.password contains a value. If both
# auth.basic.username and auth.basic.password are empty basic
# authentication is disabled.
# Type: string
auth.basic.username: ""
# The field that will be encoded.
# For more information about the format, see [Referencing
# fields](https://conduit.io/docs/processors/referencing-fields).
# Type: string
field: ".Payload.After"
# The subject name under which the inferred schema will be registered
# in the schema registry.
# Type: string
schema.autoRegister.subject: ""
# The subject of the schema in the schema registry used to encode the
# record.
# Type: string
schema.preRegistered.subject: ""
# The version of the schema in the schema registry used to encode the
# record.
# Type: int
schema.preRegistered.version: ""
# Strategy to use to determine the schema for the record. Available
# strategies are: * `preRegistered` (recommended) - Download an
# existing schema from the schema registry. This strategy is further
# configured with options starting with `schema.preRegistered.*`. *
# `autoRegister` (for development purposes) - Infer the schema from
# the record and register it in the schema registry. This strategy is
# further configured with options starting with
# `schema.autoRegister.*`.
# For more information about the behavior of each strategy read the
# main processor description.
# Type: string
schema.strategy: ""
# The path to a file containing PEM encoded CA certificates. If this
# option is empty, Conduit falls back to using the host's root CA set.
# Type: string
tls.ca.cert: ""
# The path to a file containing a PEM encoded certificate. This option
# is required if tls.client.key contains a value. If both
# tls.client.cert and tls.client.key are empty TLS is disabled.
# Type: string
tls.client.cert: ""
# The path to a file containing a PEM encoded private key. This option
# is required if tls.client.cert contains a value. If both
# tls.client.cert and tls.client.key are empty TLS is disabled.
# Type: string
tls.client.key: ""
# URL of the schema registry (e.g. http://localhost:8085)
# Type: string
url: ""

Examples

Auto-register schema

This example shows the usage of the avro.encode processor with the autoRegister schema strategy. The processor encodes the record's .Payload.After field using the schema that is extracted from the data and registered on the fly under the subject example-autoRegister.

Configuration parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "avro.encode"
settings:
field: ".Payload.After"
schema.autoRegister.subject: "example-autoRegister"
schema.strategy: "autoRegister"
url: "http://127.0.0.1:54322"

Record difference

Before
After
1
{
1
{
2
  "position": "dGVzdC1wb3NpdGlvbg==",
2
  "position": "dGVzdC1wb3NpdGlvbg==",
3
  "operation": "create",
3
  "operation": "create",
4
  "metadata": {
4
  "metadata": {
5
    "key1": "val1"
5
    "key1": "val1"
6
  },
6
  },
7
  "key": null,
7
  "key": null,
8
  "payload": {
8
  "payload": {
9
    "before": null,
9
    "before": null,
10
-
    "after": {
10
+
    "after": "\u0000\u0000\u0000\u0000\u0001ffffff\u0002@\u0002������\u0001@\u0001\u0006bar\u0000\u0002"
11
-
      "myFloat": 2.3,
12
-
      "myInt": 1,
13
-
      "myMap": {
14
-
        "bar": 2.2,
15
-
        "foo": true
16
-
      },
17
-
      "myString": "bar",
18
-
      "myStruct": {
19
-
        "bar": false,
20
-
        "foo": 1
21
-
      }
22
-
    }
23
  }
11
  }
24
}
12
}

Pre-register schema

This example shows the usage of the avro.encode processor with the preRegistered schema strategy. When using this strategy, the schema has to be manually pre-registered. In this example we use the following schema:

{
"type":"record",
"name":"record",
"fields":[
{"name":"myString","type":"string"},
{"name":"myInt","type":"int"}
]
}

The processor encodes the record's.Key field using the above schema.

Configuration parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "avro.encode"
settings:
field: ".Key"
schema.preRegistered.subject: "example-preRegistered"
schema.preRegistered.version: "1"
schema.strategy: "preRegistered"
url: "http://127.0.0.1:54322"

Record difference

Before
After
1
{
1
{
2
  "position": "dGVzdC1wb3NpdGlvbg==",
2
  "position": "dGVzdC1wb3NpdGlvbg==",
3
  "operation": "create",
3
  "operation": "create",
4
  "metadata": {
4
  "metadata": {
5
    "key1": "val1"
5
    "key1": "val1"
6
  },
6
  },
7
-
  "key": {
7
+
  "key": "\u0000\u0000\u0000\u0000\u0001\u0006bar\u0002",
8
-
    "myInt": 1,
9
-
    "myString": "bar"
10
-
  },
11
  "payload": {
8
  "payload": {
12
    "before": null,
9
    "before": null,
13
    "after": null
10
    "after": null
14
  }
11
  }
15
}
12
}

scarf pixel conduit-site-docs-processors