Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13857

The listOffsets method of KafkaAdminClient should support returning logEndOffset of topicPartition

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • admin
    • None

    Description

      The server side currently handles the LIST_OFFSETS request process as follows:

      KafkaApis.handleListOffsetRequest() ->
      
      KafkaApis.handleListOffsetRequestV1AndAbove() ->
      
      ReplicaManager.fetchOffsetForTimestamp() ->
      
      Partition.fetchOffsetForTimestamp()

       

      In the last method above, it is obvious that when the client side does not pass the isolationLevel value, the server side supports returning localLog.logEndOffset.

      val lastFetchableOffset = isolationLevel match {
        case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
        case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
        case None => localLog.logEndOffset
      } 
      
      

       

       

      KafkaAdminClient is an operation and maintenance management tool, which should be different from the listOffsets-related methods (offsetsForTimes, beginningOffsets, endOffsets) provided by KafkaConsumer, and it should not be limited by the value of isolationLevel in the ListOffsetsOptions parameter.

      In the current KafkaAdminClient.listOffsets() method, both the AdminClient and the server consider isolationLevel as a required parameter:
      1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException will be thrown when AdminClient executes listOffsets() method.

      ListOffsetsRequest.Builder(...) -> isolationLevel.id()

      2) The current logic for converting isolationLevel on the server side has not yet handled the case where the user passes in a value that is neither READ_UNCOMMITTED nor READ_COMMITTED :

      val isolationLevelOpt = if (isClientRequest)
        Some(offsetRequest.isolationLevel)
      else
        None 
      public IsolationLevel isolationLevel() {
          return IsolationLevel.forId(data.isolationLevel());
      } 

       

      Suggestion:

      Added a new enum `NONE` in IsolationLevel, only dedicated to AdminClient.listOffsets() method.
      This change may cause the highestSupportedVersion of ApiMessageType.LIST_OFFSETS to increase by one.

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            RivenSun RivenSun
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: