Skip to content

Commit

Permalink
[SPARK-50675][SQL] Table and view level collations support
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This change introduces table and view level collations support in Spark SQL, allowing CREATE TABLE, ALTER TABLE and CREATE VIEW commands to specify DEFAULT COLLATION to be used. For CREATE commands, this refers to all the underlying columns added as part of the table/view creation. For ALTER TABLE command, this refers to only newly created columns in the future, whereas existing ones are not affected, i.e. their collation remains the same.

The PR has been modelled after the original changes made by stefankandic in #48090, with this PR covering table and view-level collations, whereas a follow up PR will be made covering schema-level collations.

This PR is adding/extending the corresponding DDL commands for specifying table/view level collation, whereas a follow up PR will be created separately to leverage the table/view collation in order to determine default collations for input queries of DML commands.

### Why are the changes needed?

From our internal users feedback, many people would like to be able to specify collation for their objects, instead of each individual columns. This change adds support for table and view level collations, whereas subsequent changes will add support for other objects such as schema-level collations.

### Does this PR introduce _any_ user-facing change?

The change follows the agreed additions in syntax for collation support.

The following syntax is now supported (**bold** parts denote additions):

{ { [CREATE OR] REPLACE TABLE | CREATE [EXTERNAL] TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ USING data_source ]
  [ table_clauses ]
  [ AS query ] }

table_specification
  ( { column_identifier column_type [ column_properties ] ] } [, ...]
    [ , table_constraint ] [...] )

table_clauses
  { OPTIONS clause |
    PARTITIONED BY clause |
    CLUSTER BY clause |
    clustered_by_clause |
    LOCATION path [ WITH ( CREDENTIAL credential_name ) ] |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    **DEFAULT COLLATION table_collation_name |**
    WITH { ROW FILTER clause } } [...]

CREATE [ OR REPLACE ] [ TEMPORARY ] VIEW [ IF NOT EXISTS ] view_name
    [ column_list ]
    [ schema_binding |
      COMMENT view_comment |
      TBLPROPERTIES clause |
      **DEFAULT COLLATION collation_name** ] [...]
    AS query

ALTER TABLE table_name
   { ADD COLUMN clause |
     ALTER COLUMN clause |
     DROP COLUMN clause |
     RENAME COLUMN clause |
     **DEFAULT COLLATION clause | …**
   }

### How was this patch tested?

Tests for the new syntax/functionality were added as part of the change. Also, some of the existing tests were extended/amended to cover the new DEFAULT COLLATION for table/view objects.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #49084 from dejankrak-db/object-level-collations.

Authored-by: Dejan Krakovic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
dejankrak-db authored and cloud-fan committed Dec 26, 2024
1 parent 7a4114c commit 92948e7
Show file tree
Hide file tree
Showing 33 changed files with 257 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,15 @@ statement
| ALTER TABLE identifierReference RECOVER PARTITIONS #recoverPartitions
| ALTER TABLE identifierReference
(clusterBySpec | CLUSTER BY NONE) #alterClusterBy
| ALTER TABLE identifierReference collationSpec #alterTableCollation
| DROP TABLE (IF EXISTS)? identifierReference PURGE? #dropTable
| DROP VIEW (IF EXISTS)? identifierReference #dropView
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
VIEW (IF errorCapturingNot EXISTS)? identifierReference
identifierCommentList?
(commentSpec |
schemaBinding |
collationSpec |
(PARTITIONED ON identifierList) |
(TBLPROPERTIES propertyList))*
AS query #createView
Expand Down Expand Up @@ -528,6 +530,7 @@ createTableClauses
createFileFormat |
locationSpec |
commentSpec |
collationSpec |
(TBLPROPERTIES tableProps=propertyList))*
;

Expand Down Expand Up @@ -1232,6 +1235,10 @@ colPosition
: position=FIRST | position=AFTER afterCol=errorCapturingIdentifier
;

collationSpec
: DEFAULT COLLATION collationName=identifier
;

collateClause
: COLLATE collationName=multipartIdentifier
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public interface TableCatalog extends CatalogPlugin {
*/
String PROP_COMMENT = "comment";

/**
* A reserved property to specify the collation of the table.
*/
String PROP_COLLATION = "collation";

/**
* A reserved property to specify the provider of the table.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ object ResolveTableSpec extends Rule[LogicalPlan] {
options = newOptions.toMap,
location = u.location,
comment = u.comment,
collation = u.collation,
serde = u.serde,
external = u.external)
withNewSpec(newTableSpec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ case class CatalogTable(
stats: Option[CatalogStatistics] = None,
viewText: Option[String] = None,
comment: Option[String] = None,
collation: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty,
tracksPartitionsInCatalog: Boolean = false,
schemaPreservesCase: Boolean = true,
Expand Down Expand Up @@ -546,6 +547,7 @@ case class CatalogTable(
provider.foreach(map.put("Provider", _))
bucketSpec.foreach(map ++= _.toLinkedHashMap)
comment.foreach(map.put("Comment", _))
collation.foreach(map.put("Collation", _))
if (tableType == CatalogTableType.VIEW) {
viewText.foreach(map.put("View Text", _))
viewOriginalText.foreach(map.put("View Original Text", _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin}
import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, IntervalUtils, SparkParserUtils}
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, CollationFactory, DateTimeUtils, IntervalUtils, SparkParserUtils}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, stringToTimestamp, stringToTimestampWithoutTimeZone}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces, TableCatalog, TableWritePrivilege}
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
Expand Down Expand Up @@ -3869,6 +3869,16 @@ class AstBuilder extends DataTypeAstBuilder
ctx.asScala.headOption.map(visitCommentSpec)
}

protected def visitCollationSpecList(
ctx: java.util.List[CollationSpecContext]): Option[String] = {
ctx.asScala.headOption.map(visitCollationSpec)
}

override def visitCollationSpec(ctx: CollationSpecContext): String = withOrigin(ctx) {
val collationName = ctx.identifier.getText
CollationFactory.fetchCollation(collationName).collationName
}

/**
* Create a [[BucketSpec]].
*/
Expand Down Expand Up @@ -4000,6 +4010,7 @@ class AstBuilder extends DataTypeAstBuilder
* - options
* - location
* - comment
* - collation
* - serde
* - clusterBySpec
*
Expand All @@ -4008,8 +4019,8 @@ class AstBuilder extends DataTypeAstBuilder
* types like `i INT`, which should be appended to the existing table schema.
*/
type TableClauses = (
Seq[Transform], Seq[ColumnDefinition], Option[BucketSpec], Map[String, String],
OptionList, Option[String], Option[String], Option[SerdeInfo], Option[ClusterBySpec])
Seq[Transform], Seq[ColumnDefinition], Option[BucketSpec], Map[String, String], OptionList,
Option[String], Option[String], Option[String], Option[SerdeInfo], Option[ClusterBySpec])

/**
* Validate a create table statement and return the [[TableIdentifier]].
Expand Down Expand Up @@ -4296,6 +4307,10 @@ class AstBuilder extends DataTypeAstBuilder
throw QueryParsingErrors.cannotCleanReservedTablePropertyError(
PROP_EXTERNAL, ctx, "please use CREATE EXTERNAL TABLE")
case (PROP_EXTERNAL, _) => false
case (PROP_COLLATION, _) if !legacyOn =>
throw QueryParsingErrors.cannotCleanReservedTablePropertyError(
PROP_COLLATION, ctx, "please use the DEFAULT COLLATION clause to specify it")
case (PROP_COLLATION, _) => false
// It's safe to set whatever table comment, so we don't make it a reserved table property.
case (PROP_COMMENT, _) => true
case (k, _) =>
Expand Down Expand Up @@ -4475,6 +4490,7 @@ class AstBuilder extends DataTypeAstBuilder
checkDuplicateClauses(ctx.createFileFormat, "STORED AS/BY", ctx)
checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx)
checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx)
checkDuplicateClauses(ctx.collationSpec(), "DEFAULT COLLATION", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.clusterBySpec(), "CLUSTER BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
Expand All @@ -4493,6 +4509,7 @@ class AstBuilder extends DataTypeAstBuilder
val location = visitLocationSpecList(ctx.locationSpec())
val (cleanedOptions, newLocation) = cleanTableOptions(ctx, options, location)
val comment = visitCommentSpecList(ctx.commentSpec())
val collation = visitCollationSpecList(ctx.collationSpec())
val serdeInfo =
getSerdeInfo(ctx.rowFormat.asScala.toSeq, ctx.createFileFormat.asScala.toSeq, ctx)
val clusterBySpec = ctx.clusterBySpec().asScala.headOption.map(visitClusterBySpec)
Expand All @@ -4507,7 +4524,7 @@ class AstBuilder extends DataTypeAstBuilder
}

(partTransforms, partCols, bucketSpec, cleanedProperties, cleanedOptions, newLocation, comment,
serdeInfo, clusterBySpec)
collation, serdeInfo, clusterBySpec)
}

protected def getSerdeInfo(
Expand Down Expand Up @@ -4567,6 +4584,7 @@ class AstBuilder extends DataTypeAstBuilder
* ]
* [LOCATION path]
* [COMMENT table_comment]
* [DEFAULT COLLATION collation_name]
* [TBLPROPERTIES (property_name=property_value, ...)]
*
* partition_fields:
Expand All @@ -4580,8 +4598,8 @@ class AstBuilder extends DataTypeAstBuilder

val columns = Option(ctx.colDefinitionList()).map(visitColDefinitionList).getOrElse(Nil)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
val (partTransforms, partCols, bucketSpec, properties, options, location,
comment, serdeInfo, clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses())
val (partTransforms, partCols, bucketSpec, properties, options, location, comment,
collation, serdeInfo, clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses())

if (provider.isDefined && serdeInfo.isDefined) {
invalidStatement(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
Expand All @@ -4599,7 +4617,7 @@ class AstBuilder extends DataTypeAstBuilder
clusterBySpec.map(_.asTransform)

val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment,
serdeInfo, external)
collation, serdeInfo, external)

Option(ctx.query).map(plan) match {
case Some(_) if columns.nonEmpty =>
Expand Down Expand Up @@ -4648,6 +4666,7 @@ class AstBuilder extends DataTypeAstBuilder
* ]
* [LOCATION path]
* [COMMENT table_comment]
* [DEFAULT COLLATION collation_name]
* [TBLPROPERTIES (property_name=property_value, ...)]
*
* partition_fields:
Expand All @@ -4657,8 +4676,8 @@ class AstBuilder extends DataTypeAstBuilder
*/
override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = withOrigin(ctx) {
val orCreate = ctx.replaceTableHeader().CREATE() != null
val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo,
clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses())
val (partTransforms, partCols, bucketSpec, properties, options, location, comment, collation,
serdeInfo, clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses())
val columns = Option(ctx.colDefinitionList()).map(visitColDefinitionList).getOrElse(Nil)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)

Expand All @@ -4672,7 +4691,7 @@ class AstBuilder extends DataTypeAstBuilder
clusterBySpec.map(_.asTransform)

val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment,
serdeInfo, external = false)
collation, serdeInfo, external = false)

Option(ctx.query).map(plan) match {
case Some(_) if columns.nonEmpty =>
Expand Down Expand Up @@ -5078,6 +5097,21 @@ class AstBuilder extends DataTypeAstBuilder
}
}

/**
* Parse a [[AlterTableCollation]] command.
*
* For example:
* {{{
* ALTER TABLE table1 DEFAULT COLLATION name
* }}}
*/
override def visitAlterTableCollation(ctx: AlterTableCollationContext): LogicalPlan =
withOrigin(ctx) {
val table = createUnresolvedTable(
ctx.identifierReference, "ALTER TABLE ... DEFAULT COLLATION")
AlterTableCollation(table, visitCollationSpec(ctx.collationSpec()))
}

/**
* Parse [[SetViewProperties]] or [[SetTableProperties]] commands.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,15 @@ case class AlterTableClusterBy(

protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(table = newChild)
}

/**
* The logical plan of the ALTER TABLE ... DEFAULT COLLATION name command.
*/
case class AlterTableCollation(
table: LogicalPlan, collation: String) extends AlterTableCommand {
override def changes: Seq[TableChange] = {
Seq(TableChange.setProperty(TableCatalog.PROP_COLLATION, collation))
}

protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(table = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,7 @@ case class CreateView(
child: LogicalPlan,
userSpecifiedColumns: Seq[(String, Option[String])],
comment: Option[String],
collation: Option[String],
properties: Map[String, String],
originalText: Option[String],
query: LogicalPlan,
Expand Down Expand Up @@ -1486,6 +1487,7 @@ trait TableSpecBase {
def provider: Option[String]
def location: Option[String]
def comment: Option[String]
def collation: Option[String]
def serde: Option[SerdeInfo]
def external: Boolean
}
Expand All @@ -1496,6 +1498,7 @@ case class UnresolvedTableSpec(
optionExpression: OptionList,
location: Option[String],
comment: Option[String],
collation: Option[String],
serde: Option[SerdeInfo],
external: Boolean) extends UnaryExpression with Unevaluable with TableSpecBase {

Expand Down Expand Up @@ -1541,10 +1544,11 @@ case class TableSpec(
options: Map[String, String],
location: Option[String],
comment: Option[String],
collation: Option[String],
serde: Option[SerdeInfo],
external: Boolean) extends TableSpecBase {
def withNewLocation(newLocation: Option[String]): TableSpec = {
TableSpec(properties, provider, options, newLocation, comment, serde, external)
TableSpec(properties, provider, options, newLocation, comment, collation, serde, external)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ private[sql] object CatalogV2Util {
*/
val TABLE_RESERVED_PROPERTIES =
Seq(TableCatalog.PROP_COMMENT,
TableCatalog.PROP_COLLATION,
TableCatalog.PROP_LOCATION,
TableCatalog.PROP_PROVIDER,
TableCatalog.PROP_OWNER,
Expand Down Expand Up @@ -459,7 +460,7 @@ private[sql] object CatalogV2Util {
def convertTableProperties(t: TableSpec): Map[String, String] = {
val props = convertTableProperties(
t.properties, t.options, t.serde, t.location, t.comment,
t.provider, t.external)
t.collation, t.provider, t.external)
withDefaultOwnership(props)
}

Expand All @@ -469,6 +470,7 @@ private[sql] object CatalogV2Util {
serdeInfo: Option[SerdeInfo],
location: Option[String],
comment: Option[String],
collation: Option[String],
provider: Option[String],
external: Boolean = false): Map[String, String] = {
properties ++
Expand All @@ -478,6 +480,7 @@ private[sql] object CatalogV2Util {
(if (external) Some(TableCatalog.PROP_EXTERNAL -> "true") else None) ++
provider.map(TableCatalog.PROP_PROVIDER -> _) ++
comment.map(TableCatalog.PROP_COMMENT -> _) ++
collation.map(TableCatalog.PROP_COLLATION -> _) ++
location.map(TableCatalog.PROP_LOCATION -> _)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ private[sql] object V1Table {
TableCatalog.OPTION_PREFIX + key -> value } ++
v1Table.provider.map(TableCatalog.PROP_PROVIDER -> _) ++
v1Table.comment.map(TableCatalog.PROP_COMMENT -> _) ++
v1Table.collation.map(TableCatalog.PROP_COLLATION -> _) ++
v1Table.storage.locationUri.map { loc =>
TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(loc)
} ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.util.ArrayImplicits._

class CreateTablePartitioningValidationSuite extends AnalysisTest {
val tableSpec =
UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false)
UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, None, false)
test("CreateTableAsSelect: fail missing top-level column") {
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name").toImmutableArraySeq),
Expand Down
Loading

0 comments on commit 92948e7

Please sign in to comment.