Apache Iggy
SDKGo

Examples

Getting started

A complete getting-started example is available in the examples/go directory. It demonstrates connecting to the server, creating streams and topics, sending messages, and consuming them.

Producer

package main

import (
	"fmt"
	"time"

	"github.com/apache/iggy/foreign/go/client"
	"github.com/apache/iggy/foreign/go/client/tcp"
	iggcon "github.com/apache/iggy/foreign/go/contracts"
)

var (
	StreamId    = uint32(0)
	TopicId     = uint32(0)
	PartitionId = uint32(0)
)

func main() {
	// Create the client with TCP transport
	cli, err := client.NewIggyClient(
		client.WithTcp(tcp.WithServerAddress("127.0.0.1:8090")),
	)
	if err != nil {
		panic(err)
	}
	defer cli.Close()

	// Log in
	if _, err := cli.LoginUser("iggy", "iggy"); err != nil {
		panic(err)
	}

	// Create a stream
	if _, err := cli.CreateStream("sample-stream"); err != nil {
		panic(err)
	}

	// Create a topic with 1 partition, no compression, no expiry
	streamIdentifier, _ := iggcon.NewIdentifier(StreamId)
	if _, err := cli.CreateTopic(
		streamIdentifier,
		"sample-topic",
		1,                                  // partitions count
		iggcon.CompressionAlgorithmNone,
		iggcon.IggyExpiryNeverExpire,
		0,                                  // max topic size
		nil,                                // replication factor
	); err != nil {
		panic(err)
	}

	// Send messages in a loop
	partitioning := iggcon.PartitionId(PartitionId)
	for i := 1; i <= 10; i++ {
		payload := fmt.Sprintf("message-%d", i)
		message, _ := iggcon.NewIggyMessage([]byte(payload))

		streamIdentifier, _ := iggcon.NewIdentifier(StreamId)
		topicIdentifier, _ := iggcon.NewIdentifier(TopicId)
		if err := cli.SendMessages(
			streamIdentifier,
			topicIdentifier,
			partitioning,
			[]iggcon.IggyMessage{message},
		); err != nil {
			panic(err)
		}
		time.Sleep(500 * time.Millisecond)
	}
}

Consumer

package main

import (
	"fmt"
	"time"

	"github.com/apache/iggy/foreign/go/client"
	"github.com/apache/iggy/foreign/go/client/tcp"
	iggcon "github.com/apache/iggy/foreign/go/contracts"
)

var (
	StreamID    = uint32(0)
	TopicID     = uint32(0)
	PartitionID = uint32(0)
)

func main() {
	// Create the client with TCP transport
	cli, err := client.NewIggyClient(
		client.WithTcp(tcp.WithServerAddress("127.0.0.1:8090")),
	)
	if err != nil {
		panic(err)
	}
	defer cli.Close()

	// Log in
	if _, err := cli.LoginUser("iggy", "iggy"); err != nil {
		panic(err)
	}

	// Poll messages in a loop
	offset := uint64(0)
	consumer := iggcon.DefaultConsumer()

	for {
		streamIdentifier, _ := iggcon.NewIdentifier(StreamID)
		topicIdentifier, _ := iggcon.NewIdentifier(TopicID)
		pollMessages, err := cli.PollMessages(
			streamIdentifier,
			topicIdentifier,
			consumer,
			iggcon.OffsetPollingStrategy(offset),
			10,     // messages per batch
			false,  // auto-commit
			&PartitionID,
		)
		if err != nil {
			panic(err)
		}

		if len(pollMessages.Messages) == 0 {
			time.Sleep(500 * time.Millisecond)
			continue
		}

		offset += uint64(len(pollMessages.Messages))
		for _, message := range pollMessages.Messages {
			fmt.Printf("Offset: %d, Payload: %s\n",
				message.Header.Offset, string(message.Payload))
		}
	}
}

For the full source code, see the examples/go directory.

On this page