Skip to main content

activemq

Author: Meroxa, Inc.Conduit team logo
0

Latest release

Description

The ActiveMQ Classic connector is one of Conduit plugins. The connector provides both a source and a destination connector for ActiveMQ Artemis.

It uses the stomp protocol to connect to ActiveMQ.

What data does the OpenCDC record consist of?

  • record.Position: json object with the destination and the messageId frame header.
  • record.Operation: currently fixed as "create".
  • record.Metadata: a string to string map, with keys prefixed as activemq.header.{STOMP_HEADER_NAME}.
  • record.Key: the messageId frame header.
  • record.Payload.Before: empty
  • record.Payload.After: the message body

Source Parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
- id: example-source
type: source
plugin: "activemq"
name: example-source
settings:
# Destination is the name of the STOMP destination.
# Type: string
destination: ""
# Password is the password to use when connecting to the broker.
# Type: string
password: ""
# URL is the URL of the ActiveMQ Artemis broker.
# Type: string
url: ""
# User is the username to use when connecting to the broker.
# Type: string
user: ""
# ConsumerWindowSize is the size of the consumer window. It maps to
# the "consumer-window-size" header in the STOMP SUBSCRIBE frame.
# Type: string
consumerWindowSize: "-1"
# RecvTimeoutHeartbeat specifies the minimum amount of time between
# the client expecting to receive heartbeat notifications from the
# server
# Type: duration
recvTimeoutHeartbeat: "2s"
# SendTimeoutHeartbeat specifies the maximum amount of time between
# the client sending heartbeat notifications from the server
# Type: duration
sendTimeoutHeartbeat: "2s"
# SubscriptionType is the subscription type. It can be either ANYCAST
# or MULTICAST, with ANYCAST being the default. Maps to the
# "subscription-type" header in the STOMP SUBSCRIBE frame.
# Type: string
subscriptionType: "ANYCAST"
# CaCertPath is the path to the CA certificate file.
# Type: string
tls.caCertPath: ""
# ClientCertPath is the path to the client certificate file.
# Type: string
tls.clientCertPath: ""
# ClientKeyPath is the path to the client key file.
# Type: string
tls.clientKeyPath: ""
# Enabled is a flag to enable or disable TLS.
# Type: bool
tls.enabled: "false"
# InsecureSkipVerify is a flag to disable server certificate
# verification.
# Type: bool
tls.insecureSkipVerify: "false"
# Maximum delay before an incomplete batch is read from the source.
# Type: duration
sdk.batch.delay: "0"
# Maximum size of batch before it gets read from the source.
# Type: int
sdk.batch.size: "0"
# Specifies whether to use a schema context name. If set to false, no
# schema context name will be used, and schemas will be saved with the
# subject name specified in the connector (not safe because of name
# conflicts).
# Type: bool
sdk.schema.context.enabled: "true"
# Schema context name to be used. Used as a prefix for all schema
# subject names. If empty, defaults to the connector ID.
# Type: string
sdk.schema.context.name: ""
# Whether to extract and encode the record key with a schema.
# Type: bool
sdk.schema.extract.key.enabled: "true"
# The subject of the key schema. If the record metadata contains the
# field "opencdc.collection" it is prepended to the subject name and
# separated with a dot.
# Type: string
sdk.schema.extract.key.subject: "key"
# Whether to extract and encode the record payload with a schema.
# Type: bool
sdk.schema.extract.payload.enabled: "true"
# The subject of the payload schema. If the record metadata contains
# the field "opencdc.collection" it is prepended to the subject name
# and separated with a dot.
# Type: string
sdk.schema.extract.payload.subject: "payload"
# The type of the payload schema.
# Type: string
sdk.schema.extract.type: "avro"

Destination Parameters

version: 2.2
pipelines:
- id: example
status: running
connectors:
- id: example-destination
type: destination
plugin: "activemq"
name: example-destination
settings:
# Destination is the name of the STOMP destination.
# Type: string
destination: ""
# Password is the password to use when connecting to the broker.
# Type: string
password: ""
# URL is the URL of the ActiveMQ Artemis broker.
# Type: string
url: ""
# User is the username to use when connecting to the broker.
# Type: string
user: ""
# DestinationHeader maps to the "destination" header in the STOMP SEND
# frame. Useful when using ANYCAST.
# Type: string
destinationHeader: ""
# DestinationType is the routing type of the destination. It can be
# either ANYCAST or MULTICAST, with ANYCAST being the default. Maps to
# the "destination-type" header in the STOMP SEND frame.
# Type: string
destinationType: "ANYCAST"
# RecvTimeoutHeartbeat specifies the minimum amount of time between
# the client expecting to receive heartbeat notifications from the
# server
# Type: duration
recvTimeoutHeartbeat: "2s"
# SendTimeoutHeartbeat specifies the maximum amount of time between
# the client sending heartbeat notifications from the server
# Type: duration
sendTimeoutHeartbeat: "2s"
# CaCertPath is the path to the CA certificate file.
# Type: string
tls.caCertPath: ""
# ClientCertPath is the path to the client certificate file.
# Type: string
tls.clientCertPath: ""
# ClientKeyPath is the path to the client key file.
# Type: string
tls.clientKeyPath: ""
# Enabled is a flag to enable or disable TLS.
# Type: bool
tls.enabled: "false"
# InsecureSkipVerify is a flag to disable server certificate
# verification.
# Type: bool
tls.insecureSkipVerify: "false"
# Maximum delay before an incomplete batch is written to the
# destination.
# Type: duration
sdk.batch.delay: "0"
# Maximum size of batch before it gets written to the destination.
# Type: int
sdk.batch.size: "0"
# Allow bursts of at most X records (0 or less means that bursts are
# not limited). Only takes effect if a rate limit per second is set.
# Note that if `sdk.batch.size` is bigger than `sdk.rate.burst`, the
# effective batch size will be equal to `sdk.rate.burst`.
# Type: int
sdk.rate.burst: "0"
# Maximum number of records written per second (0 means no rate
# limit).
# Type: float
sdk.rate.perSecond: "0"
# The format of the output record. See the Conduit documentation for a
# full list of supported formats
# (https://conduit.io/docs/using/connectors/configuration-parameters/output-format).
# Type: string
sdk.record.format: "opencdc/json"
# Options to configure the chosen output record format. Options are
# normally key=value pairs separated with comma (e.g.
# opt1=val2,opt2=val2), except for the `template` record format, where
# options are a Go template.
# Type: string
sdk.record.format.options: ""
# Whether to extract and decode the record key with a schema.
# Type: bool
sdk.schema.extract.key.enabled: "true"
# Whether to extract and decode the record payload with a schema.
# Type: bool
sdk.schema.extract.payload.enabled: "true"