Details
-
Bug
-
Status: Resolved
-
P3
-
Resolution: Fixed
-
None
-
None
Description
I tried to use the RabbitMqIO with the direct runner to generate an unbounded PCollection from a queue. I encounter a NPE :
java.lang.NullPointerException
{{ at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement (UnboundedReadEvaluatorFactory.java:169)}}
....
After investigation it looks like it's caused by the fact that no default is given to
checkpointMark.oldestTimestamp. getWatermark() is called before the mutation of the currentTimestamp variable, raising a NPE. I fixed the problem on my side, reimplementing the class and overriding getWatermark to return Instant.now() if checkpointMark.oldestTimestamp is null :
@Override
publicInstantgetWatermark() {
if (checkpointMark.oldestTimestamp == null)Unknown macro: { returnInstant.now(); }return checkpointMark.oldestTimestamp;
}
It looks likes this bug as already been raised here on the PR for RabbitMqIO.
Attachments
Issue Links
- links to