Dagster & Fivetran (Component)
The dagster-fivetran library provides a FivetranAccountComponent which can be used to easily represent Fivetran connectors as assets in Dagster.
FivetranAccountComponent is a state-backed component, which fetches and caches Fivetran workspace metadata. For information on managing component state, see Configuring state-backed components.
1. Prepare a Dagster project
To begin, you'll need a Dagster project. You can use an existing components-ready project or create a new one:
uvx create-dagster project my-project && cd my-project/src
Activate the project virtual environment:
source ../.venv/bin/activate
Finally, add the dagster-fivetran library to the project:
uv add dagster-fivetran
2. Scaffold a Fivetran component definition
Now that you have a Dagster project, you can scaffold a Fivetran component definition. You'll need to provide your Fivetran account ID and API credentials, which you can set as environment variables on the command line:
dg scaffold defs dagster_fivetran.FivetranAccountComponent fivetran_ingest \
--account-id test_account --api-key "{{ env.FIVETRAN_API_KEY }}" --api-secret "{{ env.FIVETRAN_API_SECRET }}"
Creating defs at /.../my-project/src/my_project/defs/fivetran_ingest.
The dg scaffold defs call will generate a defs.yaml file:
tree my_project/defs
my_project/defs
├── __init__.py
└── fivetran_ingest
└── defs.yaml
2 directories, 2 files
YAML configuration
In its scaffolded form, the defs.yaml file contains the configuration for your Fivetran workspace:
type: dagster_fivetran.FivetranAccountComponent
attributes:
workspace:
account_id: test_account
api_key: '{{ env.FIVETRAN_API_KEY }}'
api_secret: '{{ env.FIVETRAN_API_SECRET }}'
3. Check the component configuration
You can check the configuration of your component with dg list defs:
dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━ ━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │
│ │ │ hubspot/company │ hubspot_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ hubspot/contact │ hubspot_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/campaign │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/opportunity │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/task │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼ ──────┼───────────┼─────────────┤ │
│ │ │ salesforce/user │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ └────────────────────────┴───────────────────────────┴──────┴───────────┴─────────────┘ │
└─────────┴─────────────────────────────────────────────────────────────────────────────────────────┘
4. Select specific connectors
You can select specific Fivetran connectors to include in your component using the connector_selector key. This allows you to filter which connectors are represented as assets:
type: dagster_fivetran.FivetranAccountComponent
attributes:
workspace:
account_id: test_account
api_key: "{{ env.FIVETRAN_API_KEY }}"
api_secret: "{{ env.FIVETRAN_API_SECRET }}"
connector_selector:
by_name:
- salesforce_warehouse_sync
dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │
│ │ │ salesforce/campaign │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/opportunity │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/task │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼─────────── ┼─────────────┤ │
│ │ │ salesforce/user │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ └────────────────────────┴───────────────────────────┴──────┴───────────┴─────────────┘ │
└─────────┴─────────────────────────────────────────────────────────────────────────────────────────┘
5. Customize Fivetran assets
Properties of the assets emitted by each connector can be customized in the defs.yaml file using the translation key:
type: dagster_fivetran.FivetranAccountComponent
attributes:
workspace:
account_id: test_account
api_key: "{{ env.FIVETRAN_API_KEY }}"
api_secret: "{{ env.FIVETRAN_API_SECRET }}"
connector_selector:
by_name:
- salesforce_warehouse_sync
translation:
group_name: fivetran_data
description: "Loads data from Fivetran connector {{ props.name }}"
dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━┳━━━ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ salesforce/campaign │ fivetran_data │ │ fivetran │ Loads data from Fivetran connector │ │
│ │ │ │ │ │ snowflake │ salesforce_warehouse_sync │ │
│ │ ├────────────────────────┼───────────────┼──────┼───────────┼────────────────────────────────────────────┤ │
│ │ │ salesforce/opportunity │ fivetran_data │ │ fivetran │ Loads data from Fivetran connector │ │
│ │ │ │ │ │ snowflake │ salesforce_warehouse_sync │ │
│ │ ├────────────────────────┼───────────────┼──────┼───────────┼────────────────────────────────────────────┤ │
│ │ │ salesforce/task │ fivetran_data │ │ fivetran │ Loads data from Fivetran connector │ │
│ │ │ │ │ │ snowflake │ salesforce_warehouse_sync │ │
│ │ ├────────────────────────┼───────────────┼──────┼───────────┼────────────────────────────────────────────┤ │
│ │ │ salesforce/user │ fivetran_data │ │ fivetran │ Loads data from Fivetran connector │ │
│ │ │ │ │ │ snowflake │ salesforce_warehouse_sync │ │
│ │ └───────────────────── ───┴───────────────┴──────┴───────────┴────────────────────────────────────────────┘ │
└─────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
6. Observe externally-triggered syncs
If your Fivetran connectors run on Fivetran's scheduler, you can add a polling sensor to detect completed syncs and emit AssetMaterialization events. Set polling_sensor: true in your component configuration:
type: dagster_fivetran.FivetranAccountComponent
attributes:
workspace:
api_key: '{{ env.FIVETRAN_API_KEY }}'
api_secret: '{{ env.FIVETRAN_API_SECRET }}'
account_id: '{{ env.FIVETRAN_ACCOUNT_ID }}'
polling_sensor: true
The sensor polls Fivetran on each tick and emits materialization events when syncs complete, allowing you to view sync history and track freshness in the Dagster UI without Dagster triggering the syncs.
7. Handle quota-based rescheduling
When Fivetran reschedules a sync due to quota limits, Dagster raises a RetryRequested by default. To instead continue polling until the rescheduled sync completes, set retry_on_reschedule: false:
type: dagster_fivetran.FivetranAccountComponent
attributes:
workspace:
api_key: '{{ env.FIVETRAN_API_KEY }}'
api_secret: '{{ env.FIVETRAN_API_SECRET }}'
account_id: '{{ env.FIVETRAN_ACCOUNT_ID }}'
retry_on_reschedule: false
8. Keep Fivetran's schedule active alongside Dagster
By default, the first time Dagster triggers a Fivetran sync, it sets the connector's schedule to "manual", disabling Fivetran's auto-scheduling. To keep Fivetran's own schedule active while also triggering syncs from Dagster, set disable_schedule_on_trigger: false:
type: dagster_fivetran.FivetranAccountComponent
attributes:
workspace:
api_key: '{{ env.FIVETRAN_API_KEY }}'
api_secret: '{{ env.FIVETRAN_API_SECRET }}'
account_id: '{{ env.FIVETRAN_ACCOUNT_ID }}'
disable_schedule_on_trigger: false
When using this mode with the polling sensor enabled, the sensor automatically deduplicates materialization events to avoid double-counting syncs that were triggered by Dagster.