Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49273][Connect][SQL] Origin support for Spark Connect Scala client #49373

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ object ColumnNodeToProtoConverter extends (ColumnNode => proto.Expression) {

private def apply(node: ColumnNode, e: Option[Encoder[_]]): proto.Expression = {
val builder = proto.Expression.newBuilder()
// TODO(SPARK-49273) support Origin in Connect Scala Client.
node match {
case Literal(value, None, _) =>
builder.setLiteral(toLiteralProtoBuilder(value))
Expand Down Expand Up @@ -189,6 +188,38 @@ object ColumnNodeToProtoConverter extends (ColumnNode => proto.Expression) {
case node =>
throw SparkException.internalError("Unsupported ColumnNode: " + node)
}
if (node.origin != Origin()) {
builder.setCommon(proto.ExpressionCommon.newBuilder().setOrigin(convertOrigin(node.origin)))
}
builder.build()
}

private def convertOrigin(origin: Origin): proto.Origin = {
val jvmOrigin = proto.JvmOrigin.newBuilder()
origin.line.map(jvmOrigin.setLine)
origin.startPosition.map(jvmOrigin.setStartPosition)
origin.startIndex.map(jvmOrigin.setStartIndex)
origin.stopIndex.map(jvmOrigin.setStopIndex)
origin.sqlText.map(jvmOrigin.setSqlText)
origin.objectType.map(jvmOrigin.setObjectType)
origin.objectName.map(jvmOrigin.setObjectName)

origin.stackTrace
.map(_.map(convertStackTraceElement).toSeq.asJava)
.map(jvmOrigin.addAllStackTrace)

proto.Origin.newBuilder().setJvmOrigin(jvmOrigin).build()
}

private def convertStackTraceElement(stack: StackTraceElement): proto.StackTraceElement = {
val builder = proto.StackTraceElement.newBuilder()
Option(stack.getClassLoaderName).map(builder.setClassLoaderName)
Option(stack.getModuleName).map(builder.setModuleName)
Option(stack.getModuleVersion).map(builder.setModuleVersion)
Option(stack.getClassName).map(builder.setDeclaringClass)
Option(stack.getMethodName).map(builder.setMethodName)
Option(stack.getFileName).map(builder.setFileName)
Option(stack.getLineNumber).map(builder.setLineNumber)
builder.build()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ class ClientDatasetSuite extends ConnectFunSuite with BeforeAndAfterEach {
test("write V2") {
val df = ss.newDataFrame(_ => ()).limit(10)

val partCol = col("col99")
val builder = proto.WriteOperationV2.newBuilder()
builder
.setInput(df.plan.getRoot)
.setTableName("t1")
.addPartitioningColumns(toExpr(col("col99")))
.addPartitioningColumns(toExpr(partCol))
.setProvider("json")
.addClusteringColumns("col3")
.putTableProperties("key", "value")
Expand All @@ -160,7 +161,7 @@ class ClientDatasetSuite extends ConnectFunSuite with BeforeAndAfterEach {
.build()

df.writeTo("t1")
.partitionedBy(col("col99"))
.partitionedBy(partCol)
.clusterBy("col3")
.using("json")
.tableProperty("key", "value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class DataFrameSubquerySuite extends QueryTest with RemoteSparkSession {
checkError(
intercept[AnalysisException](spark.range(1).select($"outer_col".outer()).collect()),
"UNRESOLVED_COLUMN.WITH_SUGGESTION",
parameters = Map("objectName" -> "`outer_col`", "proposal" -> "`id`"))
parameters = Map("objectName" -> "`outer_col`", "proposal" -> "`id`"),
context = ExpectedContext(fragment = "$", callSitePattern = getCurrentClassCallSitePattern))
}

test("simple uncorrelated scalar subquery") {
Expand Down Expand Up @@ -604,14 +605,18 @@ class DataFrameSubquerySuite extends QueryTest with RemoteSparkSession {
},
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_IN_EXISTS_SUBQUERY",
parameters = Map("treeNode" -> "(?s)'Unpivot.*"),
matchPVals = true)
matchPVals = true,
queryContext = Array(
ExpectedContext(fragment = "exists", callSitePattern = getCurrentClassCallSitePattern)))
checkError(
intercept[AnalysisException] {
t1.unpivot(Array($"c1"), Array(t2.exists()), "c1", "c2").collect()
},
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_IN_EXISTS_SUBQUERY",
parameters = Map("treeNode" -> "(?s)Expand.*"),
matchPVals = true)
matchPVals = true,
queryContext = Array(
ExpectedContext(fragment = "exists", callSitePattern = getCurrentClassCallSitePattern)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ class UserDefinedFunctionSuite extends ConnectFunSuite {
test("udf and encoder serialization") {
def func(x: Int): Int = x + 1

val dummyCol = Column("dummy")
val myUdf = udf(func _)
val colWithUdf = myUdf(Column("dummy"))
val colWithUdf = myUdf(dummyCol)

val udfExpr = toExpr(colWithUdf).getCommonInlineUserDefinedFunction
assert(udfExpr.getDeterministic)
assert(udfExpr.getArgumentsCount == 1)
assert(udfExpr.getArguments(0) == toExpr(Column("dummy")))
assert(udfExpr.getArguments(0) == toExpr(dummyCol))
val udfObj = udfExpr.getScalarScalaUdf

assert(!udfObj.getNullable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,17 +412,30 @@ class ColumnNodeToProtoConverterSuite extends ConnectFunSuite {
.setNullable(false)
.setAggregate(true)))

val result = ColumnNodeToProtoConverter.toTypedExpr(
Column(InvokeInlineUserDefinedFunction(aggregator, Nil)),
PrimitiveLongEncoder)
val expected = expr(
_.getTypedAggregateExpressionBuilder.getScalarScalaUdfBuilder
val invokeColumn = Column(InvokeInlineUserDefinedFunction(aggregator, Nil))
val result = ColumnNodeToProtoConverter.toTypedExpr(invokeColumn, PrimitiveLongEncoder)
val expected = expr { builder =>
builder.getTypedAggregateExpressionBuilder.getScalarScalaUdfBuilder
.setPayload(UdfToProtoUtils
.toUdfPacketBytes(aggregator, PrimitiveLongEncoder :: Nil, PrimitiveLongEncoder))
.addInputTypes(ProtoDataTypes.LongType)
.setOutputType(ProtoDataTypes.LongType)
.setNullable(true)
.setAggregate(true))
.setAggregate(true)
val origin = builder.getCommonBuilder.getOriginBuilder.getJvmOriginBuilder
invokeColumn.node.origin.stackTrace.map {
_.foreach { element =>
origin.addStackTrace(
proto.StackTraceElement
.newBuilder()
.setClassLoaderName(element.getClassLoaderName)
.setDeclaringClass(element.getClassName)
.setMethodName(element.getMethodName)
.setFileName(element.getFileName)
.setLineNumber(element.getLineNumber))
}
}
}
assert(result == expected)
}

Expand All @@ -434,6 +447,28 @@ class ColumnNodeToProtoConverterSuite extends ConnectFunSuite {
test("unsupported") {
intercept[SparkException](ColumnNodeToProtoConverter(Nope()))
}

test("origin") {
val origin = Origin(
line = Some(1),
sqlText = Some("lol"),
stackTrace = Some(Array(new StackTraceElement("a", "b", "c", 9))))
testConversion(
SqlExpression("1 + 1", origin),
expr { builder =>
builder.getExpressionStringBuilder.setExpression("1 + 1")
builder.getCommonBuilder.getOriginBuilder.getJvmOriginBuilder
.setLine(1)
.setSqlText("lol")
.addStackTrace(
proto.StackTraceElement
.newBuilder()
.setDeclaringClass("a")
.setMethodName("b")
.setFileName("c")
.setLineNumber(9))
})
}
}

private[internal] case class Nope(override val origin: Origin = CurrentOrigin.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ abstract class QueryTest extends ConnectFunSuite with SQLHelper {

protected def getCurrentClassCallSitePattern: String = {
val cs = Thread.currentThread().getStackTrace()(2)
s"${cs.getClassName}\\..*\\(${cs.getFileName}:\\d+\\)"
// {classloader}//{class.name}({file_name.scala}:{line_number})
s".*//${cs.getClassName}\\..*\\(${cs.getFileName}:\\d+\\)"
}

/**
Expand Down
14 changes: 9 additions & 5 deletions python/pyspark/sql/connect/proto/common_pb2.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@


DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x1aspark/connect/common.proto\x12\rspark.connect"\xb0\x01\n\x0cStorageLevel\x12\x19\n\x08use_disk\x18\x01 \x01(\x08R\x07useDisk\x12\x1d\n\nuse_memory\x18\x02 \x01(\x08R\tuseMemory\x12 \n\x0cuse_off_heap\x18\x03 \x01(\x08R\nuseOffHeap\x12"\n\x0c\x64\x65serialized\x18\x04 \x01(\x08R\x0c\x64\x65serialized\x12 \n\x0breplication\x18\x05 \x01(\x05R\x0breplication"G\n\x13ResourceInformation\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x1c\n\taddresses\x18\x02 \x03(\tR\taddresses"\xc3\x01\n\x17\x45xecutorResourceRequest\x12#\n\rresource_name\x18\x01 \x01(\tR\x0cresourceName\x12\x16\n\x06\x61mount\x18\x02 \x01(\x03R\x06\x61mount\x12.\n\x10\x64iscovery_script\x18\x03 \x01(\tH\x00R\x0f\x64iscoveryScript\x88\x01\x01\x12\x1b\n\x06vendor\x18\x04 \x01(\tH\x01R\x06vendor\x88\x01\x01\x42\x13\n\x11_discovery_scriptB\t\n\x07_vendor"R\n\x13TaskResourceRequest\x12#\n\rresource_name\x18\x01 \x01(\tR\x0cresourceName\x12\x16\n\x06\x61mount\x18\x02 \x01(\x01R\x06\x61mount"\xa5\x03\n\x0fResourceProfile\x12\x64\n\x12\x65xecutor_resources\x18\x01 \x03(\x0b\x32\x35.spark.connect.ResourceProfile.ExecutorResourcesEntryR\x11\x65xecutorResources\x12X\n\x0etask_resources\x18\x02 \x03(\x0b\x32\x31.spark.connect.ResourceProfile.TaskResourcesEntryR\rtaskResources\x1al\n\x16\x45xecutorResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12<\n\x05value\x18\x02 \x01(\x0b\x32&.spark.connect.ExecutorResourceRequestR\x05value:\x02\x38\x01\x1a\x64\n\x12TaskResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x38\n\x05value\x18\x02 \x01(\x0b\x32".spark.connect.TaskResourceRequestR\x05value:\x02\x38\x01"X\n\x06Origin\x12\x42\n\rpython_origin\x18\x01 \x01(\x0b\x32\x1b.spark.connect.PythonOriginH\x00R\x0cpythonOriginB\n\n\x08\x66unction"G\n\x0cPythonOrigin\x12\x1a\n\x08\x66ragment\x18\x01 \x01(\tR\x08\x66ragment\x12\x1b\n\tcall_site\x18\x02 \x01(\tR\x08\x63\x61llSiteB6\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3'
b'\n\x1aspark/connect/common.proto\x12\rspark.connect"\xb0\x01\n\x0cStorageLevel\x12\x19\n\x08use_disk\x18\x01 \x01(\x08R\x07useDisk\x12\x1d\n\nuse_memory\x18\x02 \x01(\x08R\tuseMemory\x12 \n\x0cuse_off_heap\x18\x03 \x01(\x08R\nuseOffHeap\x12"\n\x0c\x64\x65serialized\x18\x04 \x01(\x08R\x0c\x64\x65serialized\x12 \n\x0breplication\x18\x05 \x01(\x05R\x0breplication"G\n\x13ResourceInformation\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x1c\n\taddresses\x18\x02 \x03(\tR\taddresses"\xc3\x01\n\x17\x45xecutorResourceRequest\x12#\n\rresource_name\x18\x01 \x01(\tR\x0cresourceName\x12\x16\n\x06\x61mount\x18\x02 \x01(\x03R\x06\x61mount\x12.\n\x10\x64iscovery_script\x18\x03 \x01(\tH\x00R\x0f\x64iscoveryScript\x88\x01\x01\x12\x1b\n\x06vendor\x18\x04 \x01(\tH\x01R\x06vendor\x88\x01\x01\x42\x13\n\x11_discovery_scriptB\t\n\x07_vendor"R\n\x13TaskResourceRequest\x12#\n\rresource_name\x18\x01 \x01(\tR\x0cresourceName\x12\x16\n\x06\x61mount\x18\x02 \x01(\x01R\x06\x61mount"\xa5\x03\n\x0fResourceProfile\x12\x64\n\x12\x65xecutor_resources\x18\x01 \x03(\x0b\x32\x35.spark.connect.ResourceProfile.ExecutorResourcesEntryR\x11\x65xecutorResources\x12X\n\x0etask_resources\x18\x02 \x03(\x0b\x32\x31.spark.connect.ResourceProfile.TaskResourcesEntryR\rtaskResources\x1al\n\x16\x45xecutorResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12<\n\x05value\x18\x02 \x01(\x0b\x32&.spark.connect.ExecutorResourceRequestR\x05value:\x02\x38\x01\x1a\x64\n\x12TaskResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x38\n\x05value\x18\x02 \x01(\x0b\x32".spark.connect.TaskResourceRequestR\x05value:\x02\x38\x01"\x93\x01\n\x06Origin\x12\x42\n\rpython_origin\x18\x01 \x01(\x0b\x32\x1b.spark.connect.PythonOriginH\x00R\x0cpythonOrigin\x12\x39\n\njvm_origin\x18\x02 \x01(\x0b\x32\x18.spark.connect.JvmOriginH\x00R\tjvmOriginB\n\n\x08\x66unction"G\n\x0cPythonOrigin\x12\x1a\n\x08\x66ragment\x18\x01 \x01(\tR\x08\x66ragment\x12\x1b\n\tcall_site\x18\x02 \x01(\tR\x08\x63\x61llSite"\xb1\x03\n\tJvmOrigin\x12\x17\n\x04line\x18\x01 \x01(\x05H\x00R\x04line\x88\x01\x01\x12*\n\x0estart_position\x18\x02 \x01(\x05H\x01R\rstartPosition\x88\x01\x01\x12$\n\x0bstart_index\x18\x03 \x01(\x05H\x02R\nstartIndex\x88\x01\x01\x12"\n\nstop_index\x18\x04 \x01(\x05H\x03R\tstopIndex\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x05 \x01(\tH\x04R\x07sqlText\x88\x01\x01\x12$\n\x0bobject_type\x18\x06 \x01(\tH\x05R\nobjectType\x88\x01\x01\x12$\n\x0bobject_name\x18\x07 \x01(\tH\x06R\nobjectName\x88\x01\x01\x12\x41\n\x0bstack_trace\x18\x08 \x03(\x0b\x32 .spark.connect.StackTraceElementR\nstackTraceB\x07\n\x05_lineB\x11\n\x0f_start_positionB\x0e\n\x0c_start_indexB\r\n\x0b_stop_indexB\x0b\n\t_sql_textB\x0e\n\x0c_object_typeB\x0e\n\x0c_object_name"\xea\x02\n\x11StackTraceElement\x12/\n\x11\x63lass_loader_name\x18\x01 \x01(\tH\x00R\x0f\x63lassLoaderName\x88\x01\x01\x12$\n\x0bmodule_name\x18\x02 \x01(\tH\x01R\nmoduleName\x88\x01\x01\x12*\n\x0emodule_version\x18\x03 \x01(\tH\x02R\rmoduleVersion\x88\x01\x01\x12\'\n\x0f\x64\x65\x63laring_class\x18\x04 \x01(\tR\x0e\x64\x65\x63laringClass\x12\x1f\n\x0bmethod_name\x18\x05 \x01(\tR\nmethodName\x12 \n\tfile_name\x18\x06 \x01(\tH\x03R\x08\x66ileName\x88\x01\x01\x12\x1f\n\x0bline_number\x18\x07 \x01(\x05R\nlineNumberB\x14\n\x12_class_loader_nameB\x0e\n\x0c_module_nameB\x11\n\x0f_module_versionB\x0c\n\n_file_nameB6\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3'
)

_globals = globals()
Expand Down Expand Up @@ -66,8 +66,12 @@
_globals["_RESOURCEPROFILE_EXECUTORRESOURCESENTRY"]._serialized_end = 899
_globals["_RESOURCEPROFILE_TASKRESOURCESENTRY"]._serialized_start = 901
_globals["_RESOURCEPROFILE_TASKRESOURCESENTRY"]._serialized_end = 1001
_globals["_ORIGIN"]._serialized_start = 1003
_globals["_ORIGIN"]._serialized_end = 1091
_globals["_PYTHONORIGIN"]._serialized_start = 1093
_globals["_PYTHONORIGIN"]._serialized_end = 1164
_globals["_ORIGIN"]._serialized_start = 1004
_globals["_ORIGIN"]._serialized_end = 1151
_globals["_PYTHONORIGIN"]._serialized_start = 1153
_globals["_PYTHONORIGIN"]._serialized_end = 1224
_globals["_JVMORIGIN"]._serialized_start = 1227
_globals["_JVMORIGIN"]._serialized_end = 1660
_globals["_STACKTRACEELEMENT"]._serialized_start = 1663
_globals["_STACKTRACEELEMENT"]._serialized_end = 2025
# @@protoc_insertion_point(module_scope)
Loading
Loading