Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32374

ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting




      If the existing JSON plan is not truncated when overwriting, and the newly generated JSON plan contents are shorter than the previous JSON plan content, the plan be an invalid JSON.

      How to reproduce

      Flink SQL> create table debug_sink (f0 int, f1 string) with ('connector' = 'blackhole');
      [INFO] Execute statement succeed.
      Flink SQL> create table dummy_source (f0 int, f1 int, f2 string, f3 string) with ('connector' = 'datagen');
      [INFO] Execute statement succeed.
      Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink select if(f0 > f1, f0, f1) as f0, concat(f2, f3) as f1 from dummy_source;
      [INFO] Execute statement succeed.
      Flink SQL> set 'table.plan.force-recompile' = 'true';
      [INFO] Execute statement succeed.
      Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink select * from (values (2, 'bye')) T (id, message);
      [INFO] Execute statement succeed.

      cat -n debug.json, and check L#67

           1	{
           2	  "flinkVersion" : "1.17",
           3	  "nodes" : [ {
           4	    "id" : 15,
           5	    "type" : "stream-exec-values_1",
           6	    "tuples" : [ [ {
           7	      "kind" : "LITERAL",
           8	      "value" : "2",
           9	      "type" : "INT NOT NULL"
          10	    }, {
          11	      "kind" : "LITERAL",
          12	      "value" : "bye",
          13	      "type" : "CHAR(3) NOT NULL"
          14	    } ] ],
          15	    "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) NOT NULL>",
          16	    "description" : "Values(tuples=[[{ 2, _UTF-16LE'bye' }]])",
          17	    "inputProperties" : [ ]
          18	  }, {
          19	    "id" : 16,
          20	    "type" : "stream-exec-sink_1",
          21	    "configuration" : {
          22	      "table.exec.sink.keyed-shuffle" : "AUTO",
          23	      "table.exec.sink.not-null-enforcer" : "ERROR",
          24	      "table.exec.sink.type-length-enforcer" : "IGNORE",
          25	      "table.exec.sink.upsert-materialize" : "AUTO"
          26	    },
          27	    "dynamicTableSink" : {
          28	      "table" : {
          29	        "identifier" : "`default_catalog`.`default_database`.`debug_sink`",
          30	        "resolvedTable" : {
          31	          "schema" : {
          32	            "columns" : [ {
          33	              "name" : "f0",
          34	              "dataType" : "INT"
          35	            }, {
          36	              "name" : "f1",
          37	              "dataType" : "VARCHAR(2147483647)"
          38	            } ],
          39	            "watermarkSpecs" : [ ]
          40	          },
          41	          "partitionKeys" : [ ],
          42	          "options" : {
          43	            "connector" : "blackhole"
          44	          }
          45	        }
          46	      }
          47	    },
          48	    "inputChangelogMode" : [ "INSERT" ],
          49	    "inputProperties" : [ {
          50	      "requiredDistribution" : {
          51	        "type" : "UNKNOWN"
          52	      },
          53	      "damBehavior" : "PIPELINED",
          54	      "priority" : 0
          55	    } ],
          56	    "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) NOT NULL>",
          57	    "description" : "Sink(table=[default_catalog.default_database.debug_sink], fields=[id, message])"
          58	  } ],
          59	  "edges" : [ {
          60	    "source" : 15,
          61	    "target" : 16,
          62	    "shuffle" : {
          63	      "type" : "FORWARD"
          64	    },
          65	    "shuffleMode" : "PIPELINED"
          66	  } ]
          67	} "$CONCAT$1",
          68	      "operands" : [ {
          69	        "kind" : "INPUT_REF",
          70	        "inputIndex" : 2,
          71	        "type" : "VARCHAR(2147483647)"
          72	      }, {
          73	        "kind" : "INPUT_REF",
          74	        "inputIndex" : 3,
          75	        "type" : "VARCHAR(2147483647)"
          76	      } ],
          77	      "type" : "VARCHAR(2147483647)"
          78	    } ],
          79	    "condition" : null,
          80	    "inputProperties" : [ {
          81	      "requiredDistribution" : {
          82	        "type" : "UNKNOWN"
          83	      },
          84	      "damBehavior" : "PIPELINED",
          85	      "priority" : 0
          86	    } ],
          87	    "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
          88	    "description" : "Calc(select=[IF((f0 > f1), f0, f1) AS f0, CONCAT(f2, f3) AS f1])"
          89	  }, {
          90	    "id" : 14,
          91	    "type" : "stream-exec-sink_1",
          92	    "configuration" : {
          93	      "table.exec.sink.keyed-shuffle" : "AUTO",
          94	      "table.exec.sink.not-null-enforcer" : "ERROR",
          95	      "table.exec.sink.type-length-enforcer" : "IGNORE",
          96	      "table.exec.sink.upsert-materialize" : "AUTO"
          97	    },
          98	    "dynamicTableSink" : {
          99	      "table" : {
         100	        "identifier" : "`default_catalog`.`default_database`.`debug_sink`",
         101	        "resolvedTable" : {
         102	          "schema" : {
         103	            "columns" : [ {
         104	              "name" : "f0",
         105	              "dataType" : "INT"
         106	            }, {
         107	              "name" : "f1",
         108	              "dataType" : "VARCHAR(2147483647)"
         109	            } ],
         110	            "watermarkSpecs" : [ ]
         111	          },
         112	          "partitionKeys" : [ ],
         113	          "options" : {
         114	            "connector" : "blackhole"
         115	          }
         116	        }
         117	      }
         118	    },
         119	    "inputChangelogMode" : [ "INSERT" ],
         120	    "inputProperties" : [ {
         121	      "requiredDistribution" : {
         122	        "type" : "UNKNOWN"
         123	      },
         124	      "damBehavior" : "PIPELINED",
         125	      "priority" : 0
         126	    } ],
         127	    "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
         128	    "description" : "Sink(table=[default_catalog.default_database.debug_sink], fields=[f0, f1])"
         129	  } ],
         130	  "edges" : [ {
         131	    "source" : 12,
         132	    "target" : 13,
         133	    "shuffle" : {
         134	      "type" : "FORWARD"
         135	    },
         136	    "shuffleMode" : "PIPELINED"
         137	  }, {
         138	    "source" : 13,
         139	    "target" : 14,
         140	    "shuffle" : {
         141	      "type" : "FORWARD"
         142	    },
         143	    "shuffleMode" : "PIPELINED"
         144	  } ]
         145	}


        Issue Links



              qingyue Jane Chan
              qingyue Jane Chan
              0 Vote for this issue
              2 Start watching this issue

