Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lint Rivulet Changes & DeltaCAT Transaction Store WIP #436

Merged
merged 5 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deltacat/aws/s3u.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def upload_table(
**s3_client_kwargs,
) -> ManifestEntryList:
"""
Writes the given table to 1 or more S3 files and return Redshift
Writes the given table to 1 or more S3 files and return
manifest entries describing the uploaded files.
"""
if s3_table_writer_kwargs is None:
Expand Down
2 changes: 1 addition & 1 deletion deltacat/compute/compactor_v2/deletes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def _get_delete_file_envelopes(
), "Delete type deltas are required to have delete parameters defined"
delete_columns: Optional[
List[str]
] = delete_delta.meta.entry_params.equality_column_names
] = delete_delta.meta.entry_params.equality_field_locators
assert len(delete_columns) > 0, "At least 1 delete column is required"
# delete columns should exist in underlying table
delete_dataset = params.deltacat_storage.download_delta(
Expand Down
14 changes: 13 additions & 1 deletion deltacat/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
)
from deltacat.storage.model.list_result import ListResult
from deltacat.storage.model.locator import Locator
from deltacat.storage.model.metafile import (
Metafile,
Transaction,
TransactionOperation,
)
from deltacat.storage.model.namespace import (
Namespace,
NamespaceLocator,
Expand Down Expand Up @@ -71,10 +76,12 @@
LifecycleState,
LocalDataset,
LocalTable,
NullOrder,
SchemaConsistencyType,
StreamFormat,
SortOrder,
NullOrder,
TransactionType,
TransactionOperationType,
)
from deltacat.storage.model.sort_key import (
SortKey,
Expand Down Expand Up @@ -111,6 +118,7 @@
"ManifestEntry",
"ManifestEntryList",
"ManifestMeta",
"Metafile",
"MonthTransform",
"Namespace",
"NamespaceLocator",
Expand Down Expand Up @@ -139,6 +147,10 @@
"TableVersion",
"TableVersionLocator",
"TableVersionProperties",
"Transaction",
"TransactionType",
"TransactionOperation",
"TransactionOperationType",
"Transform",
"TransformName",
"TransformParameters",
Expand Down
8 changes: 3 additions & 5 deletions deltacat/storage/iceberg/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,17 +692,15 @@ def map(
snapshot = _resolve_stream_snapshot(metadata, snapshot_id)
schema = _get_current_schema_for_meta(metadata)
partition_spec = _get_current_spec_for_meta(metadata)
parent_snapshot_bytes = (
snapshot.parent_snapshot_id.to_bytes(8, "big")
if snapshot.parent_snapshot_id
else None
parent_snapshot_str = (
str(snapshot.parent_snapshot_id) if snapshot.parent_snapshot_id else None
)
return Stream.of(
locator=StreamLocatorMapper.map(
obj, metadata_timestamp, snapshot_id, catalog_properties
),
partition_scheme=PartitionSchemeMapper.map(partition_spec, schema),
state=state,
previous_stream_id=parent_snapshot_bytes,
previous_stream_id=parent_snapshot_str,
native_object=snapshot,
)
29 changes: 19 additions & 10 deletions deltacat/storage/model/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,21 +293,28 @@ def at(
partition_scheme_id: Optional[str],
stream_position: Optional[int],
) -> DeltaLocator:
partition_locator = PartitionLocator.at(
namespace,
table_name,
table_version,
stream_id,
stream_format,
partition_values,
partition_id,
partition_scheme_id,
partition_locator = (
PartitionLocator.at(
namespace,
table_name,
table_version,
stream_id,
stream_format,
partition_values,
partition_id,
partition_scheme_id,
)
if partition_values and partition_id and partition_scheme_id
else None
)
return DeltaLocator.of(
partition_locator,
stream_position,
)

def parent(self) -> Optional[PartitionLocator]:
return self.partition_locator

@property
def partition_locator(self) -> Optional[PartitionLocator]:
val: Dict[str, Any] = self.get("partitionLocator")
Expand Down Expand Up @@ -410,6 +417,8 @@ def canonical_string(self) -> str:
for equality checks (i.e. two locators are equal if they have
the same canonical string).
"""
pl_hexdigest = self.partition_locator.hexdigest()
pl_hexdigest = (
self.partition_locator.hexdigest() if self.partition_locator else None
)
stream_position = self.stream_position
return f"{pl_hexdigest}|{stream_position}"
11 changes: 11 additions & 0 deletions deltacat/storage/model/locator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
# Allow classes to use self-referencing Type hints in Python 3.7.
from __future__ import annotations

from typing import Optional

from deltacat.utils.common import sha1_digest, sha1_hexdigest


class Locator:
def parent(self) -> Optional[Locator]:
"""
Returns the parent of this locator, if any.
"""
raise NotImplementedError()

def canonical_string(self) -> str:
"""
Returns a unique string for the given locator that can be used
Expand Down
16 changes: 10 additions & 6 deletions deltacat/storage/model/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from deltacat import logs

from deltacat.storage.model.schema import FieldLocator

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))


Expand Down Expand Up @@ -60,16 +62,16 @@ class EntryParams(dict):

@staticmethod
def of(
equality_column_names: Optional[List[str]] = None,
equality_field_locators: Optional[List[FieldLocator]] = None,
) -> EntryParams:
params = EntryParams()
if equality_column_names is not None:
params["equality_column_names"] = equality_column_names
if equality_field_locators is not None:
params["equality_field_locators"] = equality_field_locators
return params

@property
def equality_column_names(self) -> Optional[List[str]]:
return self.get("equality_column_names")
def equality_field_locators(self) -> Optional[List[FieldLocator]]:
return self.get("equality_field_locators")


class Manifest(dict):
Expand Down Expand Up @@ -233,7 +235,9 @@ def of(
if credentials is not None:
manifest_meta["credentials"] = credentials
if entry_type is not None:
manifest_meta["entry_type"] = entry_type.value
manifest_meta["entry_type"] = (
entry_type.value if isinstance(entry_type, EntryType) else entry_type
)
if entry_params is not None:
manifest_meta["entry_params"] = entry_params
return manifest_meta
Expand Down
Loading
Loading