Introduction
The highly performant and modular runtime for statically typed, yet dynamically loaded connectors. Ingest the data from the external sources and push it further to the Iggy streams, or fetch the data from the Iggy streams and push it further to the external sources. Create your own Rust plugins by simply implementing either the Source or Sink trait and build custom pipelines for the data processing.
The docker image is available, and can be fetched via docker pull apache/iggy-connect.
Architecture
Connectors are dynamically loaded shared libraries (.so on Linux, .dylib on macOS, .dll on Windows) loaded at runtime via dlopen2. Data crossing the FFI boundary between the runtime and plugins is serialized using postcard, a compact binary format. Each plugin receives its own Tokio runtime inside the SDK, ensuring isolation.
Data Flow
The connector runtime operates in two directions - source (ingest) and sink (egress):
Source Flow (Ingest)
External systems push data into Iggy streams via source plugins
add_fields, delete_fields, filter_fields, update_fieldsSink Flow (Egress)
Iggy streams forward data to external systems via sink plugins
add_fields, delete_fields, filter_fields, update_fieldsKey details:
- Transforms run inside the runtime process, not inside the plugins. They are applied after decoding for sources (before sending to Iggy) and after consuming from Iggy for sinks (before forwarding to the external system).
- State persistence is file-based (using MessagePack serialization) and applies to source connectors only, tracking the last polled position.
- Consumer groups handle offset tracking for sink connectors automatically, so sinks do not need to manage their own offsets.
Available Connectors
| Type | Connectors |
|---|---|
| Source | PostgreSQL, Elasticsearch, Random |
| Sink | PostgreSQL, MongoDB, Elasticsearch, Quickwit, Apache Iceberg, Stdout |
Transforms
The following built-in transforms are available:
- add_fields - add new fields to the message payload.
- delete_fields - remove existing fields from the message payload.
- filter_fields - keep only the specified fields, removing everything else.
- update_fields - update existing fields with computed values such as
timestamp_millisoruuid_v7.
Supported Schemas
Messages can be decoded and encoded using the following formats: JSON, Raw bytes, Text, Protocol Buffers, and FlatBuffers.
Features
- High Performance: Utilizes Rust's performance characteristics to ensure fast data ingestion and egress.
- Low memory footprint: Designed with memory efficiency in mind, minimizing the memory footprint of the connectors.
- Modular Design: Designed with modularity in mind, allowing for easy extension and customization.
- Dynamic Loading: Supports dynamic loading of plugins, allowing integration with various data sources and sinks at runtime.
- Statically Typed: Ensures type safety and compile-time checks, reducing runtime errors.
- Easy Customization: Provides a simple interface for implementing custom connectors, making it easy to create new plugins.
- Data transformation: Supports data transformation with the help of existing functions.
- Powerful configuration: Define your sinks, sources, and transformations in the configuration file or fetch them from a remote HTTP API.
- Flexible configuration providers: Support for local file-based and HTTP-based configuration providers for centralized configuration management.
Quick Start
-
Build the project in release mode (or debug, and update the connectors paths in the config accordingly), and make sure that the plugins specified in
core/connectors/runtime/example_config/connectors/directory underpathare available. The configuration must be provided intomlformat. -
Run
docker compose up -dfrom/examples/rust/src/sink-data-producerwhich will start the Quickwit server to be used by an example sink connector. At this point, you can access the Quickwit UI at http://localhost:7280 - check this dashboard again later on, after theeventsindex will be created. -
Set environment variable
IGGY_CONNECTORS_CONFIG_PATH=core/connectors/runtime/example_config/config.toml(adjust the path as needed) pointing to the runtime configuration file. -
Start the Iggy server and invoke the following commands via Iggy CLI to create the example streams and topics used by the sample connectors.
iggy --username iggy --password iggy stream create example_stream iggy --username iggy --password iggy topic create example_stream example_topic 1 none 1d iggy --username iggy --password iggy stream create qw iggy --username iggy --password iggy topic create qw records 1 none 1d -
Execute
cargo run --example sink-data-producer -rwhich will start the example data producer application, sending the messages to previously createdqwstream andrecordstopic (this will be used by the Quickwit sink connector). -
Start the connector runtime
cargo run --bin iggy-connectors -r- you should be able to browse Quickwit UI with records being constantly added to theeventsindex. At the same time, you should see the new messages being added to theexamplestream andtopic1topic by the test source connector - you can use Iggy Web UI to browse the data. The messages will have applied the basic fields transformations.
Configuration
The same rules applies when it comes to overringing the configuration via environment variables as for the main Iggy server.
You can provide the following envs during the runtime startup:
IGGY_CONNECTORS_ENV_PATH - path to the .env file for custom environment variables
IGGY_CONNECTORS_CONFIG_PATH - path to the connectors runtime configuration fileAny configuration section can be overriden with IGGY_CONNECTORS_ prefix, followed by the section name and the key name, e.g. IGGY_CONNECTORS_IGGY_USERNAME.
Runtime
All the connectors are implemented as Rust libraries and can be used as a part of the connector runtime. The runtime is responsible for managing the lifecycle of the connectors and providing the necessary infrastructure for the connectors to run. For more information, please refer to the runtime documentation.
Sink
Sinks are responsible for consuming the messages from the configured stream(s) and topic(s) and sending them further to the specified destination. For example, the Quickwit sink connector is responsible for sending the messages to the Quickwit indexer.
Please refer to the Sink documentation for the details about the configuration and the sample implementation.
When implementing Sink, make sure to use the sink_connector! macro to expose the FFI interface and allow the connector runtime to register the sink with the runtime.
Each sink should have its own, custom configuration, which is passed along with the unique plugin ID via expected new() method.
Source
Sources are responsible for producing the messages to the configured stream(s) and topic(s). For example, the Test source connector will generate the random messages that will be then sent to the configured stream and topic.
Please refer to the Source documentation for the details about the configuration and the sample implementation.
Building the connectors
New connector can be built simply by implementing either Sink or Source trait. Please check the sink or source documentation, as well as the existing examples under /sinks and /sources directories.
Transformations
Field transformations (depending on the supported payload formats) can be applied to the messages either before they are sent to the specified topic (e.g. when produced by the source connectors), or before consumed by the sink connectors. To add the new transformation, simply implement the Transform trait and extend the existing load function. Each transform may have its own, custom configuration.
To find out more about the transforms, stream decoders or encoders, please refer to the SDK documentation.