Schema
Since the Iggy server supports a variety of transport protocols, it is important to have a common schema for all of them, that will represent the data in a unified way. Commands (requests), responses, data models, status codes, must be the same for all transports.
Currently, there are 2 ways of data representation: JSON (text) and binary - any serialization format that you prefer such as bincode, SBE, flatbuffers, protobuf or even your custom one, as the server simply expects the raw bytes as the message payload.
The binary format is more compact and efficient, but it is not human-readable - it's being used by TCP and QUIC transports. The JSON format is used by HTTP transport - all the existing endpoints are available in the server.http.
All multi-byte integer fields use little-endian encoding throughout.
Request schema
All the requests are represented as a binary message. The message consists of 3 parts: length, code and payload:
- length - 4-byte integer (u32) which represents the total length: code (4 bytes) + payload (N bytes)
- code - 4-byte integer (u32) which represents the request code
- payload - binary data of N bytes length
For example, if the payload is 100 bytes, the length will have a value of 104 (100 bytes for payload + 4 const bytes for the code). The message as whole will have 108 (4 + 4 + 100) bytes size.
Response schema
All the responses are represented as a binary message. The message consists of 3 parts: status, length and payload:
- status - 4-byte integer (u32) which represents the status code. The status code is 0 for success and any other value for an error.
- length - 4-byte integer (u32) which represents the total length: status (4 bytes) + payload (N bytes)
- payload - binary data of N bytes length
In case of errors, the length will be always equal to 0 and the payload will be empty.
When trying to fetch the resource which may not exist, such as a stream, topic, user etc., the response will have a status code 0 (OK), but the payload will be empty, as there's no data to return.
Request codes
PING = 1
GET_STATS = 10
GET_SNAPSHOT = 11
GET_CLUSTER_METADATA = 12
GET_ME = 20
GET_CLIENT = 21
GET_CLIENTS = 22
GET_USER = 31
GET_USERS = 32
CREATE_USER = 33
DELETE_USER = 34
UPDATE_USER = 35
UPDATE_PERMISSIONS = 36
CHANGE_PASSWORD = 37
LOGIN_USER = 38
LOGOUT_USER = 39
GET_PERSONAL_ACCESS_TOKENS = 41
CREATE_PERSONAL_ACCESS_TOKEN = 42
DELETE_PERSONAL_ACCESS_TOKEN = 43
LOGIN_WITH_PERSONAL_ACCESS_TOKEN = 44
POLL_MESSAGES = 100
SEND_MESSAGES = 101
FLUSH_UNSAVED_BUFFER = 102
GET_CONSUMER_OFFSET = 120
STORE_CONSUMER_OFFSET = 121
DELETE_CONSUMER_OFFSET = 122
GET_STREAM = 200
GET_STREAMS = 201
CREATE_STREAM = 202
DELETE_STREAM = 203
UPDATE_STREAM = 204
PURGE_STREAM = 205
GET_TOPIC = 300
GET_TOPICS = 301
CREATE_TOPIC = 302
DELETE_TOPIC = 303
UPDATE_TOPIC = 304
PURGE_TOPIC = 305
CREATE_PARTITIONS = 402
DELETE_PARTITIONS = 403
DELETE_SEGMENTS = 503
GET_CONSUMER_GROUP = 600
GET_CONSUMER_GROUPS = 601
CREATE_CONSUMER_GROUP = 602
DELETE_CONSUMER_GROUP = 603
JOIN_CONSUMER_GROUP = 604
LEAVE_CONSUMER_GROUP = 605Shared
Identifier
pub struct Identifier {
pub kind: IdKind,
pub length: u8,
pub value: Vec<u8>
}
pub enum IdKind {
Numeric, // Value = 1 (default)
String // Value = 2
}The Identifier is a struct which represents the identifier of a stream, topic, user, or consumer group. It consists of the kind (numeric or string), length (length of the identifier value) and value (actual value of the identifier). The value is a vector of bytes, which can be either a numeric value (e.g. 1 encoded as const 4 bytes for u32 type) or a string (e.g. "my-stream") with variable length of UTF-8 string, maximum 255 bytes (chars).
Serialization
kind- 1 bytelength- 1 bytevalue- N bytes (4 bytes for numeric, 1-255 bytes for string)
Consumer
pub struct Consumer {
pub kind: ConsumerKind,
pub id: Identifier,
}
pub enum ConsumerKind {
Consumer, // Value = 1 (default)
ConsumerGroup // Value = 2
}The Consumer represents the type of the consumer. It can be either a consumer or a consumer group. This type is used when interacting with the messages related commands such as PollMessages, GetConsumerOffset, StoreConsumerOffset and DeleteConsumerOffset.
Serialization
kind- 1 byteid- variable (Identifier: 2 + N bytes)
IggyMessage
pub struct IggyMessage {
pub header: IggyMessageHeader,
pub user_headers: Option<Bytes>,
pub payload: Bytes,
}
pub struct IggyMessageHeader {
pub checksum: u64,
pub id: u128,
pub offset: u64,
pub timestamp: u64,
pub origin_timestamp: u64,
pub user_headers_length: u32,
pub payload_length: u32,
pub reserved: u64,
}Each message consists of a fixed 64-byte header followed by optional user headers and the payload.
Serialization (64-byte header + variable data)
checksum- 8 bytes (u64, xxHash3)id- 16 bytes (u128, UUIDv4)offset- 8 bytes (u64)timestamp- 8 bytes (u64, server-assigned)origin_timestamp- 8 bytes (u64, client-provided)user_headers_length- 4 bytes (u32)payload_length- 4 bytes (u32)reserved- 8 bytes (u64, must be 0)user_headers- N bytes (length fromuser_headers_length)payload- N bytes (length frompayload_length)
PolledMessages
pub struct PolledMessages {
pub partition_id: u32,
pub current_offset: u64,
pub count: u32,
pub messages: Vec<IggyMessage>,
}The PolledMessages is returned by the PollMessages command.
Serialization
partition_id- 4 bytescurrent_offset- 8 bytescount- 4 bytesmessages- N bytes (each message is 64-byte header + variable data)
ConsumerOffsetInfo
Returned by the GetConsumerOffset command.
pub struct ConsumerOffsetInfo {
pub partition_id: u32,
pub current_offset: u64,
pub stored_offset: u64,
}Serialization
partition_id- 4 bytescurrent_offset- 8 bytesstored_offset- 8 bytes
HeaderKey and HeaderValue
pub struct HeaderKey(String);
pub struct HeaderValue {
pub kind: HeaderKind,
pub value: Vec<u8>
}
pub enum HeaderKind {
Raw,
String,
Bool,
Int8,
Int16,
Int32,
Int64,
Int128,
Uint8,
Uint16,
Uint32,
Uint64,
Uint128,
Float32,
Float64
}System
Ping
Code: 1 - Empty payload
Get stats
Code: 10 - Empty payload
Get snapshot
Code: 11 - Empty payload
Get cluster metadata
Code: 12 - Empty payload
Get me
Code: 20 - Empty payload
Get client
pub struct GetClient {
pub client_id: u32
}Code: 21
client_id- 4 bytes
Get clients
Code: 22 - Empty payload
Streams
Get stream
pub struct GetStream {
pub stream_id: Identifier
}Code: 200
stream_id- variable (Identifier)
Get streams
Code: 201 - Empty payload
Create stream
pub struct CreateStream {
pub name: String
}Code: 202
name_length- 1 bytename- 1-255 bytes
Delete stream
pub struct DeleteStream {
pub stream_id: Identifier
}Code: 203
stream_id- variable (Identifier)
Update stream
pub struct UpdateStream {
pub stream_id: Identifier,
pub name: String
}Code: 204
stream_id- variable (Identifier)name_length- 1 bytename- 1-255 bytes
Purge stream
pub struct PurgeStream {
pub stream_id: Identifier
}Code: 205
stream_id- variable (Identifier)
Topics
Get topic
pub struct GetTopic {
pub stream_id: Identifier,
pub topic_id: Identifier
}Code: 300
stream_id- variable (Identifier)topic_id- variable (Identifier)
Get topics
pub struct GetTopics {
pub stream_id: Identifier
}Code: 301
stream_id- variable (Identifier)
Create topic
pub struct CreateTopic {
pub stream_id: Identifier,
pub partitions_count: u32,
pub compression_algorithm: CompressionAlgorithm,
pub message_expiry: IggyExpiry,
pub max_topic_size: MaxTopicSize,
pub replication_factor: Option<u8>,
pub name: String,
}
pub enum CompressionAlgorithm {
None, // Value = 1
Gzip, // Value = 2
Lz4, // Value = 3
Zstd, // Value = 4
}Code: 302
stream_id- variable (Identifier)partitions_count- 4 bytes (u32)compression_algorithm- 1 bytemessage_expiry- 8 bytes (u64, 0 = no expiry)max_topic_size- 8 bytes (u64, 0 = unlimited)replication_factor- 1 byte (0 = none)name_length- 1 bytename- 1-255 bytes
Delete topic
pub struct DeleteTopic {
pub stream_id: Identifier,
pub topic_id: Identifier
}Code: 303
stream_id- variable (Identifier)topic_id- variable (Identifier)
Update topic
pub struct UpdateTopic {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub compression_algorithm: CompressionAlgorithm,
pub message_expiry: IggyExpiry,
pub max_topic_size: MaxTopicSize,
pub replication_factor: Option<u8>,
pub name: String,
}Code: 304
stream_id- variable (Identifier)topic_id- variable (Identifier)compression_algorithm- 1 bytemessage_expiry- 8 bytes (u64)max_topic_size- 8 bytes (u64)replication_factor- 1 bytename_length- 1 bytename- 1-255 bytes
Purge topic
pub struct PurgeTopic {
pub stream_id: Identifier,
pub topic_id: Identifier
}Code: 305
stream_id- variable (Identifier)topic_id- variable (Identifier)
Partitions
Create partitions
pub struct CreatePartitions {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub partitions_count: u32
}Code: 402
stream_id- variable (Identifier)topic_id- variable (Identifier)partitions_count- 4 bytes
Delete partitions
pub struct DeletePartitions {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub partitions_count: u32
}Code: 403
stream_id- variable (Identifier)topic_id- variable (Identifier)partitions_count- 4 bytes
Segments
Delete segments
pub struct DeleteSegments {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub partition_id: u32,
pub segments_count: u32
}Code: 503
stream_id- variable (Identifier)topic_id- variable (Identifier)partition_id- 4 bytessegments_count- 4 bytes
Messages
Poll messages
pub struct PollMessages {
pub consumer: Consumer,
pub stream_id: Identifier,
pub topic_id: Identifier,
pub partition_id: Option<u32>,
pub strategy: PollingStrategy,
pub count: u32,
pub auto_commit: bool,
}
pub struct PollingStrategy {
pub kind: PollingKind,
pub value: u64,
}
pub enum PollingKind {
Offset, // Value = 1 (default)
Timestamp, // Value = 2
First, // Value = 3
Last, // Value = 4
Next, // Value = 5
}Code: 100
consumer- variable (Consumer)stream_id- variable (Identifier)topic_id- variable (Identifier)partition_id- 5 bytes (1 byte flag + 4 bytes u32; flag: 1=Some, 0=None)strategy- 9 bytes (1 byte kind + 8 bytes u64 value)count- 4 bytesauto_commit- 1 byte
Send messages
pub struct SendMessages {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub partitioning: Partitioning,
pub messages: Vec<IggyMessage>
}
pub struct Partitioning {
pub kind: PartitioningKind,
pub length: u8,
pub value: Vec<u8>
}
pub enum PartitioningKind {
Balanced, // Value = 1 (default)
PartitionId, // Value = 2
MessagesKey // Value = 3
}Code: 101
stream_id- variable (Identifier)topic_id- variable (Identifier)partitioning- variable (1 byte kind + 1 byte length + N bytes value)messages- each message: 64-byte header + user_headers + payload
Flush unsaved buffer
pub struct FlushUnsavedBuffer {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub partition_id: u32,
pub fsync: bool,
}Code: 102
stream_id- variable (Identifier)topic_id- variable (Identifier)partition_id- 4 bytesfsync- 1 byte (1=true, 0=false)
Consumer offsets
Get consumer offset
pub struct GetConsumerOffset {
pub consumer: Consumer,
pub stream_id: Identifier,
pub topic_id: Identifier,
pub partition_id: Option<u32>
}Code: 120
consumer- variable (Consumer)stream_id- variable (Identifier)topic_id- variable (Identifier)partition_id- 5 bytes (1 byte flag + 4 bytes u32)
Store consumer offset
pub struct StoreConsumerOffset {
pub consumer: Consumer,
pub stream_id: Identifier,
pub topic_id: Identifier,
pub partition_id: Option<u32>,
pub offset: u64
}Code: 121
consumer- variable (Consumer)stream_id- variable (Identifier)topic_id- variable (Identifier)partition_id- 5 bytes (1 byte flag + 4 bytes u32)offset- 8 bytes
Delete consumer offset
pub struct DeleteConsumerOffset {
pub consumer: Consumer,
pub stream_id: Identifier,
pub topic_id: Identifier,
pub partition_id: Option<u32>
}Code: 122
consumer- variable (Consumer)stream_id- variable (Identifier)topic_id- variable (Identifier)partition_id- 5 bytes (1 byte flag + 4 bytes u32)
Consumer groups
Get consumer group
pub struct GetConsumerGroup {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub group_id: Identifier
}Code: 600
stream_id- variable (Identifier)topic_id- variable (Identifier)group_id- variable (Identifier)
Get consumer groups
pub struct GetConsumerGroups {
pub stream_id: Identifier,
pub topic_id: Identifier
}Code: 601
stream_id- variable (Identifier)topic_id- variable (Identifier)
Create consumer group
pub struct CreateConsumerGroup {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub name: String,
}Code: 602
stream_id- variable (Identifier)topic_id- variable (Identifier)name_length- 1 bytename- 1-255 bytes
Delete consumer group
pub struct DeleteConsumerGroup {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub group_id: Identifier
}Code: 603
stream_id- variable (Identifier)topic_id- variable (Identifier)group_id- variable (Identifier)
Join consumer group
pub struct JoinConsumerGroup {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub group_id: Identifier
}Code: 604
stream_id- variable (Identifier)topic_id- variable (Identifier)group_id- variable (Identifier)
Leave consumer group
pub struct LeaveConsumerGroup {
pub stream_id: Identifier,
pub topic_id: Identifier,
pub group_id: Identifier
}Code: 605
stream_id- variable (Identifier)topic_id- variable (Identifier)group_id- variable (Identifier)
Users
Get user
pub struct GetUser {
pub user_id: Identifier
}Code: 31
user_id- variable (Identifier)
Get users
Code: 32 - Empty payload
Create user
pub struct CreateUser {
pub username: String,
pub password: String,
pub status: u8,
pub permissions: Option<Permissions>,
}Code: 33
username_length- 1 byteusername- 1-255 bytespassword_length- 1 bytepassword- 1-255 bytesstatus- 1 bytehas_permissions- 1 byte (1=present, 0=absent)permissions_length- 4 bytes (u32, always present, 0 when no permissions)permissions- N bytes (only whenhas_permissions=1andpermissions_length > 0)
Delete user
pub struct DeleteUser {
pub user_id: Identifier
}Code: 34
user_id- variable (Identifier)
Update user
pub struct UpdateUser {
pub user_id: Identifier,
pub username: Option<String>,
pub status: Option<u8>,
}Code: 35
user_id- variable (Identifier)has_username- 1 byte (1=present, 0=absent)username_length- 1 byte (only when present)username- N bytes (only when present)has_status- 1 byte (1=present, 0=absent)status- 1 byte (only when present)
Update permissions
pub struct UpdatePermissions {
pub user_id: Identifier,
pub permissions: Option<Permissions>,
}Code: 36
user_id- variable (Identifier)has_permissions- 1 byte (1=present, 0=absent)permissions_length- 4 bytes (u32, always present)permissions- N bytes (only whenhas_permissions=1)
Change password
pub struct ChangePassword {
pub user_id: Identifier,
pub current_password: String,
pub new_password: String,
}Code: 37
user_id- variable (Identifier)current_password_length- 1 bytecurrent_password- N bytesnew_password_length- 1 bytenew_password- N bytes
Authentication
Login user
pub struct LoginUser {
pub username: String,
pub password: String,
pub version: Option<String>,
pub context: Option<String>,
}Code: 38
username_length- 1 byteusername- N bytespassword_length- 1 bytepassword- N bytesversion_length- 4 bytes (u32, 0 = absent)version- N bytes (only whenversion_length > 0)context_length- 4 bytes (u32, 0 = absent)context- N bytes (only whencontext_length > 0)
Logout user
Code: 39 - Empty payload
Personal access tokens
Get personal access tokens
Code: 41 - Empty payload
Create personal access token
pub struct CreatePersonalAccessToken {
pub name: String,
pub expiry: u64,
}Code: 42
name_length- 1 bytename- 1-255 bytesexpiry- 8 bytes (u64, 0 = no expiry)
Delete personal access token
pub struct DeletePersonalAccessToken {
pub name: String,
}Code: 43
name_length- 1 bytename- 1-255 bytes
Login with personal access token
pub struct LoginWithPersonalAccessToken {
pub token: String,
}Code: 44
token_length- 1 bytetoken- 1-255 bytes