Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-11016

Serialize AwsCredentialsProvider for AWS SDK v2 IO connectors

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.24.0
    • 2.27.0
    • io-java-aws
    • None

    Description

      tclemons@tutanota.com reported in email thread:
       
      I've been attempting to migrate to the AWS2 SDK (version 2.24.0) on Apache Spark 2.4.7. However, when switching over to the new API and running it I keep getting the following exceptions:

       2020-09-30 20:38:32,428 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 6, 10.42.180.18, executor 0): java.lang.NullPointerException at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.lambda$new$e39e7721$1(SqsUnboundedSource.java:41)
       
      at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$MemoizingSupplier.get(Suppliers.java:129)
       
      at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource.getSqs(SqsUnboundedSource.java:74)
       
      at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.pull(SqsUnboundedReader.java:165)
       
      at org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader.advance(SqsUnboundedReader.java:108)
       
      at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:246)
       
      at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:224)
       
      at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
       
      at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
       
      at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
       
      at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
       
      at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
       

       
      Examining the source of SqsUnboundedSource reveals a lambda where it's trying to chain a few references:

          read.sqsClientProvider().getSqsClient()
      

       
      Which is odd as I explicitly set the client provider on the read transform.  This was working well enough with the old SqsIO API to connect and process messages off the queue.

      Attachments

        Issue Links

          Activity

            People

              aromanenko Alexey Romanenko
              aromanenko Alexey Romanenko
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 40m
                  40m