Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-4000

Processing late tuples from BaseWindowedBolt results in serialization exception

    XMLWordPrintableJSON

Details

    Description

      I am developing an Apache Storm (v2.5.0) topology that reads events from a spout (BaseRichSpout), counts the number of events in tumbling windows (BaseWindowedBolt), and prints the count (BaseRichBolt). The topology works fine, but there are some out-of-order events in my dataset. The BaseWindowedBolt provides withLateTupleStream method to route late events to a separate stream. However, when I try to process late events, I get a serialization exception:

      Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap
      Note: To register this class use: kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class);
      Serialization trace:
      defaultResources (org.apache.storm.task.WorkerTopologyContext)
      context (org.apache.storm.tuple.TupleImpl)
          at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-4.0.2.jar:?]
          at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
          at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) ~[kryo-4.0.2.jar:?]
          at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) ~[kryo-4.0.2.jar:?]
          at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
          at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[kryo-4.0.2.jar:?]
          at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-4.0.2.jar:?]
          at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-4.0.2.jar:?]
          at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) ~[kryo-4.0.2.jar:?]
          at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) ~[storm-client-2.5.0.jar:2.5.0]
          at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) ~[storm-client-2.5.0.jar:2.5.0]
          at org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613) ~[storm-client-2.5.0.jar:2.5.0]
          at org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101) ~[storm-client-2.5.0.jar:2.5.0]
          at org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:66) ~[storm-client-2.5.0.jar:2.5.0]
          at org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36) ~[storm-client-2.5.0.jar:2.5.0]
          at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) ~[storm-client-2.5.0.jar:2.5.0]
          at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.5.0.jar:2.5.0]
          at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
          at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
          at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) ~[storm-client-2.5.0.jar:2.5.0]
          at org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313) ~[storm-client-2.5.0.jar:2.5.0]
          at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.5.0.jar:2.5.0]
          at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.5.0.jar:2.5.0]
          ... 6 more

      I have defined my topology as below:
       
       

      public class TestTopology {
          public static void main (String[] args) throws Exception {
              Config config = new Config();
              config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
              config.registerSerialization(TupleImpl.class);
              config.registerSerialization(WorkerTopologyContext.class);
              config.registerSerialization(Fields.class);
              LocalCluster cluster = new LocalCluster();
      
              try (LocalCluster.LocalTopology topology = cluster.submitTopology("testTopology", config, getTopology().createTopology())) {
                  Thread.sleep(50000);}
              cluster.shutdown();
          }
      
          static TopologyBuilder getTopology(){
              TopologyBuilder builder = new TopologyBuilder();
              builder.setSpout("eventSpout", new LateEventSpout());
              builder.setBolt("windowBolt", new WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)).
                              withTimestampField("time").
                              withLateTupleStream("lateEvents")).
                              shuffleGrouping("eventSpout");
              builder.setBolt("latePrintBolt", new LatePrintBolt()).
                              shuffleGrouping("windowBolt", "lateEvents");
              builder.setBolt("printBolt", new PrintBolt()).shuffleGrouping("windowBolt");
              return builder;
          }
      }

      Where `LateEventSpout` is

      public class LateEventSpout extends BaseRichSpout {
      
          private SpoutOutputCollector collector;
          private List<Long> eventTimes;
          private int currentTime = 0;
          private int id = 1;
      
          public LateEventSpout () {
              eventTimes = new ArrayList<>();
              for (int i = 1; i<= 61; i++) {
                  eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli());
              } // eventTimes = [epoch+1, epoch+2, .., epoch+61]
          }
      
          @Override
          public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
              this.collector = collector;
          }
      
          @Override
          public void nextTuple() {
              int eventId = id++;
              Long eventTime = eventTimes.get(currentTime++);
              if (currentTime == eventTimes.size()){
                  currentTime = 0;
              }
              collector.emit(new Values(eventId, eventTime));
          }
          @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
              declarer.declare(new Fields("id", "time"));
          }
      } 

      And `WindowBolt` is:

      public class WindowBolt extends BaseWindowedBolt {
      
          OutputCollector collector;
      
          @Override
          public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector){
              this.collector = collector;
          }
      
          @Override
          public void execute(TupleWindow inputWindow) {
              int sum = 0;
              for (Tuple event : inputWindow.get()){
                  sum++;
              }
              collector.emit(new Values(inputWindow.getStartTimestamp(), inputWindow.getEndTimestamp(), sum));
          }
      
          @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
              declarer.declare(new Fields("start", "end", "sum"));
          }
      } 

       
      And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt` is similar)
      If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the correct results
       

      public class PrintBolt extends BaseRichBolt {
          @Override
          public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
          }
          
          @Override
          public void execute(Tuple input) {
              System.out.println(String.format("Start: %d, End: %d, Sum:%d", input.getLongByField("start"), input.getLongByField("end"), input.getIntegerByField("sum")));
          }
          
          @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
          }
      } 
      Start: 0, End: 10000, Sum:10
      Start: 10000, End: 20000, Sum:10
      Start: 20000, End: 30000, Sum:10
      Start: 30000, End: 40000, Sum:10
      Start: 40000, End: 50000, Sum:10
      Start: 50000, End: 60000, Sum:10 

       
      However, when I try to print lateEvents stream, I get the same output but on the first late event, I get the above-mentioned exception.
       
      I have debugged the issue. When WindowedBoltExecutor receives a late tuple, it emits the late tuple but BoltOutputCollectorImpl  rewraps it in a new Tuple. Now, this new tuple contains WorkerTopologyContext, which is not serializable, hence the error.

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            jawadtahir Jawad Tahir
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: