Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16201

Kafka exception - org.apache.kafka.common.errors.NotLeaderOrFollowerException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Blocker
    • Resolution: Unresolved
    • 3.6.1
    • None
    • core
    • None
    • AWS EKS

    Description

      I am deploying Kafka inside Kubernetes cluster in HA mode (multiple brokers). The deployment consists of

      Kubernetes
      Kafka 3.6.1
      Refer to the following files used in the deployment

      Dockerfile

       

      FROM eclipse-temurin:17.0.9_9-jdk-jammy
      
      ENV KAFKA_VERSION=3.6.1
      ENV SCALA_VERSION=2.13
      ENV KAFKA_HOME=/opt/kafka
      ENV PATH=${PATH}:${KAFKA_HOME}/bin
      
      LABEL name="kafka" version=${KAFKA_VERSION}
      
      RUN wget -O /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
       && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt \
       && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
       && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME} \
       && rm -rf /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
      
      COPY ./entrypoint.sh /
      RUN ["chmod", "+x", "/entrypoint.sh"]
      ENTRYPOINT ["/entrypoint.sh"] 

       

       

      entrypoint.sh

       

      #!/bin/bash
      
      NODE_ID=${HOSTNAME:6}
      LISTENERS="SASL://:9092,CONTROLLER://:9093,INTERNAL://:29092"
      
      ADVERTISED_LISTENERS="SASL://kraft-$NODE_ID:9092,INTERNAL://kafka-$NODE_ID.$SERVICE.$NAMESPACE.svc.cluster.local:29092"
      
      CONTROLLER_QUORUM_VOTERS=""
      for i in $( seq 0 $REPLICAS); do
          if [[ $i != $REPLICAS ]]; then
              CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@kafka-$i.$SERVICE.$NAMESPACE.svc.cluster.local:9093,"
          else
              CONTROLLER_QUORUM_VOTERS=${CONTROLLER_QUORUM_VOTERS::-1}
          fi
      done
      
      mkdir -p $SHARE_DIR/$NODE_ID
      
      if [[ ! -f "$SHARE_DIR/cluster_id" && "$NODE_ID" = "0" ]]; then
          CLUSTER_ID=$(kafka-storage.sh random-uuid)
          echo $CLUSTER_ID > $SHARE_DIR/cluster_id
      else
          CLUSTER_ID=$(cat $SHARE_DIR/cluster_id)
      fi
      
      sed -e "s+^node.id=.*+node.id=$NODE_ID+" \
      -e "s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+" \
      -e "s+^listeners=.*+listeners=$LISTENERS+" \
      -e "s+^advertised.listeners=.*+advertised.listeners=$ADVERTISED_LISTENERS+" \
      -e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \
      /opt/kafka/config/kraft/server.properties > server.properties.updated \
      && mv server.properties.updated /opt/kafka/config/kraft/server.properties
      
      JAAS="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\" user_admin=\"admin-secret\";"
      
      echo -e "\nlistener.name.sasl.plain.sasl.jaas.config=${JAAS}" >> /opt/kafka/config/kraft/server.properties
      echo -e "\nsasl.enabled.mechanisms=PLAIN" >> /opt/kafka/config/kraft/server.properties
      echo -e "\nlistener.security.protocol.map=SASL:SASL_PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT" >> /opt/kafka/config/kraft/server.properties
      echo -e "\ninter.broker.listener.name=INTERNAL" >> /opt/kafka/config/kraft/server.properties
      
      kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/server.properties
      
      exec kafka-server-start.sh /opt/kafka/config/kraft/server.properties 

       

       

      Kafka.yaml

      apiVersion: v1
      kind: Namespace
      metadata:
        name: kafka-kraft
      ---
      apiVersion: v1
      kind: PersistentVolume
      metadata:
        name: kafka-pv-volume
        labels:
          type: local
      spec:
        storageClassName: manual
        capacity:
          storage: 10Gi
        accessModes:
          - ReadWriteOnce
        hostPath:
          path: '/mnt/data'
      ---
      apiVersion: v1
      kind: PersistentVolumeClaim
      metadata:
        name: kafka-pv-claim
        namespace: kafka-kraft
      spec:
        storageClassName: manual
        accessModes:
          - ReadWriteOnce
        resources:
          requests:
            storage: 3Gi
      ---
      apiVersion: v1
      kind: Service
      metadata:
        name: kafka-svc
        labels:
          app: kafka-app
        namespace: kafka-kraft
      spec:
        clusterIP: None
        ports:
          - name: '9092'
            port: 9092
            protocol: TCP
            targetPort: 9092
        selector:
          app: kafka-app
      ---
      apiVersion: apps/v1
      kind: StatefulSet
      metadata:
        name: kafka
        labels:
          app: kafka-app
        namespace: kafka-kraft
      spec:
        serviceName: kafka-svc
        replicas: 5
        selector:
          matchLabels:
            app: kafka-app
        template:
          metadata:
            labels:
              app: kafka-app
          spec:
            volumes:
              - name: kafka-storage
                persistentVolumeClaim:
                  claimName: kafka-pv-claim
            containers:
              - name: kafka-container
                image: myimage/kafka-kraft:1.0
                ports:
                  - containerPort: 9092
                  - containerPort: 9093
                env:
                  - name: REPLICAS
                    value: '5'
                  - name: SERVICE
                    value: kafka-svc
                  - name: NAMESPACE
                    value: kafka-kraft
                  - name: SHARE_DIR
                    value: /mnt/kafka
                volumeMounts:
                  - name: kafka-storage
                    mountPath: /mnt/kafka 

      After the deployment all the containers are up and running. I then connect the broker using following command

       

      .\kafka-topics.bat --bootstrap-server kraft-0:9092,kraft-1:9092,kraft-2:9092,kraft-3:9092,kraft-4:9092 --command-config producer.properties --topic hello --create --replication-factor 5 

       

       

      producer.properties

       

      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=admin password=admin-secret;
      security.protocol=SASL_PLAINTEXT
      sasl.mechanism=PLAIN
      metadata.max.age.ms=1000 

       

       

      A prompt is displayed to enter a messag. Upon a sample text it throws following error.

      [Producer clientId=console-producer] Received invalid metadata error in produce request on partition hello2-1 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now

       

      What I have tried so far

      • Tried zookeeper and kraft mode
      • Tried deleting and recreating the topics (This works randomly)

      Unfortunately, the problem persists and not able to produce messages.

      Attachments

        Activity

          People

            Unassigned Unassigned
            carbonrider Yogesh
            Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: