clone
Clone records.
Description
Clone all records N times. For each input record, the processor
outputs the original record plus N clones (for a total of N+1 records). Each clone
is identical to the original, except the metadata field clone.index
is
set to the clone's index (0 for the original, 1 to N for the clones).
Important: Add a condition to this processor if you only want to clone some records.
Important: This processor currently only works using the pipeline architecture
v2, which can be enabled using the flag --preview.pipeline-arch-v2
.
Using it without the flag will result in an error.
Configuration parameters
- YAML
- Table
version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "clone"
settings:
# The number of times to clone each record (e.g. if count is 2, the
# processor will output 3 records for every input record).
# Type: int
count: ""
# Whether to decode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.decode.key.enabled: "true"
# Whether to decode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.decode.payload.enabled: "true"
# Whether to encode the record key using its corresponding schema from
# the schema registry.
# Type: bool
sdk.schema.encode.key.enabled: "true"
# Whether to encode the record payload using its corresponding schema
# from the schema registry.
# Type: bool
sdk.schema.encode.payload.enabled: "true"
Name | Type | Default | Description |
---|---|---|---|
count | int | null | The number of times to clone each record (e.g. if count is 2, the processor will output 3 records for every input record). |
sdk.schema.decode.key.enabled | bool | true | Whether to decode the record key using its corresponding schema from the schema registry. |
sdk.schema.decode.payload.enabled | bool | true | Whether to decode the record payload using its corresponding schema from the schema registry. |
sdk.schema.encode.key.enabled | bool | true | Whether to encode the record key using its corresponding schema from the schema registry. |
sdk.schema.encode.payload.enabled | bool | true | Whether to encode the record payload using its corresponding schema from the schema registry. |
Examples
Clone record into multiple records
This example takes a record and clones it once, producing 2 records, each containing the same data, except for the metadata field clone.index
.
Configuration parameters
- YAML
- Table
version: 2.2
pipelines:
- id: example
status: running
connectors:
# define source and destination ...
processors:
- id: example
plugin: "clone"
settings:
count: "1"
Name | Value |
---|---|
count | 1 |
Record difference
Before | After | ||||
1 | { | 1 | { | ||
2 | "position": null, | 2 | "position": null, | ||
3 | "operation": "create", | 3 | "operation": "create", | ||
4 | "metadata": { | 4 | "metadata": { | ||
5 | + | "clone.index": "0", | |||
5 | "foo": "bar" | 6 | "foo": "bar" | ||
6 | }, | 7 | }, | ||
7 | "key": { | 8 | "key": { | ||
8 | "id": 123 | 9 | "id": 123 | ||
9 | }, | 10 | }, | ||
10 | "payload": { | 11 | "payload": { | ||
11 | "before": null, | 12 | "before": null, | ||
12 | "after": { | 13 | "after": { | ||
13 | "age": 30, | 14 | "age": 30, | ||
14 | "name": "Alice" | 15 | "name": "Alice" | ||
15 | } | 16 | } | ||
16 | } | 17 | } | ||
17 | } | 18 | } | ||
19 | + | { | |||
20 | + | "position": null, | |||
21 | + | "operation": "create", | |||
22 | + | "metadata": { | |||
23 | + | "clone.index": "1", | |||
24 | + | "foo": "bar" | |||
25 | + | }, | |||
26 | + | "key": { | |||
27 | + | "id": 123 | |||
28 | + | }, | |||
29 | + | "payload": { | |||
30 | + | "before": null, | |||
31 | + | "after": { | |||
32 | + | "age": 30, | |||
33 | + | "name": "Alice" | |||
34 | + | } | |||
35 | + | } | |||
36 | + | } |