Pi Cluster Documentation
Pi Cluster Documentation

Kafka

Kafka Cluster installation

For the installation Strimzi kafka operator will be used to deploy a Kafka cluster.

Strimzi Operator installation procedure using Helm

Installation using Helm (Release 3):

  • Step 1: Add the Strimzi Helm repository:

    helm repo add strimzi https://strimzi.io/charts/
    
  • Step2: Fetch the latest charts from the repository:

    helm repo update
    
  • Step 3: Create namespace

    kubectl create namespace kafka
    
  • Step 3: Install Strimzi kafka operator

    helm install strimzi-kafka-operator strimzi/strimzi-kafka-operator --namespace kafka
    
  • Step 4: Confirm that the deployment succeeded, run:

    kubectl -n kafka get pod
    

Deploy Kafka cluster

Using Strimzi operator, Kafka cluster in KRaft mode can be deployed.

  • Step 1: Create a Kafka cluster of 3 nodes with dual roles (Kraft controller/ Kafka broker),configure its storage (5gb) and create a Kafka cluster from that pool broker using a specific Kafka release.

    ---
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: dual-role
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      replicas: 3
      roles:
        - controller
        - broker
      storage:
        type: jbod
        volumes:
          - id: 0
            type: persistent-claim
            size: 5Gi
            class: longhorn
            deleteClaim: false
            kraftMetadata: shared
    ---
    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
      annotations:
        strimzi.io/node-pools: enabled
        strimzi.io/kraft: enabled
    spec:
      kafka:
        version: 4.0.0
        metadataVersion: 4.0-IV3
        listeners:
          - name: plain
            port: 9092
            type: internal
            tls: false
          - name: tls
            port: 9093
            type: internal
            tls: true
        config:
          offsets.topic.replication.factor: 3
          transaction.state.log.replication.factor: 3
          transaction.state.log.min.isr: 2
          default.replication.factor: 3
          min.insync.replicas: 2
      entityOperator:
        topicOperator: {}
        userOperator: {}
    
  • Step 2: Apply manifest

    kubectl apply -f manifest.yml
    
  • Step 3: Check Kafka status

    kubectl get kafka -n kafka
    NAME         DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS   READY   WARNINGS
    my-cluster   3                        3                     True
    

Create Topic

  • Step 1: Create a manifest file topic.yaml

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: my-topic
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 1
      replicas: 3
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
    
  • Step 2: Apply manifest

    kubectl apply -f topic.yml
    
  • Step 3: Check Kafka topic status

    kubectl get kafkatopic my-topic -n kafka
    NAME       CLUSTER      PARTITIONS   REPLICATION FACTOR   READY
    my-topic   my-cluster   1            1                    True
    

Testing Kafka cluster

Once the cluster is running, you can run a simple producer to send messages to a Kafka topic (the topic will be automatically created).

  • Step 1: launch producer
    kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.29.0-kafka-3.2.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic
    
  • Step 3: in a different terminal launch consumer
    kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.29.0-kafka-3.2.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning
    
  • Step 5: In producer terminal wait for the prompt and start typing messages. (Input Control-C to finish)

    Messages will be outputed in consumer terminal.

Observability

Metrics

Kafka can be configured to generate Prometheus metrics using an external exporters (Prometheus JMX Exporter and Kafka Exporter).

Prometheus JMX Exporter

Prometheus JMX Exporter is a collector of JMX metrics and exposes them via HTTP for Prometheus consumption.

Kafka generates JMX metrics that can be collected by Prometheus JMX Exporter. The JMX Exporter runs as a Java agent within the Kafka broker process, exposing JMX metrics on an HTTP endpoint.

Prometheus Kafka Exporter

Kafka exposed metrics via JMX are not sufficient to monitor Kafka brokers and clients.

Kafka Exporter is an open source project to enhance monitoring of Apache Kafka brokers and clients. It collects and exposes additional metrics related to Kafka consumer groups, consumer lags, topics, partitions, and offsets.

Strimzi Operator

Strimzi provides built-in support for JMX Exporter and Kafka Exporter.

When creating Kafka Cluster using Strimzi Operator, JMX Exporter and Kafka Exporter can be enabled by adding the following configuration to the Kafka manifest file:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: cluster
spec:
  kafka:
    # ...
    # Configure JMX Exporter
    metricsConfig:
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: kafka-metrics
          key: kafka-metrics-config.yml
  # Enable Kafka Exporter
  kafkaExporter:
    topicRegex: ".*"
    groupRegex: ".*"

Prometheus JMX Exporter configuration file must be provided in a ConfigMap named kafka-metrics in the same namespace as the Kafka cluster. This files contains rules for mapping JMX metrics to Prometheus metrics. The following is a sample confguration provided by Strimzi that can be used as a starting point:

kind: ConfigMap
apiVersion: v1
metadata:
  name: kafka-metrics
  labels:
    app: strimzi
data:
  kafka-metrics-config.yml: |
    # See https://github.com/prometheus/jmx_exporter for more info about JMX Prometheus Exporter metrics
    lowercaseOutputName: true
    rules:
    # Special cases and very specific rules
    - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
      name: kafka_server_$1_$2
      type: GAUGE
      labels:
        clientId: "$3"
        topic: "$4"
        partition: "$5"
    - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
      name: kafka_server_$1_$2
      type: GAUGE
      labels:
        clientId: "$3"
        broker: "$4:$5"
    - pattern: kafka.server<type=(.+), cipher=(.+), protocol=(.+), listener=(.+), networkProcessor=(.+)><>connections
      name: kafka_server_$1_connections_tls_info
      type: GAUGE
      labels:
        cipher: "$2"
        protocol: "$3"
        listener: "$4"
        networkProcessor: "$5"
    - pattern: kafka.server<type=(.+), clientSoftwareName=(.+), clientSoftwareVersion=(.+), listener=(.+), networkProcessor=(.+)><>connections
      name: kafka_server_$1_connections_software
      type: GAUGE
      labels:
        clientSoftwareName: "$2"
        clientSoftwareVersion: "$3"
        listener: "$4"
        networkProcessor: "$5"
    - pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+-total):"
      name: kafka_server_$1_$4
      type: COUNTER
      labels:
        listener: "$2"
        networkProcessor: "$3"
    - pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+):"
      name: kafka_server_$1_$4
      type: GAUGE
      labels:
        listener: "$2"
        networkProcessor: "$3"
    - pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+-total)
      name: kafka_server_$1_$4
      type: COUNTER
      labels:
        listener: "$2"
        networkProcessor: "$3"
    - pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+)
      name: kafka_server_$1_$4
      type: GAUGE
      labels:
        listener: "$2"
        networkProcessor: "$3"
    # Some percent metrics use MeanRate attribute
    # Ex) kafka.server<type=(KafkaRequestHandlerPool), name=(RequestHandlerAvgIdlePercent)><>MeanRate
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>MeanRate
      name: kafka_$1_$2_$3_percent
      type: GAUGE
    # Generic gauges for percents
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>Value
      name: kafka_$1_$2_$3_percent
      type: GAUGE
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*, (.+)=(.+)><>Value
      name: kafka_$1_$2_$3_percent
      type: GAUGE
      labels:
        "$4": "$5"
    # Generic per-second counters with 0-2 key/value pairs
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
    # Generic gauges with 0-2 key/value pairs
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
    # Emulate Prometheus 'Summary' metrics for the exported 'Histogram's.
    # Note that these are missing the '_sum' metric!
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        "$6": "$7"
        quantile: "0.$8"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        quantile: "0.$6"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        quantile: "0.$4"
    # KRaft overall related metrics
    # distinguish between always increasing COUNTER (total and max) and variable GAUGE (all others) metrics
    - pattern: "kafka.server<type=raft-metrics><>(.+-total|.+-max):"
      name: kafka_server_raftmetrics_$1
      type: COUNTER
    - pattern: "kafka.server<type=raft-metrics><>(current-state): (.+)"
      name: kafka_server_raftmetrics_$1
      value: 1
      type: UNTYPED
      labels:
        $1: "$2"
    - pattern: "kafka.server<type=raft-metrics><>(.+):"
      name: kafka_server_raftmetrics_$1
      type: GAUGE
    # KRaft "low level" channels related metrics
    # distinguish between always increasing COUNTER (total and max) and variable GAUGE (all others) metrics
    - pattern: "kafka.server<type=raft-channel-metrics><>(.+-total|.+-max):"
      name: kafka_server_raftchannelmetrics_$1
      type: COUNTER
    - pattern: "kafka.server<type=raft-channel-metrics><>(.+):"
      name: kafka_server_raftchannelmetrics_$1
      type: GAUGE
    # Broker metrics related to fetching metadata topic records in KRaft mode
    - pattern: "kafka.server<type=broker-metadata-metrics><>(.+):"
      name: kafka_server_brokermetadatametrics_$1
      type: GAUGE

Once the Kafka cluster is deployed with JMX Exporter and Kafka Exporter enabled, Prometheus can be configured to scrape metrics from the exporters.

If Kube-Prometheus-Stack is installed in the cluster, Prometheus can be configured to scrape metrics from Kafka brokers and exporters by creating a PodMonitor resources.

The following resources can be created for scraping metrics from all PODs that are created by Strimzi (Kafka, KafkaConnect, KafkaMirrorMaker) are created in the Kafka namespace:

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: cluster-operator-metrics
  labels:
    app: strimzi
spec:
  selector:
    matchLabels:
      strimzi.io/kind: cluster-operator
  namespaceSelector:
    matchNames:
      - kafka
  podMetricsEndpoints:
  - path: /metrics
    port: http
---
apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: entity-operator-metrics
  labels:
    app: strimzi
spec:
  selector:
    matchLabels:
      app.kubernetes.io/name: entity-operator
  namespaceSelector:
    matchNames:
      - kafka
  podMetricsEndpoints:
  - path: /metrics
    port: healthcheck
---
apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: bridge-metrics
  labels:
    app: strimzi
spec:
  selector:
    matchLabels:
      strimzi.io/kind: KafkaBridge
  namespaceSelector:
    matchNames:
      - kafka
  podMetricsEndpoints:
  - path: /metrics
    port: rest-api
---
apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: kafka-resources-metrics
  labels:
    app: strimzi
spec:
  selector:
    matchExpressions:
      - key: "strimzi.io/kind"
        operator: In
        values: ["Kafka", "KafkaConnect", "KafkaMirrorMaker2"]
  namespaceSelector:
    matchNames:
      - kafka
  podMetricsEndpoints:
  - path: /metrics
    port: tcp-prometheus
    relabelings:
    - separator: ;
      regex: __meta_kubernetes_pod_label_(strimzi_io_.+)
      replacement: $1
      action: labelmap
    - sourceLabels: [__meta_kubernetes_namespace]
      separator: ;
      regex: (.*)
      targetLabel: namespace
      replacement: $1
      action: replace
    - sourceLabels: [__meta_kubernetes_pod_name]
      separator: ;
      regex: (.*)
      targetLabel: kubernetes_pod_name
      replacement: $1
      action: replace
    - sourceLabels: [__meta_kubernetes_pod_node_name]
      separator: ;
      regex: (.*)
      targetLabel: node_name
      replacement: $1
      action: replace
    - sourceLabels: [__meta_kubernetes_pod_host_ip]
      separator: ;
      regex: (.*)
      targetLabel: node_ip
      replacement: $1
      action: replace

Grafana Dashboards

If Grafana’s dynamic provisioning of dashboard is configured, Kafka grafana dashboard is automatically deployed by Strimzi Operator Helm chart when providing the following values:

dashboards:
  enabled: true
  label: grafana_dashboard # this is the default value from the grafana chart
  labelValue: "1" # this is the default value from the grafana chart
  # Annotations to specify the Grafana folder
  annotations:
    grafana_folder: Strimzi
  extraLabels: {}

Helm chart will deploy a dahsboard in a kubernetes ConfigMap that Grafana can dynamically load and add into “Strimzi” folder.

Schema Registry

A schema defines the structure of message data. It defines allowed data types, their format, and relationships. A schema acts as a blueprint for data, describing the structure of data records, the data types of individual fields, the relationships between fields, and any constraints or rules that apply to the data.

Kafka Schema Registry is a component in the Apache Kafka ecosystem that provides a centralized schema management service for Kafka producers and consumers. It allows producers to register schemas for the data they produce, and consumers to retrieve and use these schemas for data validation and deserialization. The Schema Registry helps ensure that data exchanged through Kafka is compliant with a predefined schema, enabling data consistency, compatibility, and evolution across different systems and applications.

Schema-Registry

Schema-Registry-and-Kafka

When using Avro or other schema format, it is critical to manage schemas and evolve them thoughtfully. Schema compatibility checking is enabled in Kafka Schema Registry by versioning every single schema and comparing new schemas to previous versions. The type of compatibility required (backward, forward, full, none, etc) determines how Kafka Schema Registry evaluates each new schema. New schemas that fail compatibility checks are removed from service.

Some key benefits of using Kafka Schema Registry include:

  • Schema Evolution: As data formats and requirements evolve over time, it is common for producers and consumers to undergo changes to their data schemas. Kafka Schema Registry provides support for schema evolution, allowing producers to register new versions of schemas while maintaining compatibility with existing consumers. Consumers can retrieve the appropriate schema version for deserialization, ensuring that data is processed correctly even when schema changes occur.

  • Data Validation: Kafka Schema Registry enables data validation by allowing producers to register schemas with predefined data types, field names, and other constraints. Consumers can then retrieve and use these schemas to validate incoming data, ensuring that data conforms to the expected structure and format. This helps prevent data processing errors and improves data quality. Schema Management: Kafka Schema Registry provides a centralized repository for managing schemas, making it easier to track, version, and manage changes. Producers and consumers can register, retrieve and manage schemas through a simple API, allowing for centralized schema governance and management.

  • Interoperability: Kafka Schema Registry promotes interoperability between different producers and consumers by providing a standardized way to define and manage data schemas. Producers and consumers written in different programming languages or using different serialization frameworks can use a common schema registry to ensure data consistency and compatibility across the ecosystem.

  • Backward and Forward Compatibility: Kafka Schema Registry allows producers to register backward and forward compatible schemas, enabling smooth upgrades and changes to data schemas without disrupting existing producers and consumers. Backward compatibility ensures that older consumers can still process data produced with a newer schema, while forward compatibility allows newer consumers to process data produced with an older schema.

Deploying Schema Registry

Official confluent docker images for Schema Registry can be installed using helm chart maintained by the community. Confluent official docker images support multiarchitecture (x86/ARM). However, this helm chart is quite old and it seems not to be maintaned any more (last update: 2 years ago).

Install Bitnami packaged Schema registry

Confluent Schema Registry packaged by Bitnami is keept up to date and it supports multi-architecture docker images.

  • Step 1: Prepare schema-registry-values.yaml:

    kafka:
      enabled: false
    auth:
      protocol: {}
    service:
      ports:
        client: {}
    externalKafka:
      brokers:
        - PLAINTEXT://my-cluster-kafka-bootstrap:9092
    
  • Step 2: Install bitnami schema registry:

    helm install schema-registry oci://registry-1.docker.io/bitnamicharts/schema-registry -f schema-registry-values.yml --namespace kafka
    
  • Step 3: Check schema registry started

    kubectl logs kafka-schema-registry-0 schema-registry -n kafka
    
    [2023-08-19 09:06:38,783] INFO HV000001: Hibernate Validator 6.1.7.Final (org.hibernate.validator.internal.util.Version:21)
    [2023-08-19 09:06:39,019] INFO Started o.e.j.s.ServletContextHandler@7e94d093{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:921)
    [2023-08-19 09:06:39,029] INFO Started o.e.j.s.ServletContextHandler@270b6b5e{/ws,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:921)
    [2023-08-19 09:06:39,040] INFO Started NetworkTrafficServerConnector@660acfb{HTTP/1.1, (http/1.1, h2c)}{0.0.0.0:8081} (org.eclipse.jetty.server.AbstractConnector:333)
    [2023-08-19 09:06:39,041] INFO Started @6514ms (org.eclipse.jetty.server.Server:415)
    [2023-08-19 09:06:39,041] INFO Schema Registry version: 7.4.1 commitId: 8969f9f38b043ca55d4e97536b6bcb5ccc54f42f (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:47)
    [2023-08-19 09:06:39,042] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:49)
    

Testing Schema Registy

Once the cluster is running, you can run a producer and a consumer using Avro messages stored in Schema Registry.

  • Step 1: launch producer
    kubectl -n kafka run kafka-producer -ti --image=ricsanfre/kafka-python-client:latest --rm=true --restart=Never -- python avro_producer.py -b my-cluster-kafka-bootstrap:9092 -s http://kafka-schema-registry:8081 -t my-avro-topic
    

    Enter required fields for building the message

  • Step 3: in a different terminal launch consumer
    kubectl -n kafka run kafka-consumer -ti --image=ricsanfre/kafka-python-client:latest --rm=true --restart=Never -- python avro_consumer.py -b my-cluster-kafka-bootstrap:9092 -s http://kafka-schema-registry:8081 -t my-avro-topic
    

Kafka UI (Kafdrop)

Kafdrop is a web UI for viewing Kafka topics and browsing consumer groups. The tool displays information such as brokers, topics, partitions, consumers, and lets you view messages.

  • Step 1: Add the Helm repository:

    helm repo add ricsanfre https://ricsanfre.github.io/helm-charts/
    
  • Step 2: Fetch the latest charts from the repository:

    helm repo update
    
  • Step 3: Prepare kafdrop-values.yml

    # Kafka broker connection
    kafka:
      brokerConnect: my-cluster-kafka-bootstrap:9092ç
    # JVM options
    jvm:
      opts: "-Xms32M -Xmx64M"
    
    # Adding connection to schema registry
    cmdArgs: "--schemaregistry.connect=http://kafka-schema-registry:8081"
    
    # Ingress resource
    ingress:
      enabled: true
      ## Add ingressClassName to the Ingress
      ingressClassName: nginx
      # ingress host
      hosts:
        - kafdrop.${CLUSTER_DOMAIN}
      ## TLS Secret Name
      tls:
        - secretName: kafdrop-tls
          hosts:
            - kafdrop.${CLUSTER_DOMAIN}
      ## Default ingress path
      path: /
      ## Ingress annotations
      annotations:
        # Enable cert-manager to create automatically the SSL certificate and store in Secret
        # Possible Cluster-Issuer values:
        #   * 'letsencrypt-issuer' (valid TLS certificate using IONOS API)
        #   * 'ca-issuer' (CA-signed certificate, not valid)
        cert-manager.io/cluster-issuer: letsencrypt-issuer
        cert-manager.io/common-name: kafdrop.${CLUSTER_DOMAIN}
    
    # Kafdrop docker images are not multi-arch. Only amd64 image is available
    affinity:
      nodeAffinity:
        requiredDuringSchedulingIgnoredDuringExecution:
          nodeSelectorTerms:
          - matchExpressions:
            - key: kubernetes.io/arch
              operator: In
              values:
              - amd64
    

    Kafdrop is configured to use Schema Registry, so messages can be decoded when Schema Registry is used. See helm chart value cmdArgs:

    • --schemaregistry.connect=http://kafka-schema-registry:8081
  • Step 4: Install Kafdrop helm chart

    helm upgrade -i kafdrop ricsanfre/kafdrop -f kafdrop-values.yml --namespace kafka
    
  • Step 4: Confirm that the deployment succeeded, opening UI:

    https://kafdrop.picluster.ricsanfre.com/

References


Last Update: Aug 28, 2025

Comments: