Skip to content

Commit

Permalink
Lint Rivulet Changes & DeltaCAT Transaction Store WIP (#436)
Browse files Browse the repository at this point in the history
* Add SerDe support for all metafile properties.

* Initial draft of hierarchical durable storage.

* Start recording completed transactions.

* Lint Rivulet changes.

* Remove incorrect auto-generated method docs.
  • Loading branch information
pdames authored Dec 26, 2024
1 parent 53b3d1c commit 0df432c
Show file tree
Hide file tree
Showing 67 changed files with 2,137 additions and 793 deletions.
2 changes: 1 addition & 1 deletion deltacat/aws/s3u.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def upload_table(
**s3_client_kwargs,
) -> ManifestEntryList:
"""
Writes the given table to 1 or more S3 files and return Redshift
Writes the given table to 1 or more S3 files and return
manifest entries describing the uploaded files.
"""
if s3_table_writer_kwargs is None:
Expand Down
2 changes: 1 addition & 1 deletion deltacat/compute/compactor_v2/deletes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def _get_delete_file_envelopes(
), "Delete type deltas are required to have delete parameters defined"
delete_columns: Optional[
List[str]
] = delete_delta.meta.entry_params.equality_column_names
] = delete_delta.meta.entry_params.equality_field_locators
assert len(delete_columns) > 0, "At least 1 delete column is required"
# delete columns should exist in underlying table
delete_dataset = params.deltacat_storage.download_delta(
Expand Down
14 changes: 13 additions & 1 deletion deltacat/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
)
from deltacat.storage.model.list_result import ListResult
from deltacat.storage.model.locator import Locator
from deltacat.storage.model.metafile import (
Metafile,
Transaction,
TransactionOperation,
)
from deltacat.storage.model.namespace import (
Namespace,
NamespaceLocator,
Expand Down Expand Up @@ -71,10 +76,12 @@
LifecycleState,
LocalDataset,
LocalTable,
NullOrder,
SchemaConsistencyType,
StreamFormat,
SortOrder,
NullOrder,
TransactionType,
TransactionOperationType,
)
from deltacat.storage.model.sort_key import (
SortKey,
Expand Down Expand Up @@ -111,6 +118,7 @@
"ManifestEntry",
"ManifestEntryList",
"ManifestMeta",
"Metafile",
"MonthTransform",
"Namespace",
"NamespaceLocator",
Expand Down Expand Up @@ -139,6 +147,10 @@
"TableVersion",
"TableVersionLocator",
"TableVersionProperties",
"Transaction",
"TransactionType",
"TransactionOperation",
"TransactionOperationType",
"Transform",
"TransformName",
"TransformParameters",
Expand Down
8 changes: 3 additions & 5 deletions deltacat/storage/iceberg/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,17 +692,15 @@ def map(
snapshot = _resolve_stream_snapshot(metadata, snapshot_id)
schema = _get_current_schema_for_meta(metadata)
partition_spec = _get_current_spec_for_meta(metadata)
parent_snapshot_bytes = (
snapshot.parent_snapshot_id.to_bytes(8, "big")
if snapshot.parent_snapshot_id
else None
parent_snapshot_str = (
str(snapshot.parent_snapshot_id) if snapshot.parent_snapshot_id else None
)
return Stream.of(
locator=StreamLocatorMapper.map(
obj, metadata_timestamp, snapshot_id, catalog_properties
),
partition_scheme=PartitionSchemeMapper.map(partition_spec, schema),
state=state,
previous_stream_id=parent_snapshot_bytes,
previous_stream_id=parent_snapshot_str,
native_object=snapshot,
)
29 changes: 19 additions & 10 deletions deltacat/storage/model/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,21 +293,28 @@ def at(
partition_scheme_id: Optional[str],
stream_position: Optional[int],
) -> DeltaLocator:
partition_locator = PartitionLocator.at(
namespace,
table_name,
table_version,
stream_id,
stream_format,
partition_values,
partition_id,
partition_scheme_id,
partition_locator = (
PartitionLocator.at(
namespace,
table_name,
table_version,
stream_id,
stream_format,
partition_values,
partition_id,
partition_scheme_id,
)
if partition_values and partition_id and partition_scheme_id
else None
)
return DeltaLocator.of(
partition_locator,
stream_position,
)

def parent(self) -> Optional[PartitionLocator]:
return self.partition_locator

@property
def partition_locator(self) -> Optional[PartitionLocator]:
val: Dict[str, Any] = self.get("partitionLocator")
Expand Down Expand Up @@ -410,6 +417,8 @@ def canonical_string(self) -> str:
for equality checks (i.e. two locators are equal if they have
the same canonical string).
"""
pl_hexdigest = self.partition_locator.hexdigest()
pl_hexdigest = (
self.partition_locator.hexdigest() if self.partition_locator else None
)
stream_position = self.stream_position
return f"{pl_hexdigest}|{stream_position}"
11 changes: 11 additions & 0 deletions deltacat/storage/model/locator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
# Allow classes to use self-referencing Type hints in Python 3.7.
from __future__ import annotations

from typing import Optional

from deltacat.utils.common import sha1_digest, sha1_hexdigest


class Locator:
def parent(self) -> Optional[Locator]:
"""
Returns the parent of this locator, if any.
"""
raise NotImplementedError()

def canonical_string(self) -> str:
"""
Returns a unique string for the given locator that can be used
Expand Down
16 changes: 10 additions & 6 deletions deltacat/storage/model/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from deltacat import logs

from deltacat.storage.model.schema import FieldLocator

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))


Expand Down Expand Up @@ -60,16 +62,16 @@ class EntryParams(dict):

@staticmethod
def of(
equality_column_names: Optional[List[str]] = None,
equality_field_locators: Optional[List[FieldLocator]] = None,
) -> EntryParams:
params = EntryParams()
if equality_column_names is not None:
params["equality_column_names"] = equality_column_names
if equality_field_locators is not None:
params["equality_field_locators"] = equality_field_locators
return params

@property
def equality_column_names(self) -> Optional[List[str]]:
return self.get("equality_column_names")
def equality_field_locators(self) -> Optional[List[FieldLocator]]:
return self.get("equality_field_locators")


class Manifest(dict):
Expand Down Expand Up @@ -233,7 +235,9 @@ def of(
if credentials is not None:
manifest_meta["credentials"] = credentials
if entry_type is not None:
manifest_meta["entry_type"] = entry_type.value
manifest_meta["entry_type"] = (
entry_type.value if isinstance(entry_type, EntryType) else entry_type
)
if entry_params is not None:
manifest_meta["entry_params"] = entry_params
return manifest_meta
Expand Down
Loading

0 comments on commit 0df432c

Please sign in to comment.