SDKGo
Examples
Getting started
A complete getting-started example is available in the examples/go directory. It demonstrates connecting to the server, creating streams and topics, sending messages, and consuming them.
Producer
package main
import (
"fmt"
"time"
"github.com/apache/iggy/foreign/go/client"
"github.com/apache/iggy/foreign/go/client/tcp"
iggcon "github.com/apache/iggy/foreign/go/contracts"
)
var (
StreamId = uint32(0)
TopicId = uint32(0)
PartitionId = uint32(0)
)
func main() {
// Create the client with TCP transport
cli, err := client.NewIggyClient(
client.WithTcp(tcp.WithServerAddress("127.0.0.1:8090")),
)
if err != nil {
panic(err)
}
defer cli.Close()
// Log in
if _, err := cli.LoginUser("iggy", "iggy"); err != nil {
panic(err)
}
// Create a stream
if _, err := cli.CreateStream("sample-stream"); err != nil {
panic(err)
}
// Create a topic with 1 partition, no compression, no expiry
streamIdentifier, _ := iggcon.NewIdentifier(StreamId)
if _, err := cli.CreateTopic(
streamIdentifier,
"sample-topic",
1, // partitions count
iggcon.CompressionAlgorithmNone,
iggcon.IggyExpiryNeverExpire,
0, // max topic size
nil, // replication factor
); err != nil {
panic(err)
}
// Send messages in a loop
partitioning := iggcon.PartitionId(PartitionId)
for i := 1; i <= 10; i++ {
payload := fmt.Sprintf("message-%d", i)
message, _ := iggcon.NewIggyMessage([]byte(payload))
streamIdentifier, _ := iggcon.NewIdentifier(StreamId)
topicIdentifier, _ := iggcon.NewIdentifier(TopicId)
if err := cli.SendMessages(
streamIdentifier,
topicIdentifier,
partitioning,
[]iggcon.IggyMessage{message},
); err != nil {
panic(err)
}
time.Sleep(500 * time.Millisecond)
}
}Consumer
package main
import (
"fmt"
"time"
"github.com/apache/iggy/foreign/go/client"
"github.com/apache/iggy/foreign/go/client/tcp"
iggcon "github.com/apache/iggy/foreign/go/contracts"
)
var (
StreamID = uint32(0)
TopicID = uint32(0)
PartitionID = uint32(0)
)
func main() {
// Create the client with TCP transport
cli, err := client.NewIggyClient(
client.WithTcp(tcp.WithServerAddress("127.0.0.1:8090")),
)
if err != nil {
panic(err)
}
defer cli.Close()
// Log in
if _, err := cli.LoginUser("iggy", "iggy"); err != nil {
panic(err)
}
// Poll messages in a loop
offset := uint64(0)
consumer := iggcon.DefaultConsumer()
for {
streamIdentifier, _ := iggcon.NewIdentifier(StreamID)
topicIdentifier, _ := iggcon.NewIdentifier(TopicID)
pollMessages, err := cli.PollMessages(
streamIdentifier,
topicIdentifier,
consumer,
iggcon.OffsetPollingStrategy(offset),
10, // messages per batch
false, // auto-commit
&PartitionID,
)
if err != nil {
panic(err)
}
if len(pollMessages.Messages) == 0 {
time.Sleep(500 * time.Millisecond)
continue
}
offset += uint64(len(pollMessages.Messages))
for _, message := range pollMessages.Messages {
fmt.Printf("Offset: %d, Payload: %s\n",
message.Header.Offset, string(message.Payload))
}
}
}For the full source code, see the examples/go directory.