Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32374

ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting

    XMLWordPrintableJSON

Details

    Description

      If the existing JSON plan is not truncated when overwriting, and the newly generated JSON plan contents are shorter than the previous JSON plan content, the plan be an invalid JSON.

      How to reproduce

      Flink SQL> create table debug_sink (f0 int, f1 string) with ('connector' = 'blackhole');
      [INFO] Execute statement succeed.
      
      Flink SQL> create table dummy_source (f0 int, f1 int, f2 string, f3 string) with ('connector' = 'datagen');
      [INFO] Execute statement succeed.
      
      Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink select if(f0 > f1, f0, f1) as f0, concat(f2, f3) as f1 from dummy_source;
      [INFO] Execute statement succeed.
      
      Flink SQL> set 'table.plan.force-recompile' = 'true';
      [INFO] Execute statement succeed.
      
      Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink select * from (values (2, 'bye')) T (id, message);
      [INFO] Execute statement succeed.
      

      cat -n debug.json, and check L#67

           1	{
           2	  "flinkVersion" : "1.17",
           3	  "nodes" : [ {
           4	    "id" : 15,
           5	    "type" : "stream-exec-values_1",
           6	    "tuples" : [ [ {
           7	      "kind" : "LITERAL",
           8	      "value" : "2",
           9	      "type" : "INT NOT NULL"
          10	    }, {
          11	      "kind" : "LITERAL",
          12	      "value" : "bye",
          13	      "type" : "CHAR(3) NOT NULL"
          14	    } ] ],
          15	    "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) NOT NULL>",
          16	    "description" : "Values(tuples=[[{ 2, _UTF-16LE'bye' }]])",
          17	    "inputProperties" : [ ]
          18	  }, {
          19	    "id" : 16,
          20	    "type" : "stream-exec-sink_1",
          21	    "configuration" : {
          22	      "table.exec.sink.keyed-shuffle" : "AUTO",
          23	      "table.exec.sink.not-null-enforcer" : "ERROR",
          24	      "table.exec.sink.type-length-enforcer" : "IGNORE",
          25	      "table.exec.sink.upsert-materialize" : "AUTO"
          26	    },
          27	    "dynamicTableSink" : {
          28	      "table" : {
          29	        "identifier" : "`default_catalog`.`default_database`.`debug_sink`",
          30	        "resolvedTable" : {
          31	          "schema" : {
          32	            "columns" : [ {
          33	              "name" : "f0",
          34	              "dataType" : "INT"
          35	            }, {
          36	              "name" : "f1",
          37	              "dataType" : "VARCHAR(2147483647)"
          38	            } ],
          39	            "watermarkSpecs" : [ ]
          40	          },
          41	          "partitionKeys" : [ ],
          42	          "options" : {
          43	            "connector" : "blackhole"
          44	          }
          45	        }
          46	      }
          47	    },
          48	    "inputChangelogMode" : [ "INSERT" ],
          49	    "inputProperties" : [ {
          50	      "requiredDistribution" : {
          51	        "type" : "UNKNOWN"
          52	      },
          53	      "damBehavior" : "PIPELINED",
          54	      "priority" : 0
          55	    } ],
          56	    "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) NOT NULL>",
          57	    "description" : "Sink(table=[default_catalog.default_database.debug_sink], fields=[id, message])"
          58	  } ],
          59	  "edges" : [ {
          60	    "source" : 15,
          61	    "target" : 16,
          62	    "shuffle" : {
          63	      "type" : "FORWARD"
          64	    },
          65	    "shuffleMode" : "PIPELINED"
          66	  } ]
          67	} "$CONCAT$1",
          68	      "operands" : [ {
          69	        "kind" : "INPUT_REF",
          70	        "inputIndex" : 2,
          71	        "type" : "VARCHAR(2147483647)"
          72	      }, {
          73	        "kind" : "INPUT_REF",
          74	        "inputIndex" : 3,
          75	        "type" : "VARCHAR(2147483647)"
          76	      } ],
          77	      "type" : "VARCHAR(2147483647)"
          78	    } ],
          79	    "condition" : null,
          80	    "inputProperties" : [ {
          81	      "requiredDistribution" : {
          82	        "type" : "UNKNOWN"
          83	      },
          84	      "damBehavior" : "PIPELINED",
          85	      "priority" : 0
          86	    } ],
          87	    "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
          88	    "description" : "Calc(select=[IF((f0 > f1), f0, f1) AS f0, CONCAT(f2, f3) AS f1])"
          89	  }, {
          90	    "id" : 14,
          91	    "type" : "stream-exec-sink_1",
          92	    "configuration" : {
          93	      "table.exec.sink.keyed-shuffle" : "AUTO",
          94	      "table.exec.sink.not-null-enforcer" : "ERROR",
          95	      "table.exec.sink.type-length-enforcer" : "IGNORE",
          96	      "table.exec.sink.upsert-materialize" : "AUTO"
          97	    },
          98	    "dynamicTableSink" : {
          99	      "table" : {
         100	        "identifier" : "`default_catalog`.`default_database`.`debug_sink`",
         101	        "resolvedTable" : {
         102	          "schema" : {
         103	            "columns" : [ {
         104	              "name" : "f0",
         105	              "dataType" : "INT"
         106	            }, {
         107	              "name" : "f1",
         108	              "dataType" : "VARCHAR(2147483647)"
         109	            } ],
         110	            "watermarkSpecs" : [ ]
         111	          },
         112	          "partitionKeys" : [ ],
         113	          "options" : {
         114	            "connector" : "blackhole"
         115	          }
         116	        }
         117	      }
         118	    },
         119	    "inputChangelogMode" : [ "INSERT" ],
         120	    "inputProperties" : [ {
         121	      "requiredDistribution" : {
         122	        "type" : "UNKNOWN"
         123	      },
         124	      "damBehavior" : "PIPELINED",
         125	      "priority" : 0
         126	    } ],
         127	    "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
         128	    "description" : "Sink(table=[default_catalog.default_database.debug_sink], fields=[f0, f1])"
         129	  } ],
         130	  "edges" : [ {
         131	    "source" : 12,
         132	    "target" : 13,
         133	    "shuffle" : {
         134	      "type" : "FORWARD"
         135	    },
         136	    "shuffleMode" : "PIPELINED"
         137	  }, {
         138	    "source" : 13,
         139	    "target" : 14,
         140	    "shuffle" : {
         141	      "type" : "FORWARD"
         142	    },
         143	    "shuffleMode" : "PIPELINED"
         144	  } ]
         145	}
      

      Attachments

        Issue Links

          Activity

            People

              qingyue Jane Chan
              qingyue Jane Chan
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: