Encapsulating External Dependencies with Dagster Resources
By external dependencies I mean: DBs, APIs, and Queues
Why Encapsulation Matters in Data Pipelines
In real-world pipelines, external systems (databases, APIs, queues) are stateful, unreliable, and slow. Directly calls to them inside ops/assets creates:
Tight coupling: logic depends on specific implementations (e.g., Postgres vs MySQL)
Non-determinism: API latency, DB state changes, retries
Hard-to-test code: you can’t unit test without hitting real systems
Encapsulation solves this by:
Isolating side effects
Making pipelines predictable and testable
Allowing swapability (mock DB, fake API)
Think of it as:
Your pipeline should describe what to do, not how to talk to infrastructure.
Resources as Dependency Boundaries
Dagster Resources act as a formal boundary between orchestration logic and infrastructure.
Instead of:
def my_op():
conn = psycopg2.connect(...)
...
You do:
@op(required_resource_keys={"db"})
def my_op(context):
context.resources.db.execute_query(...)
What this gives you:
Inversion of control (IoC): Dagster injects dependencies
Centralized management: connection logic lives in one place
Replaceability: swap resource without touching ops
Designing Resource Interfaces for External Systems
A common mistake is exposing raw clients:
Bad:
context.resources.db.connection.cursor().execute(...)
Better:
context.resources.db.execute_query(query)
Key principles:
1. Minimal Inteface
Expose only what’s needed:
execute_queryfetch_dataframeinsert_records
2. Stable Contract
Your resource API should not change frequently, even if:
API version changes
DB schema changes.
3. Hide Implementation Details
No leaking ORM/session/cursor objects
No exposing raw HTTP clients
Database Resources: Connection Management & Query Abstraction
Core responsibilities:
Connection lifecycle
Query execution
Transaction handling
Pattern:
class DatabaseResource:
def __init__(self, connection_string):
self.connection_string = connection_string
def execute_query(self, query):
with self._get_connection() as conn:
return conn.execute(query)
def _get_connection(self):
return psycopg2.connect(self.connection_string)
Read vs Write Separation:
read_db.execute_query(...)
write_db.insert(...)
This allows scaling independently.
API Resources: Resilient and Configurable Integrations
APIs are inherently unreliable: your resource must handle that.
Responsibilities:
Authentication
Retries
Timeouts
Error handling
class APIResource:
def __init__(self, base_url, token):
self.base_url = base_url
self.token = token
def get(self, endpoint):
response = requests.get(
f"{self.base_url}/{endpoint}",
headers={"Authorization":
f"Bearer {self.token}"},
timeout=5,
)
response.raise_for_status()
return response.json()
Rety Logic, Rate Limiting, and Circuit Breaker belongs inside the resource.
Queue Resources: Messaging and Event-Driven Pipelines
Queues enable decoupled, asynchronous architectures.
Responsibilities:
Publishing messages
Consuming messages
Acknowledgment
Pattern:
class QueueResource:
def publish(self, topic, message):
...
def consume(self, topic):
...
Resources allow you to:
Swap Kafka ↔ RabbitMQ without changing ops
Keep pipeline logic clean and event-driven
Composing Resources for Complex Systems
Resources can depend on other resources.
class AuthResource:
def get_token(self):
...
class APIResource:
def __init__(self, auth):
self.auth = auth
def get(self, endpoint):
token = self.auth.get_token()
Benefits:
Reusability (auth used across APIs)
Separation of concerns
Cleaner architecture layers
This leads to resource graphs, not just standalone objects.
Testing External Dependencies via Resources
Testing becomes powerful when resources are swappable.
Replace real resource:
class FakeDB:
def execute_query(self, query):
return [{"id": 1}]
Inject in tests:
resources = {"db": FakeDB()}
Benefits:
No real DB/API calls
Fast, deterministic tests
Full control over edge cases
Simulate failures:
class FailingAPI:
def get(self, endpoint):
raise Exception("API down")
You can test:
Retry logic
Failure handling
Edge cases
Common Anti-Patterns
❌ God Resource
One resource handling:
- DB + API + Queue
Break it into smaller ones
❌ Business Logic Inside Resources
def calculate_revenue():
Resources should NOT contain domain logic
❌ Hidden State
Caching without visibility
Mutable shared state
❌ Leaky Abstractions
Exposing:
raw DB cursors
HTTP clients
End-to-End Example: Encapsulating Object Storage with Dagster Resources
MinIO Resource (Encapsulation Layer)
from dagster import ConfigurableResource, EnvVar
from minio import Minio
class MinioResource(ConfigurableResource):
endpoint: str = EnvVar("MINIO_ENDPOINT_RESOURCE")
access_key: str = EnvVar("MINIO_ACCESS_KEY")
secret_key: str = EnvVar("MINIO_SECRET_KEY")
def get_client(self):
return Minio(
self.endpoint,
access_key=self.access_key,
secret_key=self.secret_key,
secure=False,
)
Registring the Resource
from dagster import Definitions
defs = Definitions(
resources={"minio": MinioResource()},
)
Using the Resource with Sensor
A sensor that uses the MinIo resource to check if there is any new CSV.
@sensor(
job=ingestion_job,
minimum_interval_seconds=180,
default_status=dg.DefaultSensorStatus.STOPPED,
)
def ingestion_sensor(minio: MinioResource):
sensor_response = check_for_csv(minio)
if not sensor_response:
yield dg.SkipReason("No new CSV files found")
return
yield dg.RunRequest()
Resources Configuration from Dagster's UI
Using EnvVar not only tells Dagster to retrieve the value at runtime, but also not to display the value in the UI.
Resources Usage
Final Insight
Dagster Resources are not just helpers, they are:
The infrastructure abstraction layer that turns pipelines into clean, testable, and scalable systems.
Without them:
- Pipelines become messy scripts
With them:
- Pipelines become well-architected systems
Follow Muhammad Safiullah Khan on LinkedIn for more Data Engineering Content.
