Skip to content

Commit

Permalink
raise error when the compaction requires multiple rounds (#114)
Browse files Browse the repository at this point in the history
* raise error when the compaction requires multiple rounds instead of proceeding with compaction

* remove the existing if statement
  • Loading branch information
raghumdani authored Mar 31, 2023
1 parent e0aeba8 commit 3b3cf41
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions deltacat/compute/compactor/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,21 @@ def _execute_compaction_round(
round_completion_info = RoundCompletionInfo.of(
None, dest_delta_locator, None, 0, None
)

if last_stream_position_compacted.get(
source_partition_locator
) < last_stream_position_to_compact or (
not rebase_source_partition_locator
and last_stream_position_compacted.get(destination_partition_locator)
< previous_last_stream_position_compacted_on_destination_table
):
logger.info(
f"Compaction can not be completed in one round. Either increase cluster size or decrease input"
)
raise AssertionError(
"Multiple rounds are not supported. Please increase the cluster size and run again."
)

hb_tasks_pending = invoke_parallel(
items=uniform_deltas,
ray_task=hb.hash_bucket,
Expand Down Expand Up @@ -448,16 +463,7 @@ def _execute_compaction_round(
rcf_source_partition_locator,
new_round_completion_info,
)
if last_stream_position_compacted.get(
source_partition_locator
) < last_stream_position_to_compact or (
not rebase_source_partition_locator
and last_stream_position_compacted.get(destination_partition_locator)
< previous_last_stream_position_compacted_on_destination_table
):
logger.info(
f"Compaction can not be completed in one round. Either increase cluster size or decrease input"
)

logger.info(
f"partition-{source_partition_locator.partition_values},"
f"compacted at: {last_stream_position_compacted},"
Expand Down

0 comments on commit 3b3cf41

Please sign in to comment.