A python ETL libRary (SPETLR) for Databricks powered by Apache SPark.
Enables a standardized ETL framework and integration testing for Databricks.
Table of contents:
In databricks there is one standard way to refer to a table, the
“database.table” reference. In our dataplatform projects we have often
found a need for a more abstracted notion of a table reference,
one that allows for injection of test versions of tables and that allows
unified reference across different environments (dev, staging, prod).
The Configurator
provides such a further abstraction level.
The Configurator
is a singleton class that you need to instantiate
and configure once in your project. Make sure this code is always executed
before any reference to tables in made. Example:
from spetlr import Configurator
tc = Configurator()
tc.register('ENV','myenv')
tc.add_resource_path(my.resource.module)
The Configurator
can be configured with json or yaml files. The
files must contain the following structure:
The following is an example of a configuration file
MyFirst:
name: first{ID}
path: /{MNT}/my/first{ID}
# MNT and ID are default replacements
# using .set_prod(), MNT becomes 'mnt', while in debug it becomes tmp
# is helps when using mount point locations for tables.
MyAlias:
alias: MyFirst
MyForked:
release:
alias: MyFirst
debug:
name: another
path: /my/temp/path
# using release and debug is an alternative way of differentiating the cases instead
# of using {ID}, note that {ID} has the advantage of separating concurrent test runs
MyRecursing:
name: recursing{ID}
path: /{MNT}/tables/{MyRecursing_name}
# using the notation '_name' refers back to the 'name' property of that dictionary.
# alias and release/debug will be resolved before the property is accessed
You optionally either provide ‘release’ and ‘debug’ versions of each table
or include the structure {ID}
in the name and path. This is a special
replacement string that will be replaced with and empty string for
production tables, or with a “__{GUID}” string when debug is selected.
The guid construction allows for non-colliding parallel testing.
Beyond the resource definitions, the Configurator
needs to be
configured to return production or test versions of tables this is done
at the start of your code. In your jobs you need to set Configurator().set_prod()
whereas your unit-tests should call Configurator().set_debug()
.
As was already seen in the example above, all strings can contain python format-string placeholders that reference other strings known to the configurator. The following rules apply:
{}
substitutions have been applied{
or }
can never be returned{MyRecursing}
will resolve to either the entity itself
(if it is a string), or to the name
key of the fully resolved resource detail. .{MyRecursing_name}
will consider the
part after the underscore to be a dictionary key and will return the fully resolved
value. References to other parts of the same resource are allowed.In spark sql, there are database create statements and table create statemets that follow a fixed pattern reproduced here:
CREATE {DATABASE | SCHEMA} [ IF NOT EXISTS ] database_name
[ COMMENT database_comment ]
[ LOCATION database_directory ]
[ WITH DBPROPERTIES (property_name=property_value [ , ...]) ];
CREATE TABLE [ IF NOT EXISTS ] table_identifier
[ ( col_name1 col_type1 [ COMMENT col_comment1 ], ... ) ]
USING data_source
[ OPTIONS ( key1=val1, key2=val2, ... ) ]
[ PARTITIONED BY ( col_name1, col_name2, ... ) ]
[ CLUSTERED BY ( col_name3, col_name4, ... )
[ SORTED BY ( col_name [ ASC | DESC ], ... ) ]
INTO num_buckets BUCKETS ]
[ LOCATION path ]
[ COMMENT table_comment ]
[ TBLPROPERTIES ( key1=val1, key2=val2, ... ) ]
[ AS select_statement ]
The logic in this library expects the information in the above statements to be reproduced in yaml in the following structure:
MySpetlrDbReference:
name: database_name
comment: database_comment
path: database_directory
format: "db"
dbproperties:
property_name: property_value
MySpetlrTableReference:
name: table_identifier
path: path
format: data_source
schema:
sql: |
col_name1 col_type1 [ COMMENT col_comment1 ], ...
options:
key1: val1
key2: val2
partitioned_by:
- col_name1
- col_name2
clustered_by:
cols:
- col_name3
- col_name4
buckets: num_buckets
sorted_by:
- name: col_name
ordering: "ASC" or "DESC"
comment: table_comment
tblproperties:
key1: val1
key2: val2
Note that the form CREATE TABLE table_identifier AS select_statement
is not supported.
For alternate ways to specify the schema, see the documentation for the schema manager.
When using sql statements to create and manage tables, a developer may desire to keep all information referring to a table within one file. Therefore, the configurator employs a sql parsing library to extract table details directly from the CREATE statements. It can be used like this:
from . import my_sql_folder
Configurator().add_sql_resource_path(my_sql_folder)
A sql CREATE statement already contains all information that the configurator needs
except the key, aka. table_id, that shall be used to refer to the table.
This key needs to be added to the sql in the form of a comment with a magic prefix
"-- spetlr.Configurator "
(note the spaces. not case-sensitive.) See the following
example:
-- spetlr.Configurator key: MySparkDb
CREATE DATABASE IF NOT EXISTS `my_db1{ID}`
COMMENT "Dummy Database 1"
LOCATION "/tmp/foo/bar/my_db1/";
-- spetlr.Configurator key: MyDetailsTable
CREATE TABLE IF NOT EXISTS `my_db1{ID}.details`
(
{MyAlias_schema},
another int
-- comment with ;
)
USING DELTA
COMMENT "Dummy Database 1 details"
LOCATION "/{MNT}/foo/bar/my_db1/details/";
-- pure configurator magic in this statement
-- spetlr.Configurator key: MyAlias
-- spetlr.Configurator alias: MySqlTable
;
-- SPETLR.CONFIGURATOR key: MySqlTable
-- spetlr.Configurator delete_on_delta_schema_mismatch: true
CREATE TABLE IF NOT EXISTS `{MySparkDb}.tbl1`
(
a int,
b int,
c string,
d timestamp
)
USING DELTA
COMMENT "Dummy Database 1 table 1"
LOCATION "/{MNT}/foo/bar/my_db1/tbl1/";
The example is quite complex and demonstrates a number of features:
-- SPETLR.CONFIGURATOR key: MyKey
.yaml
document, so more
complicated structures are also possible.When using the configurator to parse sql
code, the in-memory structure will be as
described in the section on hive-table specification.
Once configured, the table configurator is often not used directly. Other classes in the spetlr framework use it when configured with a resource ID. You can find examples in the eventhub unittests:
from spetlr.eh import EventHubCapture
EventHubCapture.from_tc("SpetlrEh")
or in the delta handle unit-tests:
from spetlr.delta import DeltaHandle
DeltaHandle.from_tc("MyTbl")
But sometimes you still need to call the table configurator methods e.g. when constructing your own sql:
from spetlr.config_master import Configurator
f"MERGE INTO {Configurator().table_name('MyTbl')} AS target ..."
‘MNT’ is another special replacement, similar to “ID”. In production, it is replaced with the string ‘mnt’ while in debug it is replaced with ‘tmp’. The intended usage is in paths where production tables are mounted on external storage, typically mounted under “/mnt” whereas test tables should be written to “/tmp” you can use is as in this example:
MyTable:
name: mydb{ID}.data_table
path: /{MNT}/somestorage{ENV}/mydb{ID}/data_table
Extra details are now deprecated. Simply register your extras as simple string resources.
from spetlr.config_master import Configurator
tc = Configurator()
tc.register('ENV', 'prod')
The table configurations, available to the configurator can be useful when executing
actions on the command line. See below for individual commands. To expose the
configurator command line interface, you need to call the .cli()
method after
you have initialized the configurator with your project details. You therefor need
to expose a command line script as shown below.
In the file that initializes your configurator:
def init_my_configurator():
c= Configurator()
c.add_resource_path(my_yaml_module)
c.register('ENV', config.my_env_name)
return c
if __name__ == "__main__":
init_my_configurator().cli()
Now, all the functionality below will become available on the command line.
When using an IDE to develop python code, a useful feature is auto-completion and linting. Such features are however not available when using string keys from yaml files. It can therefore be useful to extract the keys from the yaml configurations and make them available as python objects.
Call the following command to generate such a keys file from your initialized configurator.
$> my_config generate-keys-file -o keys.py
This will create the file keys.py
with the following example contents:
# AUTO GENERATED FILE.
# contains all spetlr.Configurator keys
MyFirst = "MyFirst"
MySecond = "MySecond"
MyAlias = "MyAlias"
MyForked = "MyForked"
MyRecursing = "MyRecursing"
You can now use the keys file to auto-complete and validate your yaml keys:
DeltaHandle.from_tc("MyFirst")
becomes
from keys import MyFirst
DeltaHandle.from_tc(MyFirst)
which also supports flake8 linting for correct spelling.
If you want to check that you did not forget to update the keys file as part of your
CICD pipeline, running the same command with the additional option --check-only
will return an exit code of 0 if the file was already up-to-date and 1 otherwise.
The Configurator
contains logic to distinguish between production and
debug tables. To make full use of this functionality when reading and writing
delta tables, two convenience classes, DeltaHandle
and DbHandle
, have
been provided. Use the classes like this
from spetlr import Configurator
from spetlr.delta import DeltaHandle, DbHandle
tc = Configurator()
tc.add_resource_path('/my/config/files')
# name is mandatory,
# path is optional
# format is optional. Must equal 'db' if provided
db = DbHandle.from_tc('MyDb')
# quickly create the database
db.create()
# name is mandatory,
# path is optional
# format is optional. Must equal 'delta' if provided
dh = DeltaHandle.from_tc('MyTblId')
# quickly create table without schema
dh.create_hive_table()
df = dh.read()
dh.overwrite(df)
This code assumes that there exists a file /my/config/files/stuff.yml
like:
MyDb:
name: TestDb{ID}
path: /tmp/testdb{ID}
MyTblId:
name: TestDb{ID}.testTbl
path: /tmp/testdb{ID}/testTbl
The {ID}
parts are either replaced with an empty string (production) or with a uuid
if tc.reset(debug=True)
has been set.
The method upserts (updates or inserts) a databricks dataframe into a target delta table.
def upsert(
self,
df: DataFrame,
join_cols: List[str],
) -> Union[DataFrame, None]:
...
Usage example:
target_dh.upsert(df_new, ["Id"])
The following queries create a test table with two rows containing guitar data. Let’s assume that the Configurator is configured as in the section DeltaHandle and DbHandle, but the testTbl has the following schema:
%sql
(
Id STRING,
Brand STRING,
Model STRING
)
INSERT INTO TestDb.testTbl values ("2","Gibson","Les Paul");
select * from TestDb.testTbl
+----+-----+----+-----------+
|Id | Brand | Model|
+----+-----+----+-----------+
| 2| Gibson| Les Paul|
+----+----------+-----------+
The following dataframe has one row that will be merged with Id=2, and the other rows are going to be inserted:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
df_new=spark.createDataFrame(
spark.sparkContext.parallelize([
("1", "Fender", "Jaguar"),
("2", "Gibson","Starfire"),
("3", "Ibanez", "RG")
]),
StructType([
StructField("Id", StringType(), False),
StructField("Brand", StringType(), True),
StructField("Model", StringType(), True),
]))
Use the upsert method to upsert data into the test delta table:
target_dh = DeltaHandle.from_tc("MyTblId")
target_dh.upsert(df_new, ["Id"])
%sql
select * from test.testTarget order by Id
+----+-----+----+-----------+
|Id | Brand | Model|
+----+-----+----+-----------+
| 1| Fender| Jaguar|
| 2| Gibson| Starfire|
| 3| Ibanez| RG|
+----+----------+-----------+
As one can see, the row with id=2 is now upserted such that the model went from “Les Paul” to “Starfire”. The two other rows where inserted.
The TaskEntryPoint
is an abstract base class that acts as an entry point for tasks. This class is meant to be subclassed and extended with specific implementation details. The subclass must implement a method task()
which serves as the entry point for the task logic.
To use TaskEntryPoint
, create a subclass that implements the task()
method:
class MyTask(TaskEntryPoint):
@classmethod
def task(cls) -> None:
# implement the task logic here
task()
method of the subclass MyTask
, and we therefore have an entry point at MyTask.task()
. This entry point can be added to the python wheel either manually or via automatic discovery.This module contains components for implementing elegant ETL operations using the OETL Design Pattern.
Short for Orchestrated Extract-Transform-Load is a pattern that takes the ideas behind variations of the Model-View-Whatever design pattern
The Orchestrator is responsible for conducting the interactions between the Extractor -> Transformer -> Loader.
The Ochestrator reads data from the Extractor then uses the result as a parameter to calling the Transformer and saves the transformed result into the Loader. The Transformer can be optional as there are scenarios where data transformation is not needed (i.e. raw data ingestion to a landing zone)
Each layer may have a single or multiple implementations, and this is handled automatically in the Orchestrator
This library provides common simple implementations and base classes for implementing the OETL design pattern.
To simplify object construction, we provide the Orchestrator fluent interface from spetlr.etl
from spetlr.etl import Extractor, Transformer, Loader, Orchestrator
(Orchestrator()
.extract_from(Extractor())
.transform_with(Transformer())
.load_into(Loader())
.execute())
All ETL classes, Orchestrator, Extractor, Transformer, TransformerNC and Loader are ETL objects.
This means that they have a method etl(self, inputs: dataset_group) -> dataset_group
(where dataset_group = Dict[str, DataFrame]
) that transforms a set of import to a set of
outputs. The special properties of each type are
The usecase for the TransformerNC comes when there is a need to keep previously extracted (or transformed) dataframes after a transformation step. When working with these classes it is crucial to set dataset input keys and dataset output keys. This ensures that the transformer and/or loader has explicit information on which dataframe(s) to handle.
The special case of the Orchestrator is that it takes all its steps and executes them
in sequence on its inputs. Running in the default execute()
method, the inputs are empty,
but an orchestrator can also be added as part of another orchestrator with the step
method.
For the most general case of a many-to-many transformation, implement your step by inheriting
from the EtlBase
class.
Here are some example usages and implementations of the ETL class provided
Here’s an example of reading data from a single location, transforming it once and saving to a single destination. This is the most simple etl case, and will be used as base for the below more complex examples.
import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType
from spetlr.etl import Extractor, Transformer, Loader, Orchestrator
from spetlr.spark import Spark
class GuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("1", "Fender", "Telecaster", "1950"),
("2", "Gibson", "Les Paul", "1959"),
("3", "Ibanez", "RG", "1987"),
]
),
"""
id STRING,
brand STRING,
model STRING,
year STRING
""",
)
class BasicTransformer(Transformer):
def process(self, df: DataFrame) -> DataFrame:
print("Current DataFrame schema")
df.printSchema()
df = df.withColumn("id", f.col("id").cast(IntegerType()))
df = df.withColumn("year", f.col("year").cast(IntegerType()))
print("New DataFrame schema")
df.printSchema()
return df
class NoopLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
df.printSchema()
df.show()
print("ETL Orchestrator using a single simple transformer")
etl = (
Orchestrator()
.extract_from(GuitarExtractor())
.transform_with(BasicTransformer())
.load_into(NoopLoader())
)
etl.execute()
The code above produces the following output:
Original DataFrame schema
root
|-- id: string (nullable = true)
|-- brand: string (nullable = true)
|-- model: string (nullable = true)
|-- year: string (nullable = true)
New DataFrame schema
root
|-- id: integer (nullable = true)
|-- brand: string (nullable = true)
|-- model: string (nullable = true)
|-- year: integer (nullable = true)
+---+------+----------+----+
| id| brand| model|year|
+---+------+----------+----+
| 1|Fender|Telecaster|1950|
| 2|Gibson| Les Paul|1959|
| 3|Ibanez| RG|1987|
+---+------+----------+----+
Here’s an example of having multiple Transformer
implementations that is reused to change the datatype of a given column,
where the column name is parameterized.
import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from spetlr.etl import Extractor, Transformer, Loader, Orchestrator
from spetlr.spark import Spark
class GuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("1", "Fender", "Telecaster", "1950"),
("2", "Gibson", "Les Paul", "1959"),
("3", "Ibanez", "RG", "1987"),
]
),
StructType(
[
StructField("id", StringType()),
StructField("brand", StringType()),
StructField("model", StringType()),
StructField("year", StringType()),
]
),
)
class IntegerColumnTransformer(Transformer):
def __init__(self, col_name: str):
super().__init__()
self.col_name = col_name
def process(self, df: DataFrame) -> DataFrame:
df = df.withColumn(self.col_name, f.col(self.col_name).cast(IntegerType()))
return df
class NoopLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
df.printSchema()
df.show()
print("ETL Orchestrator using multiple transformers")
etl = (
Orchestrator()
.extract_from(GuitarExtractor())
.transform_with(IntegerColumnTransformer("id"))
.transform_with(IntegerColumnTransformer("year"))
.load_into(NoopLoader())
)
etl.execute()
Here’s an example of having multiple Extractor
implementations and applying transformations using
the process_many
method.
The read()
function in Extractor
will return a dictionary that uses the type name of the Extractor
as the key, and a DataFrame
as its value, the used kan can be overridden in the constructor.
Transformer
provides the function process_many(dataset: {})
and returns a single DataFrame
.
import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType
from spetlr.etl import Extractor, Loader, Orchestrator, Transformer
from spetlr.spark import Spark
class AmericanGuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("1", "Fender", "Telecaster", "1950"),
("2", "Gibson", "Les Paul", "1959"),
]
),
StructType(
[
StructField("id", StringType()),
StructField("brand", StringType()),
StructField("model", StringType()),
StructField("year", StringType()),
]
),
)
class JapaneseGuitarExtractor(Extractor):
def __init__(self):
super().__init__(dataset_key="japanese")
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[("3", "Ibanez", "RG", "1987"), ("4", "Takamine", "Pro Series", "1959")]
),
StructType(
[
StructField("id", StringType()),
StructField("brand", StringType()),
StructField("model", StringType()),
StructField("year", StringType()),
]
),
)
class CountryOfOriginTransformer(Transformer):
def process_many(self, dataset: {}) -> DataFrame:
usa_df = dataset["AmericanGuitarExtractor"].withColumn("country", f.lit("USA"))
jap_df = dataset["japanese"].withColumn("country", f.lit("Japan"))
return usa_df.union(jap_df)
class NoopLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
df.printSchema()
df.show()
print("ETL Orchestrator using multiple extractors")
etl = (
Orchestrator()
.extract_from(AmericanGuitarExtractor())
.extract_from(JapaneseGuitarExtractor())
.transform_with(CountryOfOriginTransformer())
.load_into(NoopLoader())
)
etl.execute()
The code above produces the following output:
root
|-- id: string (nullable = true)
|-- brand: string (nullable = true)
|-- model: string (nullable = true)
|-- year: string (nullable = true)
|-- country: string (nullable = false)
+---+--------+----------+----+-------+
| id| brand| model|year|country|
+---+--------+----------+----+-------+
| 1| Fender|Telecaster|1950| USA|
| 2| Gibson| Les Paul|1959| USA|
| 3| Ibanez| RG|1987| Japan|
| 4|Takamine|Pro Series|1959| Japan|
+---+--------+----------+----+-------+
Here’s an example of data raw ingestion without applying any transformations.
from pyspark.sql import DataFrame
from spetlr.etl import Extractor, Loader, Orchestrator
from spetlr.spark import Spark
class GuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("1", "Fender", "Telecaster", "1950"),
("2", "Gibson", "Les Paul", "1959"),
("3", "Ibanez", "RG", "1987"),
]
),
"""id STRING, brand STRING, model STRING, year STRING""",
)
class NoopLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
df.printSchema()
df.show()
print("ETL Orchestrator with no transformations")
etl = Orchestrator().extract_from(GuitarExtractor()).load_into(NoopLoader())
etl.execute()
Here’s an example of having multiple Loader
implementations that is writing the transformed data into multiple destinations.
import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from spetlr.etl import Extractor, Transformer, Loader, Orchestrator
from spetlr.spark import Spark
class GuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("1", "Fender", "Telecaster", "1950"),
("2", "Gibson", "Les Paul", "1959"),
("3", "Ibanez", "RG", "1987"),
]
),
StructType(
[
StructField("id", StringType()),
StructField("brand", StringType()),
StructField("model", StringType()),
StructField("year", StringType()),
]
),
)
class BasicTransformer(Transformer):
def process(self, df: DataFrame) -> DataFrame:
print("Current DataFrame schema")
df.printSchema()
df = df.withColumn("id", f.col("id").cast(IntegerType()))
df = df.withColumn("year", f.col("year").cast(IntegerType()))
print("New DataFrame schema")
df.printSchema()
return df
class NoopSilverLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
class NoopGoldLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
df.printSchema()
df.show()
print("ETL Orchestrator using multiple loaders")
etl = (
Orchestrator()
.extract_from(GuitarExtractor())
.transform_with(BasicTransformer())
.load_into(NoopSilverLoader())
.load_into(NoopGoldLoader())
)
etl.execute()
Using Example-2, Example-3 and Example-5 as reference,
any combinations for single/multiple implementations of Extractor
, Transformer
or Loader
can be created.
Here’s an example of having both multiple Extractor
, Transformer
and Loader
implementations.
It is important that the first transformer is a MultiInputTransformer
when having multiple extractors.
import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from spetlr.etl import Extractor, Transformer, Loader, Orchestrator
from spetlr.spark import Spark
class AmericanGuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("1", "Fender", "Telecaster", "1950"),
("2", "Gibson", "Les Paul", "1959"),
]
),
StructType(
[
StructField("id", StringType()),
StructField("brand", StringType()),
StructField("model", StringType()),
StructField("year", StringType()),
]
),
)
class JapaneseGuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[("3", "Ibanez", "RG", "1987"), ("4", "Takamine", "Pro Series", "1959")]
),
StructType(
[
StructField("id", StringType()),
StructField("brand", StringType()),
StructField("model", StringType()),
StructField("year", StringType()),
]
),
)
class CountryOfOriginTransformer(Transformer):
def process_many(self, dataset: {}) -> DataFrame:
usa_df = dataset["AmericanGuitarExtractor"].withColumn("country", f.lit("USA"))
jap_df = dataset["JapaneseGuitarExtractor"].withColumn(
"country", f.lit("Japan")
)
return usa_df.union(jap_df)
class BasicTransformer(Transformer):
def process(self, df: DataFrame) -> DataFrame:
print("Current DataFrame schema")
df.printSchema()
df = df.withColumn("id", f.col("id").cast(IntegerType()))
df = df.withColumn("year", f.col("year").cast(IntegerType()))
print("New DataFrame schema")
df.printSchema()
return df
class NoopSilverLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
class NoopGoldLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
df.printSchema()
df.show()
print("ETL Orchestrator using multiple loaders")
etl = (
Orchestrator()
.extract_from(AmericanGuitarExtractor())
.extract_from(JapaneseGuitarExtractor())
.transform_with(CountryOfOriginTransformer())
.transform_with(BasicTransformer())
.load_into(NoopSilverLoader())
.load_into(NoopGoldLoader())
)
etl.execute()
This example illustrates the use of TransformerNC
.
The job here is to join the two extracted dataframes - an employees dataframe and a birthdays dataframe.
But, before the birthdays can be joined onto the employees, the employees dataframe require a transformation step.
As the transformation step of employees is handled by an TransformerNC
, it does not consume the other inputs from the dataset_group
.
Hence, birthdays is still available from the inputs - even after the transformation of employees.
Then both frames can be joined and the final dataframe saved via an Loader
.
When working with TransformerNC
it is important to mind that dataset keys are crucial.
Setting both input and output dataset key(s) ensure that the Transformers
and Loaders
handle the intended dataframes.
import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
from spetlr.etl import Extractor, Loader, Orchestrator, TransformerNC
from spetlr.etl.types import dataset_group
from spetlr.spark import Spark
class OfficeEmployeeExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("1", "Michael Scott", "Regional Manager"),
("2", "Dwight K. Schrute", "Assistant to the Regional Manager"),
("3", "Jim Halpert", "Salesman"),
("4", "Pam Beesly", "Receptionist"),
]
),
StructType(
[
StructField("id", StringType()),
StructField("name", StringType()),
StructField("position", StringType()),
]
),
)
class OfficeBirthdaysExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
(1, "March 15"),
(2, "January 20"),
(3, "October 1"),
(4, "March 25"),
]
),
StructType(
[
StructField("id", IntegerType()),
StructField("birthday", StringType()),
]
),
)
class IntegerTransformerNC(TransformerNC):
def process(self, df: DataFrame) -> DataFrame:
return df.withColumn("id", f.col("id").cast(IntegerType()))
class JoinTransformerNC(TransformerNC):
def process_many(self, dataset: dataset_group) -> DataFrame:
df_employee = dataset["df_employee_transformed"]
df_birthdays = dataset["df_birthdays"]
return df_employee.join(other=df_birthdays, on="id")
class NoopLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
df.printSchema()
df.show()
print("ETL Orchestrator using two non consuming transformers")
etl = (
Orchestrator()
.extract_from(OfficeEmployeeExtractor(dataset_key="df_employee"))
.extract_from(OfficeBirthdaysExtractor(dataset_key="df_birthdays"))
.transform_with(
IntegerTransformerNC(
dataset_input_keys="df_employee",
dataset_output_key="df_employee_transformed",
)
)
.transform_with(
JoinTransformerNC(
dataset_input_keys=["df_employee_transformed", "df_birthdays"],
dataset_output_key="df_final",
)
)
.load_into(NoopLoader(dataset_input_key="df_final"))
)
etl.execute()
This example illustrates the use of an orchestrator as just another ETL step. The principle is called composit orchestration:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import DataFrame
from spetlr.etl import Extractor, Transformer, Loader, Orchestrator, dataset_group
from spetlr.spark import Spark
class AmericanGuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("1", "Fender", "Telecaster", "1950"),
("2", "Gibson", "Les Paul", "1959"),
]
),
T.StructType(
[
T.StructField("id", T.StringType()),
T.StructField("brand", T.StringType()),
T.StructField("model", T.StringType()),
T.StructField("year", T.StringType()),
]
),
)
class JapaneseGuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("3", "Ibanez", "RG", "1987"),
("4", "Takamine", "Pro Series", "1959"),
]
),
T.StructType(
[
T.StructField("id", T.StringType()),
T.StructField("brand", T.StringType()),
T.StructField("model", T.StringType()),
T.StructField("year", T.StringType()),
]
),
)
class CountryOfOriginTransformer(Transformer):
def process_many(self, dataset: dataset_group) -> DataFrame:
usa_df = dataset["AmericanGuitarExtractor"].withColumn("country", F.lit("USA"))
jap_df = dataset["JapaneseGuitarExtractor"].withColumn("country", F.lit("Japan"))
return usa_df.union(jap_df)
class OrchestratorLoader(Loader):
def __init__(self, orchestrator: Orchestrator):
super().__init__()
self.orchestrator = orchestrator
def save_many(self, datasets: dataset_group) -> None:
self.orchestrator.execute(datasets)
class NoopLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
df.printSchema()
df.show()
print("ETL Orchestrator using composit inner orchestrator")
etl_inner = (
Orchestrator()
.transform_with(CountryOfOriginTransformer())
.load_into(NoopLoader())
)
etl_outer = (
Orchestrator()
.extract_from(AmericanGuitarExtractor())
.extract_from(JapaneseGuitarExtractor())
.load_into(OrchestratorLoader(etl_inner))
)
etl_outer.execute()
This page documents all the spetlr extractors, following the OETL pattern.
Extractors in spetlr:
This extractor reads data from an Azure eventhub and returns a structural streaming dataframe.
Under the hood spark azure eventhub is used, and this maven library
from spetlr.etl import Extractor
class EventhubStreamExtractor(Extractor):
def __init__(self,
consumerGroup: str,
connectionString: str = None,
namespace: str = None,
eventhub: str = None,
accessKeyName: str = None,
accessKey: str = None,
maxEventsPerTrigger: int = 10000):
...
Usage example with connection string:
eventhubStreamExtractor = EventhubStreamExtractor(
consumerGroup="TestConsumerGroup",
connectionString="TestSecretConnectionString",
maxEventsPerTrigger = 100000
)
Usage example without connection string:
eventhubStreamExtractor = EventhubStreamExtractor(
consumerGroup="TestConsumerGroup",
namespace="TestNamespace",
eventhub="TestEventhub",
accessKeyName="TestAccessKeyName",
accessKey="TestSecretAccessKey",
maxEventsPerTrigger = 100000
)
Usage example with defining start timestamp:
eventhubStreamExtractor = EventhubStreamExtractor(
consumerGroup="TestConsumerGroup",
connectionString="TestSecretConnectionString",
maxEventsPerTrigger = 100000,
startEnqueuedTime = datetime.utcnow()
)
This section elaborates on how the EventhubStreamExtractor
extractor works and how to use it in the OETL pattern.
from pyspark.sql import DataFrame
from pyspark.sql.types import T
import pyspark.sql.functions as F
from spetlr.etl import Transformer, Loader, Orchestrator
from spetlr.extractors import EventhubStreamExtractor
from spetlr.functions import init_dbutils
class BasicTransformer(Transformer):
def process(self, df: DataFrame) -> DataFrame:
print('Current DataFrame schema')
df.printSchema()
df = df.withColumn('body', F.col('body').cast(T.StringType()))
print('New DataFrame schema')
df.printSchema()
return df
class NoopLoader(Loader):
def save(self, df: DataFrame) -> DataFrame:
df.write.format('noop').mode('overwrite').save()
return df
print('ETL Orchestrator using EventhubStreamExtractor')
etl = (Orchestrator()
.extract_from(EventhubStreamExtractor(
consumerGroup="TestConsumerGroup",
connectionString=init_dbutils().secrets.get(scope = "TestScope", key = "TestSecretConnectionString"),
maxEventsPerTrigger = 100000
))
.transform_with(BasicTransformer())
.load_into(NoopLoader())
)
result = etl.execute()
This extractor only select the newest data from the source by comparing with a target table.
What is extracted?
"""
Source has the following data:
|id| stringcol | timecol |
|--|--------------|------------------|
|1 | "string1" | 01.01.2021 10:50 |
|22| "string2inc" | 01.01.2021 10:56 |
|3 | "string3" | 01.01.2021 11:00 |
Target has the following data
|id| stringcol | timecol |
|--|--------------|------------------|
|1 | "string1" | 01.01.2021 10:50 |
|2| "string2" | 01.01.2021 10:55 |
So data from after 01.01.2021 10:55 should be read
|id| stringcol | timecol |
|--|--------------|------------------|
|22| "string2inc" | 01.01.2021 10:56 |
|3 | "string3" | 01.01.2021 11:00 |
"""
How to use it:
from pyspark.sql import DataFrame
import pyspark.sql.functions as f
from spetlr.etl.extractors import IncrementalExtractor
from spetlr.delta import DeltaHandle
from spetlr.etl import Transformer, Loader, Orchestrator
class BasicTransformer(Transformer):
def process(self, df: DataFrame) -> DataFrame:
df = df.withColumn('idAsString', f.col('id').cast("string"))
return df
class NoopLoader(Loader):
def save(self, df: DataFrame) -> DataFrame:
df.write.format('noop').mode('overwrite').save()
return df
etl = (Orchestrator
.extract_from(IncrementalExtractor(
handle_source=DeltaHandle.from_tc("SourceId"),
handle_target=DeltaHandle.from_tc("TargetId"),
time_col_source="TimeColumn",
time_col_target="TimeColumn",
dataset_key="source"
))
.transform_with(BasicTransformer())
.load_into(NoopLoader())
)
result = etl.execute()
Functions in spetlr:
The function drops a databricks database table and deletes the directory associated with the table.
def drop_table_cascade(DBDotTableName: str) -> None:
...
Usage example:
drop_table_cascade("test.testTable")
This section elaborates on how the drop_table_cascade
function works with a small example.
Create a test database:
CREATE DATABASE IF NOT EXISTS test
COMMENT "A test database"
LOCATION "/tmp/test/"
Create a test table:
CREATE TABLE IF NOT EXISTS test.testTable(
Id STRING,
sometext STRING,
someinteger INT
)
USING DELTA
COMMENT "Contains test data"
LOCATION "/tmp/test/testTable"
Insert dummy data into the test table:
insert into test.testTable values ("ID1","hello",1)
Even if the table is dropped using drop table test.testTable
, the table files are not neccesarily deleted - see Databrick documentation.
Try to run the following code and see by your self:
drop table test.testTable;
CREATE TABLE IF NOT EXISTS test.testTable(
Id STRING,
sometext STRING,
someinteger INT
)
USING DELTA
COMMENT "Contains test data"
LOCATION "/tmp/test/testTable";
select * from test.testTable;
You will notice, that when creating the table using files from the same path as previous, no files were deleted using drop table
.
If you in stead use:
drop_table_cascade("test.testTable")
The testTable is dropped and the files (directory) are deleted.
The orchestrators package contains complete orchestration steps that extract and transform from a set of sources and load the result to a target.
A very common pattern in data platforms is that json documents are published to an azure eventhub with capture enabled. The dataplatform wants to ingest that data into a delta table with a given schema before carrying out further transformations.
The class EhJsonToDeltaOrchestrator
has been designed to carry out this task with
minimal configuration required.
The arguments to this orchestrator consist of
EventHubCaptureExtractor
or the key to a TableConfigurator item from which it
can be initializedDeltaHandle
All important configurations follow from the schema and partitioning of the delta table.
There may be cases where only a subset of rows is desired to be extracted in the
process. Here the orchestrator offers the method .filter_with
which allows
additional transformation steps to be injected before the rows are finally appended
to the delta table.
“A medallion architecture is a data design pattern used to logically organize data in a lakehouse, with the goal of incrementally and progressively improving the structure and quality of data as it flows through each layer of the architecture:
(from Bronze ⇒ Silver ⇒ Gold layer tables).
Medallion architectures are sometimes also referred to as “multi-hop” architectures.”
The class EhToDeltaBronzeOrchestrator
has been designed to carry out the task of ingest eventhub data to a bronze layer. Data is always appended to the bronze table. By utilizing EhJsonToDeltaExtractor
from previous section data is extracting incrementally.
“The Bronze layer is where we land all the data from external source systems. The table structures in this layer correspond to the source system table structures “as-is,” along with any additional metadata columns that capture the load date/time, process ID, etc. The focus in this layer is quick Change Data Capture and the ability to provide an historical archive of source (cold storage), data lineage, auditability, reprocessing if needed without rereading the data from the source system.”
The schema of the captured eventhub data can be found here: Exploring captured Avro files in Azure Event Hubs. Explanation of some of the columns can be found here and here.
Using the orchestrator without adding any filtering .filter_with
the output schema is the following:
Column Name | Data type | Explanation |
---|---|---|
EventhubRowId | Long | An ID generated to give a unique id for row in the bronze table. Calculated based on sha2 hashing the Body and EnqueuedTimestamp. NB: There is a possibility for non-uniqueness. |
BodyId | Long | An ID generated to give a unique id for each unique Body message. Calculated based on sha2 hashing the Body. Can be used for identify rows with same Body. |
Body | String | The eventhub body casted as a string - for readability and searchability. Transformed version of the binary body. |
EnqueuedTimestamp | Timestamp | The enqueueded time of the eventhub row. This is a transformation of the EnqueuedTime, which is the date and time, in UTC, of when the event was enqueued in the Event Hub partition. |
StreamingTime | Timestamp | A timestamp added in the moment the orchestrator processed eventhub data. |
SequenceNumber | Long | Gets the logical sequence number of the event within the partition stream of the Event Hub. |
Offset | String | Gets the offset of the data relative to the Event Hub partition stream. The offset is a marker or identifier for an event within the Event Hubs stream. The identifier is unique within a partition of the Event Hubs stream. |
SystemProperties | String | The set of free-form event properties which were provided by the Event Hubs service to pass metadata associated with the event or associated Event Hubs operation. |
Properties | String | The set of free-form properties which may be used for associating metadata with the event that is meaningful within the application context. |
pdate | Timestamp | A transformation of the eventhub partitioning set to a timestamp. See previous section. |
The bronze schema should “at least” contain the following schema. The columns of this schema is asserted in the bronze transformer step.
from pyspark.sql.types import (
LongType,
StringType,
StructField,
StructType,
TimestampType,
)
schema_bronze = StructType(
[
StructField("EventhubRowId", LongType(), True),
StructField("BodyId", LongType(), True),
StructField("Body", StringType(), True),
StructField("EnqueuedTimestamp", TimestampType(), True),
StructField("StreamingTime", TimestampType(), True),
StructField("SequenceNumber", LongType(), True),
StructField("Offset", StringType(), True),
StructField("SystemProperties", StringType(), True),
StructField("Properties", StringType(), True),
StructField("pdate", TimestampType(), True),
]
)
The arguments to this orchestrator consist of
EventHubCaptureExtractor
or the key to a TableConfigurator item from which it can be initializedDeltaHandle
There may be cases where only a subset of rows is desired to be extracted in the
process. Here the orchestrator offers the method .filter_with
which allows
additional transformation steps to be injected before the rows are finally appended to the delta table.
from spetlr.delta import DeltaHandle
from spetlr.eh.EventHubCaptureExtractor import EventHubCaptureExtractor
from spetlr.orchestrators import EhToDeltaBronzeOrchestrator
eh=EventHubCaptureExtractor.from_tc("eh_id")
dh=DeltaHandle.from_tc("dh_id")
orchestrator = EhToDeltaBronzeOrchestrator(eh=eh, dh=dh)
orchestrator.execute()
The class EhToDeltaSilverOrchestrator
has been designed to carry out the task of unpacking and transforming bronze eventhub data to the silver layer.
“In the Silver layer of the lakehouse, the data from the Bronze layer is matched, merged, conformed and cleansed (“just-enough”) so that the Silver layer can provide an “Enterprise view” of all its key business entities, concepts and transactions. (e.g. master customers, stores, non-duplicated transactions and cross-reference tables).”
Data is per default incrementally upserted to the silver table. By utilizing EhJsonToDeltaTransformer
from previous section the schema of the dh_target
is used for unpacking the eventhub bronze schema. It is therefore only neccesary to define the target delta schema. There may be cases where only a subset of rows is desired to be extracted in the
process. Here the orchestrator offers the method .filter_with
which allows
additional transformation steps to be injected before the rows are finally upserted to the delta table.
Note: It is possible to choose either append or overwrite instead of upsert. Keep in mind, that the extracter will in these cases use the SimpleExtractor
and SimpleLoader
for extracting/loading data.
The arguments to this orchestrator consist of
DeltaHandle
(the bronze eventhub table)DeltaHandle
(the silver eventhub table)The schema of dh_target
defines how the eventhub data is unpacked.
from spetlr.delta import DeltaHandle
from spetlr.orchestrators import EhToDeltaSilverOrchestrator
dh_source=DeltaHandle.from_tc("dh_source_id")
dh_target=DeltaHandle.from_tc("dh_target_id")
orchestrator = EhToDeltaSilverOrchestrator(dh_source=dh_source, dh_target=dh_target)
orchestrator.execute()
What about the gold layer? Since the gold layer often associates with customade business logic - no orchestrator is implemented for the purpose.
“Data in the Gold layer of the lakehouse is typically organized in consumption-ready “project-specific” databases. The Gold layer is for reporting and uses more de-normalized and read-optimized data models with fewer joins. The final layer of data transformations and data quality rules are applied here.”
The Schema Manager class is the intended way to retrieve schemas in spetlr. It can be desirable to retrieve the schemas of dataframes at various times, for example to ensure that the data has the correct form before being written to a table or to be able to reason on the columns included in incoming data.
The SchemaManager
class is a singleton class that can be invoked anywhere to get any schema defined in a dataplatform using offline methods.
This section will briefly cover the included methods and their use, the following sections go into slightly more detail with how they are implemented. The main methods are:
register_schema
get_schema
get_all_schemas
Schemas written in pyspark must be registered with a given name to be accessible. The schema of any table can be retrieved by invoking the get_schema
method with either the given name of the schema or the name of the table the schema is attributed to in a .yaml file.
Lastly, all defined schemas can be retrieved in a single dictionary. This includes those defined in separate files as well as .yaml files.Schemas can be defined using various types and in various locations in the platform. Currently, the schema manager supports:
Schemas are written directly to the .yaml files under the schema
attribute, with the except of pyspark schemas, which are defined in separate files. For pyspark schemas to be available, they must be registered in the Schema Manager under a name with the register_schema
function.
Schemas defined directly in .yaml files are made available through the configurator. The get_schema
-method of the schema manager works as follows:
schema
attribute of a table with the given name.Similarly, the get_all_schemas
-method iterates over the Configurators table_details
to find all tables that have a schema
attribute.
SQL methods ovrview:
Let’s say you have an Azure SQL server called youservername with an associated database yourdatabase. The username is explicitly defined here as customusername and the password as [REDACTED]. Variables related to security challenges like passwords are recommended to be located in e.g. databricks secrets. Here is a usage example:
from spetlr.sql import SqlServer
class ExampleSqlServer(SqlServer):
def __init__(
self,
database: str = "yourdatabase",
hostname: str = None,
username: str = None,
password: str = None,
port: str = "1433",
):
self.hostname = "yourservername.database.windows.net" if hostname is None else hostname
self.username = "customusername" if username is None else username
# The password should of course be collected from e.g. databricks secrets
self.password ="[REDACTED]" if password is None else password
self.database = database
self.port = port
super().__init__(
self.hostname, self.database, self.username, self.password, self.port
)
If you are using a Service Principal to create connection to the server/database, you should set the spnid = "yourspnid", spnpassword = "[REDACTED]"
instead. The SqlServer class ensures to connect properly via the JDBC/ODBC drivers. Note, you must use either SQL user credentials or SPN credentials - never both.
The method upserts (updates or inserts) a databricks dataframe into a target sql table.
def upsert_to_table_by_name(
self,
df_source: DataFrame,
table_name: str,
join_cols: List[str],
filter_join_cols: bool = True,
overwrite_if_target_is_empty: bool = True,
):
...
If ‘filter_join_cols’ is True, the dataframe will be filtered to not contain Null values in sql merge operation. This can be set to False
to save compute if this check is already handled.
If ‘overwrite_if_target_is_empty’ is True, the first row of the target table is read and if empty the dataframe is overwritten to the table instead of doing a merge. This can be set to False
to save compute if the programmer knows the target never will be empty.
Usage example:
sql_server.upsert_to_table_by_name(df_new, "tableName", ["Id"])
This nice class can help parse and execute sql-files. It can be used for both executing spark and Azure sql queries.
NB: The parser uses the semicolon-character (;) to split the queries. If the character is used for other purposes than closing a query, there might arise some executing issues. Please use the Spark.get().sql() or SqlServer.sql() instead.
In the example below the SqlExecutor is inherited, and your sql server is used (see SQL Server Class). Furthermore, provide the module of the sql-files which can be executed into the base_module-variable.
from tests.cluster.sql import extras
from tests.cluster.sql.DeliverySqlServer import DeliverySqlServer
from spetlr.sql import SqlExecutor
class DeliverySqlExecutor(SqlExecutor):
def __init__(self):
super().__init__(base_module=extras, server=DeliverySqlServer())
If one need to execute sql queries in Databricks using Spark, there is no need for providing a server. By default, the class uses the spetlr Spark class.
from spetlr.sql import SqlExecutor
class SparkSqlExecutor(SqlExecutor):
def __init__(self):
super().__init__(base_module=extras)
In the setup job, one could consider to create all delivery SQL tables:
from spetlr.configurator import Configurator
from tests.cluster.delta.SparkExecutor import SparkSqlExecutor
from tests.cluster.sql.DeliverySqlExecutor import DeliverySqlExecutor
# In setup.py
def setup_production_tables():
Configurator().set_prod()
SparkSqlExecutor().execute_sql_file("*")
DeliverySqlExecutor().execute_sql_file("*")
It can be useful to have sql files among the resources that are only applied in special
cases. The SqlExecutor
therefore allows to exclude these files with the
exclude_pattern
parameter. Any name where this pattern is found in the name will not
be included in the execution.
Transformations in spetlr:
UPDATE: Pyspark has an equivalent implementation .unionByName(df, allowMissingColumns=False)
, see the documentation for more information.
The transformation unions dataframes by appending the dataframes on each other and keep all columns.
from pyspark.sql import DataFrame
from typing import List
def concat_dfs(dfs: List[DataFrame]) -> DataFrame:
...
Usage example:
concat_dfs([df1,df2,df3])
This section elaborates on how the concat_dfs
function works with a small example.
Create three test datasets:
df1 = Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize([
('1', 'Fender', 'Telecaster', '1950'),
('2', 'Gibson', 'Les Paul', '1959'),
('3', 'Ibanez', 'RG', '1987')
]),
StructType([
StructField('id', StringType()),
StructField('brand', StringType()),
StructField('model', StringType()),
StructField('year', StringType()),
]))
df2 = Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize([
('1', 'Fender', 'Stratocaster', 'Small'),
('2', 'Gibson', 'Les Paul Junior', 'Medium'),
('3', 'Ibanez', 'JPM', 'Large')
]),
StructType([
StructField('id', StringType()),
StructField('brand', StringType()),
StructField('model', StringType()),
StructField('size', StringType()),
]))
Concatenate (union) the two dataframes:
# SPETLR's "concat_dfs"
result = concat_dfs([df1,df2])
# pyspark's unionByName
result = df1.unionByName(df2, allowMissingColumns=True)
Print the dataframe:
result.show()
The output is then:
+------+---+---------------+------+----+
| brand| id| model| size|year|
+------+---+---------------+------+----+
|Fender| 1| Telecaster| null|1950|
|Gibson| 2| Les Paul| null|1959|
|Ibanez| 3| RG| null|1987|
|Fender| 1| Stratocaster| Small|null|
|Gibson| 2|Les Paul Junior|Medium|null|
|Ibanez| 3| JPM| Large|null|
+------+---+---------------+------+----+
See that the columns “brand”, “id”, “model”, “size” (from df2) and “year” (from df1) are added to the dataframe consisting of the union of df1 and df2.
The FuzzySelectTransformer
is an ETL transformer that can process a single dataframe. Its purpose is to help create
short concise select code that is somewhat insensitive to source columns that are misspelled
or use different capitalization.
To use, construct the FuzzySelectTransformer
with the following arguments:
columns
The list of column names in the final dataframe in order.match_cutoff
A cutoff quality in the range [0,1] below which matches will not be accepted.
See difflib arguments for details.Under the hood, difflib is used to find a suitable unique mapping from source to target columns. All column names are converted to lower case before matching.
The association of target to source columns is required to be unique. If the algorithm identifies multiple matching source columns to a target name, an exception will be raised.
Given a dataframe df
, this code renames all columns:
>>> df.show()
+----+-----+------+
|inex|count|lables|
+----+-----+------+
| 1| 2| foo|
| 3| 4| bar|
+----+-----+------+
>>> from spetlr.transformers.fuzzy_select import FuzzySelectTransformer
>>> ft = FuzzySelectTransformer(["Index", "Count", "Label"])
>>> ft.process(df).show()
+-----+-----+-----+
|Index|Count|Label|
+-----+-----+-----+
| 1| 2| foo|
| 3| 4| bar|
+-----+-----+-----+
The transformation merges a databricks dataframe into a target database table.
def merge_df_into_target(df: DataFrame,
table_name: str,
database_name: str,
join_cols: List[str]) -> None:
...
Usage example:
merge_df_into_target(df_new, "testTarget", "test", ["Id"])
The following queries crate a test table with two rows containing guitar data:
CREATE DATABASE IF NOT EXISTS test
COMMENT "A test database"
LOCATION "/tmp/test/";
CREATE TABLE IF NOT EXISTS test.testTarget(
Id STRING,
Brand STRING,
Model STRING
)
USING DELTA
COMMENT "Contains merge test target rows"
LOCATION "/tmp/test/testTarget";
insert into test.testTarget values ("2","Gibson","Les Paul");
select * from testTarget.test
+----+-----+----+-----------+
|Id | Brand | Model|
+----+-----+----+-----------+
| 2| Gibson| Les Paul|
+----+----------+-----------+
The following dataframe has one row that will be merged with Id=2, and the other rows are going to be inserted:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
df_new=spark.createDataFrame(
spark.sparkContext.parallelize([
("1", "Fender", "Jaguar"),
("2", "Gibson","Starfire"),
("3", "Ibanez", "RG")
]),
StructType([
StructField("Id", StringType(), False),
StructField("Brand", StringType(), True),
StructField("Model", StringType(), True),
]))
Use the transformation to merge data into the test delta table:
merge_df_into_target(df_new, "testTarget", "test", ["Id"])
%sql
select * from test.testTarget order by Id
+----+-----+----+-----------+
|Id | Brand | Model|
+----+-----+----+-----------+
| 1| Fender| Jaguar|
| 2| Gibson| Starfire|
| 3| Ibanez| RG|
+----+----------+-----------+
As one can see, the row with id=2 is now merged such that the model went from “Les Paul” to “Starfire”. The two other rows where inserted.
This transformation helps to drop duplicates based on time. If there is multiple duplicates, only the newest row remain. In the example below, a dataframe has several duplicates - since a unique record is defined by a combination of a guitar-id, model and brand. As times go by the amount of guitars available in a store changes. Let’s assume that we only want the newest record and dropping the oldest duplicates:
from spetlr.utils.DropOldestDuplicates import DropOldestDuplicates
data =
| id| model| brand|amount| timecolumn|
+---+------+----------+------+-------------------+
| 1|Fender|Telecaster| 5|2021-07-01 10:00:00|
| 1|Fender|Telecaster| 4|2021-07-01 11:00:00|
| 2|Gibson| Les Paul| 27|2021-07-01 11:00:00|
| 3|Ibanez| RG| 22|2021-08-01 11:00:00|
| 3|Ibanez| RG| 26|2021-09-01 11:00:00|
| 3|Ibanez| RG| 18|2021-10-01 11:00:00|
+---+------+----------+------+-------------------+
df = DropOldestDuplicatesTransformer(
cols=["id", "model", "brand"],
orderByColumn="timecolumn"
).process(data)
df.show()
| id| model| brand|amount| timecolumn|
+---+------+----------+------+-------------------+
| 1|Fender|Telecaster| 4|2021-07-01 11:00:00|
| 2|Gibson| Les Paul| 27|2021-07-01 11:00:00|
| 3|Ibanez| RG| 18|2021-10-01 11:00:00|
+---+------+----------+------+-------------------+
Notice, the oldest duplicates are dropped.
This transformation uses latitude and longitude values to determine the timezone of a specific location. The example below shows how to apply the transformer of an input DataFrame to get a column with timezones. Notice, when either the latitude or longitude value is None, the returned timezone will also be None.
from spetlr.transformers import TimeZoneTransformer
data =
| latitude| longitude|
+-----------+----------+
| 51.519487 | -0.083069|
| 55.6761 | 12.5683|
| None | None|
| None | -0.083069|
| 51.519487 | None|
+-----------+----------+
df = TimeZoneTransformer(
latitude_col="latitude",
longitude_col="longitude",
column_output_name="timezone"
).process(data)
df.show()
| latitude| longitude| timezone|
+-----------+----------+--------------------+
| 51.519487 | -0.083069| "Europe/London"|
| 55.6761 | 12.5683| "Europe/Copenhagen"|
| None | None| None|
| None | -0.083069| None|
| 51.519487 | None| None|
+-----------+----------+--------------------+
This transformation is selecting and casting columns in dataframe based on pyspark schema. If case-insensitive matching is desired, caseInsensitiveMatching can be set to True
from spetlr.transformers import SelectAndCastColumnsTransformer
data =
| id| number| value|
+-----------+----------+----------+
| 1 | 42 | 1 |
| 2 | 355 | 0 |
+-----------+----------+----------+
desired_schema = T.StructType(
[
T.StructField("id", T.StringType(), True),
T.StructField("value", T.BooleanType(), True),
]
)
df = SelectAndCastColumnsTransformer(
schema=desired_schema,
caseInsensitiveMatching=False
).process(data)
df.show()
| id| value|
+-----------+----------+
| "1" | True |
| "2" | False |
+-----------+----------+
This transformer introduces Slowly Changing Dimension 2 (SCD2) columns to a dataframe. The three introduced SCD2 columns are: ValidFrom, ValidTo and IsCurrent. The logic build the SCD2 history based on a time formatted column (the parameter time_col). One can easily extract only active (current) data by applying .filter(“iscurrent=1”) on the dataframe.
Disclaimer: Use only on “full loading” / overwrite.
Usage example:
from spetlr.transformers.ValidFromToTransformer import ValidFromToTransformer
data =
| id| model| brand|amount| timecolumn|
+---+------+----------+------+-------------------+
| 1|Fender|Telecaster| 5|2021-07-01 10:00:00|
| 1|Fender|Telecaster| 5|2021-07-01 10:00:00|
| 1|Fender|Telecaster| 4|2021-07-01 11:00:00|
| 2|Gibson| Les Paul| 27|2021-07-01 11:00:00|
| 3|Ibanez| RG| 22|2021-08-01 11:00:00|
| 3|Ibanez| RG| 26|2021-09-01 11:00:00|
| 3|Ibanez| RG| 18|2021-10-01 11:00:00|
+---+------+----------+------+-------------------+
df = ValidFromToTransformer(
time_col="timecolumn",
wnd_cols=["id", "model", "brand"]
)
.process(data)
.drop("timecolumn")
.orderBy(f.col("ValidFrom").asc(), f.col("ValidTo").asc())
df.show()
| id| model| brand|amount| validfrom| validto|iscurrent|
+---+------+----------+------+-------------------+-------------------+---------+
| 1|Fender|Telecaster| 5|2021-07-01 10:00:00|2021-07-01 11:00:00| false|
| 1|Fender|Telecaster| 4|2021-07-01 11:00:00|2262-04-11 00:00:00| true|
| 2|Gibson| Les Paul| 27|2021-07-01 11:00:00|2262-04-11 00:00:00| true|
| 3|Ibanez| RG| 22|2021-08-01 11:00:00|2021-09-01 11:00:00| false|
| 3|Ibanez| RG| 26|2021-09-01 11:00:00|2021-10-01 11:00:00| false|
| 3|Ibanez| RG| 18|2021-10-01 11:00:00|2262-04-11 00:00:00| true|
+---+------+----------+------+-------------------+-------------------+---------+
# Select only the active (current) rows in the dataframe
df.filter("iscurrent=1").show()
+---+------+----------+------+-------------------+-------------------+---------+
| id| model| brand|amount| validfrom| validto|iscurrent|
+---+------+----------+------+-------------------+-------------------+---------+
| 1|Fender|Telecaster| 4|2021-07-01 11:00:00|2262-04-11 00:00:00| true|
| 2|Gibson| Les Paul| 27|2021-07-01 11:00:00|2262-04-11 00:00:00| true|
| 3|Ibanez| RG| 18|2021-10-01 11:00:00|2262-04-11 00:00:00| true|
+---+------+----------+------+-------------------+-------------------+---------+
Utilities in spetlr:
Using the method spetlr.db_auto.getDbApi()
gives access to a
DatabricksAPI
instance that has been pre-configured for the
current databricks instance. See databricks-api
for usage documentation.
Under the hood the function uses the job context to get the host and token
when on the cluster. When using spetlr
with databricks-connect, the databricks-cli
is
called to configure the client. Thus, the function works without further configuration
in all contexts.
The DataframeCreator
is a helper class to assist in writing concise unittests.
Unittests typically take a dataframe, often created with spark.createDataFrame
and transform it.
The function createDataFrame
requires all data fields to be assigned a value, even if the given unittest is only concerned with a small subset of them.
This class allows the user to specify which columns she wants to give values for. All other columns will be assigned null values.
from spetlr.utils import DataframeCreator
from spetlr.schema_manager.schema import get_schema
df = DataframeCreator.make_partial(
schema=get_schema("""
Id INTEGER,
measured DOUBLE,
customer STRUCT<
name:STRING,
address:STRING
>,
product_nos ARRAY<STRUCT<
no:INTEGER,
name:STRING
>>
"""),
columns=[
"Id",
# of the customer structure, only specify the name
("customer", ["name"]),
# of the products array of structures, only specify the 'no' field in each row
("product_nos", ["no"])
],
data=[
(1, ("otto",), [(1,), (2,)]),
(2, ("max",), []),
],
)
df.show()
Result:
| Id|measured| customer| product_nos|
+---+--------+------------+--------------------+
| 1| null|{otto, null}|[{1, null}, {2, n...|
| 2| null| {max, null}| []|
+---+--------+------------+--------------------+
A set of standard git hooks are included to provide useful functionality
.py
will be formatted with the black code formatterTo use the hooks, they can be installed in any repository by executing this command from a path inside the repository:
spetlr-git-hooks
To uninstall the hooks, simply run this command
spetlr-git-hooks uninstall