SDK
SDK provides the commonly used structs and traits such as Sink
and Source
, along with the sink_connector
and source_connector
macros to be used when developing connectors.
Moreover, it contains both, the decoders
and encoders
modules, implementing either StreamDecoder
or StreamEncoder
traits, which are used when consuming or producing data from/to Iggy streams.
SDK is WiP, and it'd certainly benefit from having the support of multiple format schemas, such as Protobuf, Avro, Flatbuffers etc. including decoding/encoding the data between the different formats (when applicable) and supporting the data transformations whenever possible (easy for JSON, but complex for Bincode for example).
Last but not least, the different transforms
are available, to transform (add, update, delete etc.) the particular fields of the data being processed via external configuration. It's as simple as adding a new transform to the transforms
section of the particular connector configuration:
[sources.random.transforms.add_fields]
enabled = true
[[sources.random.transforms.add_fields.fields]]
key = "message"
value.static = "hello"
Protocol Buffers Support
The SDK includes support for Protocol Buffers (protobuf) format with both encoding and decoding capabilities. Protocol Buffers provide efficient serialization and are particularly useful for high-performance data streaming scenarios.
Configuration Example
Here's a complete example configuration for using Protocol Buffers with Iggy connectors:
[iggy]
address = "localhost:8090"
username = "iggy"
password = "iggy"
[sources.protobuf_source]
enabled = true
name = "Protobuf Source"
path = "target/release/libiggy_connector_protobuf_source"
[[sources.protobuf_source.streams]]
stream = "protobuf_stream"
topic = "protobuf_topic"
schema = "proto"
batch_size = 1000
send_interval = "5ms"
[sources.protobuf_source.config]
schema_path = "schemas/message.proto"
message_type = "com.example.Message"
use_any_wrapper = true
[sinks.protobuf_sink]
enabled = true
name = "Protobuf Sink"
path = "target/release/libiggy_connector_protobuf_sink"
[[sinks.protobuf_sink.streams]]
stream = "protobuf_stream"
topic = "protobuf_topic"
schema = "proto"
[[sinks.protobuf_sink.transforms]]
type = "proto_convert"
target_format = "json"
preserve_structure = true
field_mappings = { "old_field" = "new_field", "legacy_id" = "id" }
[[sinks.protobuf_sink.transforms]]
type = "proto_convert"
target_format = "proto"
preserve_structure = false
Key Configuration Options
Source Configuration
schema_path
: Path to the.proto
file containing message definitionsmessage_type
: Fully qualified name of the protobuf message type to useuse_any_wrapper
: Whether to wrap messages ingoogle.protobuf.Any
for type safety
Transform Options
proto_convert
: Transform for converting between protobuf and other formatstarget_format
: Target format for conversion (json
,proto
,text
)preserve_structure
: Whether to preserve the original message structure during conversionfield_mappings
: Mapping of field names for transformation (e.g.,"old_field" = "new_field"
)
Supported Features
- Encoding: Convert JSON, Text, and Raw data to protobuf format
- Decoding: Parse protobuf messages into JSON format with type information
- Transforms: Convert between protobuf and other formats (JSON, Text)
- Field Mapping: Transform field names during format conversion
- Any Wrapper: Support for
google.protobuf.Any
message wrapper
Programmatic Usage
Dynamic Schema Loading
You can load or reload schemas programmatically:
use iggy_connector_sdk::decoders::proto::{ProtoStreamDecoder, ProtoConfig};
use std::path::PathBuf;
let mut decoder = ProtoStreamDecoder::new(ProtoConfig {
schema_path: None,
use_any_wrapper: true,
..Default::default()
});
let config_with_schema = ProtoConfig {
schema_path: Some(PathBuf::from("schemas/user.proto")),
message_type: Some("com.example.User".to_string()),
..Default::default()
};
match decoder.update_config(config_with_schema, true) {
Ok(()) => println!("Schema loaded successfully"),
Err(e) => eprintln!("Failed to load schema: {}", e),
}
Schema Registry Integration
use iggy_connector_sdk::encoders::proto::{ProtoStreamEncoder, ProtoEncoderConfig};
let mut encoder = ProtoStreamEncoder::new_with_config(ProtoEncoderConfig {
schema_registry_url: Some("http://schema-registry:8081".to_string()),
message_type: Some("com.example.Event".to_string()),
use_any_wrapper: false,
..Default::default()
});
if let Err(e) = encoder.load_schema() {
eprintln!("Schema reload failed: {}", e);
}
Creating Converters with Schema
use iggy_connector_sdk::transforms::proto_convert::{ProtoConvert, ProtoConvertConfig};
use iggy_connector_sdk::Schema;
use std::collections::HashMap;
use std::path::PathBuf;
let converter = ProtoConvert::new(ProtoConvertConfig {
source_format: Schema::Proto,
target_format: Schema::Json,
schema_path: Some(PathBuf::from("schemas/user.proto")),
message_type: Some("com.example.User".to_string()),
field_mappings: Some(HashMap::from([
("user_id".to_string(), "id".to_string()),
("full_name".to_string(), "name".to_string()),
])),
..ProtoConvertConfig::default()
});
let mut converter_with_manual_loading = ProtoConvert::new(ProtoConvertConfig::default());
if let Err(e) = converter_with_manual_loading.load_schema() {
eprintln!("Manual schema loading failed: {}", e);
}
Usage Notes
- Automatic Loading: Schemas are loaded automatically when
schema_path
ordescriptor_set
is provided in config - Manual Loading: Use
load_schema()
method for dynamic schema loading or reloading - Error Handling: Schema loading errors are handled gracefully with fallback to Any wrapper mode
- Immutable Design: Converters are created with fixed configuration - create new instances for different schemas
- When
use_any_wrapper
is enabled, messages are wrapped ingoogle.protobuf.Any
for better type safety - The
proto_convert
transform can be used to convert protobuf messages to JSON for easier processing - Field mappings allow you to rename fields during format conversion
- Protocol Buffers provide efficient binary serialization compared to JSON