Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31373

PerRoundWrapperOperator should carry epoch information in watermark

    XMLWordPrintableJSON

Details

    Description

      Currently we use PerRoundWrapperOperator to wrap the normal flink operators such that they can be used in iterations.

      We already contained the epoch information in each record so that we know which iteration each record belongs to.

      However, there is no epoch information when the stream element is a watermark. This works in most cases, but fail to address the following use case:

      • In DataStreamUtils#withBroadcast, we will cache the elements (including watermarks) from non-broadcast inputs until the broadcast variables are ready. When the broadcast variables are ready, once we receive a stream element we will process the cached elements first. If the received element is a watermark, the current implementation of iteration module fails (ProxyOutput#collect throws NPE) since there is no epoch  information.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              zhangzp Zhipeng Zhang
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: