Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
The server side currently handles the LIST_OFFSETS request process as follows:
KafkaApis.handleListOffsetRequest() -> KafkaApis.handleListOffsetRequestV1AndAbove() -> ReplicaManager.fetchOffsetForTimestamp() -> Partition.fetchOffsetForTimestamp()
In the last method above, it is obvious that when the client side does not pass the isolationLevel value, the server side supports returning localLog.logEndOffset.
val lastFetchableOffset = isolationLevel match { case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark case None => localLog.logEndOffset }
KafkaAdminClient is an operation and maintenance management tool, which should be different from the listOffsets-related methods (offsetsForTimes, beginningOffsets, endOffsets) provided by KafkaConsumer, and it should not be limited by the value of isolationLevel in the ListOffsetsOptions parameter.
In the current KafkaAdminClient.listOffsets() method, both the AdminClient and the server consider isolationLevel as a required parameter:
1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException will be thrown when AdminClient executes listOffsets() method.
ListOffsetsRequest.Builder(...) -> isolationLevel.id()
2) The current logic for converting isolationLevel on the server side has not yet handled the case where the user passes in a value that is neither READ_UNCOMMITTED nor READ_COMMITTED :
val isolationLevelOpt = if (isClientRequest) Some(offsetRequest.isolationLevel) else None
public IsolationLevel isolationLevel() { return IsolationLevel.forId(data.isolationLevel()); }
Suggestion:
Added a new enum `NONE` in IsolationLevel, only dedicated to AdminClient.listOffsets() method.
This change may cause the highestSupportedVersion of ApiMessageType.LIST_OFFSETS to increase by one.