Skip to main content

Command Palette

Search for a command to run...

Encapsulating External Dependencies with Dagster Resources

By external dependencies I mean: DBs, APIs, and Queues

Updated
5 min read
Encapsulating External Dependencies with Dagster Resources
M
Data Platform Engineer by choice.

Why Encapsulation Matters in Data Pipelines

In real-world pipelines, external systems (databases, APIs, queues) are stateful, unreliable, and slow. Directly calls to them inside ops/assets creates:

  • Tight coupling: logic depends on specific implementations (e.g., Postgres vs MySQL)

  • Non-determinism: API latency, DB state changes, retries

  • Hard-to-test code: you can’t unit test without hitting real systems

Encapsulation solves this by:

  • Isolating side effects

  • Making pipelines predictable and testable

  • Allowing swapability (mock DB, fake API)

Think of it as:

Your pipeline should describe what to do, not how to talk to infrastructure.


Resources as Dependency Boundaries

Dagster Resources act as a formal boundary between orchestration logic and infrastructure.

Instead of:

def my_op():
    conn = psycopg2.connect(...)
    ...

You do:

@op(required_resource_keys={"db"})
def my_op(context):
    context.resources.db.execute_query(...)

What this gives you:

  • Inversion of control (IoC): Dagster injects dependencies

  • Centralized management: connection logic lives in one place

  • Replaceability: swap resource without touching ops


Designing Resource Interfaces for External Systems

A common mistake is exposing raw clients:

Bad:

context.resources.db.connection.cursor().execute(...)

Better:

context.resources.db.execute_query(query)

Key principles:

1. Minimal Inteface

Expose only what’s needed:

  • execute_query

  • fetch_dataframe

  • insert_records

2. Stable Contract

Your resource API should not change frequently, even if:

  • API version changes

  • DB schema changes.

3. Hide Implementation Details

  • No leaking ORM/session/cursor objects

  • No exposing raw HTTP clients


Database Resources: Connection Management & Query Abstraction

Core responsibilities:

  • Connection lifecycle

  • Query execution

  • Transaction handling

Pattern:

class DatabaseResource:
    def __init__(self, connection_string):
        self.connection_string = connection_string

    def execute_query(self, query):
        with self._get_connection() as conn:
            return conn.execute(query)

    def _get_connection(self):
        return psycopg2.connect(self.connection_string)

Read vs Write Separation:

read_db.execute_query(...)

write_db.insert(...)

This allows scaling independently.


API Resources: Resilient and Configurable Integrations

APIs are inherently unreliable: your resource must handle that.

Responsibilities:

  • Authentication

  • Retries

  • Timeouts

Error handling

class APIResource:
    def __init__(self, base_url, token):
        self.base_url = base_url
        self.token = token

    def get(self, endpoint):
        response = requests.get(
            f"{self.base_url}/{endpoint}",
            headers={"Authorization": 
                    f"Bearer {self.token}"},
                     timeout=5,
        )
        response.raise_for_status()
        return response.json()

Rety Logic, Rate Limiting, and Circuit Breaker belongs inside the resource.


Queue Resources: Messaging and Event-Driven Pipelines

Queues enable decoupled, asynchronous architectures.

Responsibilities:

  • Publishing messages

  • Consuming messages

  • Acknowledgment

Pattern:

class QueueResource:
    def publish(self, topic, message):
        ...

    def consume(self, topic):
        ...

Resources allow you to:

  • Swap Kafka ↔ RabbitMQ without changing ops

  • Keep pipeline logic clean and event-driven


Composing Resources for Complex Systems

Resources can depend on other resources.

class AuthResource:
    def get_token(self):
        ...

class APIResource:
    def __init__(self, auth):
        self.auth = auth

    def get(self, endpoint):
        token = self.auth.get_token()

Benefits:

  • Reusability (auth used across APIs)

  • Separation of concerns

  • Cleaner architecture layers

This leads to resource graphs, not just standalone objects.


Testing External Dependencies via Resources

Testing becomes powerful when resources are swappable.

Replace real resource:

class FakeDB:
    def execute_query(self, query):
        return [{"id": 1}]

Inject in tests:

resources = {"db": FakeDB()}

Benefits:

  • No real DB/API calls

  • Fast, deterministic tests

  • Full control over edge cases

Simulate failures:

class FailingAPI:
    def get(self, endpoint):
        raise Exception("API down")

You can test:

  • Retry logic

  • Failure handling

  • Edge cases


Common Anti-Patterns

❌ God Resource

One resource handling:

  • DB + API + Queue
    Break it into smaller ones

❌ Business Logic Inside Resources

def calculate_revenue():

Resources should NOT contain domain logic

❌ Hidden State

  • Caching without visibility

  • Mutable shared state

❌ Leaky Abstractions

Exposing:

  • raw DB cursors

  • HTTP clients


End-to-End Example: Encapsulating Object Storage with Dagster Resources

MinIO Resource (Encapsulation Layer)

from dagster import ConfigurableResource, EnvVar
from minio import Minio


class MinioResource(ConfigurableResource):
    endpoint: str = EnvVar("MINIO_ENDPOINT_RESOURCE")
    access_key: str = EnvVar("MINIO_ACCESS_KEY")
    secret_key: str = EnvVar("MINIO_SECRET_KEY")

    def get_client(self):
        return Minio(
            self.endpoint,
            access_key=self.access_key,
            secret_key=self.secret_key,
            secure=False,
        )

Registring the Resource

from dagster import Definitions

defs = Definitions(
    resources={"minio": MinioResource()},
)

Using the Resource with Sensor

A sensor that uses the MinIo resource to check if there is any new CSV.

@sensor(
    job=ingestion_job,
    minimum_interval_seconds=180,
    default_status=dg.DefaultSensorStatus.STOPPED,
)
def ingestion_sensor(minio: MinioResource):

    sensor_response = check_for_csv(minio)

    if not sensor_response:
        yield dg.SkipReason("No new CSV files found")
        return

    yield dg.RunRequest()

Resources Configuration from Dagster's UI

Using EnvVar not only tells Dagster to retrieve the value at runtime, but also not to display the value in the UI.

Resources Usage


Final Insight

Dagster Resources are not just helpers, they are:

The infrastructure abstraction layer that turns pipelines into clean, testable, and scalable systems.

Without them:

  • Pipelines become messy scripts

With them:

  • Pipelines become well-architected systems

Follow Muhammad Safiullah Khan on LinkedIn for more Data Engineering Content.

21 views