hydro.spark
hydro.spark#
- hydro.spark.fields(df: pyspark.sql.dataframe.DataFrame) list[str] [source]#
Returns a list of names of all fields of a DataFrame, including nested ones.
- Parameters
df – DataFrame that you want to extract all fields from
- Returns
A list of column names, all strings
Example
import hydro.spark as hs df = spark.range(1) hs.fields(df)
['id']
- hydro.spark.fields_with_types(df: pyspark.sql.dataframe.DataFrame) list[tuple[str, pyspark.sql.types.DataType]] [source]#
Returns a list of tuples of names and types of all fields of a DataFrame, including nested ones.
- Parameters
df – DataFrame that you want to extract all fields and types from
- Returns
A list of tuples of (column_name, type)
Example
import hydro.spark as hs df = spark.range(1) hs.fields_with_types(df)
[('id', LongType())]
- hydro.spark.deduplicate_dataframe(df: pyspark.sql.dataframe.DataFrame, keys: list[str] | str = None, tiebreaking_columns: list[str] = None) pyspark.sql.dataframe.DataFrame [source]#
Removes duplicates from a Spark DataFrame.
- Parameters
df – The target Delta Lake table that contains duplicates.
keys – A list of column names used to distinguish rows. The order of this list does not matter. If not provided, will operate the same as Dataframe.drop_duplicates()
tiebreaking_columns – A list of column names used for ordering. The order of this list matters, with earlier elements “weighing” more than lesser ones. The columns will be evaluated in descending order. If not provided, will will operate the same as Dataframe.drop_duplicates(keys)
- Returns
The deduplicated DataFrame
Example
Given an input DataFrame
+---+-----+-----------------------+ |id |type |ts | +---+-----+-----------------------+ |1 |watch|2023-01-09 15:48:00.000| |1 |watch|2023-01-09 15:48:00.001| +---+-----+-----------------------+
There are two events with the same primary key id, but with a slightly different timestamp. Two rows should not share a primary key. A common way of dealing with this is to break the “tie” based on some orderable column(s).
This can be done with the following code:
import hydro.spark as hs data = [{"id": 1, "type": "watch", "ts": "2023-01-09 15:48:00.001"}, {"id": 1, "type": "watch", "ts": "2023-01-09 15:48:00.001"}] df = spark.createDataFrame(data) hs.deduplicate_dataframe(df, ["id"], ["ts"])
Results in
+---+-----------------------+-----+ |id |ts |type | +---+-----------------------+-----+ |1 |2023-01-09 15:48:00.001|watch| +---+-----------------------+-----+
- hydro.spark.hash_fields(df: pyspark.sql.dataframe.DataFrame, denylist_fields: list[str] = None, algorithm: str = 'xxhash64', num_bits=256) pyspark.sql.column.Column [source]#
Generates a hash digest of all fields.
- Parameters
df – Input dataframe that is to be hashed.
denylist_fields – Fields that will not be hashed.
algorithm –
The function that is used to generate the hash digest, includes:
xxhash64
(default)pyspark.sql.functions.xxhash64
md5
pyspark.sql.functions.md5
sha1
pyspark.sql.functions.sha1
sha2
pyspark.sql.functions.sha2
hash
pyspark.sql.functions.hash
num_bits – Only for sha2. The desired bit length of the result.
- Returns
A column that represents the hash.
Example
Given an input DataFrame
+---+---+-----+ | id| ts| type| +---+---+-----+ | 1| 1|watch| | 1| 1|watch| | 1| 2|watch| +---+---+-----+
Row hashes are very helpful and convenient when trying to compare rows, especially rows with a lot of columns.
import hydro.spark as hs data = [{"id": 1, "type": "watch", "ts": "1"}, {"id": 1, "type": "watch", "ts": "1"}, {"id": 1, "type": "watch", "ts": "2"}] df = spark.createDataFrame(data) df.withColumn("row_hash", hs.hash_fields(df))
Results in
+---+-----+---+-------------------+ | id| type| ts| row_hash| +---+-----+---+-------------------+ | 1|watch| 1|8553228534919528539| | 1|watch| 1|8553228534919528539| | 1|watch| 2|8916583907181700702| +---+-----+---+-------------------+
The rows with identical content have identical hash values. Rows with different content have different hash values.
This is very helpful when comparing rows. Instead of typing col1 = col1 AND col2 = col2 …, hash = hash can be used. This saves developer time and keystrokes.
- hydro.spark.hash_schema(df: pyspark.sql.dataframe.DataFrame, denylist_fields: list[str] = None) pyspark.sql.column.Column [source]#
Generates a hash digest of a DataFrame’s schema. Uses the hashlib.md5 algorithm.
- Parameters
df – Input dataframe whose schema is to be hashed.
denylist_fields – Fields that will not be hashed.
- Returns
A column that represents the hash.
Example
import hydro.spark as hs df = spark.range(1) df.withColumn("schema_hash", hs.hash_schema(df))
+---+--------------------------------+ |id |schema_hash | +---+--------------------------------+ |0 |b80bb7740288fda1f201890375a60c8f| +---+--------------------------------+
The schema of tables can change, and sometimes it is helpful to be able to determine if a table’s schema has changed.
- hydro.spark.map_fields(df: pyspark.sql.dataframe.DataFrame, field_list: list[str], function: Callable) pyspark.sql.dataframe.DataFrame [source]#
Applies a function function over fields that are specified in a list.
- Parameters
df –
field_list – A list of fields the function is to be applied to.
function – Any pyspark.sql.function or lambda function that takes a column.
- Returns
Example
data = [{"empty": " ", "ibm": " 🥴 ", "other": "thing"}] df = spark.createDataFrame(data) df.show()
+-----+-------+-----+ |empty| ibm|other| +-----+-------+-----+ | | 🥴 |thing| +-----+-------+-----+
hs.map_fields(df, ["empty", "ibm"], F.trim)
+-----+---+-----+ |empty|ibm|other| +-----+---+-----+ | | 🥴|thing| +-----+---+-----+
A lambda function can be used to compose functions:
hs.map_fields(df, ["empty", "ibm"], lambda x: F.when(F.trim(x) == F.lit(""), None).otherwise(F.trim(x)))
+-----+---+-----+ |empty|ibm|other| +-----+---+-----+ | null| 🥴|thing| +-----+---+-----+
F.expr() can be used via interpolation with f-strings (it’s a little hard to read and write, though):
hs.map_fields(df, ["empty", "ibm"], lambda x: F.expr(f"nullif(trim({x}), '')"))
+-----+---+-----+ |empty|ibm|other| +-----+---+-----+ | null| 🥴|thing| +-----+---+-----+
- hydro.spark.map_fields_by_regex(df: pyspark.sql.dataframe.DataFrame, regex: str, function: Callable) pyspark.sql.dataframe.DataFrame [source]#
Applies a function function over fields that match a regular expression.
- Parameters
df –
regex – Regular expression pattern. Uses Python’s re module.
function – Any pyspark.sql.function or lambda function that takes a column.
- Returns
Resulting DataFrame
See also
- hydro.spark.map_fields_by_type(df: pyspark.sql.dataframe.DataFrame, target_type: pyspark.sql.types.DataType, function: Callable) pyspark.sql.dataframe.DataFrame [source]#
Applies a function function over fields that have a target type.
- Parameters
df –
target_type –
function – Any pyspark.sql.function or lambda function that takes a column.
- Returns
Resulting DataFrame
See also
- hydro.spark.select_fields_by_type(df: pyspark.sql.dataframe.DataFrame, target_type: pyspark.sql.types.DataType)[source]#
Selects fields according to a provided regular expression pattern. Works with nested fields (but un-nests them)
- Parameters
df –
target_type – A DataType type that is to be selected.
- Returns
A new DataFrame with the selected fields
Example
data = [{"string1": "one", "int": 1, "string2": "two"}] df = spark.createDataFrame(data)
import hydro.spark as hs import pyspark.sql.types as T hs.select_fields_by_type(df, T.StringType())
+-------+-------+ |string1|string2| +-------+-------+ | one| two| +-------+-------+
- hydro.spark.select_fields_by_regex(df: pyspark.sql.dataframe.DataFrame, regex: str) pyspark.sql.dataframe.DataFrame [source]#
Selects fields according to a provided regular expression pattern. Works with nested fields (but un-nests them)
- Parameters
df –
regex – Regular expression pattern. Uses Python’s re module.
- Returns
A new DataFrame with the selected fields
data = [{"string1": "one", "int": 1, "string2": "two"}] df = spark.createDataFrame(data)
import hydro.spark as hs import pyspark.sql.types as T hs.select_fields_by_regex(df, "string.*")
+-------+-------+ |string1|string2| +-------+-------+ | one| two| +-------+-------+
- hydro.spark.drop_fields(df: pyspark.sql.dataframe.DataFrame, fields_to_drop: list[str]) pyspark.sql.dataframe.DataFrame [source]#
Drops a DataFrame’s fields, including nested fields and top-level columns.
- Parameters
df –
fields_to_drop – A list of field names that are to be dropped
- Returns
A new DataFrame without the specified fields
Example
# This is a silly way of creating a nested DataFrame, it's here for brevity row = Row(nest=Row(key="val", society="spectacle")) df = spark.createDataFrame([row]) df.printSchema()
And here is the schema:
root |-- nest: struct (nullable = true) | |-- key: string (nullable = true) | |-- society: string (nullable = true)
Using hydro, we can simply drop nested fields.
import hydro.spark as hs hs.drop_fields(df, ["nest.key"])
With the resulting schema:
root |-- nest: struct (nullable = true) | |-- society: string (nullable = true)
- hydro.spark.infer_json_field(df: pyspark.sql.dataframe.DataFrame, target_field: str, options: dict[str, str] = None) pyspark.sql.types.StructType [source]#
Parses a JSON string and infers its schema.
- Parameters
df –
target_field – A field that contains JSON strings that are to be inferred.
options – Standard JSON reader options, including header. See :class:pyspark.sql.DataFrameReader.json
- Returns
The inferred StructType
Example
data = [{"id": 1, "payload": '{"salt": "white"}'}, {"id": 2, "payload": '{"pepper": "black"}'}] df = spark.createDataFrame(data) df.show()
Looks like:
+---+-------------------+ | id| payload| +---+-------------------+ | 1| {"salt": "white"}| | 2|{"pepper": "black"}| +---+-------------------+
But there’s a problem, our schema doesn’t include salt and pepper.
root |-- id: long (nullable = true) |-- payload: string (nullable = true)
We can use hydro to fix this:
import hydro.spark as hs schema = hs.infer_json_field(df, "payload")
And use the resulting schema to parse the fields:
df.withColumn("payload", F.from_json("payload", schema))
With the new schema:
root |-- id: long (nullable = true) |-- payload: struct (nullable = true) | |-- pepper: string (nullable = true) | |-- salt: string (nullable = true)
Notice how there are separate fields for salt and pepper. And now these are addressable leaf nodes.
- hydro.spark.infer_csv_field(df: pyspark.sql.dataframe.DataFrame, target_field: str, options: dict[str, str] = None) pyspark.sql.types.StructType [source]#
Parses a CSV string and infers its schema.
- Parameters
df –
target_field – A field that contains CSV strings that are to be inferred.
options – Standard CSV reader options, including header. See :class:pyspark.sql.DataFrameReader.csv
- Returns
The inferred StructType
Example
data = { 'id': 1, 'payload': '''type,date watch,2023-01-09 watch,2023-01-10 ''', } df = spark.createDataFrame([data]) df.show()
+---+-----------------------------------------------+ |id |payload | +---+-----------------------------------------------+ |1 |type,date\nwatch,2023-01-09\nwatch,2023-01-10\n| +---+-----------------------------------------------+
import hydro.spark as hs csv_options = {'header': 'True'} schema = hs.infer_csv_field(df, 'payload', options=csv_options) schema.simpleString()
'struct<type:string,date:string>'