Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35709][table-planner] Allow reordering source columns in CTAS and RTAS #25401

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/content.zh/docs/dev/table/sql/create.md
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,37 @@ CREATE TABLE my_ctas_table (
INSERT INTO my_ctas_table SELECT id, name FROM source_table;
```

`CTAS` also allows you to reorder the columns defined in the `SELECT` part by specifying all column names without data types in the `CREATE` part. This feature is equivalent to the `INSERT INTO` statement.
The columns specified must match the names and number of columns in the `SELECT` part. This definition cannot be combined with new columns, which requires defining data types.

Consider the example statement below:

```sql
CREATE TABLE my_ctas_table (
order_time, price, quantity, id
) WITH (
'connector' = 'kafka',
...
) AS SELECT id, price, quantity, order_time FROM source_table;
```

The resulting table `my_ctas_table` will be equivalent to create the following table and insert the data with the following statement:

```
CREATE TABLE my_ctas_table (
order_time TIMESTAMP(3),
price DOUBLE,
quantity DOUBLE,
id BIGINT
) WITH (
'connector' = 'kafka',
...
);

INSERT INTO my_ctas_table (order_time, price, quantity, id)
SELECT id, price, quantity, order_time FROM source_table;
```

**Note:** CTAS has these restrictions:
* Does not support creating a temporary table yet.
* Does not support creating partitioned table yet.
Expand Down
31 changes: 31 additions & 0 deletions docs/content/docs/dev/table/sql/create.md
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,37 @@ CREATE TABLE my_ctas_table (
INSERT INTO my_ctas_table SELECT id, name FROM source_table;
```

`CTAS` also allows you to reorder the columns defined in the `SELECT` part by specifying all column names without data types in the `CREATE` part. This feature is equivalent to the `INSERT INTO` statement.
The columns specified must match the names and number of columns in the `SELECT` part. This definition cannot be combined with new columns, which requires defining data types.

Consider the example statement below:

```sql
CREATE TABLE my_ctas_table (
order_time, price, quantity, id
) WITH (
'connector' = 'kafka',
...
) AS SELECT id, price, quantity, order_time FROM source_table;
```

The resulting table `my_ctas_table` will be equivalent to create the following table and insert the data with the following statement:

```
CREATE TABLE my_ctas_table (
order_time TIMESTAMP(3),
price DOUBLE,
quantity DOUBLE,
id BIGINT
) WITH (
'connector' = 'kafka',
...
);

INSERT INTO my_ctas_table (order_time, price, quantity, id)
SELECT id, price, quantity, order_time FROM source_table;
```

**Note:** CTAS has these restrictions:
* Does not support creating a temporary table yet.
* Does not support creating partitioned table yet.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
SqlDistribution distribution = null;
SqlNodeList partitionColumns = SqlNodeList.EMPTY;
SqlParserPos pos = startPos;
boolean isColumnsIdentifiersOnly = false;
}
{
<TABLE>
Expand All @@ -1541,12 +1542,10 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
tableName = CompoundIdentifier()
[
<LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
TableColumn(ctx)
(
<COMMA> TableColumn(ctx)
)*
TableColumnsOrIdentifiers(pos, ctx)
{
pos = pos.plus(getPos());
isColumnsIdentifiersOnly = ctx.isColumnsIdentifiersOnly();
columnList = new SqlNodeList(ctx.columnList, pos);
constraints = ctx.constraints;
watermark = ctx.watermark;
Expand Down Expand Up @@ -1574,6 +1573,12 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
<LIKE>
tableLike = SqlTableLike(getPos())
{
if (isColumnsIdentifiersOnly) {
throw SqlUtil.newContextException(
pos,
ParserResource.RESOURCE.columnsIdentifiersUnsupported());
}

return new SqlCreateTableLike(startPos.plus(getPos()),
tableName,
columnList,
Expand Down Expand Up @@ -1622,6 +1627,12 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
}
]
{
if (isColumnsIdentifiersOnly) {
throw SqlUtil.newContextException(
pos,
ParserResource.RESOURCE.columnsIdentifiersUnsupported());
}

return new SqlCreateTable(startPos.plus(getPos()),
tableName,
columnList,
Expand Down Expand Up @@ -1716,6 +1727,36 @@ SqlDrop SqlDropTable(Span s, boolean replace, boolean isTemporary) :
}
}

void TableColumnsOrIdentifiers(SqlParserPos pos, TableCreationContext ctx) :
{
final TableCreationContext tempCtx = new TableCreationContext();
final List<SqlNode> identifiers = new ArrayList<SqlNode>();
}
{
LOOKAHEAD(2)
(
TableColumn(tempCtx)
(<COMMA> TableColumn(tempCtx))*
) {
ctx.columnList = tempCtx.columnList;
ctx.constraints = tempCtx.constraints;
ctx.watermark = tempCtx.watermark;
}
|
(
AddCompoundColumnIdentifier(identifiers)
(<COMMA> AddCompoundColumnIdentifier(identifiers))*
) { ctx.columnList = identifiers; }
}

void AddCompoundColumnIdentifier(List<SqlNode> list) :
{
final SqlIdentifier name;
}
{
name = CompoundIdentifier() { list.add(name); }
}

/**
* Parser a REPLACE TABLE AS statement
*/
Expand Down Expand Up @@ -1746,10 +1787,7 @@ SqlNode SqlReplaceTable() :
tableName = CompoundIdentifier()
[
<LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
TableColumn(ctx)
(
<COMMA> TableColumn(ctx)
)*
TableColumnsOrIdentifiers(pos, ctx)
{
pos = getPos();
columnList = new SqlNodeList(ctx.columnList, pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ public static class TableCreationContext {
public List<SqlTableConstraint> constraints = new ArrayList<>();
@Nullable public SqlWatermark watermark;
@Nullable public SqlDistribution distribution;

public boolean isColumnsIdentifiersOnly() {
return !columnList.isEmpty() && columnList.get(0) instanceof SqlIdentifier;
}
}

public String[] fullTableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ public SqlCreateTableAs(

@Override
public void validate() throws SqlValidateException {
super.validate();
if (!isSchemaWithColumnsIdentifiersOnly()) {
super.validate();
}

if (isTemporary()) {
throw new SqlValidateException(
getParserPosition(),
Expand All @@ -130,6 +133,13 @@ public SqlNode getAsQuery() {
return asQuery;
}

public boolean isSchemaWithColumnsIdentifiersOnly() {
// CREATE AS SELECT supports passing only column identifiers in the column list. If
// the first column in the list is an identifier, then we assume the rest of the
// columns are identifiers as well.
return !getColumnList().isEmpty() && getColumnList().get(0) instanceof SqlIdentifier;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
super.unparse(writer, leftPrec, rightPrec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ public SqlReplaceTableAs(

@Override
public void validate() throws SqlValidateException {
SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, columnList);
if (!isSchemaWithColumnsIdentifiersOnly()) {
SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, columnList);
}

// The following features are not currently supported by RTAS, but may be supported in the
// future
String errorMsg =
Expand Down Expand Up @@ -221,6 +224,13 @@ public boolean isTemporary() {
return isTemporary;
}

public boolean isSchemaWithColumnsIdentifiersOnly() {
// REPLACE table supports passing only column identifiers in the column list. If
// the first column in the list is an identifier, then we assume the rest of the
// columns are identifiers as well.
return !columnList.isEmpty() && columnList.get(0) instanceof SqlIdentifier;
}

/** Returns the column constraints plus the table constraints. */
public List<SqlTableConstraint> getFullConstraints() {
return SqlConstraintValidator.getFullConstraints(tableConstraints, columnList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public interface ParserResource {
"Unsupported CREATE OR REPLACE statement for EXPLAIN. The statement must define a query using the AS clause (i.e. CTAS/RTAS statements).")
Resources.ExInst<ParseException> explainCreateOrReplaceStatementUnsupported();

@Resources.BaseMessage(
"Columns identifiers without types in the schema are supported on CTAS/RTAS statements only.")
Resources.ExInst<ParseException> columnsIdentifiersUnsupported();

@Resources.BaseMessage("CREATE FUNCTION USING JAR syntax is not applicable to {0} language.")
Resources.ExInst<ParseException> createFunctionUsingJar(String language);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2951,6 +2951,43 @@ void testCreateTableAsSelectWithPartitionKey() {
.node(new ValidationMatcher().ok());
}

@Test
void testCreateTableAsSelectWithColumnIdentifiers() {
// test with only column identifiers
sql("CREATE TABLE t (col1) WITH ('test' = 'zm') AS SELECT col1 FROM b")
.node(new ValidationMatcher().ok());

// test mix of column identifiers and column with types is not allowed
sql("CREATE TABLE t (col1, col2 ^int^) WITH ('test' = 'zm') AS SELECT col1 FROM b")
.fails("(?s).*Encountered \"int\" at line 1, column 28.*");
}

@Test
void testUnsupportedCreateTableStatementsWithColumnIdentifiers() {
String expectedErrorMsg =
"Columns identifiers without types in the schema are "
+ "supported on CTAS/RTAS statements only.";

sql("CREATE TABLE t ^(a, h^) WITH " + "('connector' = 'kafka', 'kafka.topic' = 'log.test')")
.fails(expectedErrorMsg);

sql("CREATE TABLE t ^(a, h^) WITH "
+ "('connector' = 'kafka', 'kafka.topic' = 'log.test') "
+ "LIKE parent_table")
.fails(expectedErrorMsg);
}

@Test
void testReplaceTableAsSelectWithColumnIdentifiers() {
// test with only column identifiers
sql("REPLACE TABLE t (col1) WITH ('test' = 'zm') AS SELECT col1 FROM b")
.node(new ValidationMatcher().ok());

// test mix of column identifiers and column with types is not allowed
sql("REPLACE TABLE t (col1, col2 ^int^) WITH ('test' = 'zm') AS SELECT col1 FROM b")
.fails("(?s).*Encountered \"int\" at line 1, column 29.*");
}

@Test
void testReplaceTableAsSelect() {
// test replace table as select without options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.validate.SqlValidator;

Expand Down Expand Up @@ -136,18 +136,16 @@ public PlannerQueryOperation maybeRewriteQuery(
}
}

// if there are no new sink fields to include, then return the original query
if (assignedFields.isEmpty()) {
return origQueryOperation;
}

// rewrite query
SqlCall newSelect =
rewriterUtils.rewriteSelect(
(SqlSelect) origQueryNode,
rewriterUtils.rewriteCall(
rewriterUtils,
sqlValidator,
(SqlCall) origQueryNode,
typeFactory.buildRelNodeRowType(sinkRowType),
assignedFields,
targetPositions);
targetPositions,
() -> "Unsupported node type " + origQueryNode.getKind());

return (PlannerQueryOperation)
SqlNodeToOperationConversion.convert(flinkPlanner, catalogManager, newSelect)
Expand Down Expand Up @@ -212,6 +210,22 @@ public Schema mergeSchemas(
return schemaBuilder.build();
}

/** Reorders the columns from the source schema based on the columns identifiers list. */
public Schema reorderSchema(SqlNodeList sqlColumnList, ResolvedSchema sourceSchema) {
SchemaBuilder schemaBuilder =
new SchemaBuilder(
(FlinkTypeFactory) validator.getTypeFactory(),
dataTypeFactory,
validator,
escapeExpression);

schemaBuilder.reorderColumns(
sqlColumnList,
Schema.newBuilder().fromResolvedSchema(sourceSchema).build().getColumns());

return schemaBuilder.build();
}

/**
* Builder class for constructing a {@link Schema} based on the rules of the {@code CREATE TABLE
* ... AS SELECT} statement.
Expand Down Expand Up @@ -311,6 +325,33 @@ private void mergeColumns(List<SqlNode> sinkCols, List<UnresolvedColumn> sourceC
columns.putAll(sourceSchemaCols);
}

/** Reorders the columns from the source schema based on the columns identifiers list. */
private void reorderColumns(List<SqlNode> identifiers, List<UnresolvedColumn> sourceCols) {
Map<String, UnresolvedColumn> sinkSchemaCols = new LinkedHashMap<>();
Map<String, UnresolvedColumn> sourceSchemaCols = new LinkedHashMap<>();

populateColumnsFromSource(sourceCols, sourceSchemaCols);

if (identifiers.size() != sourceCols.size()) {
throw new ValidationException(
"The number of columns in the column list must match the number "
+ "of columns in the source schema.");
}

for (SqlNode identifier : identifiers) {
String name = ((SqlIdentifier) identifier).getSimple();
if (!sourceSchemaCols.containsKey(name)) {
throw new ValidationException(
String.format("Column '%s' not found in the source schema. ", name));
}

sinkSchemaCols.put(name, sourceSchemaCols.get(name));
}

columns.clear();
columns.putAll(sinkSchemaCols);
}

/**
* Populates the schema columns from the source schema. The source schema is expected to
* contain only physical columns.
Expand Down
Loading