Apache Iggy
ConnectorsSinks

MongoDB Sink

The MongoDB sink connector writes messages from Iggy streams to a MongoDB database. Each message is inserted as a document into the configured collection.

Configuration

type = "sink"
key = "mongodb-sink"
enabled = true
version = 1
name = "MongoDB Sink"
path = "/path/to/libmongodb_sink.so"
verbose = false

[[streams]]
stream = "my-stream"
topics = ["my-topic"]
schema = "json"
batch_length = 100
poll_interval = "1s"
consumer_group = "mongodb-sink-group"

[plugin_config]
connection_uri = "mongodb://localhost:27017"
database = "my_database"
collection = "my_collection"
batch_size = 100
include_metadata = true
include_checksum = true
include_origin_timestamp = true
payload_format = "json"
auto_create_collection = true
verbose_logging = false
max_retries = 3
retry_delay = "1s"

Plugin config options

OptionTypeDefaultDescription
connection_uristringrequiredMongoDB connection string
databasestringrequiredTarget database name
collectionstringrequiredTarget collection name
batch_sizeu32100Number of documents to insert per batch
include_metadatabooltrueInclude message metadata (offset, timestamp) in documents
include_checksumbooltrueInclude message checksum
include_origin_timestampbooltrueInclude client-provided timestamp
payload_formatstring"json"How to store the payload ("json" or "bytea")
auto_create_collectionbooltrueCreate the collection if it doesn't exist
verbose_loggingboolfalseEnable detailed logging
max_retriesu323Max retry attempts on failure
retry_delaystring"1s"Delay between retries

Transforms

You can apply transforms before writing to MongoDB. See the transforms documentation for details.

[transforms.add_fields]
source = "iggy"
processed_at = { type = "timestamp_millis" }
trace_id = { type = "uuid_v7" }

[transforms.delete_fields]
fields = ["internal_field"]

On this page