Spetlr

A python ETL libRary (SPETLR) for Databricks powered by Apache SPark.

Enables a standardized ETL framework and integration testing for Databricks.


Project maintained by spetlr-org Hosted on GitHub Pages — Theme by mattgraham

Why use SPETLR?

See SPETLR Documentation.

From a developer’s perspective:

From a business perspective:

SPETLR has a lot of great tools for working with ETL in Databricks. But to make it easy for you to consider why you need SPETLR here is a list of the core features:

Core feature: (O) ETL Framework

Short for Orchestrated Extract-Transform-Load is a pattern that takes the ideas behind variations of the Model-View-Whatever design pattern.

A very short overview of the framework:

etl orchestrator

from spetlr.etl import Extractor, Transformer, Loader, Orchestrator

(Orchestrator()
    .extract_from(Extractor())
    .transform_with(Transformer())
    .load_into(Loader())
    .execute())

Behind the scenes

How do the framework handle the dataframes?

The framework handles the dataframes in a Dataset dictionary: {key: DataFrame}.

Example: In the code and figure below, you can see an example of how data can flow through the SPETLR ETL framework:

from spetlr.etl import Extractor, Transformer, Loader, Orchestrator

(Orchestrator()
    .extract_from(ExtractorE1(dataset_key="E1"))
    .extract_from(ExtractorE2(dataset_key="E2"))
    .transform_with(TransformerT1())
    .transform_with(TransformerT2())
    .transform_with(TransformerT3())
    .load_into(LoaderL1())
    .execute())

ETL dataset group

Why should you use the SPETLR OETL framework?

Core feature: Integration test for Databricks

SPETLR provides a framework for creating test databases and tables before deploying to production. This is a must-have for Lakehouse platform in order to ensure reliable and stable data platforms.

Concept: Table abstractions

Every table exists in two “places”​:

schema_data_code

The left part (the delta schema in a storage account) is the actual state and the right part (Spark SQL code) is the desired state.

Let us say, that the ETL process is changed in a new Pull Request. How do we ensure that the new code actually works? How do we ensure that the ETL processes works on the new desired state?

The SPETLR solution is debug tables. The process works as follows:

debug_1

debug_2

debug_3

Pull Request problem and the SPETLR solution

The next question is then: How to create debug tables?

It is done by introducing dynamic table names. A delta table is not refferenced by the actual state name, but instead the code refers to a configurator key:

-- spetlr.Configurator key: MyDetailsTable
CREATE TABLE IF NOT EXISTS `my_db1{ID}.details`
(
  a int,
  b int,
  c string,
  d timestamp
)
USING DELTA
COMMENT "Dummy Database 1 details"
LOCATION "/{MNT}/foo/bar/my_db1{ID}/details/";

The “MyDetailsTable” is used for reference in the SPETLR handler classes (DeltaHandle, SqlHandle, etc.).

When debugging the following variables are set to:

When code is used in production the variables are set:

(NB: The MNT is used for Databricks workspaces that uses mounting to access storageaccounts. When using direct access, a similar variable as MNT can change the path to the storage account.)

Tool support

Keys needs to be configured:

from spetlr import Configurator
Configurator().add_resource_path("path/to/sql_files")

All tables’ access needs to use abstractions:

# Dont use
df = spark.table("my_db.details1")

# but use SPETLR
from spetlr.delta import DeltaHandle

df = DeltaHandle.from_tc("MyDetailsTable").read()

The Configurator is a singleton, and the .from_tc() method accesses the singleton. SQL code gets treated before execution (variables replaced).

Test execution (debug tables)

To execute a test:

# Set the configurator in debug mode
# ID = a GUID
# MNT = "tmp"
Configurator().set_debug()

# SqlExecutor utilizes the Configurator to parse SQL files
SqlExecutor("path/to/sql_files").execute_sql_file("*")

MyEtlOrchestrator().execute()

Production ETL

To execute production:

# Set the configurator in debug mode
# ID = ""
# MNT = "mnt"
Configurator().set_prod()

# SqlExecutor utilizes the Configurator to parse SQL files
SqlExecutor("path/to/sql_files").execute_sql_file("*")

MyEtlOrchestrator().execute()

Test improvements

SPETLR team recommends:

Summary

SPETLR contributes with the following:

Core feature: Data source handlers

SPETLR provide classes for handle reads and writes to different data sources from primarily Azure resources.

The object-oriented structure makes it easy to change handlers in the ETL flows that can handle different data sources with minimal code duplication.

data source handles

The handlers are well-tested against real Azure resources to ensure high quality and reliability. They can be used out of the box in various projects without needing extensive customization.

The handlers use primarly PySpark under the hood. Therefore, the handlers are a convenient way to streamline the process of working with multiple data sources in ETL flows.

In the code

from spetlr.cosmos import CosmosHandle
from spetlr.delta import DeltaHandle
from spetlr.etl import Extractor, Transformer, Loader, Orchestrator
from spetlr.sql import SqlHandle

# Eventhub not yet implemented as Handle
from spetlr.eh import EventHubCaptureExtractor



dh = DeltaHandle.from_tc("DeltaTableId")
eh = EventHubCaptureExtractor.from_tc("EventhubCaptureTableId")
sqlh = SqlHandle.from_tc("SQLTableId")
cosmosh = CosmosHandle.from_tc("CosmosTableId")

# All handles have the same methods
# The SimpleLoader uses the handlers' .read() method.
# You can use the SimpleLoader regardless
# of which kind of data source you use

(Orchestrator().
    .extract_from(SimpleLoader(handle=dh))
    .extract_from(SimpleLoader(handle=eh))
    .extract_from(SimpleLoader(handle=sqlh))
    .extract_from(SimpleLoader(handle=cosmosh))
    ...
)

Why should you use the data source handlers?