hydro.delta
hydro.delta#
- hydro.delta.scd(delta_table: delta.tables.DeltaTable, source: pyspark.sql.dataframe.DataFrame, keys: list[str] | str, effective_field: str, end_field: str = None, scd_type: int = 2) delta.tables.DeltaTable [source]#
Executes a slowly changing dimensions transformation and merge.
Supports Type 1 and Type 2 SCD.
- Parameters
delta_table – The target Delta Lake table that is to be updated. See hydro.delta.bootstrap_scd2 if you need to create an SCD2 table.
source – The source data that will be used to merge with delta_table
keys – Column name(s) that identify unique rows. Can be a single column name as a string, or a list of strings, where order of the list does not matter.
effective_field – The name of the existing column that will be used as the “start” or “effective” indicator for a given entity. The column be of any orderable type, including timestamp, date, string, and numeric types.
end_field – Only required for type 2. The name of the non-existing column that will be used as the “end” indicator for a given entity. Its type will match the type of effective_field.
scd_type – The type of SCD that is to be performed
- Returns
The same delta_table
Example
For `SCD Type 2 <https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row>`,
Given a Delta Lake table:
+---+-----------+----------+----------+ | id| location| date| end_date| +---+-----------+----------+----------+ | 1| Kochi|2018-01-01|2019-01-01| | 1|Lake Forest|2019-01-01| null| +---+-----------+----------+----------+
And a source DataFrame:
+---+--------+----------+ | id|location| date| +---+--------+----------+ | 1| Irvine|2020-01-01| | 2| Venice|2022-01-01| +---+--------+----------+
import hydro.delta as hd hd.scd(delta_table, df, ["id"], effective_field="date", end_field="end_date")
Results in:
+---+-----------+----------+----------+ | id| location| date| end_date| +---+-----------+----------+----------+ | 1| Kochi|2018-01-01|2019-01-01| | 1|Lake Forest|2019-01-01|2020-01-01| | 1| Irvine|2020-01-01| null| | 2| Venice|2022-01-01| null| +---+-----------+----------+----------+
See also
- hydro.delta.bootstrap_scd2(source_df: pyspark.sql.dataframe.DataFrame, keys: list[str] | str, effective_field: str, end_field: str, table_properties: dict[str, str] = None, partition_columns: list[str] = None, comment: str = None, path: str = None, table_identifier: str = None) delta.tables.DeltaTable [source]#
Creates an SCD2-ready Delta Lake table from a source DataFrame.
- Parameters
source_df – Source data that will populate the final Delta Lake table
keys – Column name(s) that identify unique rows. Can be a single column name as a string, or a list of strings, where order of the list does not matter.
effective_field – The name of the existing column that will be used as the “start” or “effective” timestamp for a given entity. The column be of any orderable type, including timestamp, date, string, and numeric types.
end_field – The name of the non-existing column that will be used as the “end” timestamp for a given entity. Its type will match the type of effective_field.
table_properties – A set of [Delta Lake table properties](https://docs.delta.io/latest/table-properties.html) or custom properties.
partition_columns – A set of column names that will be used to partition the resulting Delta Lake table.
comment – Comment that describes the table.
path – Specify the path to the directory where table data is stored, which could be a path on distributed storage.
table_identifier – The table name. Optionally qualified with a database name [catalog_name.] [database_name.] table_name.
- Returns
The resulting DeltaTable object
Example
Given a DataFrame:
+---+-----------+----------+ | id| location| date| +---+-----------+----------+ | 1| Kochi|2018-01-01| | 1|Lake Forest|2019-01-01| +---+-----------+----------+
Run bootstrap_scd2:
import hydro.delta as hd delta_table = hd.bootstrap_scd2( df, keys=["id"], effective_field="date", end_field="end_date", path="/path/to/delta/table", )
Results in the below data being persisted to the given path, in this case, /path/to/delta/table
+---+-----------+----------+----------+ | id| location| date| end_date| +---+-----------+----------+----------+ | 1| Kochi|2018-01-01|2019-01-01| | 1|Lake Forest|2019-01-01| null| +---+-----------+----------+----------+
See also
- hydro.delta.partial_update_set(delta_frame: pyspark.sql.dataframe.DataFrame | delta.tables.DeltaTable, source_alias: str = 'source', target_alias: str = 'target') pyspark.sql.functions.col [source]#
Generates an update set for a Delta Lake MERGE operation where the source data provides partial updates.
Partial updates in this case are when some columns in the data are NULL, but are meant to be non-destructive, or is there no semantic meaning to the NULLs.
- Parameters
delta_frame – A DeltaTable or DataFrame that describes a source MERGE dataframe
source_alias – A temporary name given to the source data of the MERGE
target_alias – A temporary name given to the target Delta Table of the MERGE
- Returns
A dictionary that describes non-destructive updates for all fields in delta_frame in {key: coalesce(source.key, target.key)} form
Example
Example
Given a Delta Lake table:
+---+----------+------+ | id| location|status| +---+----------+------+ | 1|california| null| +---+----------+------+
And some source data that will partially update the Delta Lake table:
+---+--------+------+ | id|location|status| +---+--------+------+ | 1| null|active| +---+--------+------+
Perform the following MERGE:
import hydro.delta as hd delta_table.alias("target").merge( df.alias("source"), "source.id = target.id" ).whenNotMatchedInsertAll().whenMatchedUpdate(set=hd.partial_update_set(df)).execute()
With the resulting Delta Lake table:
+---+----------+------+ | id| location|status| +---+----------+------+ | 1|california|active| +---+----------+------+
- hydro.delta.file_stats(delta_table: delta.tables.DeltaTable) pyspark.sql.dataframe.DataFrame [source]#
Returns detailed information about the files of the current snapshot of a Delta Lake table, including (per-file):
name of file
size of file
partition values
modification time
is data change
statistics (min, max, and null counts)
tags
This is done by scanning the table’s transaction log, so it is fast, cheap, and scalable.
- Returns
A DataFrame that describes the physical files that compose a given Delta Lake table
- hydro.delta.partition_stats(delta_table: delta.tables.DeltaTable) pyspark.sql.dataframe.DataFrame [source]#
Returns detailed information about the partitions of the current snapshot of a Delta Lake table including (per-partition):
total size in bytes
file byte size quantiles (0, .25, .5, .75, 1.0) - where 0 is min and 1.0 is max
total number of records
total number of files
oldest and newest timestamps
This is done by scanning the table’s transaction log, so it is fast, cheap, and scalable.
- Returns
A DataFrame that describes the size of all partitions in the table
- hydro.delta.zordering_stats(delta_table: delta.tables.DeltaTable) pyspark.sql.dataframe.DataFrame [source]#
Returns a DataFrame that describes the Z-Ordering that has been applied to the table.
The resulting DataFrame has schema of zOrderBy, count.
- Returns
A DataFrame with schema zOrderBy, count.
- hydro.delta.detail(delta_table: delta.tables.DeltaTable) dict[str, Any] [source]#
Returns details about a Delta Lake table including:
table created timestamp
description
table format
table id
table last modified
location
minimum reader version
minimum writer version
table name
total number of records of current snapshot
total number of files of current snapshot
partition columns
total number of partitions of the current snapshot
properties
total data size of the current snapshot
percentage collected stats of the current snapshot
snapshot version
- Returns
A dictionary representing enhanced details of a Delta Lake table
- hydro.delta.summarize_all_files(delta_table: delta.tables.DeltaTable, humanize: bool = True) dict[str, str] [source]#
Lists and summarizes all of the contents of a Delta Lake table’s data directory.
The directory will contain:
data files that are part of the current snapshot
data files that are “tombstoned” and not part of the current snapshot
Returns summary statistics including:
the total number of files
the total size of the files
the oldest timestamp of the files
- Param
humanize: Whether or not the results should be made more easily read by humans. Turn this to False if you’re looking to do calculations on the raw metrics.
- Returns
A dictionary containing summary statistics about all of the data files under the Delta Lake table’s location
- hydro.delta.idempotency_markers(delta_table: delta.tables.DeltaTable) str [source]#
Exposes a Delta Lake table’s idempotency markers, i.e txnAppId and txnVersion.
Currently this returns a string representation of a Scala map.
- Returns
A string that represents a Scala map of the idempotency markers. This map is in the form Map(key-> value) where key is a given txnAppId and value is the associated version.