ConnectorsSinks
MongoDB Sink
The MongoDB sink connector writes messages from Iggy streams to a MongoDB database. Each message is inserted as a document into the configured collection.
Configuration
type = "sink"
key = "mongodb-sink"
enabled = true
version = 1
name = "MongoDB Sink"
path = "/path/to/libmongodb_sink.so"
verbose = false
[[streams]]
stream = "my-stream"
topics = ["my-topic"]
schema = "json"
batch_length = 100
poll_interval = "1s"
consumer_group = "mongodb-sink-group"
[plugin_config]
connection_uri = "mongodb://localhost:27017"
database = "my_database"
collection = "my_collection"
batch_size = 100
include_metadata = true
include_checksum = true
include_origin_timestamp = true
payload_format = "json"
auto_create_collection = true
verbose_logging = false
max_retries = 3
retry_delay = "1s"Plugin config options
| Option | Type | Default | Description |
|---|---|---|---|
connection_uri | string | required | MongoDB connection string |
database | string | required | Target database name |
collection | string | required | Target collection name |
batch_size | u32 | 100 | Number of documents to insert per batch |
include_metadata | bool | true | Include message metadata (offset, timestamp) in documents |
include_checksum | bool | true | Include message checksum |
include_origin_timestamp | bool | true | Include client-provided timestamp |
payload_format | string | "json" | How to store the payload ("json" or "bytea") |
auto_create_collection | bool | true | Create the collection if it doesn't exist |
verbose_logging | bool | false | Enable detailed logging |
max_retries | u32 | 3 | Max retry attempts on failure |
retry_delay | string | "1s" | Delay between retries |
Transforms
You can apply transforms before writing to MongoDB. See the transforms documentation for details.
[transforms.add_fields]
source = "iggy"
processed_at = { type = "timestamp_millis" }
trace_id = { type = "uuid_v7" }
[transforms.delete_fields]
fields = ["internal_field"]