The different types of Kafka clients
Kafka clients can be broadly classified into three types: producers, consumers, and streams.
Producers are the clients that publish messages to Kafka topics. Here is an example of a Kafka producer in Go using the kafka-go library:
package main
import (
"context"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
// Create a new producer
producer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
Balancer: &kafka.LeastBytes{},
})
// Publish messages to the topic
for i := 0; i < 10; i++ {
message := kafka.Message{
Key: []byte(fmt.Sprintf("key-%d", i)),
Value: []byte(fmt.Sprintf("value-%d", i)),
Time: time.Now(),
}
if err := producer.WriteMessages(context.Background(), message); err != nil {
log.Fatal(err)
}
}
// Close the producer
if err := producer.Close(); err != nil {
log.Fatal(err)
}
}
Consumers are the clients that consume messages from Kafka topics. Here is an example of a Kafka consumer in Go using the kafka-go library:
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
// Create a new consumer
consumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: "my-group",
Topic: "my-topic",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
// Read messages from the topic
for {
message, err := consumer.ReadMessage(context.Background())
if err != nil {
log.Fatal(err)
}
// Process the message
log.Printf("message at offset %d: key = %s, value = %s", message.Offset, string(message.Key), string(message.Value))
}
// Close the consumer
if err := consumer.Close(); err != nil {
log.Fatal(err)
}
}
Kafka streams are a client library for building applications and microservices that use Apache Kafka as the underlying data backbone. Here is an example of a Kafka stream in Go using the kafka-go library:
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/kafkastream"
)
func main() {
// Create a new stream
stream := kafkastream.New(kafkastream.Config{
Brokers: []string{"localhost:9092"},
})
// Create a source topic
sourceTopic := kafka.Topic{
Name: "my-source-topic",
}
// Create a sink topic
sinkTopic := kafka.Topic{
Name: "my-sink-topic",
}
// Create a stream pipeline
pipeline, err := stream.NewPipeline(
sourceTopic,
kafkastream.MapValue(func(value []byte) []byte {
// Process the value
return value
}),
sinkTopic,
)
if err != nil {
log.Fatal(err)
}
// Start the pipeline
if err := pipeline.Start(context.Background()); err != nil {
log.Fatal(err)
}
// Stop the pipeline
if err := pipeline.Stop(); err != nil {
log.Fatal(err)
}
}
In this example, the pipeline reads messages from the “my-source-topic” topic, processes the value of each message, and then writes the processed messages to the “my-sink-topic” topic.