Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12225

Replace AWS API used to list shards from DescribeStream to ListShards

Details

    • Improvement
    • Status: Triage Needed
    • P2
    • Resolution: Fixed
    • 2.27.0
    • 2.32.0
    • io-java-kinesis
    • None

    Description

      We use Google Dataflow with Apache Beam to read data from AWS Kinesis streams. We started experiencing problems with the GetShardIterator API that is used to establish the initial set of active shards to read from a given timestamp.

      Some of the shards unexpectedly threw AmazonKinesisExceptions (with error code “InternalFailure”) during an attempt to get an iterator for them, which caused failures of our Dataflow jobs. It resulted in us looking for potential solutions for this problem.

      According to our knowledge, AWS lately modified the way GetShardIterator API acts. Instead of throwing exceptions AWS right now will return an iterator even for closed shards. Following description of this behavior is available in the documentation:
      “If the shard is closed, GetShardIterator returns a valid iterator for the last sequence number of the shard. A shard can be closed as a result of using SplitShard or MergeShards.”

      Because of this change Beam will try to read closed shard, find no records and try to read its children. If children were created a long time in the past the data to read from Kinesis stream might be significantly moved back in time and Beam will not start processing data from the expected starting point. This causes unnecessary delay and additional costs for processing the data.

      In the current implementation, validation of shards using GetShardsIterator is required, because DescribeStream API, used for listing them, returns all shards - not only opened, but closed ones as well. AWS recommends to use ListShards API for such use cases. It eliminates not only the problem of thrown exceptions but reduces time required to list valid shards. Additional issue with DescribeStream is the low transaction limit. It is only 10 transactions per second per account, whereas ListShards limit is 100 transactions per second per data stream.

      Use of ListShards API instead of DescribeStream API:

      • reduces time required to start reading data (it should address problem with long-lasting pipeline creation: BEAM-9759),
      • allows for more transactions per second,
      • introduces API for listing shards recommended by AWS,
      • eliminates the problem with data being read from unexpectedly old shards children.

      Potential issues with ListShards API:

      • according to the documentation ListShards API for fine-grained IAM policy might require an update to that policy

      Attachments

        Issue Links

          Activity

            People

              Rafał Ochyra Rafał Ochyra
              Rafał Ochyra Rafał Ochyra
              Votes:
              2 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 3h
                  3h