Using dbt with Dagster, part three: Define assets upstream of your dbt models#

This is part three of the Using dbt with Dagster software-defined assets tutorial.

By this point, you've set up a dbt project and loaded dbt models into Dagster as assets.

However, the tables at the root of the pipeline are static: they're dbt seeds, CSVs that are hardcoded into the dbt project. In a more realistic data pipeline, these tables would typically be ingested from some external data source, for example by using a tool like Airbyte or Fivetran, or by Python code.

These ingestion steps in the pipeline often don't make sense to define inside dbt, but they often still do make sense to define as Dagster software-defined assets. You can think of a Dagster software-defined asset as a more general version of a dbt model. A dbt model is one kind of asset, but another kind is one that's defined in Python, using Dagster's Python API.

In this section, you'll replace the raw_customers dbt seed with a Dagster asset that represents it. You'll write Python code that populates this table by fetching data from the web. This will allow you to launch runs that first execute Python code to populate the raw_customers table and then invoke dbt to populate the downstream tables.

You'll:


Step 1: Install the Pandas and DuckDB Python libraries#

The Dagster asset that you write will fetch data using Pandas and write it out to your DuckDB warehouse using DuckDB's Python API. To use these, you'll need to install them:

pip install pandas duckdb

Step 2: Define an upstream Dagster asset#

To fetch the data the dbt models require, we'll write a Dagster asset for raw_customers. We'll put this asset in our definitions.py file, inside the jaffle_dagster directory. This is the file that contains the code that loads our dbt models, which we reviewed at the end of the last section. Copy and paste this code into that file, replacing everything up to dbt = DbtCliResource(project_dir=os.fspath(dbt_project_dir)):

import os
from pathlib import Path

import duckdb
import pandas as pd
from dagster_dbt import DbtCliResource, build_schedule_from_dbt_selection, dbt_assets

from dagster import Definitions, OpExecutionContext, asset

dbt_project_dir = Path(__file__).joinpath("..", "..", "..").resolve()
duckdb_database_path = dbt_project_dir.joinpath("tutorial.duckdb")


@asset
def raw_customers(context) -> None:
    data = pd.read_csv("https://docs.dagster.io/assets/customers.csv")
    connection = duckdb.connect(os.fspath(duckdb_database_path))
    connection.execute("create schema if not exists jaffle_shop")
    connection.execute(
        "create or replace table jaffle_shop.raw_customers as select * from data"
    )

    # Log some metadata about the table we just wrote. It will show up in the UI.
    context.add_output_metadata({"num_rows": data.shape[0]})


dbt = DbtCliResource(project_dir=os.fspath(dbt_project_dir))

Let's review the changes we made:

  1. At the top, we added imports for pandas and duckdb, which we use for fetching data into a DataFrame and writing it to DuckDB.

  2. We added a duckdb_database_path variable, which holds the location of our DuckDB database. Remember that DuckDB databases are just regular files on the local filesystem. The path is the same path that we used when we configured our profiles.yml file. This variable is used in the implementations of the raw_customers asset.

  3. We added a definition for the raw_customers table by writing a function named raw_customers and decorating it with the @asset decorator. The implementation inside the function fetches data from the internet and writes it to a table in our DuckDB database. Similar to how running a dbt model executes a select statement, materializing this asset will execute this Python code.

Finally, let's update the assets argument of our Definitions object, at the bottom of the file, to include the new asset we just defined:

defs = Definitions(
    assets=[raw_customers, jaffle_shop_dbt_assets],
    schedules=schedules,
    resources={
        "dbt": dbt,
    },
)

Step 3: In the dbt project, replace a seed with a source#

  1. Because we're replacing it with a Dagster asset, we no longer need the dbt seed for raw_customers, so we can delete it:

    cd ..
    rm seeds/raw_customers.csv
    
  2. Instead, we want to tell dbt that raw_customers is a table that is defined outside of the dbt project. We can do that by defining it inside a dbt source.

    Create a file called sources.yml inside the models/ directory, and put this inside it:

    version: 2
    
    sources:
      - name: jaffle_shop
        tables:
          - name: raw_customers
            meta:
              dagster:
                asset_key: raw_customers
    

This is a standard dbt source definition, with one addition: it includes metadata, under the meta property, that specifies the Dagster asset that it corresponds to. When Dagster reads the contents of the dbt project, it reads this metadata and infers the correspondence. For any dbt model that depends on this dbt source, Dagster then knows that the Dagster asset corresponding to the dbt model should depend on the Dagster asset corresponding to the source.

  1. Then, update the model that depends on the raw_customers seed to instead depend on the source. Replace the contents of model/staging/stg_customers.sql with this:

    with source as (
    
        {#-
        Use source instead of seed:
        #}
        select * from {{ source('jaffle_shop', 'raw_customers') }}
    
    ),
    
    renamed as (
    
        select
            id as customer_id,
            first_name,
            last_name
    
        from source
    
    )
    
    select * from renamed
    

Step 4: Materialize the assets using the Dagster UI#

Now let's load our data pipeline in the UI to see what we've built:

cd jaffle_dagster/

DAGSTER_DBT_PARSE_PROJECT_ON_LOAD=1 dagster dev

When you open the UI, you'll be looking at a job that contains only your dbt assets, with a link to the raw_customers asset.

dbt job with link to upstream asset

If you click on "default" under the "Asset groups" section in the left navigation pane, you'll get to a view that includes all your assets - the dbt ones and the raw_customers one.

Asset group with dbt models and Python asset

Click the Materialize all button. This will launch a run with two steps:

  • Run the raw_customers Python function to fetch data and write the raw_customers table to DuckDB.
  • Run all the dbt models using dbt build, like in the last section.

If you click to view the run, you can see a graphical representation of these steps, along with logs.

Run page for run with dbt models and Python asset

What's next?#

At this point, you've built and materialized an upstream Dagster asset, providing source data to your dbt models. In the last section of the tutorial, we'll show you how to add a downstream asset to the pipeline.