Details
Description
Here is the summary of the issue I am experiencing when using kinesis direct URL for consuming data using spark.
Kinesis direct URL: https://kinesis-ae1.hdw.r53.deap.tv (Failing with Credential should be scoped to a valid region, not 'ae1')
Kinesis default URL: https://kinesis.us-east-1.amazonaws.com (Working)
Spark code for consuming data
SparkAWSCredentials credentials = commonService.getSparkAWSCredentials(kinApp.propConfig);
KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
.streamingContext(jssc)
.checkpointAppName(applicationName)
.streamName(streamName)
.endpointUrl(endpointURL)
.regionName(regionName)
.initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(initPosition))
.checkpointInterval(checkpointInterval)
.kinesisCredentials(credentials)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2()).build();
Spark version 2.4.4
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.13.3</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.747</version>
</dependency>
The spark application works fine when I use default URL but fails when I change to direct URL with below error. The direct URL works when I try to publish to direct kinesis URL. Issue only when I try to consume data.
2020-03-24 08:43:40,650 ERROR - Caught exception while sync'ing Kinesis shards and leases
com.amazonaws.services.kinesis.model.AmazonKinesisException: Credential should be scoped to a valid region, not 'ae1'. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidSignatureException; Request ID: fb43b636-8ce2-ec77-adb7-a8ead9e038c2)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1383)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1359)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
at com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1557)
at com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528)
at com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.listShards(KinesisProxy.java:326)
at com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.getShardList(KinesisProxy.java:441)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer.getShardList(KinesisShardSyncer.java:349)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer.syncShardLeases(KinesisShardSyncer.java:159)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShardSyncer.checkAndCreateLeasesForNewShards(KinesisShardSyncer.java:112)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask.call(ShardSyncTask.java:84)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:683)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:614)
at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:191)