Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33768

FLIP-379: Support dynamic source parallelism inference for batch jobs

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Hide
      In Flink 1.19, we have supported dynamic source parallelism inference for batch jobs, which allows source connectors to dynamically infer the parallelism based on the actual amount of data to consume. This feature is a significant improvement over previous versions, which only assigned a fixed default parallelism to source vertices.

      Source connectors need to implement the inference interface to enable dynamic parallelism inference. Currently, the FileSource connector has already been developed with this functionality in place.

      Additionally, the configuration `execution.batch.adaptive.auto-parallelism.default-source-parallelism` will be used as the upper bound of source parallelism inference. And now it will not default to 1. Instead, if it is not set, the upper bound of allowed parallelism set via `execution.batch.adaptive.auto-parallelism.max-parallelism` will be used. If that configuration is also not set, the default parallelism set via `parallelism.default` or StreamExecutionEnvironment#setParallelism() will be used instead.
      Show
      In Flink 1.19, we have supported dynamic source parallelism inference for batch jobs, which allows source connectors to dynamically infer the parallelism based on the actual amount of data to consume. This feature is a significant improvement over previous versions, which only assigned a fixed default parallelism to source vertices. Source connectors need to implement the inference interface to enable dynamic parallelism inference. Currently, the FileSource connector has already been developed with this functionality in place. Additionally, the configuration `execution.batch.adaptive.auto-parallelism.default-source-parallelism` will be used as the upper bound of source parallelism inference. And now it will not default to 1. Instead, if it is not set, the upper bound of allowed parallelism set via `execution.batch.adaptive.auto-parallelism.max-parallelism` will be used. If that configuration is also not set, the default parallelism set via `parallelism.default` or StreamExecutionEnvironment#setParallelism() will be used instead.

    Description

      Currently, for JobVertices without parallelism configured, the AdaptiveBatchScheduler dynamically infers the vertex parallelism based on the volume of input data. Specifically, for Source vertices, it uses the value of `execution.batch.adaptive.auto-parallelism.default-source-parallelism` as the fixed parallelism. If this is not set by the user, the default value of 1  is used as the source parallelism, which is actually a temporary implementation solution.

      We aim to support dynamic source parallelism inference for batch jobs. More details see FLIP-379.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            xiasun xingbe
            xiasun xingbe
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment