Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.16.0, 1.17.0, 1.16.1, 1.16.2, 1.18.0, 1.17.1
Description
If the existing JSON plan is not truncated when overwriting, and the newly generated JSON plan contents are shorter than the previous JSON plan content, the plan be an invalid JSON.
How to reproduce
Flink SQL> create table debug_sink (f0 int, f1 string) with ('connector' = 'blackhole'); [INFO] Execute statement succeed. Flink SQL> create table dummy_source (f0 int, f1 int, f2 string, f3 string) with ('connector' = 'datagen'); [INFO] Execute statement succeed. Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink select if(f0 > f1, f0, f1) as f0, concat(f2, f3) as f1 from dummy_source; [INFO] Execute statement succeed. Flink SQL> set 'table.plan.force-recompile' = 'true'; [INFO] Execute statement succeed. Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink select * from (values (2, 'bye')) T (id, message); [INFO] Execute statement succeed.
cat -n debug.json, and check L#67
1 { 2 "flinkVersion" : "1.17", 3 "nodes" : [ { 4 "id" : 15, 5 "type" : "stream-exec-values_1", 6 "tuples" : [ [ { 7 "kind" : "LITERAL", 8 "value" : "2", 9 "type" : "INT NOT NULL" 10 }, { 11 "kind" : "LITERAL", 12 "value" : "bye", 13 "type" : "CHAR(3) NOT NULL" 14 } ] ], 15 "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) NOT NULL>", 16 "description" : "Values(tuples=[[{ 2, _UTF-16LE'bye' }]])", 17 "inputProperties" : [ ] 18 }, { 19 "id" : 16, 20 "type" : "stream-exec-sink_1", 21 "configuration" : { 22 "table.exec.sink.keyed-shuffle" : "AUTO", 23 "table.exec.sink.not-null-enforcer" : "ERROR", 24 "table.exec.sink.type-length-enforcer" : "IGNORE", 25 "table.exec.sink.upsert-materialize" : "AUTO" 26 }, 27 "dynamicTableSink" : { 28 "table" : { 29 "identifier" : "`default_catalog`.`default_database`.`debug_sink`", 30 "resolvedTable" : { 31 "schema" : { 32 "columns" : [ { 33 "name" : "f0", 34 "dataType" : "INT" 35 }, { 36 "name" : "f1", 37 "dataType" : "VARCHAR(2147483647)" 38 } ], 39 "watermarkSpecs" : [ ] 40 }, 41 "partitionKeys" : [ ], 42 "options" : { 43 "connector" : "blackhole" 44 } 45 } 46 } 47 }, 48 "inputChangelogMode" : [ "INSERT" ], 49 "inputProperties" : [ { 50 "requiredDistribution" : { 51 "type" : "UNKNOWN" 52 }, 53 "damBehavior" : "PIPELINED", 54 "priority" : 0 55 } ], 56 "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) NOT NULL>", 57 "description" : "Sink(table=[default_catalog.default_database.debug_sink], fields=[id, message])" 58 } ], 59 "edges" : [ { 60 "source" : 15, 61 "target" : 16, 62 "shuffle" : { 63 "type" : "FORWARD" 64 }, 65 "shuffleMode" : "PIPELINED" 66 } ] 67 } "$CONCAT$1", 68 "operands" : [ { 69 "kind" : "INPUT_REF", 70 "inputIndex" : 2, 71 "type" : "VARCHAR(2147483647)" 72 }, { 73 "kind" : "INPUT_REF", 74 "inputIndex" : 3, 75 "type" : "VARCHAR(2147483647)" 76 } ], 77 "type" : "VARCHAR(2147483647)" 78 } ], 79 "condition" : null, 80 "inputProperties" : [ { 81 "requiredDistribution" : { 82 "type" : "UNKNOWN" 83 }, 84 "damBehavior" : "PIPELINED", 85 "priority" : 0 86 } ], 87 "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>", 88 "description" : "Calc(select=[IF((f0 > f1), f0, f1) AS f0, CONCAT(f2, f3) AS f1])" 89 }, { 90 "id" : 14, 91 "type" : "stream-exec-sink_1", 92 "configuration" : { 93 "table.exec.sink.keyed-shuffle" : "AUTO", 94 "table.exec.sink.not-null-enforcer" : "ERROR", 95 "table.exec.sink.type-length-enforcer" : "IGNORE", 96 "table.exec.sink.upsert-materialize" : "AUTO" 97 }, 98 "dynamicTableSink" : { 99 "table" : { 100 "identifier" : "`default_catalog`.`default_database`.`debug_sink`", 101 "resolvedTable" : { 102 "schema" : { 103 "columns" : [ { 104 "name" : "f0", 105 "dataType" : "INT" 106 }, { 107 "name" : "f1", 108 "dataType" : "VARCHAR(2147483647)" 109 } ], 110 "watermarkSpecs" : [ ] 111 }, 112 "partitionKeys" : [ ], 113 "options" : { 114 "connector" : "blackhole" 115 } 116 } 117 } 118 }, 119 "inputChangelogMode" : [ "INSERT" ], 120 "inputProperties" : [ { 121 "requiredDistribution" : { 122 "type" : "UNKNOWN" 123 }, 124 "damBehavior" : "PIPELINED", 125 "priority" : 0 126 } ], 127 "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>", 128 "description" : "Sink(table=[default_catalog.default_database.debug_sink], fields=[f0, f1])" 129 } ], 130 "edges" : [ { 131 "source" : 12, 132 "target" : 13, 133 "shuffle" : { 134 "type" : "FORWARD" 135 }, 136 "shuffleMode" : "PIPELINED" 137 }, { 138 "source" : 13, 139 "target" : 14, 140 "shuffle" : { 141 "type" : "FORWARD" 142 }, 143 "shuffleMode" : "PIPELINED" 144 } ] 145 }
Attachments
Issue Links
- Discovered while testing
-
FLINK-31956 Extend the CompiledPlan to read from/write to Flink's FileSystem
- Closed
- links to