split
Split records.
Description
Split the records into multiple records based on the field
provided in the configuration. If the field is an array, each element of the
array will be converted into a separate record. The new record will contain the
same data as the original record, but the field specified in the configuration
will be replaced with the element of the array. The index of the element in the
array will be stored in the metadata of the new record under the key
split.index
.
This processor is only applicable to .Key
, .Payload.Before
and .Payload.After
prefixes, and only applicable if said fields
contain structured data. If the record contains raw JSON data, then use the
processor json.decode
to parse it into structured data first.
Important: This processor currently only works using the pipeline architecture
v2, which can be enabled using the flag --preview.pipeline-arch-v2
.
Using it without the flag will result in an error.
Configuration parameters
- YAML
- Table
version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "split"
settings:
# Field is the target field that should be split. Note that the target
# field has to contain an array so it can be split, otherwise the
# processor returns an error. This also means you can only split on
# fields in structured data under `.Key` and `.Payload`.
# For more information about the format, see [Referencing
# fields](https://conduit.io/docs/using/processors/referencing-fields).
# Type: string
field: ""
# Whether to decode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.decode.key.enabled: "true"
# Whether to decode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.decode.payload.enabled: "true"
# Whether to encode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.encode.key.enabled: "true"
# Whether to encode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.encode.payload.enabled: "true"
Name | Type | Default | Description |
---|---|---|---|
field | string | null | Field is the target field that should be split. Note that the target
field has to contain an array so it can be split, otherwise the processor
returns an error. This also means you can only split on fields in
structured data under For more information about the format, see Referencing fields. |
sdk.schema.decode.key.enabled | bool | true | Whether to decode the record key using its corresponding schema from the schema registry. |
sdk.schema.decode.payload.enabled | bool | true | Whether to decode the record payload using its corresponding schema from the schema registry. |
sdk.schema.encode.key.enabled | bool | true | Whether to encode the record key using its corresponding schema from the schema registry. |
sdk.schema.encode.payload.enabled | bool | true | Whether to encode the record payload using its corresponding schema from the schema registry. |
Examples
Split array into multiple records
This example takes the array in field .Payload.After.users
and splits it into separate records, each containing one element.
Configuration parameters
- YAML
- Table
version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "split"
settings:
field: ".Payload.After.users"
Name | Value |
---|---|
field | .Payload.After.users |
Record difference
Before | After | ||||
1 | { | 1 | { | ||
2 | "position": null, | 2 | "position": null, | ||
3 | "operation": "update", | 3 | "operation": "update", | ||
4 | - | "metadata": null, | 4 | + | "metadata": { |
5 | + | "split.index": "0" | |||
6 | + | }, | |||
5 | "key": { | 7 | "key": { | ||
6 | "id": 123 | 8 | "id": 123 | ||
7 | }, | 9 | }, | ||
8 | "payload": { | 10 | "payload": { | ||
9 | "before": null, | 11 | "before": null, | ||
10 | "after": { | 12 | "after": { | ||
11 | - | "users": [ | 13 | + | "users": { |
12 | - | { | 14 | + | "age": 30, |
13 | - | "age": 30, | 15 | + | "name": "Alice" |
14 | - | "name": "Alice" | 16 | + | } |
15 | - | }, | 17 | + | } |
16 | - | { | 18 | + | } |
17 | - | "age": 25, | 19 | + | } |
18 | - | "name": "Bob" | 20 | + | { |
19 | - | }, | 21 | + | "position": null, |
20 | - | { | 22 | + | "operation": "update", |
21 | - | "age": 35, | 23 | + | "metadata": { |
22 | - | "name": "Charlie" | 24 | + | "split.index": "1" |
23 | - | } | 25 | + | }, |
24 | - | ] | 26 | + | "key": { |
27 | + | "id": 123 | |||
28 | + | }, | |||
29 | + | "payload": { | |||
30 | + | "before": null, | |||
31 | + | "after": { | |||
32 | + | "users": { | |||
33 | + | "age": 25, | |||
34 | + | "name": "Bob" | |||
35 | + | } | |||
25 | } | 36 | } | ||
26 | } | 37 | } | ||
27 | } | 38 | } | ||
39 | + | { | |||
40 | + | "position": null, | |||
41 | + | "operation": "update", | |||
42 | + | "metadata": { | |||
43 | + | "split.index": "2" | |||
44 | + | }, | |||
45 | + | "key": { | |||
46 | + | "id": 123 | |||
47 | + | }, | |||
48 | + | "payload": { | |||
49 | + | "before": null, | |||
50 | + | "after": { | |||
51 | + | "users": { | |||
52 | + | "age": 35, | |||
53 | + | "name": "Charlie" | |||
54 | + | } | |||
55 | + | } | |||
56 | + | } | |||
57 | + | } |