diff --git a/docs/content.zh/docs/dev/table/sql/create.md b/docs/content.zh/docs/dev/table/sql/create.md
index 717c00502302f..650c1b28dc398 100644
--- a/docs/content.zh/docs/dev/table/sql/create.md
+++ b/docs/content.zh/docs/dev/table/sql/create.md
@@ -643,6 +643,37 @@ CREATE TABLE my_ctas_table (
INSERT INTO my_ctas_table SELECT id, name FROM source_table;
```
+`CTAS` also allows you to reorder the columns defined in the `SELECT` part by specifying all column names without data types in the `CREATE` part. This feature is equivalent to the `INSERT INTO` statement.
+The columns specified must match the names and number of columns in the `SELECT` part. This definition cannot be combined with new columns, which requires defining data types.
+
+Consider the example statement below:
+
+```sql
+CREATE TABLE my_ctas_table (
+ order_time, price, quantity, id
+) WITH (
+ 'connector' = 'kafka',
+ ...
+) AS SELECT id, price, quantity, order_time FROM source_table;
+```
+
+The resulting table `my_ctas_table` will be equivalent to create the following table and insert the data with the following statement:
+
+```
+CREATE TABLE my_ctas_table (
+ order_time TIMESTAMP(3),
+ price DOUBLE,
+ quantity DOUBLE,
+ id BIGINT
+) WITH (
+ 'connector' = 'kafka',
+ ...
+);
+
+INSERT INTO my_ctas_table (order_time, price, quantity, id)
+ SELECT id, price, quantity, order_time FROM source_table;
+```
+
**Note:** CTAS has these restrictions:
* Does not support creating a temporary table yet.
* Does not support creating partitioned table yet.
diff --git a/docs/content/docs/dev/table/sql/create.md b/docs/content/docs/dev/table/sql/create.md
index 2957a65679017..3a564d3ffb1c3 100644
--- a/docs/content/docs/dev/table/sql/create.md
+++ b/docs/content/docs/dev/table/sql/create.md
@@ -643,6 +643,37 @@ CREATE TABLE my_ctas_table (
INSERT INTO my_ctas_table SELECT id, name FROM source_table;
```
+`CTAS` also allows you to reorder the columns defined in the `SELECT` part by specifying all column names without data types in the `CREATE` part. This feature is equivalent to the `INSERT INTO` statement.
+The columns specified must match the names and number of columns in the `SELECT` part. This definition cannot be combined with new columns, which requires defining data types.
+
+Consider the example statement below:
+
+```sql
+CREATE TABLE my_ctas_table (
+ order_time, price, quantity, id
+) WITH (
+ 'connector' = 'kafka',
+ ...
+) AS SELECT id, price, quantity, order_time FROM source_table;
+```
+
+The resulting table `my_ctas_table` will be equivalent to create the following table and insert the data with the following statement:
+
+```
+CREATE TABLE my_ctas_table (
+ order_time TIMESTAMP(3),
+ price DOUBLE,
+ quantity DOUBLE,
+ id BIGINT
+) WITH (
+ 'connector' = 'kafka',
+ ...
+);
+
+INSERT INTO my_ctas_table (order_time, price, quantity, id)
+ SELECT id, price, quantity, order_time FROM source_table;
+```
+
**Note:** CTAS has these restrictions:
* Does not support creating a temporary table yet.
* Does not support creating partitioned table yet.
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index d1ae33dd9ce9b..9e7bad6206c62 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1532,6 +1532,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
SqlDistribution distribution = null;
SqlNodeList partitionColumns = SqlNodeList.EMPTY;
SqlParserPos pos = startPos;
+ boolean isColumnsIdentifiersOnly = false;
}
{
@@ -1541,12 +1542,10 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
tableName = CompoundIdentifier()
[
{ pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
- TableColumn(ctx)
- (
- TableColumn(ctx)
- )*
+ TableColumnsOrIdentifiers(pos, ctx)
{
pos = pos.plus(getPos());
+ isColumnsIdentifiersOnly = ctx.isColumnsIdentifiersOnly();
columnList = new SqlNodeList(ctx.columnList, pos);
constraints = ctx.constraints;
watermark = ctx.watermark;
@@ -1574,6 +1573,12 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
tableLike = SqlTableLike(getPos())
{
+ if (isColumnsIdentifiersOnly) {
+ throw SqlUtil.newContextException(
+ pos,
+ ParserResource.RESOURCE.columnsIdentifiersUnsupported());
+ }
+
return new SqlCreateTableLike(startPos.plus(getPos()),
tableName,
columnList,
@@ -1622,6 +1627,12 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
}
]
{
+ if (isColumnsIdentifiersOnly) {
+ throw SqlUtil.newContextException(
+ pos,
+ ParserResource.RESOURCE.columnsIdentifiersUnsupported());
+ }
+
return new SqlCreateTable(startPos.plus(getPos()),
tableName,
columnList,
@@ -1716,6 +1727,36 @@ SqlDrop SqlDropTable(Span s, boolean replace, boolean isTemporary) :
}
}
+void TableColumnsOrIdentifiers(SqlParserPos pos, TableCreationContext ctx) :
+{
+ final TableCreationContext tempCtx = new TableCreationContext();
+ final List identifiers = new ArrayList();
+}
+{
+ LOOKAHEAD(2)
+ (
+ TableColumn(tempCtx)
+ ( TableColumn(tempCtx))*
+ ) {
+ ctx.columnList = tempCtx.columnList;
+ ctx.constraints = tempCtx.constraints;
+ ctx.watermark = tempCtx.watermark;
+ }
+ |
+ (
+ AddCompoundColumnIdentifier(identifiers)
+ ( AddCompoundColumnIdentifier(identifiers))*
+ ) { ctx.columnList = identifiers; }
+}
+
+void AddCompoundColumnIdentifier(List list) :
+{
+ final SqlIdentifier name;
+}
+{
+ name = CompoundIdentifier() { list.add(name); }
+}
+
/**
* Parser a REPLACE TABLE AS statement
*/
@@ -1746,10 +1787,7 @@ SqlNode SqlReplaceTable() :
tableName = CompoundIdentifier()
[
{ pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
- TableColumn(ctx)
- (
- TableColumn(ctx)
- )*
+ TableColumnsOrIdentifiers(pos, ctx)
{
pos = getPos();
columnList = new SqlNodeList(ctx.columnList, pos);
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index b26e5d1e73428..96de6b7992187 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -297,6 +297,10 @@ public static class TableCreationContext {
public List constraints = new ArrayList<>();
@Nullable public SqlWatermark watermark;
@Nullable public SqlDistribution distribution;
+
+ public boolean isColumnsIdentifiersOnly() {
+ return !columnList.isEmpty() && columnList.get(0) instanceof SqlIdentifier;
+ }
}
public String[] fullTableName() {
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
index c2edf89d356cd..c48fbfdf19ea5 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
@@ -118,7 +118,10 @@ public SqlCreateTableAs(
@Override
public void validate() throws SqlValidateException {
- super.validate();
+ if (!isSchemaWithColumnsIdentifiersOnly()) {
+ super.validate();
+ }
+
if (isTemporary()) {
throw new SqlValidateException(
getParserPosition(),
@@ -130,6 +133,13 @@ public SqlNode getAsQuery() {
return asQuery;
}
+ public boolean isSchemaWithColumnsIdentifiersOnly() {
+ // CREATE AS SELECT supports passing only column identifiers in the column list. If
+ // the first column in the list is an identifier, then we assume the rest of the
+ // columns are identifiers as well.
+ return !getColumnList().isEmpty() && getColumnList().get(0) instanceof SqlIdentifier;
+ }
+
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
super.unparse(writer, leftPrec, rightPrec);
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
index 65bb7056dbd2e..47d7b78b2118c 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
@@ -158,7 +158,10 @@ public SqlReplaceTableAs(
@Override
public void validate() throws SqlValidateException {
- SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, columnList);
+ if (!isSchemaWithColumnsIdentifiersOnly()) {
+ SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, columnList);
+ }
+
// The following features are not currently supported by RTAS, but may be supported in the
// future
String errorMsg =
@@ -221,6 +224,13 @@ public boolean isTemporary() {
return isTemporary;
}
+ public boolean isSchemaWithColumnsIdentifiersOnly() {
+ // REPLACE table supports passing only column identifiers in the column list. If
+ // the first column in the list is an identifier, then we assume the rest of the
+ // columns are identifiers as well.
+ return !columnList.isEmpty() && columnList.get(0) instanceof SqlIdentifier;
+ }
+
/** Returns the column constraints plus the table constraints. */
public List getFullConstraints() {
return SqlConstraintValidator.getFullConstraints(tableConstraints, columnList);
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 0dbe275031b5c..9c29119ba9e08 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -45,6 +45,10 @@ public interface ParserResource {
"Unsupported CREATE OR REPLACE statement for EXPLAIN. The statement must define a query using the AS clause (i.e. CTAS/RTAS statements).")
Resources.ExInst explainCreateOrReplaceStatementUnsupported();
+ @Resources.BaseMessage(
+ "Columns identifiers without types in the schema are supported on CTAS/RTAS statements only.")
+ Resources.ExInst columnsIdentifiersUnsupported();
+
@Resources.BaseMessage("CREATE FUNCTION USING JAR syntax is not applicable to {0} language.")
Resources.ExInst createFunctionUsingJar(String language);
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 0345886c434ca..c732859e5a8d7 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2951,6 +2951,43 @@ void testCreateTableAsSelectWithPartitionKey() {
.node(new ValidationMatcher().ok());
}
+ @Test
+ void testCreateTableAsSelectWithColumnIdentifiers() {
+ // test with only column identifiers
+ sql("CREATE TABLE t (col1) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(new ValidationMatcher().ok());
+
+ // test mix of column identifiers and column with types is not allowed
+ sql("CREATE TABLE t (col1, col2 ^int^) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .fails("(?s).*Encountered \"int\" at line 1, column 28.*");
+ }
+
+ @Test
+ void testUnsupportedCreateTableStatementsWithColumnIdentifiers() {
+ String expectedErrorMsg =
+ "Columns identifiers without types in the schema are "
+ + "supported on CTAS/RTAS statements only.";
+
+ sql("CREATE TABLE t ^(a, h^) WITH " + "('connector' = 'kafka', 'kafka.topic' = 'log.test')")
+ .fails(expectedErrorMsg);
+
+ sql("CREATE TABLE t ^(a, h^) WITH "
+ + "('connector' = 'kafka', 'kafka.topic' = 'log.test') "
+ + "LIKE parent_table")
+ .fails(expectedErrorMsg);
+ }
+
+ @Test
+ void testReplaceTableAsSelectWithColumnIdentifiers() {
+ // test with only column identifiers
+ sql("REPLACE TABLE t (col1) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(new ValidationMatcher().ok());
+
+ // test mix of column identifiers and column with types is not allowed
+ sql("REPLACE TABLE t (col1, col2 ^int^) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .fails("(?s).*Encountered \"int\" at line 1, column 29.*");
+ }
+
@Test
void testReplaceTableAsSelect() {
// test replace table as select without options
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
index e743e5964a2d3..e3a1b08578cca 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
@@ -42,10 +42,10 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.validate.SqlValidator;
@@ -136,18 +136,16 @@ public PlannerQueryOperation maybeRewriteQuery(
}
}
- // if there are no new sink fields to include, then return the original query
- if (assignedFields.isEmpty()) {
- return origQueryOperation;
- }
-
// rewrite query
SqlCall newSelect =
- rewriterUtils.rewriteSelect(
- (SqlSelect) origQueryNode,
+ rewriterUtils.rewriteCall(
+ rewriterUtils,
+ sqlValidator,
+ (SqlCall) origQueryNode,
typeFactory.buildRelNodeRowType(sinkRowType),
assignedFields,
- targetPositions);
+ targetPositions,
+ () -> "Unsupported node type " + origQueryNode.getKind());
return (PlannerQueryOperation)
SqlNodeToOperationConversion.convert(flinkPlanner, catalogManager, newSelect)
@@ -212,6 +210,22 @@ public Schema mergeSchemas(
return schemaBuilder.build();
}
+ /** Reorders the columns from the source schema based on the columns identifiers list. */
+ public Schema reorderSchema(SqlNodeList sqlColumnList, ResolvedSchema sourceSchema) {
+ SchemaBuilder schemaBuilder =
+ new SchemaBuilder(
+ (FlinkTypeFactory) validator.getTypeFactory(),
+ dataTypeFactory,
+ validator,
+ escapeExpression);
+
+ schemaBuilder.reorderColumns(
+ sqlColumnList,
+ Schema.newBuilder().fromResolvedSchema(sourceSchema).build().getColumns());
+
+ return schemaBuilder.build();
+ }
+
/**
* Builder class for constructing a {@link Schema} based on the rules of the {@code CREATE TABLE
* ... AS SELECT} statement.
@@ -311,6 +325,33 @@ private void mergeColumns(List sinkCols, List sourceC
columns.putAll(sourceSchemaCols);
}
+ /** Reorders the columns from the source schema based on the columns identifiers list. */
+ private void reorderColumns(List identifiers, List sourceCols) {
+ Map sinkSchemaCols = new LinkedHashMap<>();
+ Map sourceSchemaCols = new LinkedHashMap<>();
+
+ populateColumnsFromSource(sourceCols, sourceSchemaCols);
+
+ if (identifiers.size() != sourceCols.size()) {
+ throw new ValidationException(
+ "The number of columns in the column list must match the number "
+ + "of columns in the source schema.");
+ }
+
+ for (SqlNode identifier : identifiers) {
+ String name = ((SqlIdentifier) identifier).getSimple();
+ if (!sourceSchemaCols.containsKey(name)) {
+ throw new ValidationException(
+ String.format("Column '%s' not found in the source schema. ", name));
+ }
+
+ sinkSchemaCols.put(name, sourceSchemaCols.get(name));
+ }
+
+ columns.clear();
+ columns.putAll(sinkSchemaCols);
+ }
+
/**
* Populates the schema columns from the source schema. The source schema is expected to
* contain only physical columns.
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 037fc4cde9272..dbff49f53103a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -140,7 +140,7 @@ Operation convertCreateTableAS(
}
private ResolvedCatalogTable createCatalogTable(
- SqlCreateTableAs sqlCreateTableAs, ResolvedSchema mergeSchema) {
+ SqlCreateTableAs sqlCreateTableAs, ResolvedSchema querySchema) {
Map tableOptions =
sqlCreateTableAs.getPropertyList().getList().stream()
.collect(
@@ -151,12 +151,20 @@ private ResolvedCatalogTable createCatalogTable(
String tableComment =
OperationConverterUtils.getTableComment(sqlCreateTableAs.getComment());
- Schema mergedSchema =
- mergeTableAsUtil.mergeSchemas(
- sqlCreateTableAs.getColumnList(),
- sqlCreateTableAs.getWatermark().orElse(null),
- sqlCreateTableAs.getFullConstraints(),
- mergeSchema);
+ Schema mergedSchema;
+ if (sqlCreateTableAs.isSchemaWithColumnsIdentifiersOnly()) {
+ // If only column identifiers are provided, then these are used to
+ // order the columns in the schema.
+ mergedSchema =
+ mergeTableAsUtil.reorderSchema(sqlCreateTableAs.getColumnList(), querySchema);
+ } else {
+ mergedSchema =
+ mergeTableAsUtil.mergeSchemas(
+ sqlCreateTableAs.getColumnList(),
+ sqlCreateTableAs.getWatermark().orElse(null),
+ sqlCreateTableAs.getFullConstraints(),
+ querySchema);
+ }
Optional tableDistribution =
Optional.ofNullable(sqlCreateTableAs.getDistribution())
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
index cf4d96cf26fbc..b2b5d8f10397f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
@@ -111,7 +111,7 @@ private ResolvedCatalogTable createCatalogTable(
ConvertContext context,
MergeTableAsUtil mergeTableAsUtil,
SqlReplaceTableAs sqlReplaceTableAs,
- ResolvedSchema mergeSchema) {
+ ResolvedSchema querySchema) {
CatalogManager catalogManager = context.getCatalogManager();
// get table comment
@@ -129,13 +129,21 @@ private ResolvedCatalogTable createCatalogTable(
((SqlTableOption) p).getKeyString(),
((SqlTableOption) p).getValueString()));
- // merge schemas
- Schema mergedSchema =
- mergeTableAsUtil.mergeSchemas(
- sqlReplaceTableAs.getColumnList(),
- sqlReplaceTableAs.getWatermark().orElse(null),
- sqlReplaceTableAs.getFullConstraints(),
- mergeSchema);
+ Schema mergedSchema;
+ if (sqlReplaceTableAs.isSchemaWithColumnsIdentifiersOnly()) {
+ // If only column identifiers are provided, then these are used to
+ // order the columns in the schema.
+ mergedSchema =
+ mergeTableAsUtil.reorderSchema(sqlReplaceTableAs.getColumnList(), querySchema);
+ } else {
+ // merge schemas
+ mergedSchema =
+ mergeTableAsUtil.mergeSchemas(
+ sqlReplaceTableAs.getColumnList(),
+ sqlReplaceTableAs.getWatermark().orElse(null),
+ sqlReplaceTableAs.getFullConstraints(),
+ querySchema);
+ }
// get distribution
Optional tableDistribution =
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index d47a91b4fb605..8be16db924f01 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -220,63 +220,14 @@ object PreValidateReWriter {
}
}
- rewriteSqlCall(rewriterUtils, validator, source, targetRowType, assignedFields, targetPosition)
- }
-
- private def rewriteSqlCall(
- rewriterUtils: SqlRewriterUtils,
- validator: FlinkCalciteSqlValidator,
- call: SqlCall,
- targetRowType: RelDataType,
- assignedFields: util.LinkedHashMap[Integer, SqlNode],
- targetPosition: util.List[Int]): SqlCall = {
-
- def rewrite(node: SqlNode): SqlCall = {
- checkArgument(node.isInstanceOf[SqlCall], node)
- rewriteSqlCall(
- rewriterUtils,
- validator,
- node.asInstanceOf[SqlCall],
- targetRowType,
- assignedFields,
- targetPosition)
- }
-
- call.getKind match {
- case SqlKind.SELECT =>
- val sqlSelect = call.asInstanceOf[SqlSelect]
-
- if (targetPosition.nonEmpty && sqlSelect.getSelectList.size() != targetPosition.size()) {
- throw newValidationError(call, RESOURCE.columnCountMismatch())
- }
- rewriterUtils.rewriteSelect(sqlSelect, targetRowType, assignedFields, targetPosition)
- case SqlKind.VALUES =>
- call.getOperandList.toSeq.foreach {
- case sqlCall: SqlCall => {
- if (targetPosition.nonEmpty && sqlCall.getOperandList.size() != targetPosition.size()) {
- throw newValidationError(call, RESOURCE.columnCountMismatch())
- }
- }
- }
- rewriterUtils.rewriteValues(call, targetRowType, assignedFields, targetPosition)
- case kind if SqlKind.SET_QUERY.contains(kind) =>
- call.getOperandList.zipWithIndex.foreach {
- case (operand, index) => call.setOperand(index, rewrite(operand))
- }
- call
- case SqlKind.ORDER_BY =>
- val operands = call.getOperandList
- new SqlOrderBy(
- call.getParserPosition,
- rewrite(operands.get(0)),
- operands.get(1).asInstanceOf[SqlNodeList],
- operands.get(2),
- operands.get(3))
- // Not support:
- // case SqlKind.WITH =>
- // case SqlKind.EXPLICIT_TABLE =>
- case _ => throw new ValidationException(notSupported(call))
- }
+ rewriterUtils.rewriteCall(
+ rewriterUtils,
+ validator,
+ source,
+ targetRowType,
+ assignedFields,
+ targetPosition,
+ () => notSupported(source))
}
/**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
index 64d426971d880..b9810b09f0d77 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
@@ -18,13 +18,19 @@
package org.apache.flink.table.planner.calcite
import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec
-import org.apache.flink.table.planner.calcite.SqlRewriterUtils.{rewriteSqlSelect, rewriteSqlValues}
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.planner.calcite.PreValidateReWriter.{newValidationError, notSupported}
+import org.apache.flink.table.planner.calcite.SqlRewriterUtils.{rewriteSqlCall, rewriteSqlSelect, rewriteSqlValues}
+import org.apache.flink.util.Preconditions.checkArgument
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.runtime.{CalciteContextException, Resources}
import org.apache.calcite.sql.`type`.SqlTypeUtil
-import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlKind, SqlNode, SqlNodeList, SqlSelect}
+import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlKind, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.validate.SqlValidatorException
+import org.apache.calcite.util.Static.RESOURCE
import java.util
import java.util.Collections
@@ -48,6 +54,24 @@ class SqlRewriterUtils(validator: FlinkCalciteSqlValidator) {
rewriteSqlValues(svalues, targetRowType, assignedFields, targetPosition)
}
+ def rewriteCall(
+ rewriterUtils: SqlRewriterUtils,
+ validator: FlinkCalciteSqlValidator,
+ call: SqlCall,
+ targetRowType: RelDataType,
+ assignedFields: util.LinkedHashMap[Integer, SqlNode],
+ targetPosition: util.List[Int],
+ unsupportedErrorMessage: () => String): SqlCall = {
+ rewriteSqlCall(
+ rewriterUtils,
+ validator,
+ call,
+ targetRowType,
+ assignedFields,
+ targetPosition,
+ unsupportedErrorMessage)
+ }
+
// This code snippet is copied from the SqlValidatorImpl.
def maybeCast(
node: SqlNode,
@@ -82,6 +106,64 @@ class SqlRewriterUtils(validator: FlinkCalciteSqlValidator) {
}
object SqlRewriterUtils {
+ def rewriteSqlCall(
+ rewriterUtils: SqlRewriterUtils,
+ validator: FlinkCalciteSqlValidator,
+ call: SqlCall,
+ targetRowType: RelDataType,
+ assignedFields: util.LinkedHashMap[Integer, SqlNode],
+ targetPosition: util.List[Int],
+ unsupportedErrorMessage: () => String): SqlCall = {
+
+ def rewrite(node: SqlNode): SqlCall = {
+ checkArgument(node.isInstanceOf[SqlCall], node)
+ rewriteSqlCall(
+ rewriterUtils,
+ validator,
+ node.asInstanceOf[SqlCall],
+ targetRowType,
+ assignedFields,
+ targetPosition,
+ unsupportedErrorMessage)
+ }
+
+ call.getKind match {
+ case SqlKind.SELECT =>
+ val sqlSelect = call.asInstanceOf[SqlSelect]
+
+ if (targetPosition.nonEmpty && sqlSelect.getSelectList.size() != targetPosition.size()) {
+ throw newValidationError(call, RESOURCE.columnCountMismatch())
+ }
+ rewriterUtils.rewriteSelect(sqlSelect, targetRowType, assignedFields, targetPosition)
+ case SqlKind.VALUES =>
+ call.getOperandList.toSeq.foreach {
+ case sqlCall: SqlCall => {
+ if (targetPosition.nonEmpty && sqlCall.getOperandList.size() != targetPosition.size()) {
+ throw newValidationError(call, RESOURCE.columnCountMismatch())
+ }
+ }
+ }
+ rewriterUtils.rewriteValues(call, targetRowType, assignedFields, targetPosition)
+ case kind if SqlKind.SET_QUERY.contains(kind) =>
+ call.getOperandList.zipWithIndex.foreach {
+ case (operand, index) => call.setOperand(index, rewrite(operand))
+ }
+ call
+ case SqlKind.ORDER_BY =>
+ val operands = call.getOperandList
+ new SqlOrderBy(
+ call.getParserPosition,
+ rewrite(operands.get(0)),
+ operands.get(1).asInstanceOf[SqlNodeList],
+ operands.get(2),
+ operands.get(3))
+ // Not support:
+ // case SqlKind.WITH =>
+ // case SqlKind.EXPLICIT_TABLE =>
+ case _ => throw new ValidationException(unsupportedErrorMessage())
+ }
+ }
+
def rewriteSqlSelect(
validator: FlinkCalciteSqlValidator,
select: SqlSelect,
@@ -156,6 +238,14 @@ object SqlRewriterUtils {
SqlStdOperatorTable.VALUES.createCall(values.getParserPosition, fixedNodes)
}
+ def newValidationError(
+ node: SqlNode,
+ e: Resources.ExInst[SqlValidatorException]): CalciteContextException = {
+ assert(node != null)
+ val pos = node.getParserPosition
+ SqlUtil.newContextException(pos, e)
+ }
+
/**
* Reorder sourceList to targetPosition. For example:
* - sourceList(f0, f1, f2).
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index e758296f2ecf0..6904763dbf635 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -658,6 +658,80 @@ public void testCreateTableInvalidDistribution() {
"Invalid bucket key 'f3'. A bucket key for a distribution must reference a physical column in the schema. Available columns are: [a]");
}
+ @Test
+ public void tesCreateTableAsWithOrderingColumns() {
+ CatalogTable catalogTable =
+ CatalogTable.newBuilder()
+ .schema(
+ Schema.newBuilder()
+ .column("f0", DataTypes.INT().notNull())
+ .column("f1", DataTypes.TIMESTAMP(3))
+ .build())
+ .build();
+
+ catalogManager.createTable(
+ catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false);
+
+ final String sql = "create table tbl1 (f1, f0) AS SELECT * FROM src1";
+
+ Operation ctas = parseAndConvert(sql);
+ Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation();
+ assertThat(operation)
+ .is(
+ new HamcrestCondition<>(
+ isCreateTableOperation(
+ withNoDistribution(),
+ withSchema(
+ Schema.newBuilder()
+ .column("f1", DataTypes.TIMESTAMP(3))
+ .column("f0", DataTypes.INT().notNull())
+ .build()))));
+ }
+
+ @Test
+ public void testCreateTableAsWithNotFoundColumnIdentifiers() {
+ CatalogTable catalogTable =
+ CatalogTable.newBuilder()
+ .schema(
+ Schema.newBuilder()
+ .column("f0", DataTypes.INT().notNull())
+ .column("f1", DataTypes.INT())
+ .build())
+ .build();
+
+ catalogManager.createTable(
+ catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false);
+
+ final String sql = "create table tbl1 (f1, f2) AS SELECT * FROM src1";
+
+ assertThatThrownBy(() -> parseAndConvert(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("Column 'f2' not found in the source schema.");
+ }
+
+ @Test
+ public void testCreateTableAsWithMismatchIdentifiersLength() {
+ CatalogTable catalogTable =
+ CatalogTable.newBuilder()
+ .schema(
+ Schema.newBuilder()
+ .column("f0", DataTypes.INT().notNull())
+ .column("f1", DataTypes.INT())
+ .build())
+ .build();
+
+ catalogManager.createTable(
+ catalogTable, ObjectIdentifier.of("builtin", "default", "src1"), false);
+
+ final String sql = "create table tbl1 (f1) AS SELECT * FROM src1";
+
+ assertThatThrownBy(() -> parseAndConvert(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "The number of columns in the column list "
+ + "must match the number of columns in the source schema.");
+ }
+
@Test
public void testCreateTableAsWithColumns() {
CatalogTable catalogTable =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
index ee6e8bca6588a..78ed39bf9802b 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
@@ -61,6 +61,50 @@ public void testReplaceTableAs() {
testCommonReplaceTableAs(sql, tableName, tableComment);
}
+ @Test
+ public void testReplaceTableAsWithOrderingColumns() {
+ String tableName = "replace_table";
+ String sql =
+ "REPLACE TABLE "
+ + tableName
+ + " (a, b) WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT b, a FROM t1";
+ Schema tableSchema =
+ Schema.newBuilder()
+ .column("a", DataTypes.BIGINT().notNull())
+ .column("b", DataTypes.STRING())
+ .build();
+
+ testCommonReplaceTableAs(sql, tableName, null, tableSchema, null, Collections.emptyList());
+ }
+
+ @Test
+ public void testReplaceTableAsWithNotFoundColumnIdentifiers() {
+ String tableName = "replace_table";
+ String sql =
+ "REPLACE TABLE "
+ + tableName
+ + " (a, d) WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT b, a FROM t1";
+
+ assertThatThrownBy(() -> parseAndConvert(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("Column 'd' not found in the source schema.");
+ }
+
+ @Test
+ public void testReplaceTableAsWithMismatchIdentifiersLength() {
+ String tableName = "replace_table";
+ String sql =
+ "REPLACE TABLE "
+ + tableName
+ + " (a) WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT b, a FROM t1";
+
+ assertThatThrownBy(() -> parseAndConvert(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "The number of columns in the column list "
+ + "must match the number of columns in the source schema.");
+ }
+
@Test
public void testCreateOrReplaceTableAs() {
String tableName = "create_or_replace_table";
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
index 2ce2bc6dffefb..61e1c0d087b15 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
@@ -188,6 +188,28 @@ void testCreateOrReplaceTableAsSelectWithMixOfNewColumnsAndQueryColumns() throws
verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
}
+ @Test
+ void testReplaceTableAsSelectWithColumnOrdering() throws Exception {
+ tEnv().executeSql(
+ "REPLACE TABLE target"
+ + " (c, a)"
+ + " WITH ('connector' = 'values', 'bounded' = 'true')"
+ + " AS SELECT a, c FROM source")
+ .await();
+
+ // verify written rows
+ assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString())
+ .isEqualTo("[" + "+I[Hi, 1], " + "+I[Hello, 2], " + "+I[Hello world, 3]" + "]");
+
+ // verify the table after replacing
+ CatalogTable expectCatalogTable =
+ getExpectCatalogTable(
+ new String[] {"c", "a"},
+ new AbstractDataType[] {DataTypes.STRING(), DataTypes.INT()});
+
+ verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
+ }
+
@Test
void testCreateOrReplaceTableASWithSortLimit() throws Exception {
tEnv().executeSql(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
index 364feaad4e631..d0512adcb8511 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
@@ -337,6 +337,49 @@ class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase
" managed table relies on checkpoint to commit and the data is visible only after commit.")
}
+ @TestTemplate
+ def testCreateTableAsSelectWithColumnOrdering(): Unit = {
+ tEnv
+ .executeSql("""
+ |CREATE TABLE MyCtasTable(votes, person)
+ | WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'true'
+ |) AS
+ | SELECT
+ | `person`,
+ | `votes`
+ | FROM
+ | src
+ |""".stripMargin)
+ .await()
+ val actual = TestValuesTableFactory.getResultsAsStrings("MyCtasTable")
+ val expected = List(
+ "+I[1, jason]",
+ "+I[1, jason]",
+ "+I[1, jason]",
+ "+I[1, jason]"
+ )
+ assertThat(actual.sorted).isEqualTo(expected.sorted)
+ // test statement set
+ val statementSet = tEnv.createStatementSet()
+ statementSet.addInsertSql("""
+ |CREATE TABLE MyCtasTableUseStatement(votes, person)
+ | WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'true'
+ |) AS
+ | SELECT
+ | `person`,
+ | `votes`
+ | FROM
+ | src
+ |""".stripMargin)
+ statementSet.execute().await()
+ val actualUseStatement = TestValuesTableFactory.getResultsAsStrings("MyCtasTableUseStatement")
+ assertThat(actualUseStatement.sorted).isEqualTo(expected.sorted)
+ }
+
@TestTemplate
def testCreateTableAsSelectWithNewColumnsOnly(): Unit = {
tEnv