Implementing TLS and SASL Connection in Kafka Using Go (Golang)

Ambiyansyah Risyal
2 min readJan 14, 2023
Photo by King's Church International on Unsplash

Introduction

Apache Kafka is a distributed streaming platform that is widely used for building real-time data pipelines and streaming applications. It allows producers to send data to Kafka topics, and consumers to read data from those topics.

In a production environment, it is important to secure the communication between Kafka producers, consumers, and brokers. This can be achieved by using Transport Layer Security (TLS) and Simple Authentication and Security Layer (SASL).

TLS is a protocol that provides secure communication over a network. It uses encryption and authentication to protect data from being intercepted or modified by unauthorized parties. SASL is a framework that allows for the negotiation of authentication mechanisms between client and server.

In this article, we will see how to implement TLS and SASL connection in Kafka using Go (golang).

Prerequisites

Before we start, make sure you have the following prerequisites:

  • Go 1.14 or higher
  • Apache Kafka 2.8.0 or higher
  • A valid SSL certificate for your Kafka broker
  • A SASL username and password for your Kafka broker

Installing Go Libraries

We will be using the following Go libraries for this tutorial:

  • “github.com/segmentio/kafka-go” for Kafka connectivity
  • “golang.org/x/crypto/tls” for TLS support
  • “golang.org/x/text/encoding/charmap” for SASL support

To install these libraries, run the following command:

go get github.com/segmentio/kafka-go golang.org/x/crypto/tls golang.org/x/text/encoding/charmap

Connecting to Kafka Broker with TLS

To connect to a Kafka broker with TLS, we need to configure the TLS settings in the Kafka client. Here is an example of how to do it:

package main

import (
"context"
"crypto/tls"
"fmt"
"log"

"github.com/segmentio/kafka-go"
)

func main() {
// Load the TLS certificate
cert, err := tls.LoadX509KeyPair("cert.pem", "key.pem")
if err != nil {
log.Fatal(err)
}

// Create a TLS configuration
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
// Set the required TLS version
MinVersion: tls.VersionTLS12,
}

// Create a Kafka client
client := kafka.NewClient(kafka.ClientConfig{
Brokers: []string{"broker1:9093", "broker2:9093"},
TLS: tlsConfig,
TLSConfig: tlsConfig,
})
defer client.Close()

// Connect to the Kafka broker
err = client.Connect(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Println("Connected to Kafka broker with TLS")
}

--

--

Ambiyansyah Risyal

Software engineer. Lover of learning and creating. Sharing thoughts and experiences on tech and software development. Always seeking new ideas and techniques.