Doramagic 项目包 · 项目说明书
dagster 项目
一个用于数据资产开发、生产和观测的编排平台。
Dagster Overview & Architecture
Dagster 是一个云原生的数据管道编排器(data pipeline orchestrator),专注于整个数据资产开发生命周期的管理。根据顶层说明,它"for the whole development lifecycle, with integrated lineage and observability, a declarative programming mod...
继续阅读本节完整说明和来源证据。
Dagster 概览与架构
项目定位与核心价值
Dagster 是一个云原生的数据管道编排器(data pipeline orchestrator),专注于整个数据资产开发生命周期的管理。根据顶层说明,它"for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability",即覆盖开发、测试、部署、运行、观测的全过程,并内置血缘追踪与可观测能力 资料来源:README.md:1-50。
Dagster 面向的核心对象是 数据资产(data assets),包括表、数据集、机器学习模型、报表等。用户通过 Python 函数以声明式方式定义这些资产,由 Dagster 在合适的时机执行函数并保持资产更新 资料来源:README.md:50-90。这种以资产为中心的范式区别于传统以任务(task)或作业(job)为中心的调度器。
最新稳定版本为 1.13.10(core)/ 0.29.10(libraries)。该版本修复了 DagsterInstance.get_latest_materialization_event 与 get_asset_records 在资产被 wipe 后仍可能返回陈旧物化事件的缺陷,表明实例层(Instance)的事件记录与资产状态机是整个系统的关键一致性边界 资料来源:README.md:release-notes。
核心编程模型
Dagster 的核心抽象由 @dg.asset 装饰器定义。如下示例展示了如何通过纯 Python 函数声明三个相互依赖的数据资产 资料来源:README.md:90-130:
import dagster as dg
import pandas as pd
from sklearn.linear_model import LinearRegression
@dg.asset
def country_populations() -> pd.DataFrame:
df = pd.read_html("https://tinyurl.com/mry64ebh")[0]
...
@dg.asset
def continent_change_model(country_populations: pd.DataFrame) -> LinearRegression:
...
@dg.asset
def continent_stats(country_populations: pd.DataFrame,
continent_change_model: LinearRegression) -> pd.DataFrame:
...
Dagster 会根据函数参数签名(如 country_populations、continent_change_model)自动推导资产间的依赖关系,构成资产图(asset graph / DAG)。这种声明式模型也是 Dagster 强调"best-in-class testability"的基础——资产可在隔离环境中按图执行。
仓库结构与模块组织
仓库采用多包(monorepo)结构,主要由以下几类目录构成:
| 目录 | 角色 |
|---|---|
python_modules/dagster | 核心框架包,提供 @asset、@job、@sensor、@schedule、IOManager 等基础 API |
python_modules/libraries/dagster-* | 第三方集成包,如 dagster-github、dagster-pagerduty、dagster-papertrail 等 |
examples/ | 端到端示例项目,覆盖 ETL、Modern Data Stack、dev→prod、ATProto Dashboard 等 |
js_modules/dg-docs-components | dg docs 独立站点与 Dagster 应用内嵌文档共用的 React 组件库 |
python_modules/automation | Docker 镜像构建与 PyPI 发布等内部自动化工具 |
例如 dagster-github、dagster-pagerduty、dagster-papertrail 均为独立发布的轻量级集成库,统一以"docs redirect"模式指向官方文档站点 资料来源:python_modules/libraries/dagster-github/README.md:1-5、python_modules/libraries/dagster-pagerduty/README.md:1-5、python_modules/libraries/dagster-papertrail/README.md:1-5。automation 模块则说明其角色为"a collection of tools for working with Docker images and publishing Dagster modules to PyPI" 资料来源:python_modules/automation/README.md:1-5。
端到端架构与示例覆盖
仓库在 examples/ 下提供多个"由小到大"的参考实现,帮助用户建立对 Dagster 完整能力栈的认知:
- quickstart_etl:每日 ETL 起步模板,覆盖资产定义、UI 物化、调度、监控与元数据查看 资料来源:examples/quickstart_etl/README.md:1-30。
- assets_modern_data_stack:将 Software-Defined Assets 与 Airbyte、dbt 等现代数据栈工具组合 资料来源:examples/assets_modern_data_stack/README.md:1-20。该示例同时演示了 Dagster 通过环境变量管理连接与 Secret 的方式。
- project_fully_featured:综合演示 SDA、Schedules、Sensors、IOManagers、Resources、dbt/S3/PySpark 集成及单元测试 资料来源:examples/project_fully_featured/README.md:1-30。
- development_to_production:通过可替换的 Resource 与运行配置实现本地/生产环境隔离 资料来源:examples/development_to_production/README.md:1-15。
- project_atproto_dashboard:结合动态分区(Dynamic partitions)、Declarative Automation、并发限制、dbt 与 Power BI 展示端到端实时分析 资料来源:examples/docs_projects/project_atproto_dashboard/README.md:1-25。
- project_ask_ai_dagster:基于 OpenAI 与 Pinecone 向量库的 RAG 应用,展示 Dagster 在 AI 数据管线中的用法 资料来源:examples/docs_projects/project_ask_ai_dagster/README.md:1-20。
综合这些示例可以归纳出典型的 Dagster 运行时架构:
flowchart LR
A[Python 资产定义 @asset] --> B[Dagster Daemon / Webserver]
B --> C[资产图 DAG 调度]
C --> D[执行器: IOManager / Resource]
D --> E[(外部系统: S3 / Snowflake / dbt / Airbyte)]
B --> F[(DagsterInstance 事件存储)]
F --> G[Web UI: Lineage / Activity]
F --> H[Sensors / Schedules]
H --> B资产定义被加载到 Dagster Daemon 与 Webserver 中,由调度器依据 Schedules 与 Sensors 触发物化,运行期间通过 IOManager 与外部系统交互,物化与观测事件全部写入 DagsterInstance,供 UI 血缘视图与告警链路消费。
常见失败模式与可观测性
由于 Dagster 的状态强依赖于 DagsterInstance 中事件流的正确性,下列情形常引发排查需求:
- Wipe 后陈旧事件:当资产被 wipe 但仍有运行启动或新观测到达时,
get_latest_materialization_event可能返回已被清除的物化事件。该问题在 1.13.10 中已被修复 资料来源:README.md:release-notes。 - 集成库版本不一致:core 与 libraries 版本号(1.13.x / 0.29.x)解耦发布,升级时需逐一对齐,否则可能出现序列化或 API 不兼容。
- 环境隔离未配置:在多环境项目中若未使用 Resource 抽象与运行配置,本地执行可能覆盖生产数据 资料来源:examples/development_to_production/README.md:1-15。
See Also
- [Asset Definition Reference] —
@asset与资产图建模细节 - [Schedules & Sensors] — 调度与事件驱动
- [Resources & IOManagers] — 环境隔离与外部系统读写
- [Integrations Library] —
dagster-*集成库索引
来源:https://github.com/dagster-io/dagster / 项目说明书
Core Concepts: Assets, Jobs, Ops, Graphs & Resources
Dagster 是一个面向数据流水线的编排框架,以"软件定义资产(Software-defined Assets)"作为核心抽象,辅以操作(Ops)、图(Graphs)、作业(Jobs)、资源(Resources)、调度(Schedules)与传感器(Sensors)等概念构成完整的编排模型 资料来源:README.md。在最新版本 1.13.10(core)中,框架进一步...
继续阅读本节完整说明和来源证据。
Dagster 核心概念:资产、作业、操作、图与资源
概述
Dagster 是一个面向数据流水线的编排框架,以"软件定义资产(Software-defined Assets)"作为核心抽象,辅以操作(Ops)、图(Graphs)、作业(Jobs)、资源(Resources)、调度(Schedules)与传感器(Sensors)等概念构成完整的编排模型 资料来源:README.md。在最新版本 1.13.10(core)中,框架进一步修复了 DagsterInstance.get_latest_materialization_event 在资产被擦除后仍可能返回陈旧物化事件的缺陷,这体现了核心抽象与实例存储层之间紧密的耦合关系。
下文按照由高层到低层的顺序,依次介绍资产、作业、操作与图、资源这四组核心概念,并指出它们之间的组合关系。
资产(Assets)
资产是 Dagster 中最核心的概念,代表一个可持久化、有版本且可被观测的数据对象,例如数据库表、文件、数据集分区等。资产通过 @asset 装饰器将一个普通 Python 函数声明为资产生产逻辑 资料来源:examples/quickstart_etl/README.md。
from dagster import asset
@asset
def hackernews_topstories(hackernews_topstory_ids):
...
资产之间可以通过函数参数隐式建立上下游依赖关系。例如在 quickstart_etl 示例中,hackernews_topstories 接受 hackernews_topstory_ids 的输出作为输入,hackernews_stories_word_cloud 又以 hackernews_topstories 的结果作为输入 资料来源:examples/quickstart_etl/README.md。
Dagster 支持多种资产特性以适应复杂场景:
| 特性 | 用途 | 示例场景 |
|---|---|---|
compute_kind | 为资产打标签,标识其计算来源 | HackerNews API、Plot 资料来源:examples/quickstart_etl/README.md |
| 分区(Partitions) | 按时间或维度切分资产 | 按小时分区的事件流 资料来源:examples/project_fully_featured/README.md |
| 动态分区(Dynamic partitions) | 运行时创建新分区 | 持续发现的新数据源 资料来源:examples/docs_projects/project_atproto_dashboard/README.md |
| 元数据(Metadata) | 附加任意元数据到物化事件 | 词云预览图 资料来源:examples/quickstart_etl/README.md |
| 外部资产(External assets) | 编排 Dagster 之外的资产 | Snowflake Dynamic Tables 资料来源:examples/snowflake_cortex/dagster_snowflake/README.md |
最近发布的 1.13.10 版本修复了资产擦除后 get_latest_materialization_event 返回陈旧数据的 Bug,开发者在编写"先擦除再重跑"的清理流水线时应升级至该版本以保证数据一致性。
操作(Ops)与图(Graphs)
当资产的生成过程需要拆分为多个离散的计算单元时,Dagster 提供了操作(Op)与图(Graph)作为更细粒度的组合单元。
- Op:一个可独立执行、可复用、可测试的计算单元,对应一个 Python 函数。
- Graph:将多个 Op 通过输入输出连接而成的有向无环图,描述它们之间的数据流向。
当一个资产的内部实现是一个 Graph 时,就称之为"图支撑的资产(Graph-backed asset)"。这种模式适用于"单个资产背后存在多步独立计算"或"已有现成的 Graph 需要复用"的场景 资料来源:examples/feature_graph_backed_assets/README.md。
flowchart LR
A[hackernews_topstory_ids<br/>@asset] --> B[hackernews_topstories<br/>@asset]
B --> C[hackernews_stories_word_cloud<br/>@asset]
subgraph 内部实现可选
B1[Op: 拉取详情] --> B2[Op: 解析字段]
B2 -.被组合为.-> B
end需要注意的是,即使资产内部使用 Ops 与 Graphs 组合,资产本身仍然是 Dagster 视图层与持久化层的主角,Ops 仅为实现细节。
作业(Jobs)
作业(Job)是一组资产或操作的可执行单元,通常对应一次完整的运行(Run)。作业是 Dagster 中"运行什么、何时运行、怎样运行"的承载体:
- 执行单元:将若干资产或图组合为一个可被调度执行的工作单元。
- 触发方式:可以手动触发、由 Schedule 按时间表触发,也可由 Sensor 响应外部事件触发 资料来源:examples/snowflake_cortex/dagster_snowflake/README.md。
- 环境切换:通过环境变量切换部署目标,例如
project_fully_featured通过DAGSTER_DEPLOYMENT在prod/staging/ 本地之间切换 资料来源:examples/project_fully_featured/README.md。 - 示例入口:
quickstart_etl在definitions.py中定义了每日执行一次的 ETL 作业,并在 Dagster UI 中以all_assets_job的形式呈现 资料来源:examples/quickstart_etl/README.md。
资源(Resources)
资源(Resource)是可在资产、操作或作业中注入的可复用外部依赖,例如数据库连接、HTTP 客户端、文件系统句柄、LLM 客户端等。资源与资产在概念上是正交的:资产描述"产出什么数据",资源描述"用什么去产出"。
例如 project_multi_tenant 示例将共享的 LLM 客户端封装为资源,并在多个代码位置之间复用 资料来源:examples/project_multi_tenant/README.md。project_fully_featured 则在不同环境使用不同的存储后端:本地使用文件系统与 DuckDB,生产环境使用 S3 与 Snowflake 资料来源:examples/project_fully_featured/README.md。
资源的关键价值在于:
- 解耦:将连接配置与业务逻辑分离,便于在不同环境中复用同一份资产代码。
- 可测试:在测试中注入伪造资源以隔离外部依赖。
- 集中治理:对凭据、连接池、超时等横切关注点统一管理。
常见模式与实践
- 本地开发:使用
dg dev或dagster dev启动本地 Web 界面,修改代码后点击 "Reload definition" 即可应用最新定义 资料来源:examples/quickstart_etl/README.md。 - 快速脚手架:通过
dagster project from-example --name my-project --example quickstart_etl创建新项目 资料来源:examples/quickstart_etl/README.md。 - 多代码位置工作区:使用
workspace.yaml同时加载多个代码位置,并配合dagster_cloud.yaml进行部署配置 资料来源:examples/project_multi_tenant/README.md。 - 声明式自动化(Declarative Automation):在 ATProto 仪表盘示例中,通过声明条件让 Dagster 自动决定何时运行资产 资料来源:examples/docs_projects/project_atproto_dashboard/README.md。
- RAG 应用编排:在
project_ask_ai_dagster中,Dagster 同时编排 OpenAI、向量数据库与数据预处理等多个外部依赖的资产 资料来源:examples/docs_projects/project_ask_ai_dagster/README.md。
See Also
来源:https://github.com/dagster-io/dagster / 项目说明书
Asset Materialization, Event Log & Instance API
Dagster 是一个面向数据流水线的编排框架,其核心抽象是 Asset(资产)。当一次计算生成或更新某个资产时,会产生一次 materialization(物化)事件,记录该资产在某一时刻的状态、运行来源以及附加元数据。这些事件被持久化到 Event Log(事件日志)中,并可通过 DagsterInstance API 进行查询、审计与回溯。资料来源:[README.m...
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
Asset Materialization、Event Log 与 Instance API
概述
Dagster 是一个面向数据流水线的编排框架,其核心抽象是 Asset(资产)。当一次计算生成或更新某个资产时,会产生一次 materialization(物化)事件,记录该资产在某一时刻的状态、运行来源以及附加元数据。这些事件被持久化到 Event Log(事件日志)中,并可通过 DagsterInstance API 进行查询、审计与回溯。资料来源:README.md:1-50
在最新发布的 1.13.10(core)/ 0.29.10(libraries)版本中,针对 DagsterInstance.get_latest_materialization_event 和 get_asset_records 返回已擦除(wiped)资产的过期物化事件这一缺陷进行了修复。资料来源:CHANGES.md(社区上下文中的发布说明)
Asset Materialization 的工作原理
基本概念
Asset materialization 是 Dagster 用来表达"某个资产被成功生成"的核心事件。每个 @asset 装饰的函数在执行完成后,Dagster 会自动产出一条 ASSET_MATERIALIZATION 类型的事件,开发者还可以通过 MetadataValue 附加任意元数据(如记录数、数据预览、词云图片等)。资料来源:examples/quickstart_etl/README.md:50-130
例如在 quickstart_etl 示例中:
hackernews_topstory_ids从 HackerNews API 拉取热门故事 ID 列表;hackernews_topstories基于上一步的输出抓取详细故事数据;hackernews_stories_word_cloud依据前一步的输出生成词云可视化。
这三者形成一条由上至下的依赖链,每次运行时都会产生对应的 ASSET_MATERIALIZATION 事件,写入 Event Log。资料来源:examples/quickstart_etl/README.md:30-80
元数据与观测事件
除了物化事件,Dagster 还支持 observation(观测)事件,用于记录资产未发生新的物化但外部状态发生了变化的情况(例如对分区执行擦除操作时)。观测事件同样写入 Event Log,并参与"最新物化"语义计算,这也是 1.13.10 修复缺陷的关键场景。资料来源:CHANGES.md(社区上下文)
分区与擦除
在 project_atproto_dashboard 示例中,资产按 dynamic partitions(动态分区)组织,并通过 declarative automation 驱动更新。资料来源:examples/docs_projects/project_atproto_dashboard/README.md:10-25 当某个分区的数据被擦除时,Dagster+ 在旧版本中可能错误地把已擦除分区的物化作为整个资产的"最新"记录返回,1.13.10 对此行为进行了修正。资料来源:CHANGES.md(社区上下文)
Event Log 系统
Event Log 是 Dagster 持久化所有执行历史(runs、materializations、observations、expectation results 等)的地方,UI 中的 Activity、Asset Details 页面以及 Asset Lineage 视图都直接消费 Event Log 的数据。资料来源:examples/quickstart_etl/README.md:90-130
flowchart LR
A["@asset 函数执行"] --> B["ASSET_MATERIALIZATION<br/>事件"]
A2["observation / wipe"] --> B2["OBSERVATION / WIPE<br/>事件"]
B --> C[Event Log 存储]
B2 --> C
C --> D["DagsterInstance<br/>查询 API"]
C --> E["UI: Activity / Lineage"]
D --> F["用户代码 & CLI"]
E --> G["数据工程师"]Event Log 中的每条事件都带有一个稳定的 asset_key,查询接口(详见下一节)正是基于 asset_key 在日志中筛选和排序,从而得到"最新"或"全部"物化记录。资料来源:examples/quickstart_etl/README.md:100-130
DagsterInstance API 与常见查询
DagsterInstance 是用户与 Event Log 交互的入口对象,提供包括但不限于以下与物化相关的查询方法:
| API 方法 | 主要用途 |
|---|---|
get_latest_materialization_event | 获取某个资产的最新 ASSET_MATERIALIZATION 事件 |
get_asset_records | 返回一个或多个资产的物化历史记录集合 |
wipe_asset / 分区擦除 | 移除资产(或其分区)的历史物化与观测事件 |
资料来源:CHANGES.md(社区上下文中提及的相关 API)
1.13.10 修复的行为变化
在 1.13.10 之前,如果一个资产曾被擦除(wiped),那么一旦有新的运行(run)开始针对该资产,或报告了一条 observation,get_latest_materialization_event 与 get_asset_records 仍可能返回擦除前的过期物化记录。在 Dagster+ 上,分区擦除也存在类似问题。该 bug 已在 1.13.10 修复,调用方应预期返回"擦除后状态"或更严格的一致性结果。资料来源:CHANGES.md(社区上下文)
在项目中的使用模式
- 在
project_ask_ai_dagster中,RAG 摄取流水线利用 Dagster 调度数据写入与向量化过程,每一步都对应可被 Event Log 追踪的物化事件。资料来源:examples/docs_projects/project_ask_ai_dagster/README.md:1-30 project_fully_featured展示了按小时分区的活动分析资产,其最新物化时间点是新鲜度(freshness)策略评估与告警的依据。资料来源:examples/project_fully_featured/README.md:1-30snowflake_cortex/dagster_snowflake中通过AI_AGG生成的daily_story_summary等聚合资产,其物化元数据可通过 Instance API 检索,用于成本与查询历史的回溯。资料来源:examples/snowflake_cortex/dagster_snowflake/README.md:1-30project_multi_tenant演示了多代码位置(multi-code-location)部署下,每个 code location 可拥有独立的事件日志后端(默认 DuckDB),物化记录按代码位置隔离。资料来源:examples/project_multi_tenant/README.md:1-50
常见失败模式与排查建议
- 读取到过期物化:在 1.13.10 之前可能出现。升级 core 至 1.13.10 后验证返回值是否符合预期,并检查是否有未完成的 run 导致事件乱序。资料来源:CHANGES.md(社区上下文)
- 分区擦除后仍能查到数据:在 Dagster+ 上若升级到 1.13.10 仍出现,应确认 wipe 操作的目标分区与
get_asset_records的过滤条件一致。资料来源:CHANGES.md(社区上下文) - 元数据过大导致查询缓慢:在
hackernews_topstories中以 Markdown 形式记录了 DataFrame 预览,过大的preview会增加 Event Log 体积,建议仅记录必要列。资料来源:examples/quickstart_etl/README.md:90-130 - 跨代码位置混淆:多租户部署下需显式选择
DagsterInstance的目标 code location,否则可能读到错误的物化记录。资料来源:examples/project_multi_tenant/README.md:1-50
See Also
- Asset Materialization 概念(quickstart 示例)
- Declarative Automation 与动态分区(atproto dashboard)
- 多代码位置 / 多租户部署
- Dagster 1.13.10 发布说明(CHANGES.md)
资料来源:CHANGES.md(社区上下文中提及的相关 API)
Deployment Options & Infrastructure
Dagster 提供从本地开发到生产云端的多种部署形态,覆盖单机调试、多代码位置工作区、混合(Hybrid)部署与托管式 Dagster+ 等场景。本页基于仓库内示例与文档梳理其部署选项与基础设施组成。
继续阅读本节完整说明和来源证据。
部署形态总览
Dagster 的部署按运行位置与托管方式可划分为以下几类:
| 形态 | 适用场景 | 关键特征 |
|---|---|---|
| 本地开发(Local) | 个人调试、快速迭代 | 单进程 dagster dev 或 dg dev,Webserver 与 Daemon 同进程启动 |
| 工作区(Workspace) | 多项目本地编排 | 通过 workspace.yaml 聚合多个 code location |
| 多租户(Multi-tenant) | 团队/企业内多业务线隔离 | 每业务线独立 code location、共享 LLM 资源、隔离数据库 |
| Dagster+ Serverless | 免运维托管 | 官方云服务,CI/CD 与分支开箱即用 |
| Dagster+ Hybrid | 数据合规场景 | 控制面在云端,用户代码与基础设施自托管 |
| Experimental | 试验性功能 | API 可能随时变更,资料来源:examples/experimental/README.md |
下图为典型的本地/云端混合部署架构:
flowchart LR Dev[开发者本地] -->|dagster dev / dg dev| WS[Webserver:3000] WS --> Inst[(Dagster Instance\nSQLite/Postgres)] Daemon[Daemon\nSchedules/Sensors/Backfills] --> Inst CodeLoc[Code Location\nPython 包] --> WS CodeLoc --> Daemon Hybrid[Dagster+ Hybrid Agent] -.->|用户 VPC 内| CodeLoc Serverless[Dagster+ Serverless] -.->|官方托管| CodeLoc
本地开发与快速启动
最常见的本地启动方式是 dagster dev,它会同时拉起 Webserver 与 Daemon。quickstart_etl 示例展示了完整流程:先以 pip install -e ".[dev]" 编辑模式安装项目,再执行 dagster dev,最后访问 http://localhost:3000 资料来源:examples/quickstart_etl/README.md。
新引入的 dg CLI 是组件化(Components)项目推荐的入口。project_atproto_dashboard 与 project_prompt_eng 均使用 dg dev 启动,资料来源:examples/docs_projects/project_atproto_dashboard/README.md 与 examples/docs_projects/project_prompt_eng/README.md。两者的共同模式是:先在 dbt_project/ 中生成 manifest,再用 uv 创建虚拟环境安装项目,最后通过 dg dev 一键启动。
本地开发时的关键配置点包括:
DAGSTER_HOME:指定实例元数据存储位置(默认./.dagster_home),资料来源:examples/project_multi_tenant/README.md。.env文件:存放凭据与后端选择,例如LLM_BACKEND=embedded启用本地确定性响应,资料来源:examples/project_multi_tenant/README.md。- 代码变更后需在 UI 点击 Reload definition 以加载最新定义,资料来源:examples/quickstart_etl/README.md。
多代码位置与多租户架构
当组织内有多个业务线时,推荐使用多 code location 部署。project_multi_tenant 演示了典型布局:beacon_hq/、harbor_outfitters/、summit_financial/ 各自承载报表、目录与风险评估资产,shared/ 承载共享 LLM 资源与 IO Manager,资料来源:examples/project_multi_tenant/README.md。
工作区通过 workspace.yaml 注册所有 code location,部署到 Dagster+ 时则使用 dagster_cloud.yaml 进行等价配置,资料来源:examples/project_multi_tenant/README.md。这种"一个 workspace,多个 location"的模式带来三项隔离能力:
- 运行时隔离:通过
vendor/下的 marker 包(如catalog_coach_runtime==1.4.0)按 location 安装不同依赖版本,资料来源:examples/project_multi_tenant/README.md。 - 数据隔离:每个 location 写入独立 DuckDB 文件到
data/子目录,资料来源:examples/project_multi_tenant/README.md。 - 模型隔离:通过
HARBOR_OUTFITTERS_MODEL、SUMMIT_FINANCIAL_MODEL、BEACON_HQ_MODEL等环境变量为不同 location 指定不同 LLM,资料来源:examples/project_multi_tenant/README.md。
类似的多项目结构也出现在 project_databricks_and_snowflake 中,它将 deployments/local 与 projects/* 分离,外部通过 dg dev 统一加载,资料来源:examples/project_databricks_and_snowflake/README.md。
基础设施组件与社区关注点
一次完整的 Dagster 部署至少包含以下基础设施组件:
- Webserver:提供 UI 与 GraphQL API,默认监听
127.0.0.1:3000,资料来源:examples/project_multi_tenant/README.md。 - Daemon:运行 schedules、sensors、backfills、declarative automation 等后台循环,资料来源:examples/docs_projects/project_atproto_dashboard/README.md。
- Dagster Instance:持久化 run、event log、asset materialization 等元数据,资料来源:python_modules/dagster/README.md。
- Code Location:用户定义的 Python 包,通过
Definitions对象暴露资产/作业/调度/传感器,资料来源:README.md。
社区近期发布(1.13.10 / 0.29.10)修复了与 DagsterInstance.get_latest_materialization_event 及 get_asset_records 相关的 staleness 问题:当资产被 wipe 后,若新的 run 或 observation 启动,API 仍可能返回 wipe 前的旧 materialization 记录;Dagster+ 在 partition wipe 场景下也存在类似问题。该修复直接影响运维侧对"资产最新一次成功执行时间"的判断,部署时建议升级到 1.13.10 或更高版本以避免上报指标异常。
See Also
来源:https://github.com/dagster-io/dagster / 项目说明书
Automation: Schedules, Sensors & Declarative Automation
Dagster 的自动化(Automation)体系提供了多种触发工作流运行(run)的机制,使开发者能够按时间、按事件或按数据状态来驱动资产(asset)物化。Automation 主要由三大支柱组成:Schedules(定时调度)、Sensors(传感器) 和 Declarative Automation(声明式自动化)。三者协同覆盖了从固定时间窗口到复杂依赖触发的各种...
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
概述
Dagster 的自动化(Automation)体系提供了多种触发工作流运行(run)的机制,使开发者能够按时间、按事件或按数据状态来驱动资产(asset)物化。Automation 主要由三大支柱组成:Schedules(定时调度)、Sensors(传感器) 和 Declarative Automation(声明式自动化)。三者协同覆盖了从固定时间窗口到复杂依赖触发的各种场景。
资料来源:docs/docs/guides/automate/index.md:1-30
graph LR
A[Automation 自动化体系] --> B[Schedules<br/>定时调度]
A --> C[Sensors<br/>事件驱动]
A --> D[Declarative Automation<br/>声明式自动化]
B --> B1[cron / @schedule]
C --> C1[SensorEvaluationContext]
C --> C2[多资产传感器]
D --> D1[AutomationCondition]
D --> D2[分区分层策略]Schedules(定时调度)
用途与定义
Schedules 用于按固定时间间隔触发作业(job)或资产图。基于 @schedule 装饰器或 ScheduleDefinition 创建,常见形式包括 cron 表达式与执行函数组合。Schedules 适合"在每天凌晨 2 点运行 ETL"这类确定性时间任务。
关键属性
| 属性 | 说明 |
|---|---|
cron_schedule | 标准 cron 表达式,定义触发时间 |
execution_timezone | 调度所用的时区 |
job 或 selection | 调度的目标作业或资产选择 |
execution_fn | 自定义执行函数,返回 RunRequest |
@schedule(cron_schedule="0 2 * * *", job=my_etl_job)
def daily_etl_schedule(context):
return RunRequest()
故障模式
- 时区错误:未设置
execution_timezone时默认按 UTC 计算,可能导致预期外的偏移。 - 重叠触发:当运行时间超过调度间隔时,可能出现并发启动,需要通过
max_concurrent_runs或 tag 进行约束。
资料来源:docs/docs/guides/automate/schedules/defining-schedules.md:1-80
Sensors(事件驱动传感器)
用途与定义
Sensors 是基于事件或外部状态变化触发运行的组件。通过 @sensor 装饰器创建,每次 tick 会调用 evaluate_tick 检查是否需要触发 RunRequest。Sensors 适合"当 S3 出现新文件时立即处理"或"轮询数据库是否有新数据"等场景。
多资产传感器
@multi_asset_sensor 允许针对多个资产的 materialization 事件进行响应,并根据分区状态、事件 ID 等信息生成下游运行请求。
关键实践
SensorEvaluationContext提供cursor属性,用于实现增量轮询,避免重复触发。- Sensors 应当是幂等的,重复触发同一游标不应产生错误状态。
- 复杂逻辑建议拆分为多个 sensor 组合,而非在一个 sensor 内堆叠所有判断。
资料来源:docs/docs/guides/automate/sensors/using-sensors.md:1-120
Declarative Automation(声明式自动化)
核心概念
Declarative Automation 是 Dagster 较新的自动化层,通过 AutomationCondition 在资产级别声明"何时应当重新物化"。它将"哪些上游状态变化应当触发我"的规则以声明形式写进资产定义,而非通过 schedule/sensor 命令式地编排。
常用 AutomationCondition
| 条件 | 触发语义 |
|---|---|
eager() | 任一上游更新即触发 |
missing() | 资产尚未被物化时触发 |
in_progress() | 上一次运行正在进行时触发 |
cron(...) | 按 cron 时间窗口触发 |
自定义与组合
通过 &(与)、|(或)、~(非)操作符组合多个 AutomationCondition,可表达复杂策略。例如:~missing() & cron("0 9 * * MON") 表示"周一上午 9 点运行,但仅在该资产尚未物化时"。
@asset(automation_condition=eager())
def downstream_asset(): ...
与 Schedule/Sensor 的关系
Declarative Automation 并非取代,而是补充。当资产规则较为复杂或需要跨多个资产统一策略时,优先使用 AutomationCondition;而当需要跨外部系统(如 API 轮询、定时清理)触发时,仍推荐 Sensors/Schedules。
资料来源:docs/docs/guides/automate/declarative-automation/customizing-automation-conditions.md:1-100
版本说明与相关修复
在最新发布版本(1.13.10 core / 0.29.10 libraries)中修复了一项关于 DagsterInstance.get_latest_materialization_event 与 get_asset_records 的 bug:当时序上 wipe 某资产后,新的 run 启动或 observation 上报时,可能错误地返回 wipe 之前的 stale materialization。该修复同时涉及 Dagster+ 中分区被擦除时分区 materialization 残留为最新值的同类问题。该修复对自动化层的影响在于:声明式自动化在判断 missing() 或 eager() 条件时,不再会误以为资产已存在历史物化,从而避免了误判触发。
选择建议
| 场景 | 推荐方式 |
|---|---|
| 固定时间窗口触发 | Schedule |
| 外部系统事件触发 | Sensor |
| 资产依赖驱动 | Declarative Automation |
| 跨系统复合编排 | Schedule + Sensor + AutomationCondition 组合 |
See Also
失败模式与踩坑日记
保留 Doramagic 在发现、验证和编译中沉淀的项目专属风险,不把社区讨论只当作装饰信息。
可能增加新用户试用和生产接入成本。
可能增加新用户试用和生产接入成本。
可能增加新用户试用和生产接入成本。
可能增加新用户试用和生产接入成本。
Pitfall Log / 踩坑日志
项目:dagster-io/dagster
摘要:发现 13 个潜在踩坑项,其中 1 个为 high/blocking;最高优先级:安装坑 - 来源证据:Customize Dagster UI logo and colors / whitelabeling。
1. 安装坑 · 来源证据:Customize Dagster UI logo and colors / whitelabeling
- 严重度:high
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Customize Dagster UI logo and colors / whitelabeling
- 对用户的影响:可能增加新用户试用和生产接入成本。
- 证据:community_evidence:github | https://github.com/dagster-io/dagster/issues/24989 | 来源类型 github_issue 暴露的待验证使用条件。
2. 安装坑 · 来源证据:Sqlalchemy Errors in Dagster
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Sqlalchemy Errors in Dagster
- 对用户的影响:可能增加新用户试用和生产接入成本。
- 证据:community_evidence:github | https://github.com/dagster-io/dagster/issues/29674 | 来源类型 github_issue 暴露的待验证使用条件。
3. 配置坑 · 来源证据:A single un-deserializable bulk_actions row breaks the entire get_backfills read (Backfills page won't load)
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个配置相关的待验证问题:A single un-deserializable bulk_actions row breaks the entire get_backfills read (Backfills page won't load)
- 对用户的影响:可能增加新用户试用和生产接入成本。
- 证据:community_evidence:github | https://github.com/dagster-io/dagster/issues/33946 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
4. 配置坑 · 来源证据:[UI] Searching for "true" or "false" in Jobs and Schedules throws an error
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个配置相关的待验证问题:[UI] Searching for "true" or "false" in Jobs and Schedules throws an error
- 对用户的影响:可能增加新用户试用和生产接入成本。
- 证据:community_evidence:github | https://github.com/dagster-io/dagster/issues/33945 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
5. 能力坑 · 能力判断依赖假设
- 严重度:medium
- 证据强度:source_linked
- 发现:README/documentation is current enough for a first validation pass.
- 对用户的影响:假设不成立时,用户拿不到承诺的能力。
- 证据:capability.assumptions | https://github.com/dagster-io/dagster | README/documentation is current enough for a first validation pass.
6. 维护坑 · 来源证据:Add executor_def for AutomationConditionSensorDefinition
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个维护/版本相关的待验证问题:Add executor_def for AutomationConditionSensorDefinition
- 对用户的影响:可能影响升级、迁移或版本选择。
- 证据:community_evidence:github | https://github.com/dagster-io/dagster/issues/33944 | 来源类型 github_issue 暴露的待验证使用条件。
7. 维护坑 · 来源证据:Dynamic partition definitions leaking data across code locations
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个维护/版本相关的待验证问题:Dynamic partition definitions leaking data across code locations
- 对用户的影响:可能增加新用户试用和生产接入成本。
- 证据:community_evidence:github | https://github.com/dagster-io/dagster/issues/29693 | 来源类型 github_issue 暴露的待验证使用条件。
8. 维护坑 · 来源证据:Enhancement - Update to tomlkit 0.15.0, Remove dagster-dg-core pin "tomlkit<0.13.3"
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个维护/版本相关的待验证问题:Enhancement - Update to tomlkit 0.15.0, Remove dagster-dg-core pin "tomlkit<0.13.3"
- 对用户的影响:可能增加新用户试用和生产接入成本。
- 证据:community_evidence:github | https://github.com/dagster-io/dagster/issues/33943 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
9. 维护坑 · 维护活跃度未知
- 严重度:medium
- 证据强度:source_linked
- 发现:未记录 last_activity_observed。
- 对用户的影响:新项目、停更项目和活跃项目会被混在一起,推荐信任度下降。
- 证据:evidence.maintainer_signals | https://github.com/dagster-io/dagster | last_activity_observed missing
- 严重度:medium
- 证据强度:source_linked
- 发现:no_demo
- 证据:downstream_validation.risk_items | https://github.com/dagster-io/dagster | no_demo; severity=medium
11. 安全/权限坑 · 存在评分风险
- 严重度:medium
- 证据强度:source_linked
- 发现:no_demo
- 对用户的影响:风险会影响是否适合普通用户安装。
- 证据:risks.scoring_risks | https://github.com/dagster-io/dagster | no_demo; severity=medium
12. 维护坑 · issue/PR 响应质量未知
- 严重度:low
- 证据强度:source_linked
- 发现:issue_or_pr_quality=unknown。
- 对用户的影响:用户无法判断遇到问题后是否有人维护。
- 证据:evidence.maintainer_signals | https://github.com/dagster-io/dagster | issue_or_pr_quality=unknown
13. 维护坑 · 发布节奏不明确
- 严重度:low
- 证据强度:source_linked
- 发现:release_recency=unknown。
- 对用户的影响:安装命令和文档可能落后于代码,用户踩坑概率升高。
- 证据:evidence.maintainer_signals | https://github.com/dagster-io/dagster | release_recency=unknown
来源:Doramagic 发现、验证与编译记录