Details
-
New Feature
-
Status: Open
-
P3
-
Resolution: Unresolved
-
None
-
None
-
None
Description
I am trying to use the RedisIO connector with Redis cluster but it looks like the Jedis client that RedisIO uses only works on a standalone Redis server, not on a cluster. I get this error when trying to read from Redis:
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: redis.clients.jedis.exceptions.JedisMovedDa taException: MOVED 15000 172.16.2.3:6379 at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) at com.oracle.quanta.RedisToAtp.run(RedisToAtp.java:196) at com.oracle.quanta.RedisToAtp.main(RedisToAtp.java:54) Caused by: redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 15000 172.16.2.3:6379 at redis.clients.jedis.Protocol.processError(Protocol.java:116) at redis.clients.jedis.Protocol.process(Protocol.java:166) at redis.clients.jedis.Protocol.read(Protocol.java:220) at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:278) at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:230) at redis.clients.jedis.Connection.getMultiBulkReply(Connection.java:224) at redis.clients.jedis.Jedis.mget(Jedis.java:474) at org.apache.beam.sdk.io.redis.RedisIO$ReadFn.fetchAndFlush(RedisIO.java:517) at org.apache.beam.sdk.io.redis.RedisIO$ReadFn.finishBundle(RedisIO.java:500)
This is the code that I use:
PCollection<Event> events =
pipeline
/*
* Step #1: Read from Redis.
*/
.apply("Read Redis KV Store", RedisIO.read()
.withEndpoint(redisHost, 6379)
.withKeyPattern(redisKeyPattern))
Is there a way to configure RedisIO to work with a cluster? I would have expected it to use JedisCluster when working with Redis in cluster mode but from https://github.com/apache/beam/blob/master/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java it appears that it only uses the standalone Jedis client.