Skip to main content

Record routing

Overview

Record routing in Conduit makes it easier to build one-to-many and many-to-many pipeline topologies. Or, in other words, it's possible to:

  • route data from one source to multiple collections in one destination
  • route data from multiple sources to multiple destinations.

Through this, a number of use cases are possible:

  • Keep two instances synchronized. For example, two database instances with multiple tables in each can be kept in sync with a single pipeline with only one source and one destination connector.
  • Route data from one collection (table, topic, index, etc.) to multiple collections on a single server.
  • Route data to different instances or different systems altogether.

This can be done in two ways:

  • by using a connector that supports writing to multiple collections,
  • by using filter processors.

Using a connector that supports writing to multiple collections

The OpenCDC format recommends that a record's source or destination collection is written to its metadata using the opencdc.collection key.

A destination connector can then utilize that information and write the record to the correct collection.

note

It's the connectors that eventually provide the ability to read from multiple source collections or write to multiple destination collections. Consult the connector documentation to check if it supports this feature.

Example: Route data from multiple Kafka topics to multiple PostgreSQL tables on a single database server

Let's assume we have a pipeline that streams data from two Kafka topics, employees_finance and employees_legal to PostgreSQL (both of the connectors support reads from multiple collections and writes to multiple collections). A pipeline configuration file could look like this:

---
version: 2.2
pipelines:
- id: example-pipeline
status: running
name: example-pipeline
description: 'Kafka to PostgreSQL'
connectors:
- id: kafka-source
type: source
plugin: "builtin:kafka"
name: kafka-source
settings:
servers: "localhost:9092"
topics: "employees_finance,employees_legal"

- id: pg-destination
type: destination
plugin: "builtin:postgres"
name: pg-destination
settings:
url: "postgresql://username:password@localhost/testdb?sslmode=disable"
table: "employees"

The Kafka connector will read records like the ones below:

{
"metadata": {
"opencdc.collection": "employees_finance",
// rest of metadata
},
"payload": {
"after": {
"name": "John"
}
}
// other record fields
}
{
"metadata": {
"opencdc.collection": "employees_legal",
// rest of metadata
},
"payload": {
"after": {
"name": "Adam"
}
}
// other record fields
}

The PostgreSQL destination connector will check the value in the opencdc.collection metadata field. Based on that, it will write John's information to the employees_finance table and Adam's information to the employees_legal table.

Using a filter processor with a condition

In some cases, records from a single source collection need to be written to multiple connectors. Even if a destination connector can write to multiple collections, that feature sometimes cannot be used because the destinations have very different configurations. For example, the records may need to be written to different databases or different systems altogether.

In scenarios like these, we can route records to multiple destination connectors using the built-in filter processor with a condition.

The advantage of this approach is that there will still be only one source, which means that we will read the data only once.

Example: Route data from a single Kafka topic to different PostgreSQL databases

Let's assume we have a pipeline that reads employee data from a topic. The employees from the finance department (identified by department: finance in the metadata) need to be written to one PostgreSQL database. Employees from the legal department (identified by department: legal in the metadata) need to be written into a different database.

---
version: 2.2
pipelines:
- id: example-pipeline
status: running
name: example-pipeline
description: 'Kafka to PostgreSQL'
connectors:
- id: kafka-source
type: source
plugin: "builtin:kafka"
name: kafka-source
settings:
servers: "localhost:9092"
topics: "employees_finance,employees_legal"

- id: finance
type: destination
plugin: "builtin:postgres"
name: finance
settings:
url: "postgresql://username:password@localhost/finance-db?sslmode=disable"
table: "employees"
processors:
- id: employees_finance_only
plugin: "filter"
# Filter out records where the metadata field "department" is NOT set to "finance".
condition: `{{ eq .Metadata.department "finance" | not }}`

- id: legal
type: destination
plugin: "builtin:postgres"
name: legal
settings:
url: "postgresql://username:password@localhost/legal-db?sslmode=disable"
table: "employees"
processors:
- id: employees_legal_only
plugin: "filter"
# Filter out records where the metadata field "department" is NOT set to "legal".
condition: `{{ eq .Metadata.department "legal" | not }}`