You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
importorg.apache.spark.sql._importorg.apache.spark.sql._importorg.apache.spark.sql.execution._importorg.apache.spark.sql.catalyst.plans.logical.LogicalPlanimportorg.apache.spark.sql.util.QueryExecutionListener;
importio.delta.tables._;
classMyQueryExecutionListenerextendsQueryExecutionListener {
// Called when query execution startsoverridedefonSuccess(f: String, qe: QueryExecution, d: Long):Unit= {
println(s"${qe.observedMetrics}")
}
// Called when query execution endsoverridedefonFailure(f: String, qe: QueryExecution, e: Exception):Unit= {
println(s"Execution finished for query: ${qe}")
}
}
spark.listenerManager.register(newMyQueryExecutionListener())
spark.sql("CREATE TABLE IF NOT EXISTS db.test_table (k STRING, o STRING, op STRING) USING DELTA LOCATION '/tmp/test_table'")
vardst=DeltaTable.forPath("/tmp/test_table").as("dst")
vards= spark.sql("SELECT CAST(1000 AS STRING) AS k, CAST(1 AS STRING) AS o, 'u' AS op ").unionAll(spark.sql("SELECT CAST(1001 AS STRING) AS k, CAST(1 AS STRING) AS o, 'u' AS op "))
varsrc= ds.withColumn("fields", struct(col("*"))).groupBy(col("k")).agg(max_by(col("fields"), col("o")).as("fields")).select(expr("fields.*")).observe("my_metrics", count("op").as("rows")).cache.as("src")
varmergeBuilder= dst.merge(src, "dst.`k` = src.`k`").whenMatched("src.op <> 'd'").updateAll().whenNotMatched("src.op <> 'd'").insertAll();
mergeBuilder.execute(); // This Gives 0 in metric
mergeBuilder.execute(); // Running this once now gives 2.
src.write.format("noop").mode("overwrite").save() // Gives 2 as expected.
Observed results
First call to mergeBuilder.execute() gives wrong observation value of 0, second calls yields correct value of 2
Expected results
mergeBuilder.execute() should have given 2 since there are 2 rows in the observed df
Further details
scala>varmergeBuilder= dst.merge(src, "dst.`k` = src.`k`").whenMatched("src.op <> 'd'").updateAll().whenNotMatched("src.op <> 'd'").insertAll();
mergeBuilder: io.delta.tables.DeltaMergeBuilder= io.delta.tables.DeltaMergeBuilder@59eb557e
scala> mergeBuilder.execute();
Map(my_metrics -> [0])
24/12/1805:57:22WARNSparkStringUtils:Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
scala> mergeBuilder.execute
Map(my_metrics -> [2])
scala> src.write.format("noop").mode("overwrite").save() // Gives 2 as expected.Map(my_metrics -> [2])
Environment information
Delta Lake version: 3.1.0
Spark version: 3.5.1
Scala version: 2.12
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
Yes. I can contribute a fix for this bug independently.
Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
No. I cannot contribute a bug fix at this time.
The text was updated successfully, but these errors were encountered:
Bug
Which Delta project/connector is this regarding?
Describe the problem
Steps to reproduce
Observed results
First call to
mergeBuilder.execute()
gives wrong observation value of0
, second calls yields correct value of2
Expected results
mergeBuilder.execute()
should have given 2 since there are 2 rows in the observed dfFurther details
Environment information
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
The text was updated successfully, but these errors were encountered: