Spetlr

A python ETL libRary (SPETLR) for Databricks powered by Apache SPark.

Enables a standardized ETL framework and integration testing for Databricks.


Project maintained by spetlr-org Hosted on GitHub Pages — Theme by mattgraham

SPETLR Documentation

Table of contents:

Table Configurator

Motivation

In databricks there is one standard way to refer to a table, the “database.table” reference. In our dataplatform projects we have often found a need for a more abstracted notion of a table reference, one that allows for injection of test versions of tables and that allows unified reference across different environments (dev, staging, prod). The Configurator provides such a further abstraction level.

Setting up configuration

The Configurator is a singleton class that you need to instantiate and configure once in your project. Make sure this code is always executed before any reference to tables in made. Example:

from spetlr import Configurator
tc = Configurator()
tc.register('ENV','myenv')
tc.add_resource_path(my.resource.module)

Configuration from yaml or json

The Configurator can be configured with json or yaml files. The files must contain the following structure:

The following is an example of a configuration file

MyFirst:
  name: first{ID}
  path: /{MNT}/my/first{ID}
# MNT and ID are default replacements
# using .set_prod(), MNT becomes 'mnt', while in debug it becomes tmp
# is helps when using mount point locations for tables.

MyAlias:
  alias: MyFirst

MyForked:
  release:
    alias: MyFirst
  debug:
    name: another
    path: /my/temp/path
# using release and debug is an alternative way of differentiating the cases instead
# of using {ID}, note that {ID} has the advantage of separating concurrent test runs

MyRecursing:
  name: recursing{ID}
  path: /{MNT}/tables/{MyRecursing_name}
# using the notation '_name' refers back to the 'name' property of that dictionary.
# alias and release/debug will be resolved before the property is accessed

You optionally either provide ‘release’ and ‘debug’ versions of each table or include the structure {ID} in the name and path. This is a special replacement string that will be replaced with and empty string for production tables, or with a “__{GUID}” string when debug is selected. The guid construction allows for non-colliding parallel testing.

Beyond the resource definitions, the Configurator needs to be configured to return production or test versions of tables this is done at the start of your code. In your jobs you need to set Configurator().set_prod() whereas your unit-tests should call Configurator().set_debug().

String substitutions

As was already seen in the example above, all strings can contain python format-string placeholders that reference other strings known to the configurator. The following rules apply:

Describing Hive tables

In spark sql, there are database create statements and table create statemets that follow a fixed pattern reproduced here:

CREATE {DATABASE | SCHEMA} [ IF NOT EXISTS ] database_name
  [ COMMENT database_comment ]
  [ LOCATION database_directory ]
  [ WITH DBPROPERTIES (property_name=property_value [ , ...]) ];

CREATE TABLE [ IF NOT EXISTS ] table_identifier
    [ ( col_name1 col_type1 [ COMMENT col_comment1 ], ... ) ]
    USING data_source
    [ OPTIONS ( key1=val1, key2=val2, ... ) ]
    [ PARTITIONED BY ( col_name1, col_name2, ... ) ]
    [ CLUSTERED BY ( col_name3, col_name4, ... ) 
        [ SORTED BY ( col_name [ ASC | DESC ], ... ) ] 
        INTO num_buckets BUCKETS ]
    [ LOCATION path ]
    [ COMMENT table_comment ]
    [ TBLPROPERTIES ( key1=val1, key2=val2, ... ) ]
    [ AS select_statement ]

The logic in this library expects the information in the above statements to be reproduced in yaml in the following structure:

MySpetlrDbReference:
  name: database_name
  comment: database_comment
  path: database_directory
  format: "db"
  dbproperties:
    property_name: property_value

MySpetlrTableReference:
  name: table_identifier
  path: path
  format: data_source
  schema:
    sql: |
      col_name1 col_type1 [ COMMENT col_comment1 ], ... 
  options:
    key1: val1
    key2: val2
  partitioned_by:
    - col_name1
    - col_name2
  clustered_by:
    cols:
      - col_name3
      - col_name4 
    buckets: num_buckets
    sorted_by:
      - name: col_name 
        ordering: "ASC" or "DESC"
  comment: table_comment
  tblproperties:
    key1: val1
    key2: val2

Note that the form CREATE TABLE table_identifier AS select_statement is not supported. For alternate ways to specify the schema, see the documentation for the schema manager.

Configuration from sql

When using sql statements to create and manage tables, a developer may desire to keep all information referring to a table within one file. Therefore, the configurator employs a sql parsing library to extract table details directly from the CREATE statements. It can be used like this:

from . import my_sql_folder
Configurator().add_sql_resource_path(my_sql_folder)

A sql CREATE statement already contains all information that the configurator needs except the key, aka. table_id, that shall be used to refer to the table. This key needs to be added to the sql in the form of a comment with a magic prefix "-- spetlr.Configurator " (note the spaces. not case-sensitive.) See the following example:

-- spetlr.Configurator key: MySparkDb
CREATE DATABASE IF NOT EXISTS `my_db1{ID}`
COMMENT "Dummy Database 1"
LOCATION "/tmp/foo/bar/my_db1/";

-- spetlr.Configurator key: MyDetailsTable
CREATE TABLE IF NOT EXISTS `my_db1{ID}.details`
(
  {MyAlias_schema},
  another int
  -- comment with ;
)
USING DELTA
COMMENT "Dummy Database 1 details"
LOCATION "/{MNT}/foo/bar/my_db1/details/";

-- pure configurator magic in this statement
-- spetlr.Configurator key: MyAlias
-- spetlr.Configurator alias: MySqlTable
;


-- SPETLR.CONFIGURATOR key: MySqlTable
-- spetlr.Configurator delete_on_delta_schema_mismatch: true
CREATE TABLE IF NOT EXISTS `{MySparkDb}.tbl1`
(
  a int,
  b int,
  c string,
  d timestamp
)
USING DELTA
COMMENT "Dummy Database 1 table 1"
LOCATION "/{MNT}/foo/bar/my_db1/tbl1/";

The example is quite complex and demonstrates a number of features:

When using the configurator to parse sql code, the in-memory structure will be as described in the section on hive-table specification.

Using the Configurator

Once configured, the table configurator is often not used directly. Other classes in the spetlr framework use it when configured with a resource ID. You can find examples in the eventhub unittests:

from spetlr.eh import EventHubCapture
EventHubCapture.from_tc("SpetlrEh")

or in the delta handle unit-tests:

from spetlr.delta import DeltaHandle
DeltaHandle.from_tc("MyTbl")

But sometimes you still need to call the table configurator methods e.g. when constructing your own sql:

from spetlr.config_master import Configurator
f"MERGE INTO {Configurator().table_name('MyTbl')} AS target ..."

Further Features

MNT key

‘MNT’ is another special replacement, similar to “ID”. In production, it is replaced with the string ‘mnt’ while in debug it is replaced with ‘tmp’. The intended usage is in paths where production tables are mounted on external storage, typically mounted under “/mnt” whereas test tables should be written to “/tmp” you can use is as in this example:

MyTable:
  name: mydb{ID}.data_table
  path: /{MNT}/somestorage{ENV}/mydb{ID}/data_table

Extra details

Extra details are now deprecated. Simply register your extras as simple string resources.

from spetlr.config_master import Configurator
tc = Configurator()
tc.register('ENV', 'prod')

The Configurator Command Line Interface (CLI)

The table configurations, available to the configurator can be useful when executing actions on the command line. See below for individual commands. To expose the configurator command line interface, you need to call the .cli() method after you have initialized the configurator with your project details. You therefor need to expose a command line script as shown below.

In the file that initializes your configurator:

def init_my_configurator():
    c= Configurator()
    c.add_resource_path(my_yaml_module)
    c.register('ENV', config.my_env_name)
    return c

if __name__ == "__main__":
    init_my_configurator().cli()

Now, all the functionality below will become available on the command line.

Generated Keys File

When using an IDE to develop python code, a useful feature is auto-completion and linting. Such features are however not available when using string keys from yaml files. It can therefore be useful to extract the keys from the yaml configurations and make them available as python objects.

Call the following command to generate such a keys file from your initialized configurator.

$> my_config generate-keys-file -o keys.py

This will create the file keys.py with the following example contents:

# AUTO GENERATED FILE.
# contains all spetlr.Configurator keys

MyFirst = "MyFirst"
MySecond = "MySecond"
MyAlias = "MyAlias"
MyForked = "MyForked"
MyRecursing = "MyRecursing"

You can now use the keys file to auto-complete and validate your yaml keys:

DeltaHandle.from_tc("MyFirst")

becomes

from keys import MyFirst
DeltaHandle.from_tc(MyFirst)

which also supports flake8 linting for correct spelling.

If you want to check that you did not forget to update the keys file as part of your CICD pipeline, running the same command with the additional option --check-only will return an exit code of 0 if the file was already up-to-date and 1 otherwise.


DeltaHandle and DbHandle

The Configurator contains logic to distinguish between production and debug tables. To make full use of this functionality when reading and writing delta tables, two convenience classes, DeltaHandle and DbHandle, have been provided. Use the classes like this

from spetlr import Configurator
from spetlr.delta import DeltaHandle, DbHandle

tc = Configurator()
tc.add_resource_path('/my/config/files')

# name is mandatory,
# path is optional
# format is optional. Must equal 'db' if provided
db = DbHandle.from_tc('MyDb')

# quickly create the database
db.create()

# name is mandatory,
# path is optional
# format is optional. Must equal 'delta' if provided
dh = DeltaHandle.from_tc('MyTblId')

# quickly create table without schema
dh.create_hive_table()
df = dh.read()
dh.overwrite(df)

This code assumes that there exists a file /my/config/files/stuff.yml like:

MyDb:
  name: TestDb{ID}
  path: /tmp/testdb{ID}

MyTblId:
  name: TestDb{ID}.testTbl
  path: /tmp/testdb{ID}/testTbl

The {ID} parts are either replaced with an empty string (production) or with a uuid if tc.reset(debug=True) has been set.

DeltaHandle Upsert

The method upserts (updates or inserts) a databricks dataframe into a target delta table.

def upsert(
        self,
        df: DataFrame,
        join_cols: List[str],
    ) -> Union[DataFrame, None]:   
    ...

Usage example:

target_dh.upsert(df_new, ["Id"])

Example

The following queries create a test table with two rows containing guitar data. Let’s assume that the Configurator is configured as in the section DeltaHandle and DbHandle, but the testTbl has the following schema:

%sql
(
  Id STRING,
  Brand STRING,
  Model STRING
)

INSERT INTO TestDb.testTbl values ("2","Gibson","Les Paul");

select * from TestDb.testTbl
+----+-----+----+-----------+
|Id  |    Brand |      Model|
+----+-----+----+-----------+
|   2|    Gibson|   Les Paul|
+----+----------+-----------+

The following dataframe has one row that will be merged with Id=2, and the other rows are going to be inserted:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType
df_new=spark.createDataFrame(
        spark.sparkContext.parallelize([
            ("1", "Fender", "Jaguar"),
            ("2", "Gibson","Starfire"),
            ("3", "Ibanez", "RG")
        ]),
        StructType([
            StructField("Id", StringType(), False),
            StructField("Brand", StringType(), True),
          StructField("Model", StringType(), True),
        ]))

Use the upsert method to upsert data into the test delta table:

target_dh = DeltaHandle.from_tc("MyTblId")

target_dh.upsert(df_new, ["Id"])

%sql
select * from test.testTarget order by Id

+----+-----+----+-----------+
|Id  |    Brand |      Model|
+----+-----+----+-----------+
|   1|    Fender|     Jaguar|
|   2|    Gibson|   Starfire|
|   3|    Ibanez|         RG|
+----+----------+-----------+

As one can see, the row with id=2 is now upserted such that the model went from “Les Paul” to “Starfire”. The two other rows where inserted.


Entry points

The TaskEntryPoint is an abstract base class that acts as an entry point for tasks. This class is meant to be subclassed and extended with specific implementation details. The subclass must implement a method task() which serves as the entry point for the task logic.

Usage

To use TaskEntryPoint, create a subclass that implements the task() method:

class MyTask(TaskEntryPoint):
            @classmethod
            def task(cls) -> None:
                # implement the task logic here

It is now ensured that there is a task() method of the subclass MyTask, and we therefore have an entry point at MyTask.task(). This entry point can be added to the python wheel either manually or via automatic discovery.

ETL Orchestrator

Introduction

This module contains components for implementing elegant ETL operations using the OETL Design Pattern.

OETL

Short for Orchestrated Extract-Transform-Load is a pattern that takes the ideas behind variations of the Model-View-Whatever design pattern

Orchestrated ETL

The Orchestrator is responsible for conducting the interactions between the Extractor -> Transformer -> Loader.

The Ochestrator reads data from the Extractor then uses the result as a parameter to calling the Transformer and saves the transformed result into the Loader. The Transformer can be optional as there are scenarios where data transformation is not needed (i.e. raw data ingestion to a landing zone)

Each layer may have a single or multiple implementations, and this is handled automatically in the Orchestrator

Orchestration Fluent Interface

This library provides common simple implementations and base classes for implementing the OETL design pattern. To simplify object construction, we provide the Orchestrator fluent interface from spetlr.etl

from spetlr.etl import Extractor, Transformer, Loader, Orchestrator

(Orchestrator()
    .extract_from(Extractor())
    .transform_with(Transformer())
    .load_into(Loader())
    .execute())

Principles

All ETL classes, Orchestrator, Extractor, Transformer, TransformerNC and Loader are ETL objects. This means that they have a method etl(self, inputs: dataset_group) -> dataset_group (where dataset_group = Dict[str, DataFrame]) that transforms a set of import to a set of outputs. The special properties of each type are

The usecase for the TransformerNC comes when there is a need to keep previously extracted (or transformed) dataframes after a transformation step. When working with these classes it is crucial to set dataset input keys and dataset output keys. This ensures that the transformer and/or loader has explicit information on which dataframe(s) to handle.

The special case of the Orchestrator is that it takes all its steps and executes them in sequence on its inputs. Running in the default execute() method, the inputs are empty, but an orchestrator can also be added as part of another orchestrator with the step method.

For the most general case of a many-to-many transformation, implement your step by inheriting from the EtlBase class.

Usage examples:

Here are some example usages and implementations of the ETL class provided

Example-1

Here’s an example of reading data from a single location, transforming it once and saving to a single destination. This is the most simple etl case, and will be used as base for the below more complex examples.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType

from spetlr.etl import Extractor, Transformer, Loader, Orchestrator
from spetlr.spark import Spark


class GuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                    ("3", "Ibanez", "RG", "1987"),
                ]
            ),
            """
            id STRING,
            brand STRING,
            model STRING,
            year STRING
            """,
        )


class BasicTransformer(Transformer):
    def process(self, df: DataFrame) -> DataFrame:
        print("Current DataFrame schema")
        df.printSchema()

        df = df.withColumn("id", f.col("id").cast(IntegerType()))
        df = df.withColumn("year", f.col("year").cast(IntegerType()))

        print("New DataFrame schema")
        df.printSchema()
        return df


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using a single simple transformer")
etl = (
    Orchestrator()
    .extract_from(GuitarExtractor())
    .transform_with(BasicTransformer())
    .load_into(NoopLoader())
)
etl.execute()

The code above produces the following output:

Original DataFrame schema
root
 |-- id: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: string (nullable = true)

New DataFrame schema
root
 |-- id: integer (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: integer (nullable = true)

+---+------+----------+----+
| id| brand|     model|year|
+---+------+----------+----+
|  1|Fender|Telecaster|1950|
|  2|Gibson|  Les Paul|1959|
|  3|Ibanez|        RG|1987|
+---+------+----------+----+

Example-2

Here’s an example of having multiple Transformer implementations that is reused to change the datatype of a given column, where the column name is parameterized.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from spetlr.etl import Extractor, Transformer, Loader, Orchestrator
from spetlr.spark import Spark


class GuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                    ("3", "Ibanez", "RG", "1987"),
                ]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class IntegerColumnTransformer(Transformer):
    def __init__(self, col_name: str):
        super().__init__()
        self.col_name = col_name

    def process(self, df: DataFrame) -> DataFrame:
        df = df.withColumn(self.col_name, f.col(self.col_name).cast(IntegerType()))
        return df


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using multiple transformers")
etl = (
    Orchestrator()
    .extract_from(GuitarExtractor())
    .transform_with(IntegerColumnTransformer("id"))
    .transform_with(IntegerColumnTransformer("year"))
    .load_into(NoopLoader())
)
etl.execute()

Example-3

Here’s an example of having multiple Extractor implementations and applying transformations using the process_many method.

The read() function in Extractor will return a dictionary that uses the type name of the Extractor as the key, and a DataFrame as its value, the used kan can be overridden in the constructor.

Transformer provides the function process_many(dataset: {}) and returns a single DataFrame.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType

from spetlr.etl import Extractor, Loader, Orchestrator, Transformer
from spetlr.spark import Spark


class AmericanGuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                ]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class JapaneseGuitarExtractor(Extractor):
    def __init__(self):
        super().__init__(dataset_key="japanese")

    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [("3", "Ibanez", "RG", "1987"), ("4", "Takamine", "Pro Series", "1959")]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class CountryOfOriginTransformer(Transformer):
    def process_many(self, dataset: {}) -> DataFrame:
        usa_df = dataset["AmericanGuitarExtractor"].withColumn("country", f.lit("USA"))
        jap_df = dataset["japanese"].withColumn("country", f.lit("Japan"))
        return usa_df.union(jap_df)


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using multiple extractors")
etl = (
    Orchestrator()
    .extract_from(AmericanGuitarExtractor())
    .extract_from(JapaneseGuitarExtractor())
    .transform_with(CountryOfOriginTransformer())
    .load_into(NoopLoader())
)
etl.execute()

The code above produces the following output:

root
 |-- id: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: string (nullable = true)
 |-- country: string (nullable = false)

+---+--------+----------+----+-------+
| id|   brand|     model|year|country|
+---+--------+----------+----+-------+
|  1|  Fender|Telecaster|1950|    USA|
|  2|  Gibson|  Les Paul|1959|    USA|
|  3|  Ibanez|        RG|1987|  Japan|
|  4|Takamine|Pro Series|1959|  Japan|
+---+--------+----------+----+-------+

Example-4

Here’s an example of data raw ingestion without applying any transformations.

from pyspark.sql import DataFrame

from spetlr.etl import Extractor, Loader, Orchestrator
from spetlr.spark import Spark


class GuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                    ("3", "Ibanez", "RG", "1987"),
                ]
            ),
            """id STRING, brand STRING, model STRING, year STRING""",
        )


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator with no transformations")
etl = Orchestrator().extract_from(GuitarExtractor()).load_into(NoopLoader())
etl.execute()

Example-5

Here’s an example of having multiple Loader implementations that is writing the transformed data into multiple destinations.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from spetlr.etl import Extractor, Transformer, Loader, Orchestrator
from spetlr.spark import Spark


class GuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                    ("3", "Ibanez", "RG", "1987"),
                ]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class BasicTransformer(Transformer):
    def process(self, df: DataFrame) -> DataFrame:
        print("Current DataFrame schema")
        df.printSchema()

        df = df.withColumn("id", f.col("id").cast(IntegerType()))
        df = df.withColumn("year", f.col("year").cast(IntegerType()))

        print("New DataFrame schema")
        df.printSchema()
        return df


class NoopSilverLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()


class NoopGoldLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using multiple loaders")
etl = (
    Orchestrator()
    .extract_from(GuitarExtractor())
    .transform_with(BasicTransformer())
    .load_into(NoopSilverLoader())
    .load_into(NoopGoldLoader())
)
etl.execute()

Example-6

Using Example-2, Example-3 and Example-5 as reference, any combinations for single/multiple implementations of Extractor, Transformer or Loader can be created.

Here’s an example of having both multiple Extractor, Transformer and Loader implementations.

It is important that the first transformer is a MultiInputTransformer when having multiple extractors.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from spetlr.etl import Extractor, Transformer, Loader, Orchestrator
from spetlr.spark import Spark


class AmericanGuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                ]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class JapaneseGuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [("3", "Ibanez", "RG", "1987"), ("4", "Takamine", "Pro Series", "1959")]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class CountryOfOriginTransformer(Transformer):
    def process_many(self, dataset: {}) -> DataFrame:
        usa_df = dataset["AmericanGuitarExtractor"].withColumn("country", f.lit("USA"))
        jap_df = dataset["JapaneseGuitarExtractor"].withColumn(
            "country", f.lit("Japan")
        )
        return usa_df.union(jap_df)


class BasicTransformer(Transformer):
    def process(self, df: DataFrame) -> DataFrame:
        print("Current DataFrame schema")
        df.printSchema()

        df = df.withColumn("id", f.col("id").cast(IntegerType()))
        df = df.withColumn("year", f.col("year").cast(IntegerType()))

        print("New DataFrame schema")
        df.printSchema()
        return df


class NoopSilverLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()


class NoopGoldLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using multiple loaders")
etl = (
    Orchestrator()
    .extract_from(AmericanGuitarExtractor())
    .extract_from(JapaneseGuitarExtractor())
    .transform_with(CountryOfOriginTransformer())
    .transform_with(BasicTransformer())
    .load_into(NoopSilverLoader())
    .load_into(NoopGoldLoader())
)
etl.execute()

Example-7

This example illustrates the use of TransformerNC. The job here is to join the two extracted dataframes - an employees dataframe and a birthdays dataframe. But, before the birthdays can be joined onto the employees, the employees dataframe require a transformation step. As the transformation step of employees is handled by an TransformerNC, it does not consume the other inputs from the dataset_group. Hence, birthdays is still available from the inputs - even after the transformation of employees. Then both frames can be joined and the final dataframe saved via an Loader. When working with TransformerNC it is important to mind that dataset keys are crucial. Setting both input and output dataset key(s) ensure that the Transformers and Loaders handle the intended dataframes.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

from spetlr.etl import Extractor, Loader, Orchestrator, TransformerNC
from spetlr.etl.types import dataset_group
from spetlr.spark import Spark


class OfficeEmployeeExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Michael Scott", "Regional Manager"),
                    ("2", "Dwight K. Schrute", "Assistant to the Regional Manager"),
                    ("3", "Jim Halpert", "Salesman"),
                    ("4", "Pam Beesly", "Receptionist"),
                ]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("name", StringType()),
                    StructField("position", StringType()),
                ]
            ),
        )


class OfficeBirthdaysExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    (1, "March 15"),
                    (2, "January 20"),
                    (3, "October 1"),
                    (4, "March 25"),
                ]
            ),
            StructType(
                [
                    StructField("id", IntegerType()),
                    StructField("birthday", StringType()),
                ]
            ),
        )


class IntegerTransformerNC(TransformerNC):
    def process(self, df: DataFrame) -> DataFrame:
        return df.withColumn("id", f.col("id").cast(IntegerType()))


class JoinTransformerNC(TransformerNC):
    def process_many(self, dataset: dataset_group) -> DataFrame:

        df_employee = dataset["df_employee_transformed"]
        df_birthdays = dataset["df_birthdays"]

        return df_employee.join(other=df_birthdays, on="id")


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using two non consuming transformers")
etl = (
    Orchestrator()
    .extract_from(OfficeEmployeeExtractor(dataset_key="df_employee"))
    .extract_from(OfficeBirthdaysExtractor(dataset_key="df_birthdays"))
    .transform_with(
        IntegerTransformerNC(
            dataset_input_keys="df_employee",
            dataset_output_key="df_employee_transformed",
        )
    )
    .transform_with(
        JoinTransformerNC(
            dataset_input_keys=["df_employee_transformed", "df_birthdays"],
            dataset_output_key="df_final",
        )
    )
    .load_into(NoopLoader(dataset_input_key="df_final"))
)
etl.execute()

Example-8

This example illustrates the use of an orchestrator as just another ETL step. The principle is called composit orchestration:

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import DataFrame

from spetlr.etl import Extractor, Transformer, Loader, Orchestrator, dataset_group
from spetlr.spark import Spark


class AmericanGuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                ]
            ),
            T.StructType(
                [
                    T.StructField("id", T.StringType()),
                    T.StructField("brand", T.StringType()),
                    T.StructField("model", T.StringType()),
                    T.StructField("year", T.StringType()),
                ]
            ),
        )


class JapaneseGuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("3", "Ibanez", "RG", "1987"),
                    ("4", "Takamine", "Pro Series", "1959"),
                ]
            ),
            T.StructType(
                [
                    T.StructField("id", T.StringType()),
                    T.StructField("brand", T.StringType()),
                    T.StructField("model", T.StringType()),
                    T.StructField("year", T.StringType()),
                ]
            ),
        )


class CountryOfOriginTransformer(Transformer):
    def process_many(self, dataset: dataset_group) -> DataFrame:
        usa_df = dataset["AmericanGuitarExtractor"].withColumn("country", F.lit("USA"))
        jap_df = dataset["JapaneseGuitarExtractor"].withColumn("country", F.lit("Japan"))
        return usa_df.union(jap_df)


class OrchestratorLoader(Loader):
    def __init__(self, orchestrator: Orchestrator):
        super().__init__()
        self.orchestrator = orchestrator

    def save_many(self, datasets: dataset_group) -> None:
        self.orchestrator.execute(datasets)


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using composit inner orchestrator")
etl_inner = (
    Orchestrator()
    .transform_with(CountryOfOriginTransformer())
    .load_into(NoopLoader())
)

etl_outer = (
    Orchestrator()
    .extract_from(AmericanGuitarExtractor())
    .extract_from(JapaneseGuitarExtractor())
    .load_into(OrchestratorLoader(etl_inner))
)

etl_outer.execute()


Extractors documentation

This page documents all the spetlr extractors, following the OETL pattern.

Extractors in spetlr:

Eventhub stream extractor

This extractor reads data from an Azure eventhub and returns a structural streaming dataframe.

Under the hood spark azure eventhub is used, and this maven library

from spetlr.etl import Extractor

class EventhubStreamExtractor(Extractor):
    def __init__(self, 
                 consumerGroup: str,
                 connectionString: str = None,
                 namespace: str = None,
                 eventhub: str = None,
                 accessKeyName: str = None,
                 accessKey: str = None,
                 maxEventsPerTrigger: int = 10000):
        ...

Usage example with connection string:

eventhubStreamExtractor = EventhubStreamExtractor(
    consumerGroup="TestConsumerGroup",
    connectionString="TestSecretConnectionString",
    maxEventsPerTrigger = 100000
)

Usage example without connection string:

eventhubStreamExtractor = EventhubStreamExtractor(
    consumerGroup="TestConsumerGroup",
    namespace="TestNamespace",
    eventhub="TestEventhub",
    accessKeyName="TestAccessKeyName",
    accessKey="TestSecretAccessKey",
    maxEventsPerTrigger = 100000
)

Usage example with defining start timestamp:

eventhubStreamExtractor = EventhubStreamExtractor(
    consumerGroup="TestConsumerGroup",
    connectionString="TestSecretConnectionString",
    maxEventsPerTrigger = 100000,
    startEnqueuedTime = datetime.utcnow()
)

Example

This section elaborates on how the EventhubStreamExtractor extractor works and how to use it in the OETL pattern.

from pyspark.sql import DataFrame
from pyspark.sql.types import T
import pyspark.sql.functions as F

from spetlr.etl import Transformer, Loader, Orchestrator
from spetlr.extractors import EventhubStreamExtractor
from spetlr.functions import init_dbutils

class BasicTransformer(Transformer):
    def process(self, df: DataFrame) -> DataFrame:
        print('Current DataFrame schema')
        df.printSchema()

        df = df.withColumn('body', F.col('body').cast(T.StringType()))

        print('New DataFrame schema')
        df.printSchema()
        return df


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> DataFrame:
        df.write.format('noop').mode('overwrite').save()
        return df


print('ETL Orchestrator using EventhubStreamExtractor')
etl = (Orchestrator()
        .extract_from(EventhubStreamExtractor(
            consumerGroup="TestConsumerGroup",
            connectionString=init_dbutils().secrets.get(scope = "TestScope", key = "TestSecretConnectionString"),
            maxEventsPerTrigger = 100000
        ))
        .transform_with(BasicTransformer())
        .load_into(NoopLoader())
        )
result = etl.execute()

Incremental extractor

This extractor only select the newest data from the source by comparing with a target table.

Example

What is extracted?

"""
Source has the following data:

|id| stringcol    | timecol          |
|--|--------------|------------------|
|1 | "string1"    | 01.01.2021 10:50 |
|22| "string2inc" | 01.01.2021 10:56 |
|3 | "string3"    | 01.01.2021 11:00 |

Target has the following data

|id| stringcol    | timecol          |
|--|--------------|------------------|
|1 | "string1"    | 01.01.2021 10:50 |
|2| "string2"     | 01.01.2021 10:55 |

So data from after 01.01.2021 10:55 should be read

|id| stringcol    | timecol          |
|--|--------------|------------------|
|22| "string2inc" | 01.01.2021 10:56 |
|3 | "string3"    | 01.01.2021 11:00 |
"""

How to use it:

from pyspark.sql import DataFrame
import pyspark.sql.functions as f
from spetlr.etl.extractors import IncrementalExtractor
from spetlr.delta import DeltaHandle
from spetlr.etl import Transformer, Loader, Orchestrator

class BasicTransformer(Transformer):
    def process(self, df: DataFrame) -> DataFrame:
        df = df.withColumn('idAsString', f.col('id').cast("string"))
        return df

class NoopLoader(Loader):
    def save(self, df: DataFrame) -> DataFrame:
        df.write.format('noop').mode('overwrite').save()
        return df

etl = (Orchestrator
        .extract_from(IncrementalExtractor(
            handle_source=DeltaHandle.from_tc("SourceId"),
            handle_target=DeltaHandle.from_tc("TargetId"),
            time_col_source="TimeColumn",
            time_col_target="TimeColumn",
            dataset_key="source"
        ))
        .transform_with(BasicTransformer())
        .load_into(NoopLoader())
        )
result = etl.execute()

Functions documentation

Functions in spetlr:

Drop table cascade

The function drops a databricks database table and deletes the directory associated with the table.

def drop_table_cascade(DBDotTableName: str) -> None:    
    ...

Usage example:

drop_table_cascade("test.testTable")

Example

This section elaborates on how the drop_table_cascade function works with a small example.

Create a test database:


CREATE DATABASE IF NOT EXISTS test
COMMENT "A test database"
LOCATION "/tmp/test/"

Create a test table:

CREATE TABLE IF NOT EXISTS test.testTable(
  Id STRING,
  sometext STRING,
  someinteger INT
)
USING DELTA
COMMENT "Contains test data"
LOCATION "/tmp/test/testTable"

Insert dummy data into the test table:

insert into test.testTable values ("ID1","hello",1)

Even if the table is dropped using drop table test.testTable, the table files are not neccesarily deleted - see Databrick documentation. Try to run the following code and see by your self:

drop table test.testTable; 

CREATE TABLE IF NOT EXISTS test.testTable(
  Id STRING,
  sometext STRING,
  someinteger INT
)
USING DELTA
COMMENT "Contains test data"
LOCATION "/tmp/test/testTable";

select * from test.testTable;

You will notice, that when creating the table using files from the same path as previous, no files were deleted using drop table. If you in stead use:

drop_table_cascade("test.testTable")

The testTable is dropped and the files (directory) are deleted.


The orchestrators package contains complete orchestration steps that extract and transform from a set of sources and load the result to a target.

EventHub to Delta

A very common pattern in data platforms is that json documents are published to an azure eventhub with capture enabled. The dataplatform wants to ingest that data into a delta table with a given schema before carrying out further transformations.

The class EhJsonToDeltaOrchestrator has been designed to carry out this task with minimal configuration required.

The arguments to this orchestrator consist of

All important configurations follow from the schema and partitioning of the delta table.

There may be cases where only a subset of rows is desired to be extracted in the process. Here the orchestrator offers the method .filter_with which allows additional transformation steps to be injected before the rows are finally appended to the delta table.

Eventhub to medallion architecture

“A medallion architecture is a data design pattern used to logically organize data in a lakehouse, with the goal of incrementally and progressively improving the structure and quality of data as it flows through each layer of the architecture:

(from Bronze ⇒ Silver ⇒ Gold layer tables).

Medallion architectures are sometimes also referred to as “multi-hop” architectures.”

Source: Databricks.com

EventHub to Bronze

The class EhToDeltaBronzeOrchestrator has been designed to carry out the task of ingest eventhub data to a bronze layer. Data is always appended to the bronze table. By utilizing EhJsonToDeltaExtractor from previous section data is extracting incrementally.

“The Bronze layer is where we land all the data from external source systems. The table structures in this layer correspond to the source system table structures “as-is,” along with any additional metadata columns that capture the load date/time, process ID, etc. The focus in this layer is quick Change Data Capture and the ability to provide an historical archive of source (cold storage), data lineage, auditability, reprocessing if needed without rereading the data from the source system.”

Source: Databricks.com

The schema of the captured eventhub data can be found here: Exploring captured Avro files in Azure Event Hubs. Explanation of some of the columns can be found here and here.

Using the orchestrator without adding any filtering .filter_with the output schema is the following:

Column Name Data type Explanation
EventhubRowId Long An ID generated to give a unique id for row in the bronze table. Calculated based on sha2 hashing the Body and EnqueuedTimestamp. NB: There is a possibility for non-uniqueness.
BodyId Long An ID generated to give a unique id for each unique Body message. Calculated based on sha2 hashing the Body. Can be used for identify rows with same Body.
Body String The eventhub body casted as a string - for readability and searchability. Transformed version of the binary body.
EnqueuedTimestamp Timestamp The enqueueded time of the eventhub row. This is a transformation of the EnqueuedTime, which is the date and time, in UTC, of when the event was enqueued in the Event Hub partition.
StreamingTime Timestamp A timestamp added in the moment the orchestrator processed eventhub data.
SequenceNumber Long Gets the logical sequence number of the event within the partition stream of the Event Hub.
Offset String Gets the offset of the data relative to the Event Hub partition stream. The offset is a marker or identifier for an event within the Event Hubs stream. The identifier is unique within a partition of the Event Hubs stream.
SystemProperties String The set of free-form event properties which were provided by the Event Hubs service to pass metadata associated with the event or associated Event Hubs operation.
Properties String The set of free-form properties which may be used for associating metadata with the event that is meaningful within the application context.
pdate Timestamp A transformation of the eventhub partitioning set to a timestamp. See previous section.
     

The bronze schema should “at least” contain the following schema. The columns of this schema is asserted in the bronze transformer step.

from pyspark.sql.types import (
    LongType,
    StringType,
    StructField,
    StructType,
    TimestampType,
)

schema_bronze = StructType(
        [
            StructField("EventhubRowId", LongType(), True),
            StructField("BodyId", LongType(), True),
            StructField("Body", StringType(), True),
            StructField("EnqueuedTimestamp", TimestampType(), True),
            StructField("StreamingTime", TimestampType(), True),
            StructField("SequenceNumber", LongType(), True),
            StructField("Offset", StringType(), True),
            StructField("SystemProperties", StringType(), True),
            StructField("Properties", StringType(), True),
            StructField("pdate", TimestampType(), True),
        ]
    )

Example

The arguments to this orchestrator consist of

There may be cases where only a subset of rows is desired to be extracted in the process. Here the orchestrator offers the method .filter_with which allows additional transformation steps to be injected before the rows are finally appended to the delta table.

from spetlr.delta import DeltaHandle
from spetlr.eh.EventHubCaptureExtractor import EventHubCaptureExtractor
from spetlr.orchestrators import EhToDeltaBronzeOrchestrator

eh=EventHubCaptureExtractor.from_tc("eh_id")
dh=DeltaHandle.from_tc("dh_id")

orchestrator = EhToDeltaBronzeOrchestrator(eh=eh, dh=dh)
orchestrator.execute()

Eventhub to Silver

The class EhToDeltaSilverOrchestrator has been designed to carry out the task of unpacking and transforming bronze eventhub data to the silver layer.

“In the Silver layer of the lakehouse, the data from the Bronze layer is matched, merged, conformed and cleansed (“just-enough”) so that the Silver layer can provide an “Enterprise view” of all its key business entities, concepts and transactions. (e.g. master customers, stores, non-duplicated transactions and cross-reference tables).”

Source: Databricks.com

Data is per default incrementally upserted to the silver table. By utilizing EhJsonToDeltaTransformer from previous section the schema of the dh_target is used for unpacking the eventhub bronze schema. It is therefore only neccesary to define the target delta schema. There may be cases where only a subset of rows is desired to be extracted in the process. Here the orchestrator offers the method .filter_with which allows additional transformation steps to be injected before the rows are finally upserted to the delta table.

Note: It is possible to choose either append or overwrite instead of upsert. Keep in mind, that the extracter will in these cases use the SimpleExtractor and SimpleLoader for extracting/loading data.

Example

The arguments to this orchestrator consist of

The schema of dh_target defines how the eventhub data is unpacked.

from spetlr.delta import DeltaHandle
from spetlr.orchestrators import EhToDeltaSilverOrchestrator

dh_source=DeltaHandle.from_tc("dh_source_id")
dh_target=DeltaHandle.from_tc("dh_target_id")

orchestrator = EhToDeltaSilverOrchestrator(dh_source=dh_source, dh_target=dh_target)

orchestrator.execute()

Eventhub to gold

What about the gold layer? Since the gold layer often associates with customade business logic - no orchestrator is implemented for the purpose.

“Data in the Gold layer of the lakehouse is typically organized in consumption-ready “project-specific” databases. The Gold layer is for reporting and uses more de-normalized and read-optimized data models with fewer joins. The final layer of data transformations and data quality rules are applied here.”

Source: Databricks.com


The Schema Manager Class

The Schema Manager class is the intended way to retrieve schemas in spetlr. It can be desirable to retrieve the schemas of dataframes at various times, for example to ensure that the data has the correct form before being written to a table or to be able to reason on the columns included in incoming data.

The SchemaManager class is a singleton class that can be invoked anywhere to get any schema defined in a dataplatform using offline methods.

Summary

This section will briefly cover the included methods and their use, the following sections go into slightly more detail with how they are implemented. The main methods are:

Schema Types

Schemas can be defined using various types and in various locations in the platform. Currently, the schema manager supports:

Schemas are written directly to the .yaml files under the schema attribute, with the except of pyspark schemas, which are defined in separate files. For pyspark schemas to be available, they must be registered in the Schema Manager under a name with the register_schema function.

Communication with the Configurator

Schemas defined directly in .yaml files are made available through the configurator. The get_schema-method of the schema manager works as follows:

  1. Look for the given name in the registered schemas.
  2. If not found, query the Configurator for the schema attribute of a table with the given name.
  3. If the result is a string, it must be referencing a pyspark schema, which should be registered.
  4. Otherwise, it must be a dict with the schema defined in the .yaml file and both the schema and its type can be found here.

Similarly, the get_all_schemas-method iterates over the Configurators table_details to find all tables that have a schema attribute.


Spetlr SQL documentation

SQL methods ovrview:

SQL Server Class

Let’s say you have an Azure SQL server called youservername with an associated database yourdatabase. The username is explicitly defined here as customusername and the password as [REDACTED]. Variables related to security challenges like passwords are recommended to be located in e.g. databricks secrets. Here is a usage example:

from spetlr.sql import SqlServer
class ExampleSqlServer(SqlServer):
    def __init__(
        self,
        database: str = "yourdatabase",
        hostname: str = None,
        username: str = None,
        password: str = None,
        port: str = "1433",
    ):

        self.hostname = "yourservername.database.windows.net" if hostname is None else hostname
        self.username = "customusername" if username is None else username
        
        # The password should of course be collected from e.g. databricks secrets
        self.password ="[REDACTED]" if password is None else password 
        
        self.database = database
        self.port = port
        super().__init__(
            self.hostname, self.database, self.username, self.password, self.port
        )

Using SPN to connect

If you are using a Service Principal to create connection to the server/database, you should set the spnid = "yourspnid", spnpassword = "[REDACTED]" instead. The SqlServer class ensures to connect properly via the JDBC/ODBC drivers. Note, you must use either SQL user credentials or SPN credentials - never both.

SQL Upsert

The method upserts (updates or inserts) a databricks dataframe into a target sql table.

def upsert_to_table_by_name(
        self,
        df_source: DataFrame,
        table_name: str,
        join_cols: List[str],
        filter_join_cols: bool = True,
        overwrite_if_target_is_empty: bool = True,
    ):
    ...

If ‘filter_join_cols’ is True, the dataframe will be filtered to not contain Null values in sql merge operation. This can be set to False to save compute if this check is already handled.

If ‘overwrite_if_target_is_empty’ is True, the first row of the target table is read and if empty the dataframe is overwritten to the table instead of doing a merge. This can be set to False to save compute if the programmer knows the target never will be empty.

Usage example:

sql_server.upsert_to_table_by_name(df_new, "tableName", ["Id"])

SqlExecutor

This nice class can help parse and execute sql-files. It can be used for both executing spark and Azure sql queries.

NB: The parser uses the semicolon-character (;) to split the queries. If the character is used for other purposes than closing a query, there might arise some executing issues. Please use the Spark.get().sql() or SqlServer.sql() instead.

In the example below the SqlExecutor is inherited, and your sql server is used (see SQL Server Class). Furthermore, provide the module of the sql-files which can be executed into the base_module-variable.

from tests.cluster.sql import extras
from tests.cluster.sql.DeliverySqlServer import DeliverySqlServer
from spetlr.sql import SqlExecutor

class DeliverySqlExecutor(SqlExecutor):
    def __init__(self):
        super().__init__(base_module=extras, server=DeliverySqlServer())

If one need to execute sql queries in Databricks using Spark, there is no need for providing a server. By default, the class uses the spetlr Spark class.

from spetlr.sql import SqlExecutor

class SparkSqlExecutor(SqlExecutor):
    def __init__(self):
        super().__init__(base_module=extras)

In the setup job, one could consider to create all delivery SQL tables:

from spetlr.configurator import Configurator
from tests.cluster.delta.SparkExecutor import SparkSqlExecutor
from tests.cluster.sql.DeliverySqlExecutor import DeliverySqlExecutor

# In setup.py
def setup_production_tables():
    Configurator().set_prod()
    SparkSqlExecutor().execute_sql_file("*")
    DeliverySqlExecutor().execute_sql_file("*")

It can be useful to have sql files among the resources that are only applied in special cases. The SqlExecutor therefore allows to exclude these files with the exclude_pattern parameter. Any name where this pattern is found in the name will not be included in the execution.


Transformations documentation

Transformations in spetlr:

Concatenate data frames

UPDATE: Pyspark has an equivalent implementation .unionByName(df, allowMissingColumns=False), see the documentation for more information.

The transformation unions dataframes by appending the dataframes on each other and keep all columns.

from pyspark.sql import DataFrame
from typing import List

def concat_dfs(dfs: List[DataFrame]) -> DataFrame:   
    ...

Usage example:

concat_dfs([df1,df2,df3])

Example

This section elaborates on how the concat_dfs function works with a small example.

Create three test datasets:

df1 =   Spark.get().createDataFrame(
        Spark.get().sparkContext.parallelize([
            ('1', 'Fender', 'Telecaster', '1950'),
            ('2', 'Gibson', 'Les Paul', '1959'),
            ('3', 'Ibanez', 'RG', '1987')
        ]),
        StructType([
            StructField('id', StringType()),
            StructField('brand', StringType()),
            StructField('model', StringType()),
            StructField('year', StringType()),
        ]))

df2 = Spark.get().createDataFrame(
        Spark.get().sparkContext.parallelize([
            ('1', 'Fender', 'Stratocaster', 'Small'),
            ('2', 'Gibson', 'Les Paul Junior', 'Medium'),
            ('3', 'Ibanez', 'JPM', 'Large')
        ]),
        StructType([
            StructField('id', StringType()),
            StructField('brand', StringType()),
            StructField('model', StringType()),
            StructField('size', StringType()),
        ]))

Concatenate (union) the two dataframes:

 # SPETLR's "concat_dfs"
 result = concat_dfs([df1,df2])
 
 # pyspark's unionByName
 result = df1.unionByName(df2, allowMissingColumns=True)

Print the dataframe:

 result.show()

The output is then:

+------+---+---------------+------+----+
| brand| id|          model|  size|year|
+------+---+---------------+------+----+
|Fender|  1|     Telecaster|  null|1950|
|Gibson|  2|       Les Paul|  null|1959|
|Ibanez|  3|             RG|  null|1987|
|Fender|  1|   Stratocaster| Small|null|
|Gibson|  2|Les Paul Junior|Medium|null|
|Ibanez|  3|            JPM| Large|null|
+------+---+---------------+------+----+

See that the columns “brand”, “id”, “model”, “size” (from df2) and “year” (from df1) are added to the dataframe consisting of the union of df1 and df2.

Fuzzy Select Transformer

The FuzzySelectTransformer is an ETL transformer that can process a single dataframe. Its purpose is to help create short concise select code that is somewhat insensitive to source columns that are misspelled or use different capitalization.

To use, construct the FuzzySelectTransformer with the following arguments:

Under the hood, difflib is used to find a suitable unique mapping from source to target columns. All column names are converted to lower case before matching.

The association of target to source columns is required to be unique. If the algorithm identifies multiple matching source columns to a target name, an exception will be raised.

Example

Given a dataframe df, this code renames all columns:

>>> df.show()
+----+-----+------+
|inex|count|lables|
+----+-----+------+
|   1|    2|   foo|
|   3|    4|   bar|
+----+-----+------+
>>> from spetlr.transformers.fuzzy_select import FuzzySelectTransformer
>>> ft = FuzzySelectTransformer(["Index", "Count", "Label"])
>>> ft.process(df).show()
+-----+-----+-----+
|Index|Count|Label|
+-----+-----+-----+
|    1|    2|  foo|
|    3|    4|  bar|
+-----+-----+-----+

Merge df into target

The transformation merges a databricks dataframe into a target database table.

def merge_df_into_target(df: DataFrame,
    table_name: str,
    database_name: str,
    join_cols: List[str]) -> None:    
    ...

Usage example:

merge_df_into_target(df_new, "testTarget", "test", ["Id"])

Example

The following queries crate a test table with two rows containing guitar data:

CREATE DATABASE IF NOT EXISTS test
COMMENT "A test database"
LOCATION "/tmp/test/";

CREATE TABLE IF NOT EXISTS test.testTarget(
  Id STRING,
  Brand STRING,
  Model STRING
)
USING DELTA
COMMENT "Contains merge test target rows"
LOCATION "/tmp/test/testTarget";

insert into test.testTarget values ("2","Gibson","Les Paul");

select * from testTarget.test
+----+-----+----+-----------+
|Id  |    Brand |      Model|
+----+-----+----+-----------+
|   2|    Gibson|   Les Paul|
+----+----------+-----------+

The following dataframe has one row that will be merged with Id=2, and the other rows are going to be inserted:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType
df_new=spark.createDataFrame(
        spark.sparkContext.parallelize([
            ("1", "Fender", "Jaguar"),
            ("2", "Gibson","Starfire"),
            ("3", "Ibanez", "RG")
        ]),
        StructType([
            StructField("Id", StringType(), False),
            StructField("Brand", StringType(), True),
          StructField("Model", StringType(), True),
        ]))

Use the transformation to merge data into the test delta table:

merge_df_into_target(df_new, "testTarget", "test", ["Id"])

%sql

select * from test.testTarget order by Id

+----+-----+----+-----------+
|Id  |    Brand |      Model|
+----+-----+----+-----------+
|   1|    Fender|     Jaguar|
|   2|    Gibson|   Starfire|
|   3|    Ibanez|         RG|
+----+----------+-----------+

As one can see, the row with id=2 is now merged such that the model went from “Les Paul” to “Starfire”. The two other rows where inserted.

DropOldestDuplicates

This transformation helps to drop duplicates based on time. If there is multiple duplicates, only the newest row remain. In the example below, a dataframe has several duplicates - since a unique record is defined by a combination of a guitar-id, model and brand. As times go by the amount of guitars available in a store changes. Let’s assume that we only want the newest record and dropping the oldest duplicates:

from spetlr.utils.DropOldestDuplicates import DropOldestDuplicates
data =

| id| model|     brand|amount|         timecolumn|
+---+------+----------+------+-------------------+
|  1|Fender|Telecaster|     5|2021-07-01 10:00:00|
|  1|Fender|Telecaster|     4|2021-07-01 11:00:00|
|  2|Gibson|  Les Paul|    27|2021-07-01 11:00:00|
|  3|Ibanez|        RG|    22|2021-08-01 11:00:00|
|  3|Ibanez|        RG|    26|2021-09-01 11:00:00|
|  3|Ibanez|        RG|    18|2021-10-01 11:00:00|
+---+------+----------+------+-------------------+

df = DropOldestDuplicatesTransformer( 
            cols=["id", "model", "brand"], 
            orderByColumn="timecolumn"
            ).process(data)
df.show()

| id| model|     brand|amount|         timecolumn|
+---+------+----------+------+-------------------+
|  1|Fender|Telecaster|     4|2021-07-01 11:00:00|
|  2|Gibson|  Les Paul|    27|2021-07-01 11:00:00|
|  3|Ibanez|        RG|    18|2021-10-01 11:00:00|
+---+------+----------+------+-------------------+

Notice, the oldest duplicates are dropped.

TimeZoneTransformer

This transformation uses latitude and longitude values to determine the timezone of a specific location. The example below shows how to apply the transformer of an input DataFrame to get a column with timezones. Notice, when either the latitude or longitude value is None, the returned timezone will also be None.

from spetlr.transformers import TimeZoneTransformer
data =

|   latitude| longitude|
+-----------+----------+
| 51.519487 | -0.083069|
| 55.6761   |   12.5683|
| None      |      None|
| None      | -0.083069|
| 51.519487 |      None|
+-----------+----------+

df = TimeZoneTransformer( 
            latitude_col="latitude",
            longitude_col="longitude",
            column_output_name="timezone"
        ).process(data)
df.show()

|   latitude| longitude|            timezone|
+-----------+----------+--------------------+
| 51.519487 | -0.083069|     "Europe/London"|
| 55.6761   |   12.5683| "Europe/Copenhagen"|
| None      |      None|                None|
| None      | -0.083069|                None|
| 51.519487 |      None|                None|
+-----------+----------+--------------------+

SelectAndCastColumnsTransformer

This transformation is selecting and casting columns in dataframe based on pyspark schema. If case-insensitive matching is desired, caseInsensitiveMatching can be set to True

from spetlr.transformers import SelectAndCastColumnsTransformer
data =

|         id|    number|     value|
+-----------+----------+----------+
|         1 |       42 |        1 |
|         2 |      355 |        0 |
+-----------+----------+----------+

desired_schema = T.StructType(
    [
        T.StructField("id", T.StringType(), True),
        T.StructField("value", T.BooleanType(), True),
    ]
)

df = SelectAndCastColumnsTransformer( 
      schema=desired_schema,
      caseInsensitiveMatching=False
  ).process(data)
df.show()

|         id|     value|
+-----------+----------+
|       "1" |     True |
|       "2" |    False |
+-----------+----------+

ValidFromToTransformer

This transformer introduces Slowly Changing Dimension 2 (SCD2) columns to a dataframe. The three introduced SCD2 columns are: ValidFrom, ValidTo and IsCurrent. The logic build the SCD2 history based on a time formatted column (the parameter time_col). One can easily extract only active (current) data by applying .filter(“iscurrent=1”) on the dataframe.

Disclaimer: Use only on “full loading” / overwrite.

Usage example:

from spetlr.transformers.ValidFromToTransformer import ValidFromToTransformer
data =

| id| model|     brand|amount|         timecolumn|
+---+------+----------+------+-------------------+
|  1|Fender|Telecaster|     5|2021-07-01 10:00:00|
|  1|Fender|Telecaster|     5|2021-07-01 10:00:00|
|  1|Fender|Telecaster|     4|2021-07-01 11:00:00|
|  2|Gibson|  Les Paul|    27|2021-07-01 11:00:00|
|  3|Ibanez|        RG|    22|2021-08-01 11:00:00|
|  3|Ibanez|        RG|    26|2021-09-01 11:00:00|
|  3|Ibanez|        RG|    18|2021-10-01 11:00:00|
+---+------+----------+------+-------------------+


df = ValidFromToTransformer(
            time_col="timecolumn",
            wnd_cols=["id", "model", "brand"]
            )
            .process(data)
            .drop("timecolumn")
            .orderBy(f.col("ValidFrom").asc(), f.col("ValidTo").asc())

df.show()

| id| model|     brand|amount|          validfrom|            validto|iscurrent|
+---+------+----------+------+-------------------+-------------------+---------+
|  1|Fender|Telecaster|     5|2021-07-01 10:00:00|2021-07-01 11:00:00|    false|
|  1|Fender|Telecaster|     4|2021-07-01 11:00:00|2262-04-11 00:00:00|     true|
|  2|Gibson|  Les Paul|    27|2021-07-01 11:00:00|2262-04-11 00:00:00|     true|
|  3|Ibanez|        RG|    22|2021-08-01 11:00:00|2021-09-01 11:00:00|    false|
|  3|Ibanez|        RG|    26|2021-09-01 11:00:00|2021-10-01 11:00:00|    false|
|  3|Ibanez|        RG|    18|2021-10-01 11:00:00|2262-04-11 00:00:00|     true|
+---+------+----------+------+-------------------+-------------------+---------+


# Select only the active (current) rows in the dataframe

df.filter("iscurrent=1").show()

 +---+------+----------+------+-------------------+-------------------+---------+
| id| model|     brand|amount|          validfrom|            validto|iscurrent|
+---+------+----------+------+-------------------+-------------------+---------+
|  1|Fender|Telecaster|     4|2021-07-01 11:00:00|2262-04-11 00:00:00|     true|
|  2|Gibson|  Les Paul|    27|2021-07-01 11:00:00|2262-04-11 00:00:00|     true|
|  3|Ibanez|        RG|    18|2021-10-01 11:00:00|2262-04-11 00:00:00|     true|
+---+------+----------+------+-------------------+-------------------+---------+

Utilities documentation

Utilities in spetlr:

Api Auto Config

Using the method spetlr.db_auto.getDbApi() gives access to a DatabricksAPI instance that has been pre-configured for the current databricks instance. See databricks-api for usage documentation.

Under the hood the function uses the job context to get the host and token when on the cluster. When using spetlr with databricks-connect, the databricks-cli is called to configure the client. Thus, the function works without further configuration in all contexts.

Test Utilities

DataframeCreator

The DataframeCreator is a helper class to assist in writing concise unittests.

Unittests typically take a dataframe, often created with spark.createDataFrame and transform it. The function createDataFrame requires all data fields to be assigned a value, even if the given unittest is only concerned with a small subset of them.

This class allows the user to specify which columns she wants to give values for. All other columns will be assigned null values.

Usage:

from spetlr.utils import DataframeCreator
from spetlr.schema_manager.schema import get_schema

df = DataframeCreator.make_partial(
    schema=get_schema("""
                Id INTEGER,
                measured DOUBLE,
                customer STRUCT<
                    name:STRING,
                    address:STRING
                >,
                product_nos ARRAY<STRUCT<
                    no:INTEGER,
                    name:STRING
                >>
            """),
    columns=[
        "Id",
        # of the customer structure, only specify the name
        ("customer", ["name"]),
        # of the products array of structures, only specify the 'no' field in each row
        ("product_nos", ["no"])
    ],
    data=[
        (1, ("otto",), [(1,), (2,)]),
        (2, ("max",), []),
    ],
)
df.show()

Result:

| Id|measured|    customer|         product_nos|
+---+--------+------------+--------------------+
|  1|    null|{otto, null}|[{1, null}, {2, n...|
|  2|    null| {max, null}|                  []|
+---+--------+------------+--------------------+

Git Hooks

A set of standard git hooks are included to provide useful functionality

To use the hooks, they can be installed in any repository by executing this command from a path inside the repository:

spetlr-git-hooks

To uninstall the hooks, simply run this command

spetlr-git-hooks uninstall