Skip to main content

Adding built-in Processors to Conduit

Built-in processors offer better performance when compared to standalone ones, which is why in some cases it's desirable to have a custom build of Conduit that includes additional built-in processors.

The simplest way to achieve this is to write a small application that embeds Conduit (i.e., uses Conduit as a library) and adds one or more processors to its default configuration.

note

For the purpose of this example, we're going to add a custom "foo" processor to Conduit as a built-in processor, and we're going to name our application custom-conduit-app pushed to github.com/conduitio-labs.

Initialize your own Conduit Application

First we'll create the directory structure for our custom Conduit application and initialize a Go module:

mkdir -p custom-conduit-app && touch custom-conduit-app/main.go
cd custom-conduit-app
go mod init github.com/conduitio-labs/custom-conduit-app

Add the processor as a built-in plugin

Once that is done, we need to write a main function that adds our custom processor to the default Conduit configuration. Later we'll run Conduit with this custom configuration.

Write the main function

custom-conduit-app/main.go
package main

import (
"context"

"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin"
"github.com/yourusername/conduit-processor-foo"
)

func main() {
// Get the default configuration, including all built-in processors
cfg := conduit.DefaultConfig()


// Import a full processor from an external repo
cfg.ProcessorPlugins["foo"] = builtin.Constructor(NewFooProcessor)

// Add a simple custom processor in-line
cfg.ProcessorPlugins["bar"] = func(logger log.CtxLogger) sdk.Processor {
specs := sdk.Specification{
Name: "bar",
Summary: "A simple pass-through processor",
Description: "This processor simply passes records through without modification",
Version: "v0.1.0",
}
return sdk.NewProcessorFunc(specs, func(ctx context.Context, record opencdc.Record) (opencdc.Record, error) {
// This is a simple pass-through processor
// You can modify the record here if needed
return record, nil
})
}

cli.Run(cfg)
}

Add your dependencies

go get github.com/conduitio/conduit
go get github.com/conduitio/conduit-processor-sdk
go get github.com/conduitio/conduit-commons/opencdc
go mod tidy

Build your application binary

This custom version of Conduit can be built with:

custom-conduit-app
go build -o custom-conduit main.go

Check the custom processor is included

In order to interact with Conduit, you need to make sure you run the service first.

important

By running Conduit, you will also process any pipelines configured in the default configuration (./custom-conduit-app/pipelines).

Assuming you're only going through this guide, no pipelines have been configured yet, so no data would be processed.

custom-conduit-app
./custom-conduit run

In a different terminal session, you can check that the custom processor has been included in the build by listing all the processor plugins as built-in:

Using the Conduit CLI

custom-conduit-app
./custom-conduit processor-plugins list
+---------------------------+----------------------------------------------------------------------+
| NAME | SUMMARY |
+---------------------------+----------------------------------------------------------------------+
| builtin:[email protected]| Decode Avro data into structured data. |
| builtin:[email protected]| Encode structured data into Avro data. |
| builtin:[email protected]| Decode base64 data. |
| builtin:[email protected]| Encode data to base64. |
| builtin:[email protected] | A simple pass-through processor |
| ... | ... |
+---------------------------+----------------------------------------------------------------------+

Using the API

curl localhost:8080/v1/processors/plugins | jq '.[].name'
"builtin:[email protected]"
"builtin:[email protected]"
"builtin:[email protected]"
"builtin:[email protected]"
"builtin:[email protected]"
...

Next steps

Once you have confirmed that your custom processor is available in your application, you are ready to transfer any pipeline you had previously tested to the custom Conduit application:

custom-conduit-app
mkdir pipelines
mv $PREVIOUSLY_TESTED_CONDUIT/pipelines/* pipelines/
./custom-conduit run

Or configure it to read from the desired pipelines directory:

custom-conduit-app
./custom-conduit run --pipelines.path $YOUR_PIPELINES_DIRECTORY
note

If you, on the other hand, didn't have any pipeline previously configured and you'd like to get started, check out this page on how to build a pipeline. Just remember that any conduit command from the terminal would need to be replaced by your custom conduit binary ./custom-conduit.

scarf pixel conduit-site-docs-using-processors