Skip to content

Commit

Permalink
Persisting previous delta inflation and avg record sizes (#220)
Browse files Browse the repository at this point in the history
  • Loading branch information
raghumdani authored Sep 18, 2023
1 parent 186813e commit 1e9fa00
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 3 deletions.
12 changes: 12 additions & 0 deletions deltacat/compute/compactor/model/round_completion_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def of(
hash_bucket_count: Optional[int] = None,
hb_index_to_entry_range: Optional[Dict[int, Tuple[int, int]]] = None,
compactor_version: Optional[str] = None,
input_inflation: Optional[float] = None,
input_average_record_size_bytes: Optional[float] = None,
) -> RoundCompletionInfo:

rci = RoundCompletionInfo()
Expand All @@ -62,6 +64,8 @@ def of(
rci["hashBucketCount"] = hash_bucket_count
rci["hbIndexToEntryRange"] = hb_index_to_entry_range
rci["compactorVersion"] = compactor_version
rci["inputInflation"] = input_inflation
rci["inputAverageRecordSizeBytes"] = input_average_record_size_bytes
return rci

@property
Expand Down Expand Up @@ -119,3 +123,11 @@ def hb_index_to_entry_range(self) -> Optional[Dict[int, Tuple[int, int]]]:
@property
def compactor_version(self) -> Optional[str]:
return self.get("compactorVersion")

@property
def input_inflation(self) -> Optional[float]:
return self.get("inputInflation")

@property
def input_average_record_size_bytes(self) -> Optional[float]:
return self.get("inputAverageRecordSizeBytes")
29 changes: 28 additions & 1 deletion deltacat/compute/compactor_v2/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ def _execute_compaction(
cluster_cpus = cluster_resources["CPU"]
cluster_memory = cluster_resources["memory"]
task_max_parallelism = cluster_cpus
compaction_audit.set_cluster_cpu_max(cluster_cpus)
compaction_audit.set_total_cluster_memory_bytes(cluster_memory)

# read the results from any previously completed compaction round
Expand Down Expand Up @@ -488,13 +487,39 @@ def merge_input_provider(index, item):
compaction_audit.set_total_memory_gb_seconds(
cluster_util.total_memory_gb_seconds
)
compaction_audit.set_cluster_cpu_max(cluster_util.max_cpu)

s3_utils.upload(
compaction_audit.audit_url,
str(json.dumps(compaction_audit)),
**params.s3_client_kwargs,
)

input_inflation = None
input_average_record_size_bytes = None
if (
compaction_audit.input_size_bytes
and compaction_audit.hash_bucket_processed_size_bytes
):
input_inflation = (
compaction_audit.hash_bucket_processed_size_bytes
/ compaction_audit.input_size_bytes
)

if (
compaction_audit.hash_bucket_processed_size_bytes
and compaction_audit.input_records
):
input_average_record_size_bytes = (
compaction_audit.hash_bucket_processed_size_bytes
/ compaction_audit.input_records
)

logger.info(
f"The inflation of input deltas={input_inflation}"
f" and average record size={input_average_record_size_bytes}"
)

new_round_completion_info = RoundCompletionInfo.of(
high_watermark=params.last_stream_position_to_compact,
compacted_delta_locator=new_compacted_delta_locator,
Expand All @@ -505,6 +530,8 @@ def merge_input_provider(index, item):
hash_bucket_count=params.hash_bucket_count,
hb_index_to_entry_range=hb_id_to_entry_indices_range,
compactor_version=CompactorVersion.V2.value,
input_inflation=input_inflation,
input_average_record_size_bytes=input_average_record_size_bytes,
)

logger.info(
Expand Down
2 changes: 1 addition & 1 deletion deltacat/compute/compactor_v2/steps/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def _materialize(

logger.info(
f"[Merge task index {input.merge_task_index}] Merged "
f"record count: {len(table)}, took: {merge_time}s"
f"record count: {len(table)}, size={table.nbytes} took: {merge_time}s"
)

materialized_results.append(_materialize(hb_idx, [table]))
Expand Down
12 changes: 12 additions & 0 deletions deltacat/tests/compute/compactor_v2/test_hashlib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import hashlib


def test_hashlib_sanity():
"""
This test ensures that there is no change in hashlib behavior
across different python version. If there is, a rebase is required.
"""
assert (
hashlib.sha1("test".encode("utf-8")).hexdigest()
== "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3"
)
1 change: 1 addition & 0 deletions deltacat/tests/utils/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ def test_sanity(self, ray_mock):
) # total is greater than used
self.assertIsNotNone(cu.total_memory_gb_seconds)
self.assertIsNotNone(cu.used_memory_gb_seconds)
self.assertIsNotNone(cu.max_cpu)
11 changes: 10 additions & 1 deletion deltacat/utils/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,16 @@ def s3_partial_parquet_file_to_table(
[input_schema.field(col) for col in column_names],
metadata=input_schema.metadata,
)
return coerce_pyarrow_table_to_schema(table, input_schema)
coerced_table, coerce_latency = timed_invocation(
coerce_pyarrow_table_to_schema, table, input_schema
)

logger.debug(
f"Coercing the PyArrow table of len {len(coerced_table)} "
f"into passed schema took {coerce_latency}s"
)

return coerced_table

return table

Expand Down
2 changes: 2 additions & 0 deletions deltacat/utils/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __init__(self) -> None:
self.used_vcpu_seconds = 0.0
self.total_memory_gb_seconds = 0.0
self.used_memory_gb_seconds = 0.0
self.max_cpu = 0.0

def __enter__(self) -> Any:
schedule.every().second.do(self._update_resources)
Expand Down Expand Up @@ -111,6 +112,7 @@ def _update_resources(self):
self.total_vcpu_seconds = self.total_vcpu_seconds + float(
str(cluster_resources["CPU"])
)
self.max_cpu = max(self.max_cpu, float(str(cluster_resources["CPU"])))

if "memory" not in cluster_resources:
return
Expand Down

0 comments on commit 1e9fa00

Please sign in to comment.