Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-15448

Streams StandbyTaskUpdateListener

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • None
    • 3.7.0
    • streams

    Description

      KIP-869: https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Update+Listener

      In addition to the new metrics in KIP-869, it would be great to have a callback that allows for monitoring of Standby Task status. The StateRestoreListener is currently not called for Standby Tasks for good reasons (the API wouldn't make sense for Standby). I've attached an interface which would be nice to have:

       

      ```
      public interface StandbyTaskUpdateListener {

      public enum SuspendReason

      { MIGRATED, PROMOTED; }

       
      /**

      • Method called upon the creation of the Standby Task.
        *
      • @param topicPartition the TopicPartition of the Standby Task.
      • @param storeName the name of the store being watched by this Standby Task.
      • @param earliestOffset the earliest offset available on the Changelog topic.
      • @param startingOffset the offset from which the Standby Task starts watching.
      • @param currentEndOffset the current latest offset on the associated changelog partition.
        */
        void onTaskCreated(final TopicPartition topicPartition,
        final String storeName,
        final long earliestOffset
        final long startingOffset,
        final long currentEndOffset);

        /**
      • Method called after restoring a batch of records. In this case the maximum size of the batch is whatever
      • the value of the MAX_POLL_RECORDS is set to.
        *
      • This method is called after restoring each batch and it is advised to keep processing to a minimum.
      • Any heavy processing will hold up recovering the next batch, hence slowing down the restore process as a
      • whole.
        *
      • If you need to do any extended processing or connecting to an external service consider doing so asynchronously.
        *
      • @param topicPartition the TopicPartition containing the values to restore
      • @param storeName the name of the store undergoing restoration
      • @param batchEndOffset the inclusive ending offset for the current restored batch for this TopicPartition
      • @param numRestored the total number of records restored in this batch for this TopicPartition
      • @param currentEndOffset the current end offset of the changelog topic partition.
        */
        void onBatchRestored(final TopicPartition topicPartition,
        final String storeName,
        final long batchEndOffset,
        final long numRestored,
        final long currentEndOffset);

        /**
      • Method called after a Standby Task is closed, either because the task migrated to a new instance or because the task was promoted to an Active task.
        */
        void onTaskSuspended(final TopicPartition topicPartition,
        final String storeName,
        final long storeOffset,
        final long currentEndOffset,
        final SuspendReason reason);
        }
        ```

      Attachments

        Activity

          People

            coltmcnealy-lh Colt McNealy
            coltmcnealy-lh Colt McNealy
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: