Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Invalid observation value when doing a merge write #3983

Open
1 of 8 tasks
vVv-AA opened this issue Dec 18, 2024 · 0 comments
Open
1 of 8 tasks

[BUG] Invalid observation value when doing a merge write #3983

vVv-AA opened this issue Dec 18, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@vVv-AA
Copy link

vVv-AA commented Dec 18, 2024

Bug

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Describe the problem

Steps to reproduce

import org.apache.spark.sql._
import org.apache.spark.sql._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.util.QueryExecutionListener;
import io.delta.tables._;


class MyQueryExecutionListener extends QueryExecutionListener {

  // Called when query execution starts
  override def onSuccess(f: String, qe: QueryExecution, d: Long): Unit = {
    println(s"${qe.observedMetrics}")
  }

  // Called when query execution ends
  override def onFailure(f: String, qe: QueryExecution, e: Exception): Unit = {
    println(s"Execution finished for query: ${qe}")
  }
}

spark.listenerManager.register(new MyQueryExecutionListener())


spark.sql("CREATE TABLE IF NOT EXISTS db.test_table (k STRING, o STRING, op STRING) USING DELTA LOCATION '/tmp/test_table'")

var dst = DeltaTable.forPath("/tmp/test_table").as("dst")


var ds = spark.sql("SELECT CAST(1000 AS STRING) AS k, CAST(1 AS STRING) AS o, 'u' AS op ").unionAll(spark.sql("SELECT CAST(1001 AS STRING) AS k, CAST(1 AS STRING) AS o, 'u' AS op "))

var src = ds.withColumn("fields", struct(col("*"))).groupBy(col("k")).agg(max_by(col("fields"), col("o")).as("fields")).select(expr("fields.*")).observe("my_metrics", count("op").as("rows")).cache.as("src")

var mergeBuilder = dst.merge(src, "dst.`k` = src.`k`").whenMatched("src.op <> 'd'").updateAll().whenNotMatched("src.op <> 'd'").insertAll();
mergeBuilder.execute(); // This Gives 0 in metric
mergeBuilder.execute(); // Running this once now gives 2.
src.write.format("noop").mode("overwrite").save() // Gives 2 as expected.

Observed results

First call to mergeBuilder.execute() gives wrong observation value of 0, second calls yields correct value of 2

Expected results

mergeBuilder.execute() should have given 2 since there are 2 rows in the observed df

Further details

scala> var mergeBuilder = dst.merge(src, "dst.`k` = src.`k`").whenMatched("src.op <> 'd'").updateAll().whenNotMatched("src.op <> 'd'").insertAll();
mergeBuilder: io.delta.tables.DeltaMergeBuilder = io.delta.tables.DeltaMergeBuilder@59eb557e

scala> mergeBuilder.execute();
Map(my_metrics -> [0])
24/12/18 05:57:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

scala> mergeBuilder.execute
Map(my_metrics -> [2])


scala> src.write.format("noop").mode("overwrite").save() // Gives 2 as expected.
Map(my_metrics -> [2])                                                          

Environment information

  • Delta Lake version: 3.1.0
  • Spark version: 3.5.1
  • Scala version: 2.12

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@vVv-AA vVv-AA added the bug Something isn't working label Dec 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant