From ce71701bcfcf090ec9a365fdd10e24c94a7295e8 Mon Sep 17 00:00:00 2001 From: Sergio Pena Date: Wed, 25 Sep 2024 16:58:52 -0500 Subject: [PATCH] [FLINK-35709][table-planner] Allow reordering source columns in CTAS and RTAS --- docs/content.zh/docs/dev/table/sql/create.md | 31 ++++++ docs/content/docs/dev/table/sql/create.md | 31 ++++++ .../src/main/codegen/includes/parserImpls.ftl | 54 +++++++++-- .../flink/sql/parser/ddl/SqlCreateTable.java | 4 + .../sql/parser/ddl/SqlCreateTableAs.java | 12 ++- .../sql/parser/ddl/SqlReplaceTableAs.java | 12 ++- .../sql/parser/utils/ParserResource.java | 4 + .../sql/parser/FlinkSqlParserImplTest.java | 37 ++++++++ .../planner/operations/MergeTableAsUtil.java | 59 ++++++++++-- .../operations/SqlCreateTableConverter.java | 22 +++-- .../SqlReplaceTableAsConverter.java | 24 +++-- .../planner/calcite/PreValidateReWriter.scala | 65 ++----------- .../planner/calcite/SqlRewriterUtils.scala | 94 ++++++++++++++++++- .../SqlDdlToOperationConverterTest.java | 74 +++++++++++++++ .../SqlRTASNodeToOperationConverterTest.java | 44 +++++++++ .../runtime/stream/sql/RTASITCase.java | 22 +++++ .../runtime/stream/sql/TableSinkITCase.scala | 43 +++++++++ 17 files changed, 539 insertions(+), 93 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/create.md b/docs/content.zh/docs/dev/table/sql/create.md index 717c00502302f..650c1b28dc398 100644 --- a/docs/content.zh/docs/dev/table/sql/create.md +++ b/docs/content.zh/docs/dev/table/sql/create.md @@ -643,6 +643,37 @@ CREATE TABLE my_ctas_table ( INSERT INTO my_ctas_table SELECT id, name FROM source_table; ``` +`CTAS` also allows you to reorder the columns defined in the `SELECT` part by specifying all column names without data types in the `CREATE` part. This feature is equivalent to the `INSERT INTO` statement. +The columns specified must match the names and number of columns in the `SELECT` part. This definition cannot be combined with new columns, which requires defining data types. + +Consider the example statement below: + +```sql +CREATE TABLE my_ctas_table ( + order_time, price, quantity, id +) WITH ( + 'connector' = 'kafka', + ... +) AS SELECT id, price, quantity, order_time FROM source_table; +``` + +The resulting table `my_ctas_table` will be equivalent to create the following table and insert the data with the following statement: + +``` +CREATE TABLE my_ctas_table ( + order_time TIMESTAMP(3), + price DOUBLE, + quantity DOUBLE, + id BIGINT +) WITH ( + 'connector' = 'kafka', + ... +); + +INSERT INTO my_ctas_table (order_time, price, quantity, id) + SELECT id, price, quantity, order_time FROM source_table; +``` + **Note:** CTAS has these restrictions: * Does not support creating a temporary table yet. * Does not support creating partitioned table yet. diff --git a/docs/content/docs/dev/table/sql/create.md b/docs/content/docs/dev/table/sql/create.md index 2957a65679017..3a564d3ffb1c3 100644 --- a/docs/content/docs/dev/table/sql/create.md +++ b/docs/content/docs/dev/table/sql/create.md @@ -643,6 +643,37 @@ CREATE TABLE my_ctas_table ( INSERT INTO my_ctas_table SELECT id, name FROM source_table; ``` +`CTAS` also allows you to reorder the columns defined in the `SELECT` part by specifying all column names without data types in the `CREATE` part. This feature is equivalent to the `INSERT INTO` statement. +The columns specified must match the names and number of columns in the `SELECT` part. This definition cannot be combined with new columns, which requires defining data types. + +Consider the example statement below: + +```sql +CREATE TABLE my_ctas_table ( + order_time, price, quantity, id +) WITH ( + 'connector' = 'kafka', + ... +) AS SELECT id, price, quantity, order_time FROM source_table; +``` + +The resulting table `my_ctas_table` will be equivalent to create the following table and insert the data with the following statement: + +``` +CREATE TABLE my_ctas_table ( + order_time TIMESTAMP(3), + price DOUBLE, + quantity DOUBLE, + id BIGINT +) WITH ( + 'connector' = 'kafka', + ... +); + +INSERT INTO my_ctas_table (order_time, price, quantity, id) + SELECT id, price, quantity, order_time FROM source_table; +``` + **Note:** CTAS has these restrictions: * Does not support creating a temporary table yet. * Does not support creating partitioned table yet. diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index d1ae33dd9ce9b..9e7bad6206c62 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -1532,6 +1532,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : SqlDistribution distribution = null; SqlNodeList partitionColumns = SqlNodeList.EMPTY; SqlParserPos pos = startPos; + boolean isColumnsIdentifiersOnly = false; } { @@ -1541,12 +1542,10 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : tableName = CompoundIdentifier() [ { pos = getPos(); TableCreationContext ctx = new TableCreationContext();} - TableColumn(ctx) - ( - TableColumn(ctx) - )* + TableColumnsOrIdentifiers(pos, ctx) { pos = pos.plus(getPos()); + isColumnsIdentifiersOnly = ctx.isColumnsIdentifiersOnly(); columnList = new SqlNodeList(ctx.columnList, pos); constraints = ctx.constraints; watermark = ctx.watermark; @@ -1574,6 +1573,12 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : tableLike = SqlTableLike(getPos()) { + if (isColumnsIdentifiersOnly) { + throw SqlUtil.newContextException( + pos, + ParserResource.RESOURCE.columnsIdentifiersUnsupported()); + } + return new SqlCreateTableLike(startPos.plus(getPos()), tableName, columnList, @@ -1622,6 +1627,12 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : } ] { + if (isColumnsIdentifiersOnly) { + throw SqlUtil.newContextException( + pos, + ParserResource.RESOURCE.columnsIdentifiersUnsupported()); + } + return new SqlCreateTable(startPos.plus(getPos()), tableName, columnList, @@ -1716,6 +1727,36 @@ SqlDrop SqlDropTable(Span s, boolean replace, boolean isTemporary) : } } +void TableColumnsOrIdentifiers(SqlParserPos pos, TableCreationContext ctx) : +{ + final TableCreationContext tempCtx = new TableCreationContext(); + final List identifiers = new ArrayList(); +} +{ + LOOKAHEAD(2) + ( + TableColumn(tempCtx) + ( TableColumn(tempCtx))* + ) { + ctx.columnList = tempCtx.columnList; + ctx.constraints = tempCtx.constraints; + ctx.watermark = tempCtx.watermark; + } + | + ( + AddCompoundColumnIdentifier(identifiers) + ( AddCompoundColumnIdentifier(identifiers))* + ) { ctx.columnList = identifiers; } +} + +void AddCompoundColumnIdentifier(List list) : +{ + final SqlIdentifier name; +} +{ + name = CompoundIdentifier() { list.add(name); } +} + /** * Parser a REPLACE TABLE AS statement */ @@ -1746,10 +1787,7 @@ SqlNode SqlReplaceTable() : tableName = CompoundIdentifier() [ { pos = getPos(); TableCreationContext ctx = new TableCreationContext();} - TableColumn(ctx) - ( - TableColumn(ctx) - )* + TableColumnsOrIdentifiers(pos, ctx) { pos = getPos(); columnList = new SqlNodeList(ctx.columnList, pos); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java index b26e5d1e73428..96de6b7992187 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java @@ -297,6 +297,10 @@ public static class TableCreationContext { public List constraints = new ArrayList<>(); @Nullable public SqlWatermark watermark; @Nullable public SqlDistribution distribution; + + public boolean isColumnsIdentifiersOnly() { + return !columnList.isEmpty() && columnList.get(0) instanceof SqlIdentifier; + } } public String[] fullTableName() { diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java index c2edf89d356cd..c48fbfdf19ea5 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java @@ -118,7 +118,10 @@ public SqlCreateTableAs( @Override public void validate() throws SqlValidateException { - super.validate(); + if (!isSchemaWithColumnsIdentifiersOnly()) { + super.validate(); + } + if (isTemporary()) { throw new SqlValidateException( getParserPosition(), @@ -130,6 +133,13 @@ public SqlNode getAsQuery() { return asQuery; } + public boolean isSchemaWithColumnsIdentifiersOnly() { + // CREATE AS SELECT supports passing only column identifiers in the column list. If + // the first column in the list is an identifier, then we assume the rest of the + // columns are identifiers as well. + return !getColumnList().isEmpty() && getColumnList().get(0) instanceof SqlIdentifier; + } + @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { super.unparse(writer, leftPrec, rightPrec); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java index 65bb7056dbd2e..47d7b78b2118c 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java @@ -158,7 +158,10 @@ public SqlReplaceTableAs( @Override public void validate() throws SqlValidateException { - SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, columnList); + if (!isSchemaWithColumnsIdentifiersOnly()) { + SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, columnList); + } + // The following features are not currently supported by RTAS, but may be supported in the // future String errorMsg = @@ -221,6 +224,13 @@ public boolean isTemporary() { return isTemporary; } + public boolean isSchemaWithColumnsIdentifiersOnly() { + // REPLACE table supports passing only column identifiers in the column list. If + // the first column in the list is an identifier, then we assume the rest of the + // columns are identifiers as well. + return !columnList.isEmpty() && columnList.get(0) instanceof SqlIdentifier; + } + /** Returns the column constraints plus the table constraints. */ public List getFullConstraints() { return SqlConstraintValidator.getFullConstraints(tableConstraints, columnList); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java index 0dbe275031b5c..9c29119ba9e08 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java @@ -45,6 +45,10 @@ public interface ParserResource { "Unsupported CREATE OR REPLACE statement for EXPLAIN. The statement must define a query using the AS clause (i.e. CTAS/RTAS statements).") Resources.ExInst explainCreateOrReplaceStatementUnsupported(); + @Resources.BaseMessage( + "Columns identifiers without types in the schema are supported on CTAS/RTAS statements only.") + Resources.ExInst columnsIdentifiersUnsupported(); + @Resources.BaseMessage("CREATE FUNCTION USING JAR syntax is not applicable to {0} language.") Resources.ExInst createFunctionUsingJar(String language); diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 0345886c434ca..c732859e5a8d7 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -2951,6 +2951,43 @@ void testCreateTableAsSelectWithPartitionKey() { .node(new ValidationMatcher().ok()); } + @Test + void testCreateTableAsSelectWithColumnIdentifiers() { + // test with only column identifiers + sql("CREATE TABLE t (col1) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node(new ValidationMatcher().ok()); + + // test mix of column identifiers and column with types is not allowed + sql("CREATE TABLE t (col1, col2 ^int^) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .fails("(?s).*Encountered \"int\" at line 1, column 28.*"); + } + + @Test + void testUnsupportedCreateTableStatementsWithColumnIdentifiers() { + String expectedErrorMsg = + "Columns identifiers without types in the schema are " + + "supported on CTAS/RTAS statements only."; + + sql("CREATE TABLE t ^(a, h^) WITH " + "('connector' = 'kafka', 'kafka.topic' = 'log.test')") + .fails(expectedErrorMsg); + + sql("CREATE TABLE t ^(a, h^) WITH " + + "('connector' = 'kafka', 'kafka.topic' = 'log.test') " + + "LIKE parent_table") + .fails(expectedErrorMsg); + } + + @Test + void testReplaceTableAsSelectWithColumnIdentifiers() { + // test with only column identifiers + sql("REPLACE TABLE t (col1) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node(new ValidationMatcher().ok()); + + // test mix of column identifiers and column with types is not allowed + sql("REPLACE TABLE t (col1, col2 ^int^) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .fails("(?s).*Encountered \"int\" at line 1, column 29.*"); + } + @Test void testReplaceTableAsSelect() { // test replace table as select without options diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java index e743e5964a2d3..e3a1b08578cca 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java @@ -42,10 +42,10 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.validate.SqlValidator; @@ -136,18 +136,16 @@ public PlannerQueryOperation maybeRewriteQuery( } } - // if there are no new sink fields to include, then return the original query - if (assignedFields.isEmpty()) { - return origQueryOperation; - } - // rewrite query SqlCall newSelect = - rewriterUtils.rewriteSelect( - (SqlSelect) origQueryNode, + rewriterUtils.rewriteCall( + rewriterUtils, + sqlValidator, + (SqlCall) origQueryNode, typeFactory.buildRelNodeRowType(sinkRowType), assignedFields, - targetPositions); + targetPositions, + () -> "Unsupported node type " + origQueryNode.getKind()); return (PlannerQueryOperation) SqlNodeToOperationConversion.convert(flinkPlanner, catalogManager, newSelect) @@ -212,6 +210,22 @@ public Schema mergeSchemas( return schemaBuilder.build(); } + /** Reorders the columns from the source schema based on the columns identifiers list. */ + public Schema reorderSchema(SqlNodeList sqlColumnList, ResolvedSchema sourceSchema) { + SchemaBuilder schemaBuilder = + new SchemaBuilder( + (FlinkTypeFactory) validator.getTypeFactory(), + dataTypeFactory, + validator, + escapeExpression); + + schemaBuilder.reorderColumns( + sqlColumnList, + Schema.newBuilder().fromResolvedSchema(sourceSchema).build().getColumns()); + + return schemaBuilder.build(); + } + /** * Builder class for constructing a {@link Schema} based on the rules of the {@code CREATE TABLE * ... AS SELECT} statement. @@ -311,6 +325,33 @@ private void mergeColumns(List sinkCols, List sourceC columns.putAll(sourceSchemaCols); } + /** Reorders the columns from the source schema based on the columns identifiers list. */ + private void reorderColumns(List identifiers, List sourceCols) { + Map sinkSchemaCols = new LinkedHashMap<>(); + Map sourceSchemaCols = new LinkedHashMap<>(); + + populateColumnsFromSource(sourceCols, sourceSchemaCols); + + if (identifiers.size() != sourceCols.size()) { + throw new ValidationException( + "The number of columns in the column list must match the number " + + "of columns in the source schema."); + } + + for (SqlNode identifier : identifiers) { + String name = ((SqlIdentifier) identifier).getSimple(); + if (!sourceSchemaCols.containsKey(name)) { + throw new ValidationException( + String.format("Column '%s' not found in the source schema. ", name)); + } + + sinkSchemaCols.put(name, sourceSchemaCols.get(name)); + } + + columns.clear(); + columns.putAll(sinkSchemaCols); + } + /** * Populates the schema columns from the source schema. The source schema is expected to * contain only physical columns. diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java index 037fc4cde9272..dbff49f53103a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java @@ -140,7 +140,7 @@ Operation convertCreateTableAS( } private ResolvedCatalogTable createCatalogTable( - SqlCreateTableAs sqlCreateTableAs, ResolvedSchema mergeSchema) { + SqlCreateTableAs sqlCreateTableAs, ResolvedSchema querySchema) { Map tableOptions = sqlCreateTableAs.getPropertyList().getList().stream() .collect( @@ -151,12 +151,20 @@ private ResolvedCatalogTable createCatalogTable( String tableComment = OperationConverterUtils.getTableComment(sqlCreateTableAs.getComment()); - Schema mergedSchema = - mergeTableAsUtil.mergeSchemas( - sqlCreateTableAs.getColumnList(), - sqlCreateTableAs.getWatermark().orElse(null), - sqlCreateTableAs.getFullConstraints(), - mergeSchema); + Schema mergedSchema; + if (sqlCreateTableAs.isSchemaWithColumnsIdentifiersOnly()) { + // If only column identifiers are provided, then these are used to + // order the columns in the schema. + mergedSchema = + mergeTableAsUtil.reorderSchema(sqlCreateTableAs.getColumnList(), querySchema); + } else { + mergedSchema = + mergeTableAsUtil.mergeSchemas( + sqlCreateTableAs.getColumnList(), + sqlCreateTableAs.getWatermark().orElse(null), + sqlCreateTableAs.getFullConstraints(), + querySchema); + } Optional tableDistribution = Optional.ofNullable(sqlCreateTableAs.getDistribution()) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java index cf4d96cf26fbc..b2b5d8f10397f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java @@ -111,7 +111,7 @@ private ResolvedCatalogTable createCatalogTable( ConvertContext context, MergeTableAsUtil mergeTableAsUtil, SqlReplaceTableAs sqlReplaceTableAs, - ResolvedSchema mergeSchema) { + ResolvedSchema querySchema) { CatalogManager catalogManager = context.getCatalogManager(); // get table comment @@ -129,13 +129,21 @@ private ResolvedCatalogTable createCatalogTable( ((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString())); - // merge schemas - Schema mergedSchema = - mergeTableAsUtil.mergeSchemas( - sqlReplaceTableAs.getColumnList(), - sqlReplaceTableAs.getWatermark().orElse(null), - sqlReplaceTableAs.getFullConstraints(), - mergeSchema); + Schema mergedSchema; + if (sqlReplaceTableAs.isSchemaWithColumnsIdentifiersOnly()) { + // If only column identifiers are provided, then these are used to + // order the columns in the schema. + mergedSchema = + mergeTableAsUtil.reorderSchema(sqlReplaceTableAs.getColumnList(), querySchema); + } else { + // merge schemas + mergedSchema = + mergeTableAsUtil.mergeSchemas( + sqlReplaceTableAs.getColumnList(), + sqlReplaceTableAs.getWatermark().orElse(null), + sqlReplaceTableAs.getFullConstraints(), + querySchema); + } // get distribution Optional tableDistribution = diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala index d47a91b4fb605..8be16db924f01 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala @@ -220,63 +220,14 @@ object PreValidateReWriter { } } - rewriteSqlCall(rewriterUtils, validator, source, targetRowType, assignedFields, targetPosition) - } - - private def rewriteSqlCall( - rewriterUtils: SqlRewriterUtils, - validator: FlinkCalciteSqlValidator, - call: SqlCall, - targetRowType: RelDataType, - assignedFields: util.LinkedHashMap[Integer, SqlNode], - targetPosition: util.List[Int]): SqlCall = { - - def rewrite(node: SqlNode): SqlCall = { - checkArgument(node.isInstanceOf[SqlCall], node) - rewriteSqlCall( - rewriterUtils, - validator, - node.asInstanceOf[SqlCall], - targetRowType, - assignedFields, - targetPosition) - } - - call.getKind match { - case SqlKind.SELECT => - val sqlSelect = call.asInstanceOf[SqlSelect] - - if (targetPosition.nonEmpty && sqlSelect.getSelectList.size() != targetPosition.size()) { - throw newValidationError(call, RESOURCE.columnCountMismatch()) - } - rewriterUtils.rewriteSelect(sqlSelect, targetRowType, assignedFields, targetPosition) - case SqlKind.VALUES => - call.getOperandList.toSeq.foreach { - case sqlCall: SqlCall => { - if (targetPosition.nonEmpty && sqlCall.getOperandList.size() != targetPosition.size()) { - throw newValidationError(call, RESOURCE.columnCountMismatch()) - } - } - } - rewriterUtils.rewriteValues(call, targetRowType, assignedFields, targetPosition) - case kind if SqlKind.SET_QUERY.contains(kind) => - call.getOperandList.zipWithIndex.foreach { - case (operand, index) => call.setOperand(index, rewrite(operand)) - } - call - case SqlKind.ORDER_BY => - val operands = call.getOperandList - new SqlOrderBy( - call.getParserPosition, - rewrite(operands.get(0)), - operands.get(1).asInstanceOf[SqlNodeList], - operands.get(2), - operands.get(3)) - // Not support: - // case SqlKind.WITH => - // case SqlKind.EXPLICIT_TABLE => - case _ => throw new ValidationException(notSupported(call)) - } + rewriterUtils.rewriteCall( + rewriterUtils, + validator, + source, + targetRowType, + assignedFields, + targetPosition, + () => notSupported(source)) } /** diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala index 64d426971d880..b9810b09f0d77 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala @@ -18,13 +18,19 @@ package org.apache.flink.table.planner.calcite import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec -import org.apache.flink.table.planner.calcite.SqlRewriterUtils.{rewriteSqlSelect, rewriteSqlValues} +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.planner.calcite.PreValidateReWriter.{newValidationError, notSupported} +import org.apache.flink.table.planner.calcite.SqlRewriterUtils.{rewriteSqlCall, rewriteSqlSelect, rewriteSqlValues} +import org.apache.flink.util.Preconditions.checkArgument import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.runtime.{CalciteContextException, Resources} import org.apache.calcite.sql.`type`.SqlTypeUtil -import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlKind, SqlNode, SqlNodeList, SqlSelect} +import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlKind, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil} import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.validate.SqlValidatorException +import org.apache.calcite.util.Static.RESOURCE import java.util import java.util.Collections @@ -48,6 +54,24 @@ class SqlRewriterUtils(validator: FlinkCalciteSqlValidator) { rewriteSqlValues(svalues, targetRowType, assignedFields, targetPosition) } + def rewriteCall( + rewriterUtils: SqlRewriterUtils, + validator: FlinkCalciteSqlValidator, + call: SqlCall, + targetRowType: RelDataType, + assignedFields: util.LinkedHashMap[Integer, SqlNode], + targetPosition: util.List[Int], + unsupportedErrorMessage: () => String): SqlCall = { + rewriteSqlCall( + rewriterUtils, + validator, + call, + targetRowType, + assignedFields, + targetPosition, + unsupportedErrorMessage) + } + // This code snippet is copied from the SqlValidatorImpl. def maybeCast( node: SqlNode, @@ -82,6 +106,64 @@ class SqlRewriterUtils(validator: FlinkCalciteSqlValidator) { } object SqlRewriterUtils { + def rewriteSqlCall( + rewriterUtils: SqlRewriterUtils, + validator: FlinkCalciteSqlValidator, + call: SqlCall, + targetRowType: RelDataType, + assignedFields: util.LinkedHashMap[Integer, SqlNode], + targetPosition: util.List[Int], + unsupportedErrorMessage: () => String): SqlCall = { + + def rewrite(node: SqlNode): SqlCall = { + checkArgument(node.isInstanceOf[SqlCall], node) + rewriteSqlCall( + rewriterUtils, + validator, + node.asInstanceOf[SqlCall], + targetRowType, + assignedFields, + targetPosition, + unsupportedErrorMessage) + } + + call.getKind match { + case SqlKind.SELECT => + val sqlSelect = call.asInstanceOf[SqlSelect] + + if (targetPosition.nonEmpty && sqlSelect.getSelectList.size() != targetPosition.size()) { + throw newValidationError(call, RESOURCE.columnCountMismatch()) + } + rewriterUtils.rewriteSelect(sqlSelect, targetRowType, assignedFields, targetPosition) + case SqlKind.VALUES => + call.getOperandList.toSeq.foreach { + case sqlCall: SqlCall => { + if (targetPosition.nonEmpty && sqlCall.getOperandList.size() != targetPosition.size()) { + throw newValidationError(call, RESOURCE.columnCountMismatch()) + } + } + } + rewriterUtils.rewriteValues(call, targetRowType, assignedFields, targetPosition) + case kind if SqlKind.SET_QUERY.contains(kind) => + call.getOperandList.zipWithIndex.foreach { + case (operand, index) => call.setOperand(index, rewrite(operand)) + } + call + case SqlKind.ORDER_BY => + val operands = call.getOperandList + new SqlOrderBy( + call.getParserPosition, + rewrite(operands.get(0)), + operands.get(1).asInstanceOf[SqlNodeList], + operands.get(2), + operands.get(3)) + // Not support: + // case SqlKind.WITH => + // case SqlKind.EXPLICIT_TABLE => + case _ => throw new ValidationException(unsupportedErrorMessage()) + } + } + def rewriteSqlSelect( validator: FlinkCalciteSqlValidator, select: SqlSelect, @@ -156,6 +238,14 @@ object SqlRewriterUtils { SqlStdOperatorTable.VALUES.createCall(values.getParserPosition, fixedNodes) } + def newValidationError( + node: SqlNode, + e: Resources.ExInst[SqlValidatorException]): CalciteContextException = { + assert(node != null) + val pos = node.getParserPosition + SqlUtil.newContextException(pos, e) + } + /** * Reorder sourceList to targetPosition. For example: * - sourceList(f0, f1, f2). diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java index e758296f2ecf0..6904763dbf635 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java @@ -658,6 +658,80 @@ public void testCreateTableInvalidDistribution() { "Invalid bucket key 'f3'. A bucket key for a distribution must reference a physical column in the schema. Available columns are: [a]"); } + @Test + public void tesCreateTableAsWithOrderingColumns() { + CatalogTable catalogTable = + CatalogTable.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) + .build()) + .build(); + + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); + + final String sql = "create table tbl1 (f1, f0) AS SELECT * FROM src1"; + + Operation ctas = parseAndConvert(sql); + Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation(); + assertThat(operation) + .is( + new HamcrestCondition<>( + isCreateTableOperation( + withNoDistribution(), + withSchema( + Schema.newBuilder() + .column("f1", DataTypes.TIMESTAMP(3)) + .column("f0", DataTypes.INT().notNull()) + .build())))); + } + + @Test + public void testCreateTableAsWithNotFoundColumnIdentifiers() { + CatalogTable catalogTable = + CatalogTable.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.INT()) + .build()) + .build(); + + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); + + final String sql = "create table tbl1 (f1, f2) AS SELECT * FROM src1"; + + assertThatThrownBy(() -> parseAndConvert(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Column 'f2' not found in the source schema."); + } + + @Test + public void testCreateTableAsWithMismatchIdentifiersLength() { + CatalogTable catalogTable = + CatalogTable.newBuilder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.INT()) + .build()) + .build(); + + catalogManager.createTable( + catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false); + + final String sql = "create table tbl1 (f1) AS SELECT * FROM src1"; + + assertThatThrownBy(() -> parseAndConvert(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "The number of columns in the column list " + + "must match the number of columns in the source schema."); + } + @Test public void testCreateTableAsWithColumns() { CatalogTable catalogTable = diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java index ee6e8bca6588a..78ed39bf9802b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java @@ -61,6 +61,50 @@ public void testReplaceTableAs() { testCommonReplaceTableAs(sql, tableName, tableComment); } + @Test + public void testReplaceTableAsWithOrderingColumns() { + String tableName = "replace_table"; + String sql = + "REPLACE TABLE " + + tableName + + " (a, b) WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT b, a FROM t1"; + Schema tableSchema = + Schema.newBuilder() + .column("a", DataTypes.BIGINT().notNull()) + .column("b", DataTypes.STRING()) + .build(); + + testCommonReplaceTableAs(sql, tableName, null, tableSchema, null, Collections.emptyList()); + } + + @Test + public void testReplaceTableAsWithNotFoundColumnIdentifiers() { + String tableName = "replace_table"; + String sql = + "REPLACE TABLE " + + tableName + + " (a, d) WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT b, a FROM t1"; + + assertThatThrownBy(() -> parseAndConvert(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Column 'd' not found in the source schema."); + } + + @Test + public void testReplaceTableAsWithMismatchIdentifiersLength() { + String tableName = "replace_table"; + String sql = + "REPLACE TABLE " + + tableName + + " (a) WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT b, a FROM t1"; + + assertThatThrownBy(() -> parseAndConvert(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "The number of columns in the column list " + + "must match the number of columns in the source schema."); + } + @Test public void testCreateOrReplaceTableAs() { String tableName = "create_or_replace_table"; diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java index 2ce2bc6dffefb..61e1c0d087b15 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java @@ -188,6 +188,28 @@ void testCreateOrReplaceTableAsSelectWithMixOfNewColumnsAndQueryColumns() throws verifyCatalogTable(expectCatalogTable, getCatalogTable("target")); } + @Test + void testReplaceTableAsSelectWithColumnOrdering() throws Exception { + tEnv().executeSql( + "REPLACE TABLE target" + + " (c, a)" + + " WITH ('connector' = 'values', 'bounded' = 'true')" + + " AS SELECT a, c FROM source") + .await(); + + // verify written rows + assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString()) + .isEqualTo("[" + "+I[Hi, 1], " + "+I[Hello, 2], " + "+I[Hello world, 3]" + "]"); + + // verify the table after replacing + CatalogTable expectCatalogTable = + getExpectCatalogTable( + new String[] {"c", "a"}, + new AbstractDataType[] {DataTypes.STRING(), DataTypes.INT()}); + + verifyCatalogTable(expectCatalogTable, getCatalogTable("target")); + } + @Test void testCreateOrReplaceTableASWithSortLimit() throws Exception { tEnv().executeSql( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala index 364feaad4e631..d0512adcb8511 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala @@ -337,6 +337,49 @@ class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase " managed table relies on checkpoint to commit and the data is visible only after commit.") } + @TestTemplate + def testCreateTableAsSelectWithColumnOrdering(): Unit = { + tEnv + .executeSql(""" + |CREATE TABLE MyCtasTable(votes, person) + | WITH ( + | 'connector' = 'values', + | 'sink-insert-only' = 'true' + |) AS + | SELECT + | `person`, + | `votes` + | FROM + | src + |""".stripMargin) + .await() + val actual = TestValuesTableFactory.getResultsAsStrings("MyCtasTable") + val expected = List( + "+I[1, jason]", + "+I[1, jason]", + "+I[1, jason]", + "+I[1, jason]" + ) + assertThat(actual.sorted).isEqualTo(expected.sorted) + // test statement set + val statementSet = tEnv.createStatementSet() + statementSet.addInsertSql(""" + |CREATE TABLE MyCtasTableUseStatement(votes, person) + | WITH ( + | 'connector' = 'values', + | 'sink-insert-only' = 'true' + |) AS + | SELECT + | `person`, + | `votes` + | FROM + | src + |""".stripMargin) + statementSet.execute().await() + val actualUseStatement = TestValuesTableFactory.getResultsAsStrings("MyCtasTableUseStatement") + assertThat(actualUseStatement.sorted).isEqualTo(expected.sorted) + } + @TestTemplate def testCreateTableAsSelectWithNewColumnsOnly(): Unit = { tEnv