Doramagic 项目包 · 项目说明书

dagster 项目

一个用于数据资产开发、生产和观测的编排平台。

Dagster Overview & Architecture

Dagster 是一个云原生的数据管道编排器(data pipeline orchestrator),专注于整个数据资产开发生命周期的管理。根据顶层说明,它"for the whole development lifecycle, with integrated lineage and observability, a declarative programming mod...

章节 相关页面

继续阅读本节完整说明和来源证据。

Dagster 概览与架构

项目定位与核心价值

Dagster 是一个云原生的数据管道编排器(data pipeline orchestrator),专注于整个数据资产开发生命周期的管理。根据顶层说明,它"for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability",即覆盖开发、测试、部署、运行、观测的全过程,并内置血缘追踪与可观测能力 资料来源:README.md:1-50

Dagster 面向的核心对象是 数据资产(data assets),包括表、数据集、机器学习模型、报表等。用户通过 Python 函数以声明式方式定义这些资产,由 Dagster 在合适的时机执行函数并保持资产更新 资料来源:README.md:50-90。这种以资产为中心的范式区别于传统以任务(task)或作业(job)为中心的调度器。

最新稳定版本为 1.13.10(core)/ 0.29.10(libraries)。该版本修复了 DagsterInstance.get_latest_materialization_eventget_asset_records 在资产被 wipe 后仍可能返回陈旧物化事件的缺陷,表明实例层(Instance)的事件记录与资产状态机是整个系统的关键一致性边界 资料来源:README.md:release-notes。

核心编程模型

Dagster 的核心抽象由 @dg.asset 装饰器定义。如下示例展示了如何通过纯 Python 函数声明三个相互依赖的数据资产 资料来源:README.md:90-130

import dagster as dg
import pandas as pd
from sklearn.linear_model import LinearRegression

@dg.asset
def country_populations() -> pd.DataFrame:
    df = pd.read_html("https://tinyurl.com/mry64ebh")[0]
    ...

@dg.asset
def continent_change_model(country_populations: pd.DataFrame) -> LinearRegression:
    ...

@dg.asset
def continent_stats(country_populations: pd.DataFrame,
                    continent_change_model: LinearRegression) -> pd.DataFrame:
    ...

Dagster 会根据函数参数签名(如 country_populationscontinent_change_model)自动推导资产间的依赖关系,构成资产图(asset graph / DAG)。这种声明式模型也是 Dagster 强调"best-in-class testability"的基础——资产可在隔离环境中按图执行。

仓库结构与模块组织

仓库采用多包(monorepo)结构,主要由以下几类目录构成:

目录角色
python_modules/dagster核心框架包,提供 @asset@job@sensor@scheduleIOManager 等基础 API
python_modules/libraries/dagster-*第三方集成包,如 dagster-githubdagster-pagerdutydagster-papertrail
examples/端到端示例项目,覆盖 ETL、Modern Data Stack、dev→prod、ATProto Dashboard 等
js_modules/dg-docs-componentsdg docs 独立站点与 Dagster 应用内嵌文档共用的 React 组件库
python_modules/automationDocker 镜像构建与 PyPI 发布等内部自动化工具

例如 dagster-githubdagster-pagerdutydagster-papertrail 均为独立发布的轻量级集成库,统一以"docs redirect"模式指向官方文档站点 资料来源:python_modules/libraries/dagster-github/README.md:1-5python_modules/libraries/dagster-pagerduty/README.md:1-5python_modules/libraries/dagster-papertrail/README.md:1-5automation 模块则说明其角色为"a collection of tools for working with Docker images and publishing Dagster modules to PyPI" 资料来源:python_modules/automation/README.md:1-5

端到端架构与示例覆盖

仓库在 examples/ 下提供多个"由小到大"的参考实现,帮助用户建立对 Dagster 完整能力栈的认知:

综合这些示例可以归纳出典型的 Dagster 运行时架构:

flowchart LR
    A[Python 资产定义 @asset] --> B[Dagster Daemon / Webserver]
    B --> C[资产图 DAG 调度]
    C --> D[执行器: IOManager / Resource]
    D --> E[(外部系统: S3 / Snowflake / dbt / Airbyte)]
    B --> F[(DagsterInstance 事件存储)]
    F --> G[Web UI: Lineage / Activity]
    F --> H[Sensors / Schedules]
    H --> B

资产定义被加载到 Dagster Daemon 与 Webserver 中,由调度器依据 Schedules 与 Sensors 触发物化,运行期间通过 IOManager 与外部系统交互,物化与观测事件全部写入 DagsterInstance,供 UI 血缘视图与告警链路消费。

常见失败模式与可观测性

由于 Dagster 的状态强依赖于 DagsterInstance 中事件流的正确性,下列情形常引发排查需求:

  1. Wipe 后陈旧事件:当资产被 wipe 但仍有运行启动或新观测到达时,get_latest_materialization_event 可能返回已被清除的物化事件。该问题在 1.13.10 中已被修复 资料来源:README.md:release-notes。
  2. 集成库版本不一致:core 与 libraries 版本号(1.13.x / 0.29.x)解耦发布,升级时需逐一对齐,否则可能出现序列化或 API 不兼容。
  3. 环境隔离未配置:在多环境项目中若未使用 Resource 抽象与运行配置,本地执行可能覆盖生产数据 资料来源:examples/development_to_production/README.md:1-15

See Also

  • [Asset Definition Reference] — @asset 与资产图建模细节
  • [Schedules & Sensors] — 调度与事件驱动
  • [Resources & IOManagers] — 环境隔离与外部系统读写
  • [Integrations Library] — dagster-* 集成库索引

来源:https://github.com/dagster-io/dagster / 项目说明书

Core Concepts: Assets, Jobs, Ops, Graphs & Resources

Dagster 是一个面向数据流水线的编排框架,以"软件定义资产(Software-defined Assets)"作为核心抽象,辅以操作(Ops)、图(Graphs)、作业(Jobs)、资源(Resources)、调度(Schedules)与传感器(Sensors)等概念构成完整的编排模型 资料来源:README.md。在最新版本 1.13.10(core)中,框架进一步...

章节 相关页面

继续阅读本节完整说明和来源证据。

Dagster 核心概念:资产、作业、操作、图与资源

概述

Dagster 是一个面向数据流水线的编排框架,以"软件定义资产(Software-defined Assets)"作为核心抽象,辅以操作(Ops)、图(Graphs)、作业(Jobs)、资源(Resources)、调度(Schedules)与传感器(Sensors)等概念构成完整的编排模型 资料来源:README.md。在最新版本 1.13.10(core)中,框架进一步修复了 DagsterInstance.get_latest_materialization_event 在资产被擦除后仍可能返回陈旧物化事件的缺陷,这体现了核心抽象与实例存储层之间紧密的耦合关系。

下文按照由高层到低层的顺序,依次介绍资产、作业、操作与图、资源这四组核心概念,并指出它们之间的组合关系。

资产(Assets)

资产是 Dagster 中最核心的概念,代表一个可持久化、有版本且可被观测的数据对象,例如数据库表、文件、数据集分区等。资产通过 @asset 装饰器将一个普通 Python 函数声明为资产生产逻辑 资料来源:examples/quickstart_etl/README.md

from dagster import asset

@asset
def hackernews_topstories(hackernews_topstory_ids):
    ...

资产之间可以通过函数参数隐式建立上下游依赖关系。例如在 quickstart_etl 示例中,hackernews_topstories 接受 hackernews_topstory_ids 的输出作为输入,hackernews_stories_word_cloud 又以 hackernews_topstories 的结果作为输入 资料来源:examples/quickstart_etl/README.md

Dagster 支持多种资产特性以适应复杂场景:

特性用途示例场景
compute_kind为资产打标签,标识其计算来源HackerNews APIPlot 资料来源:examples/quickstart_etl/README.md
分区(Partitions)按时间或维度切分资产按小时分区的事件流 资料来源:examples/project_fully_featured/README.md
动态分区(Dynamic partitions)运行时创建新分区持续发现的新数据源 资料来源:examples/docs_projects/project_atproto_dashboard/README.md
元数据(Metadata)附加任意元数据到物化事件词云预览图 资料来源:examples/quickstart_etl/README.md
外部资产(External assets)编排 Dagster 之外的资产Snowflake Dynamic Tables 资料来源:examples/snowflake_cortex/dagster_snowflake/README.md

最近发布的 1.13.10 版本修复了资产擦除后 get_latest_materialization_event 返回陈旧数据的 Bug,开发者在编写"先擦除再重跑"的清理流水线时应升级至该版本以保证数据一致性。

操作(Ops)与图(Graphs)

当资产的生成过程需要拆分为多个离散的计算单元时,Dagster 提供了操作(Op)与图(Graph)作为更细粒度的组合单元。

  • Op:一个可独立执行、可复用、可测试的计算单元,对应一个 Python 函数。
  • Graph:将多个 Op 通过输入输出连接而成的有向无环图,描述它们之间的数据流向。

当一个资产的内部实现是一个 Graph 时,就称之为"图支撑的资产(Graph-backed asset)"。这种模式适用于"单个资产背后存在多步独立计算"或"已有现成的 Graph 需要复用"的场景 资料来源:examples/feature_graph_backed_assets/README.md

flowchart LR
    A[hackernews_topstory_ids<br/>@asset] --> B[hackernews_topstories<br/>@asset]
    B --> C[hackernews_stories_word_cloud<br/>@asset]
    subgraph 内部实现可选
        B1[Op: 拉取详情] --> B2[Op: 解析字段]
        B2 -.被组合为.-> B
    end

需要注意的是,即使资产内部使用 Ops 与 Graphs 组合,资产本身仍然是 Dagster 视图层与持久化层的主角,Ops 仅为实现细节。

作业(Jobs)

作业(Job)是一组资产或操作的可执行单元,通常对应一次完整的运行(Run)。作业是 Dagster 中"运行什么、何时运行、怎样运行"的承载体:

  • 执行单元:将若干资产或图组合为一个可被调度执行的工作单元。
  • 触发方式:可以手动触发、由 Schedule 按时间表触发,也可由 Sensor 响应外部事件触发 资料来源:examples/snowflake_cortex/dagster_snowflake/README.md
  • 环境切换:通过环境变量切换部署目标,例如 project_fully_featured 通过 DAGSTER_DEPLOYMENTprod / staging / 本地之间切换 资料来源:examples/project_fully_featured/README.md
  • 示例入口quickstart_etldefinitions.py 中定义了每日执行一次的 ETL 作业,并在 Dagster UI 中以 all_assets_job 的形式呈现 资料来源:examples/quickstart_etl/README.md

资源(Resources)

资源(Resource)是可在资产、操作或作业中注入的可复用外部依赖,例如数据库连接、HTTP 客户端、文件系统句柄、LLM 客户端等。资源与资产在概念上是正交的:资产描述"产出什么数据",资源描述"用什么去产出"。

例如 project_multi_tenant 示例将共享的 LLM 客户端封装为资源,并在多个代码位置之间复用 资料来源:examples/project_multi_tenant/README.mdproject_fully_featured 则在不同环境使用不同的存储后端:本地使用文件系统与 DuckDB,生产环境使用 S3 与 Snowflake 资料来源:examples/project_fully_featured/README.md

资源的关键价值在于:

  1. 解耦:将连接配置与业务逻辑分离,便于在不同环境中复用同一份资产代码。
  2. 可测试:在测试中注入伪造资源以隔离外部依赖。
  3. 集中治理:对凭据、连接池、超时等横切关注点统一管理。

常见模式与实践

See Also

来源:https://github.com/dagster-io/dagster / 项目说明书

Asset Materialization, Event Log & Instance API

Dagster 是一个面向数据流水线的编排框架,其核心抽象是 Asset(资产)。当一次计算生成或更新某个资产时,会产生一次 materialization(物化)事件,记录该资产在某一时刻的状态、运行来源以及附加元数据。这些事件被持久化到 Event Log(事件日志)中,并可通过 DagsterInstance API 进行查询、审计与回溯。资料来源:[README.m...

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 基本概念

继续阅读本节完整说明和来源证据。

章节 元数据与观测事件

继续阅读本节完整说明和来源证据。

章节 分区与擦除

继续阅读本节完整说明和来源证据。

Asset Materialization、Event Log 与 Instance API

概述

Dagster 是一个面向数据流水线的编排框架,其核心抽象是 Asset(资产)。当一次计算生成或更新某个资产时,会产生一次 materialization(物化)事件,记录该资产在某一时刻的状态、运行来源以及附加元数据。这些事件被持久化到 Event Log(事件日志)中,并可通过 DagsterInstance API 进行查询、审计与回溯。资料来源:README.md:1-50

在最新发布的 1.13.10(core)/ 0.29.10(libraries)版本中,针对 DagsterInstance.get_latest_materialization_eventget_asset_records 返回已擦除(wiped)资产的过期物化事件这一缺陷进行了修复。资料来源:CHANGES.md(社区上下文中的发布说明)

Asset Materialization 的工作原理

基本概念

Asset materialization 是 Dagster 用来表达"某个资产被成功生成"的核心事件。每个 @asset 装饰的函数在执行完成后,Dagster 会自动产出一条 ASSET_MATERIALIZATION 类型的事件,开发者还可以通过 MetadataValue 附加任意元数据(如记录数、数据预览、词云图片等)。资料来源:examples/quickstart_etl/README.md:50-130

例如在 quickstart_etl 示例中:

  • hackernews_topstory_ids 从 HackerNews API 拉取热门故事 ID 列表;
  • hackernews_topstories 基于上一步的输出抓取详细故事数据;
  • hackernews_stories_word_cloud 依据前一步的输出生成词云可视化。

这三者形成一条由上至下的依赖链,每次运行时都会产生对应的 ASSET_MATERIALIZATION 事件,写入 Event Log。资料来源:examples/quickstart_etl/README.md:30-80

元数据与观测事件

除了物化事件,Dagster 还支持 observation(观测)事件,用于记录资产未发生新的物化但外部状态发生了变化的情况(例如对分区执行擦除操作时)。观测事件同样写入 Event Log,并参与"最新物化"语义计算,这也是 1.13.10 修复缺陷的关键场景。资料来源:CHANGES.md(社区上下文)

分区与擦除

project_atproto_dashboard 示例中,资产按 dynamic partitions(动态分区)组织,并通过 declarative automation 驱动更新。资料来源:examples/docs_projects/project_atproto_dashboard/README.md:10-25 当某个分区的数据被擦除时,Dagster+ 在旧版本中可能错误地把已擦除分区的物化作为整个资产的"最新"记录返回,1.13.10 对此行为进行了修正。资料来源:CHANGES.md(社区上下文)

Event Log 系统

Event Log 是 Dagster 持久化所有执行历史(runs、materializations、observations、expectation results 等)的地方,UI 中的 ActivityAsset Details 页面以及 Asset Lineage 视图都直接消费 Event Log 的数据。资料来源:examples/quickstart_etl/README.md:90-130

flowchart LR
    A["@asset 函数执行"] --> B["ASSET_MATERIALIZATION<br/>事件"]
    A2["observation / wipe"] --> B2["OBSERVATION / WIPE<br/>事件"]
    B --> C[Event Log 存储]
    B2 --> C
    C --> D["DagsterInstance<br/>查询 API"]
    C --> E["UI: Activity / Lineage"]
    D --> F["用户代码 & CLI"]
    E --> G["数据工程师"]

Event Log 中的每条事件都带有一个稳定的 asset_key,查询接口(详见下一节)正是基于 asset_key 在日志中筛选和排序,从而得到"最新"或"全部"物化记录。资料来源:examples/quickstart_etl/README.md:100-130

DagsterInstance API 与常见查询

DagsterInstance 是用户与 Event Log 交互的入口对象,提供包括但不限于以下与物化相关的查询方法:

API 方法主要用途
get_latest_materialization_event获取某个资产的最新 ASSET_MATERIALIZATION 事件
get_asset_records返回一个或多个资产的物化历史记录集合
wipe_asset / 分区擦除移除资产(或其分区)的历史物化与观测事件

资料来源:CHANGES.md(社区上下文中提及的相关 API)

1.13.10 修复的行为变化

在 1.13.10 之前,如果一个资产曾被擦除(wiped),那么一旦有新的运行(run)开始针对该资产,或报告了一条 observation,get_latest_materialization_eventget_asset_records 仍可能返回擦除前的过期物化记录。在 Dagster+ 上,分区擦除也存在类似问题。该 bug 已在 1.13.10 修复,调用方应预期返回"擦除后状态"或更严格的一致性结果。资料来源:CHANGES.md(社区上下文)

在项目中的使用模式

常见失败模式与排查建议

  1. 读取到过期物化:在 1.13.10 之前可能出现。升级 core 至 1.13.10 后验证返回值是否符合预期,并检查是否有未完成的 run 导致事件乱序。资料来源:CHANGES.md(社区上下文)
  2. 分区擦除后仍能查到数据:在 Dagster+ 上若升级到 1.13.10 仍出现,应确认 wipe 操作的目标分区与 get_asset_records 的过滤条件一致。资料来源:CHANGES.md(社区上下文)
  3. 元数据过大导致查询缓慢:在 hackernews_topstories 中以 Markdown 形式记录了 DataFrame 预览,过大的 preview 会增加 Event Log 体积,建议仅记录必要列。资料来源:examples/quickstart_etl/README.md:90-130
  4. 跨代码位置混淆:多租户部署下需显式选择 DagsterInstance 的目标 code location,否则可能读到错误的物化记录。资料来源:examples/project_multi_tenant/README.md:1-50

See Also

资料来源:CHANGES.md(社区上下文中提及的相关 API)

Deployment Options & Infrastructure

Dagster 提供从本地开发到生产云端的多种部署形态,覆盖单机调试、多代码位置工作区、混合(Hybrid)部署与托管式 Dagster+ 等场景。本页基于仓库内示例与文档梳理其部署选项与基础设施组成。

章节 相关页面

继续阅读本节完整说明和来源证据。

部署形态总览

Dagster 的部署按运行位置与托管方式可划分为以下几类:

形态适用场景关键特征
本地开发(Local)个人调试、快速迭代单进程 dagster devdg dev,Webserver 与 Daemon 同进程启动
工作区(Workspace)多项目本地编排通过 workspace.yaml 聚合多个 code location
多租户(Multi-tenant)团队/企业内多业务线隔离每业务线独立 code location、共享 LLM 资源、隔离数据库
Dagster+ Serverless免运维托管官方云服务,CI/CD 与分支开箱即用
Dagster+ Hybrid数据合规场景控制面在云端,用户代码与基础设施自托管
Experimental试验性功能API 可能随时变更,资料来源:examples/experimental/README.md

下图为典型的本地/云端混合部署架构:

flowchart LR
  Dev[开发者本地] -->|dagster dev / dg dev| WS[Webserver:3000]
  WS --> Inst[(Dagster Instance\nSQLite/Postgres)]
  Daemon[Daemon\nSchedules/Sensors/Backfills] --> Inst
  CodeLoc[Code Location\nPython 包] --> WS
  CodeLoc --> Daemon
  Hybrid[Dagster+ Hybrid Agent] -.->|用户 VPC 内| CodeLoc
  Serverless[Dagster+ Serverless] -.->|官方托管| CodeLoc

本地开发与快速启动

最常见的本地启动方式是 dagster dev,它会同时拉起 Webserver 与 Daemon。quickstart_etl 示例展示了完整流程:先以 pip install -e ".[dev]" 编辑模式安装项目,再执行 dagster dev,最后访问 http://localhost:3000 资料来源:examples/quickstart_etl/README.md

新引入的 dg CLI 是组件化(Components)项目推荐的入口。project_atproto_dashboardproject_prompt_eng 均使用 dg dev 启动,资料来源:examples/docs_projects/project_atproto_dashboard/README.mdexamples/docs_projects/project_prompt_eng/README.md。两者的共同模式是:先在 dbt_project/ 中生成 manifest,再用 uv 创建虚拟环境安装项目,最后通过 dg dev 一键启动。

本地开发时的关键配置点包括:

多代码位置与多租户架构

当组织内有多个业务线时,推荐使用多 code location 部署。project_multi_tenant 演示了典型布局:beacon_hq/harbor_outfitters/summit_financial/ 各自承载报表、目录与风险评估资产,shared/ 承载共享 LLM 资源与 IO Manager,资料来源:examples/project_multi_tenant/README.md

工作区通过 workspace.yaml 注册所有 code location,部署到 Dagster+ 时则使用 dagster_cloud.yaml 进行等价配置,资料来源:examples/project_multi_tenant/README.md。这种"一个 workspace,多个 location"的模式带来三项隔离能力:

  1. 运行时隔离:通过 vendor/ 下的 marker 包(如 catalog_coach_runtime==1.4.0)按 location 安装不同依赖版本,资料来源:examples/project_multi_tenant/README.md
  2. 数据隔离:每个 location 写入独立 DuckDB 文件到 data/ 子目录,资料来源:examples/project_multi_tenant/README.md
  3. 模型隔离:通过 HARBOR_OUTFITTERS_MODELSUMMIT_FINANCIAL_MODELBEACON_HQ_MODEL 等环境变量为不同 location 指定不同 LLM,资料来源:examples/project_multi_tenant/README.md

类似的多项目结构也出现在 project_databricks_and_snowflake 中,它将 deployments/localprojects/* 分离,外部通过 dg dev 统一加载,资料来源:examples/project_databricks_and_snowflake/README.md

基础设施组件与社区关注点

一次完整的 Dagster 部署至少包含以下基础设施组件:

社区近期发布(1.13.10 / 0.29.10)修复了与 DagsterInstance.get_latest_materialization_eventget_asset_records 相关的 staleness 问题:当资产被 wipe 后,若新的 run 或 observation 启动,API 仍可能返回 wipe 前的旧 materialization 记录;Dagster+ 在 partition wipe 场景下也存在类似问题。该修复直接影响运维侧对"资产最新一次成功执行时间"的判断,部署时建议升级到 1.13.10 或更高版本以避免上报指标异常。

See Also

来源:https://github.com/dagster-io/dagster / 项目说明书

Automation: Schedules, Sensors & Declarative Automation

Dagster 的自动化(Automation)体系提供了多种触发工作流运行(run)的机制,使开发者能够按时间、按事件或按数据状态来驱动资产(asset)物化。Automation 主要由三大支柱组成:Schedules(定时调度)、Sensors(传感器) 和 Declarative Automation(声明式自动化)。三者协同覆盖了从固定时间窗口到复杂依赖触发的各种...

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 用途与定义

继续阅读本节完整说明和来源证据。

章节 关键属性

继续阅读本节完整说明和来源证据。

章节 故障模式

继续阅读本节完整说明和来源证据。

概述

Dagster 的自动化(Automation)体系提供了多种触发工作流运行(run)的机制,使开发者能够按时间、按事件或按数据状态来驱动资产(asset)物化。Automation 主要由三大支柱组成:Schedules(定时调度)Sensors(传感器)Declarative Automation(声明式自动化)。三者协同覆盖了从固定时间窗口到复杂依赖触发的各种场景。

资料来源:docs/docs/guides/automate/index.md:1-30

graph LR
    A[Automation 自动化体系] --> B[Schedules<br/>定时调度]
    A --> C[Sensors<br/>事件驱动]
    A --> D[Declarative Automation<br/>声明式自动化]
    B --> B1[cron / @schedule]
    C --> C1[SensorEvaluationContext]
    C --> C2[多资产传感器]
    D --> D1[AutomationCondition]
    D --> D2[分区分层策略]

Schedules(定时调度)

用途与定义

Schedules 用于按固定时间间隔触发作业(job)或资产图。基于 @schedule 装饰器或 ScheduleDefinition 创建,常见形式包括 cron 表达式与执行函数组合。Schedules 适合"在每天凌晨 2 点运行 ETL"这类确定性时间任务。

关键属性

属性说明
cron_schedule标准 cron 表达式,定义触发时间
execution_timezone调度所用的时区
jobselection调度的目标作业或资产选择
execution_fn自定义执行函数,返回 RunRequest
@schedule(cron_schedule="0 2 * * *", job=my_etl_job)
def daily_etl_schedule(context):
    return RunRequest()

故障模式

  • 时区错误:未设置 execution_timezone 时默认按 UTC 计算,可能导致预期外的偏移。
  • 重叠触发:当运行时间超过调度间隔时,可能出现并发启动,需要通过 max_concurrent_runs 或 tag 进行约束。

资料来源:docs/docs/guides/automate/schedules/defining-schedules.md:1-80

Sensors(事件驱动传感器)

用途与定义

Sensors 是基于事件或外部状态变化触发运行的组件。通过 @sensor 装饰器创建,每次 tick 会调用 evaluate_tick 检查是否需要触发 RunRequest。Sensors 适合"当 S3 出现新文件时立即处理"或"轮询数据库是否有新数据"等场景。

多资产传感器

@multi_asset_sensor 允许针对多个资产的 materialization 事件进行响应,并根据分区状态、事件 ID 等信息生成下游运行请求。

关键实践

  • SensorEvaluationContext 提供 cursor 属性,用于实现增量轮询,避免重复触发。
  • Sensors 应当是幂等的,重复触发同一游标不应产生错误状态。
  • 复杂逻辑建议拆分为多个 sensor 组合,而非在一个 sensor 内堆叠所有判断。

资料来源:docs/docs/guides/automate/sensors/using-sensors.md:1-120

Declarative Automation(声明式自动化)

核心概念

Declarative Automation 是 Dagster 较新的自动化层,通过 AutomationCondition 在资产级别声明"何时应当重新物化"。它将"哪些上游状态变化应当触发我"的规则以声明形式写进资产定义,而非通过 schedule/sensor 命令式地编排。

常用 AutomationCondition

条件触发语义
eager()任一上游更新即触发
missing()资产尚未被物化时触发
in_progress()上一次运行正在进行时触发
cron(...)按 cron 时间窗口触发

自定义与组合

通过 &(与)、|(或)、~(非)操作符组合多个 AutomationCondition,可表达复杂策略。例如:~missing() & cron("0 9 * * MON") 表示"周一上午 9 点运行,但仅在该资产尚未物化时"。

@asset(automation_condition=eager())
def downstream_asset(): ...

与 Schedule/Sensor 的关系

Declarative Automation 并非取代,而是补充。当资产规则较为复杂或需要跨多个资产统一策略时,优先使用 AutomationCondition;而当需要跨外部系统(如 API 轮询、定时清理)触发时,仍推荐 Sensors/Schedules。

资料来源:docs/docs/guides/automate/declarative-automation/customizing-automation-conditions.md:1-100

版本说明与相关修复

在最新发布版本(1.13.10 core / 0.29.10 libraries)中修复了一项关于 DagsterInstance.get_latest_materialization_eventget_asset_records 的 bug:当时序上 wipe 某资产后,新的 run 启动或 observation 上报时,可能错误地返回 wipe 之前的 stale materialization。该修复同时涉及 Dagster+ 中分区被擦除时分区 materialization 残留为最新值的同类问题。该修复对自动化层的影响在于:声明式自动化在判断 missing()eager() 条件时,不再会误以为资产已存在历史物化,从而避免了误判触发。

选择建议

场景推荐方式
固定时间窗口触发Schedule
外部系统事件触发Sensor
资产依赖驱动Declarative Automation
跨系统复合编排Schedule + Sensor + AutomationCondition 组合

See Also

资料来源:docs/docs/guides/automate/index.md:1-30

失败模式与踩坑日记

保留 Doramagic 在发现、验证和编译中沉淀的项目专属风险,不把社区讨论只当作装饰信息。

high 来源证据:Customize Dagster UI logo and colors / whitelabeling

可能增加新用户试用和生产接入成本。

medium 来源证据:Sqlalchemy Errors in Dagster

可能增加新用户试用和生产接入成本。

medium 来源证据:A single un-deserializable bulk_actions row breaks the entire get_backfills read (Backfills page won't load)

可能增加新用户试用和生产接入成本。

medium 来源证据:[UI] Searching for "true" or "false" in Jobs and Schedules throws an error

可能增加新用户试用和生产接入成本。

Pitfall Log / 踩坑日志

项目:dagster-io/dagster

摘要:发现 13 个潜在踩坑项,其中 1 个为 high/blocking;最高优先级:安装坑 - 来源证据:Customize Dagster UI logo and colors / whitelabeling。

1. 安装坑 · 来源证据:Customize Dagster UI logo and colors / whitelabeling

  • 严重度:high
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Customize Dagster UI logo and colors / whitelabeling
  • 对用户的影响:可能增加新用户试用和生产接入成本。
  • 证据:community_evidence:github | https://github.com/dagster-io/dagster/issues/24989 | 来源类型 github_issue 暴露的待验证使用条件。

2. 安装坑 · 来源证据:Sqlalchemy Errors in Dagster

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Sqlalchemy Errors in Dagster
  • 对用户的影响:可能增加新用户试用和生产接入成本。
  • 证据:community_evidence:github | https://github.com/dagster-io/dagster/issues/29674 | 来源类型 github_issue 暴露的待验证使用条件。

3. 配置坑 · 来源证据:A single un-deserializable bulk_actions row breaks the entire get_backfills read (Backfills page won't load)

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个配置相关的待验证问题:A single un-deserializable bulk_actions row breaks the entire get_backfills read (Backfills page won't load)
  • 对用户的影响:可能增加新用户试用和生产接入成本。
  • 证据:community_evidence:github | https://github.com/dagster-io/dagster/issues/33946 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

4. 配置坑 · 来源证据:[UI] Searching for "true" or "false" in Jobs and Schedules throws an error

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个配置相关的待验证问题:[UI] Searching for "true" or "false" in Jobs and Schedules throws an error
  • 对用户的影响:可能增加新用户试用和生产接入成本。
  • 证据:community_evidence:github | https://github.com/dagster-io/dagster/issues/33945 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

5. 能力坑 · 能力判断依赖假设

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:README/documentation is current enough for a first validation pass.
  • 对用户的影响:假设不成立时,用户拿不到承诺的能力。
  • 证据:capability.assumptions | https://github.com/dagster-io/dagster | README/documentation is current enough for a first validation pass.

6. 维护坑 · 来源证据:Add executor_def for AutomationConditionSensorDefinition

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个维护/版本相关的待验证问题:Add executor_def for AutomationConditionSensorDefinition
  • 对用户的影响:可能影响升级、迁移或版本选择。
  • 证据:community_evidence:github | https://github.com/dagster-io/dagster/issues/33944 | 来源类型 github_issue 暴露的待验证使用条件。

7. 维护坑 · 来源证据:Dynamic partition definitions leaking data across code locations

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个维护/版本相关的待验证问题:Dynamic partition definitions leaking data across code locations
  • 对用户的影响:可能增加新用户试用和生产接入成本。
  • 证据:community_evidence:github | https://github.com/dagster-io/dagster/issues/29693 | 来源类型 github_issue 暴露的待验证使用条件。

8. 维护坑 · 来源证据:Enhancement - Update to tomlkit 0.15.0, Remove dagster-dg-core pin "tomlkit<0.13.3"

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个维护/版本相关的待验证问题:Enhancement - Update to tomlkit 0.15.0, Remove dagster-dg-core pin "tomlkit<0.13.3"
  • 对用户的影响:可能增加新用户试用和生产接入成本。
  • 证据:community_evidence:github | https://github.com/dagster-io/dagster/issues/33943 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

9. 维护坑 · 维护活跃度未知

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:未记录 last_activity_observed。
  • 对用户的影响:新项目、停更项目和活跃项目会被混在一起,推荐信任度下降。
  • 证据:evidence.maintainer_signals | https://github.com/dagster-io/dagster | last_activity_observed missing
  • 严重度:medium
  • 证据强度:source_linked
  • 发现:no_demo
  • 证据:downstream_validation.risk_items | https://github.com/dagster-io/dagster | no_demo; severity=medium

11. 安全/权限坑 · 存在评分风险

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:no_demo
  • 对用户的影响:风险会影响是否适合普通用户安装。
  • 证据:risks.scoring_risks | https://github.com/dagster-io/dagster | no_demo; severity=medium

12. 维护坑 · issue/PR 响应质量未知

  • 严重度:low
  • 证据强度:source_linked
  • 发现:issue_or_pr_quality=unknown。
  • 对用户的影响:用户无法判断遇到问题后是否有人维护。
  • 证据:evidence.maintainer_signals | https://github.com/dagster-io/dagster | issue_or_pr_quality=unknown

13. 维护坑 · 发布节奏不明确

  • 严重度:low
  • 证据强度:source_linked
  • 发现:release_recency=unknown。
  • 对用户的影响:安装命令和文档可能落后于代码,用户踩坑概率升高。
  • 证据:evidence.maintainer_signals | https://github.com/dagster-io/dagster | release_recency=unknown

来源:Doramagic 发现、验证与编译记录