The different types of Kafka clients

Ambiyansyah Risyal
2 min readDec 15, 2022

--

Photo by mauro mora on Unsplash

Kafka clients can be broadly classified into three types: producers, consumers, and streams.

Producers are the clients that publish messages to Kafka topics. Here is an example of a Kafka producer in Go using the kafka-go library:

package main

import (
"context"
"log"
"time"

"github.com/segmentio/kafka-go"
)

func main() {
// Create a new producer
producer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
Balancer: &kafka.LeastBytes{},
})

// Publish messages to the topic
for i := 0; i < 10; i++ {
message := kafka.Message{
Key: []byte(fmt.Sprintf("key-%d", i)),
Value: []byte(fmt.Sprintf("value-%d", i)),
Time: time.Now(),
}
if err := producer.WriteMessages(context.Background(), message); err != nil {
log.Fatal(err)
}
}

// Close the producer
if err := producer.Close(); err != nil {
log.Fatal(err)
}
}

Consumers are the clients that consume messages from Kafka topics. Here is an example of a Kafka consumer in Go using the kafka-go library:

package main

import (
"context"
"log"

"github.com/segmentio/kafka-go"
)

func main() {
// Create a new consumer
consumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: "my-group",
Topic: "my-topic",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})

// Read messages from the topic
for {
message, err := consumer.ReadMessage(context.Background())
if err != nil {
log.Fatal(err)
}

// Process the message
log.Printf("message at offset %d: key = %s, value = %s", message.Offset, string(message.Key), string(message.Value))
}

// Close the consumer
if err := consumer.Close(); err != nil {
log.Fatal(err)
}
}

Kafka streams are a client library for building applications and microservices that use Apache Kafka as the underlying data backbone. Here is an example of a Kafka stream in Go using the kafka-go library:

package main

import (
"context"
"log"

"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/kafkastream"
)

func main() {
// Create a new stream
stream := kafkastream.New(kafkastream.Config{
Brokers: []string{"localhost:9092"},
})

// Create a source topic
sourceTopic := kafka.Topic{
Name: "my-source-topic",
}

// Create a sink topic
sinkTopic := kafka.Topic{
Name: "my-sink-topic",
}

// Create a stream pipeline
pipeline, err := stream.NewPipeline(
sourceTopic,
kafkastream.MapValue(func(value []byte) []byte {
// Process the value
return value
}),
sinkTopic,
)
if err != nil {
log.Fatal(err)
}

// Start the pipeline
if err := pipeline.Start(context.Background()); err != nil {
log.Fatal(err)
}

// Stop the pipeline
if err := pipeline.Stop(); err != nil {
log.Fatal(err)
}
}

In this example, the pipeline reads messages from the “my-source-topic” topic, processes the value of each message, and then writes the processed messages to the “my-sink-topic” topic.

--

--

Ambiyansyah Risyal
Ambiyansyah Risyal

Written by Ambiyansyah Risyal

Software engineer. Lover of learning and creating. Sharing thoughts and experiences on tech and software development. Always seeking new ideas and techniques.

No responses yet