Skip to content

Commit

Permalink
Support null as a valid primary key value (#352)
Browse files Browse the repository at this point in the history
  • Loading branch information
raghumdani authored Sep 12, 2024
1 parent ebbd210 commit 71026ef
Show file tree
Hide file tree
Showing 7 changed files with 579 additions and 14 deletions.
1 change: 0 additions & 1 deletion deltacat/aws/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import botocore
from typing import Set
from daft.exceptions import DaftTransientError

from deltacat.utils.common import env_integer, env_string


Expand Down
13 changes: 9 additions & 4 deletions deltacat/compute/compactor_v2/utils/primary_key_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ def _append_sha1_hash_to_table(table: pa.Table, hash_column: pa.Array) -> pa.Tab

result = []
for hash_value in hash_column_np:
assert hash_value is not None, f"Expected non-null primary key"
result.append(hashlib.sha1(hash_value.encode("utf-8")).hexdigest())
if hash_value is None:
result.append(None)
logger.info("A primary key hash is null")
else:
result.append(hashlib.sha1(hash_value.encode("utf-8")).hexdigest())

return sc.append_pk_hash_string_column(table, result)

Expand Down Expand Up @@ -191,7 +194,7 @@ def _generate_pk_hash(table: pa.Table) -> pa.Array:
pk_columns.append(sliced_string_cast(table[pk_name]))

pk_columns.append(PK_DELIMITER)
hash_column = pc.binary_join_element_wise(*pk_columns)
hash_column = pc.binary_join_element_wise(*pk_columns, null_handling="replace")
return hash_column

def _generate_uuid(table: pa.Table) -> pa.Array:
Expand Down Expand Up @@ -345,8 +348,10 @@ def hash_group_index_to_hash_bucket_indices(
return range(hb_group, num_buckets, num_groups)


def pk_digest_to_hash_bucket_index(digest: str, num_buckets: int) -> int:
def pk_digest_to_hash_bucket_index(digest: Optional[str], num_buckets: int) -> int:
"""
Generates the hash bucket index from the given digest.
"""
if digest is None:
return 0
return int(digest, 16) % num_buckets
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,83 @@ def __iter__(self):
assert_compaction_audit=None,
num_rounds=3,
),
# 4 input deltas (3 upsert, 1 delete delta), 2 rounds requested
# Expect to see a table that aggregates 10 records total
# (12 upserts - 2 deletes (null PK) = 10 records)
# (dropDuplicates = False)
"9-multiple-rounds-delete-deltas-with-null-pk": MultipleRoundsTestCaseParams(
primary_keys={"pk_col_1"},
sort_keys=ZERO_VALUED_SORT_KEY,
partition_keys=[PartitionKey.of("region_id", PartitionKeyType.INT)],
partition_values=["1"],
input_deltas=[
(
pa.Table.from_arrays(
[
pa.array([None, 11, 12, 13]),
pa.array(["a", "b", "c", "d"]),
],
names=["pk_col_1", "col_1"],
),
DeltaType.UPSERT,
None,
),
(
pa.Table.from_arrays(
[
pa.array([14, 15, 16, 17]),
pa.array(["e", "f", "g", "h"]),
],
names=["pk_col_1", "col_1"],
),
DeltaType.UPSERT,
None,
),
(
pa.Table.from_arrays(
[
pa.array([18, 19, 20, 21]),
pa.array(["i", "j", "k", "l"]),
],
names=["pk_col_1", "col_1"],
),
DeltaType.UPSERT,
None,
),
(
pa.Table.from_arrays(
[pa.array([None, 11]), pa.array(["a", "b"])],
names=["pk_col_1", "col_1"],
),
DeltaType.DELETE,
DeleteParameters.of(["pk_col_1", "col_1"]),
),
],
rebase_expected_compact_partition_result=pa.Table.from_arrays(
[
pa.array([i for i in range(12, 22)]),
pa.array(["c", "d", "e", "f", "g", "h", "i", "j", "k", "l"]),
],
names=["pk_col_1", "col_1"],
),
expected_terminal_compact_partition_result=pa.Table.from_arrays(
[
pa.array([i for i in range(12, 22)]),
pa.array(["c", "d", "e", "f", "g", "h", "i", "j", "k", "l"]),
],
names=["pk_col_1", "col_1"],
),
expected_terminal_exception=None,
expected_terminal_exception_message=None,
do_create_placement_group=False,
records_per_compacted_file=DEFAULT_MAX_RECORDS_PER_FILE,
hash_bucket_count=DEFAULT_HASH_BUCKET_COUNT,
read_kwargs_provider=None,
drop_duplicates=False,
skip_enabled_compact_partition_drivers=[CompactorVersion.V1],
assert_compaction_audit=None,
num_rounds=2,
),
}

MULTIPLE_ROUNDS_TEST_CASES = with_compactor_version_func_test_param(
Expand Down
Loading

0 comments on commit 71026ef

Please sign in to comment.