Skip to main content

Getting Started with Processors

A processor is a component that operates on a single record that flows through a pipeline. It can either transform the record, or filter it out based on some criteria. Since they are part of pipelines, making yourself familiar with pipeline semantics is highly recommended.

Pipeline

Processors are optional components in a pipeline (i.e. a pipeline can be started without them), and they are always attached to a single parent, which can be either a connector or a pipeline:

  • Connector processors:
    • Source processors only receive messages originating at a specific source connector. Source processors are created by specifying the corresponding source connector as the parent entity.
    • Destination processors only receive messages that are meant to be sent to a specific destination connector. Destination processors are created by specifying the corresponding destination connector as the parent entity.
  • Pipeline processors receive all messages that flow through the pipeline, regardless of the source or destination. Pipeline processors are created by specifying the pipeline as the parent entity.

Processor types

When it comes to using a processor, Conduit supports different types:

How to use a processor

In these following examples, we're using the json.decode, but you could use any other you'd like from our Built-in ones, or even reference your own Standalone processor.

info

When referencing the name of a processor plugin there are different ways you can make sure you're using the one you'd like. Please, check out the Referencing Processors documentation for more information.

Using a pipeline configuration file

Using a pipeline processor

Creating a pipeline processor through a pipeline configuration file can be done as below:

version: 2.2
pipelines:
- id: example-pipeline
connectors:
# define source and destination connectors
# ...
processors:
- id: extract-name
plugin: json.decode
settings:
field: name

Using a connector processor

Similarly, we can configure a connector processor, i.e. a processor attached to a connector:

version: 2.2
pipelines:
- id: example-pipeline
connectors:
- id: conn1
# other connector configuration
processors:
- id: extract-name
plugin: json.decode
settings:
field: name
# other connectors

The documentation about pipeline configuration files can be found here.

Using the HTTP API

The processor endpoints live under the /v1/processors namespace, and to attach a processor to either connector or a pipeline, you could do a POST request to /v1/processors specifying parent.type as TYPE_PIPELINE or TYPE_CONNECTOR. Default value is TYPE_UNSPECIFIED.

Here's how the entire request could look like.

tip

To list all the different API HTTP requests you could perform check out our HTTP API. These are also described in our api.swagger.json.