Skip to main content

split

Split records.

Description

Split the records into multiple records based on the field provided in the configuration. If the field is an array, each element of the array will be converted into a separate record. The new record will contain the same data as the original record, but the field specified in the configuration will be replaced with the element of the array. The index of the element in the array will be stored in the metadata of the new record under the key split.index.

This processor is only applicable to .Key, .Payload.Before and .Payload.After prefixes, and only applicable if said fields contain structured data. If the record contains raw JSON data, then use the processor json.decode to parse it into structured data first.

Important: This processor currently only works using the pipeline architecture v2, which can be enabled using the flag --preview.pipeline-arch-v2. Using it without the flag will result in an error.

Configuration parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "split"
settings:
# Field is the target field that should be split. Note that the target
# field has to contain an array so it can be split, otherwise the
# processor returns an error. This also means you can only split on
# fields in structured data under `.Key` and `.Payload`.
# For more information about the format, see [Referencing
# fields](https://conduit.io/docs/using/processors/referencing-fields).
# Type: string
field: ""
# Whether to decode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.decode.key.enabled: "true"
# Whether to decode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.decode.payload.enabled: "true"
# Whether to encode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.encode.key.enabled: "true"
# Whether to encode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.encode.payload.enabled: "true"

Examples

Split array into multiple records

This example takes the array in field .Payload.After.users and splits it into separate records, each containing one element.

Configuration parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "split"
settings:
field: ".Payload.After.users"

Record difference

Before
After
1
{
1
{
2
  "position": null,
2
  "position": null,
3
  "operation": "update",
3
  "operation": "update",
4
-
  "metadata": null,
4
+
  "metadata": {
5
+
    "split.index": "0"
6
+
  },
5
  "key": {
7
  "key": {
6
    "id": 123
8
    "id": 123
7
  },
9
  },
8
  "payload": {
10
  "payload": {
9
    "before": null,
11
    "before": null,
10
    "after": {
12
    "after": {
11
-
      "users": [
13
+
      "users": {
12
-
        {
14
+
        "age": 30,
13
-
          "age": 30,
15
+
        "name": "Alice"
14
-
          "name": "Alice"
16
+
      }
15
-
        },
17
+
    }
16
-
        {
18
+
  }
17
-
          "age": 25,
19
+
}
18
-
          "name": "Bob"
20
+
{
19
-
        },
21
+
  "position": null,
20
-
        {
22
+
  "operation": "update",
21
-
          "age": 35,
23
+
  "metadata": {
22
-
          "name": "Charlie"
24
+
    "split.index": "1"
23
-
        }
25
+
  },
24
-
      ]
26
+
  "key": {
27
+
    "id": 123
28
+
  },
29
+
  "payload": {
30
+
    "before": null,
31
+
    "after": {
32
+
      "users": {
33
+
        "age": 25,
34
+
        "name": "Bob"
35
+
      }
25
    }
36
    }
26
  }
37
  }
27
}
38
}
39
+
{
40
+
  "position": null,
41
+
  "operation": "update",
42
+
  "metadata": {
43
+
    "split.index": "2"
44
+
  },
45
+
  "key": {
46
+
    "id": 123
47
+
  },
48
+
  "payload": {
49
+
    "before": null,
50
+
    "after": {
51
+
      "users": {
52
+
        "age": 35,
53
+
        "name": "Charlie"
54
+
      }
55
+
    }
56
+
  }
57
+
}

scarf pixel conduit-site-docs-using-processors