Dagster项目实战:实现自定义运行协调器(Run Coordinator)
概述
在Dagster数据编排平台的实际应用中,有时需要根据业务需求定制运行协调器(Run Coordinator)的行为。本文将详细介绍如何在Dagster项目中实现一个自定义的运行协调器,通过标签(tag)机制来控制作业运行的权限和并发。
项目结构设计
首先我们需要规划项目的目录结构,合理的结构有助于代码管理和维护:
项目根目录/
|- 自定义组件目录/
| |- run_coordinator/
| | |- __init__.py
| | |- custom_run_coordinator.py
| |- pyproject.toml
|
|- 用户代码目录/
| |- dagster/
| | |- __init__.py
| | |- definitions.py
| |- pyproject.toml
|
|- 部署配置/
| |- values.yaml
|
|- Dockerfile.自定义组件
|- Dockerfile.用户代码
核心组件实现
自定义运行协调器
我们基于Dagster的QueuedRunCoordinator
类进行扩展,增加基于标签的过滤功能:
from typing import Any, List, Mapping, NamedTuple, Optional, Sequence
from typing_extensions import Self
from dagster import _check as check
from dagster._core.run_coordinator.base import SubmitRunContext
from dagster._core.run_coordinator.queued_run_coordinator import QueuedRunCoordinator
from dagster._core.storage.dagster_run import DagsterRun
from dagster._serdes import ConfigurableClassData
class 运行队列配置(NamedTuple):
"""封装运行队列的配置参数"""
def __new__(
cls,
最大并发运行数: int,
标签并发限制: Optional[Sequence[Mapping[str, Any]]],
最大用户代码失败重试次数: int = 0,
用户代码失败重试延迟: int = 60,
是否阻塞操作并发限制的运行: bool = True,
操作并发槽缓冲区: int = 0,
):
# 参数校验和初始化逻辑...
def 应用并发设置(self, 并发设置: Mapping[str, Any]) -> "运行队列配置":
"""应用新的并发设置"""
# 实现细节...
class 自定义运行协调器(QueuedRunCoordinator):
"""扩展QueuedRunCoordinator,增加基于标签值的过滤功能"""
def __init__(self, *args, **kwargs):
"""初始化方法"""
super().__init__(*args, **kwargs)
def 获取运行队列配置(self) -> 运行队列配置:
"""获取当前运行队列配置"""
# 实现细节...
def _是否允许运行(self, dagster运行: DagsterRun) -> bool:
"""基于标签检查是否允许运行"""
运行标签 = dagster运行.tags or {}
# 如果有标签并发限制,项目必须包含在内才能允许作业运行
if self._标签并发限制:
允许的值 = {限制["value"] for 限制 in self._标签并发限制
if 限制["key"] == "foo"}
if "foo" in 运行标签 and 运行标签.get("foo") in 允许的值:
return True
return False
# 如果没有定义标签并发限制,不允许任何运行
return False
def 提交运行(self, 上下文: SubmitRunContext) -> DagsterRun:
"""重写提交运行方法,增加标签过滤"""
dagster运行 = 上下文.dagster_run
运行标签 = dagster运行.tags or {}
if not self._是否允许运行(dagster运行):
self._实例.报告运行已取消(
dagster运行,
message=f"运行被自定义运行协调器拒绝: 标签权限检查失败..."
)
return dagster运行
# 通过所有过滤器后,将运行提交到队列
return super().提交运行(上下文)
部署配置
在Kubernetes环境中,我们需要通过Helm chart配置自定义运行协调器:
dagsterDaemon:
runCoordintor:
启用: true
类型: 自定义运行协调器
配置:
自定义运行协调器:
模块: run_coordinator.custom_run_coordinator
类: 自定义运行协调器
配置:
标签并发限制:
- { key: foo, value: bar, limit: 1}
Docker容器配置
为了部署自定义组件,我们需要修改Dockerfile:
# 基础镜像
FROM dagster/dagster-k8s
# 安装自定义组件
COPY 自定义组件目录 /opt/自定义组件目录
RUN pip install -e /opt/自定义组件目录
实现原理分析
-
继承与扩展:我们基于Dagster内置的
QueuedRunCoordinator
进行扩展,保留了原有的队列管理功能,同时增加了标签过滤逻辑。 -
标签过滤机制:
- 通过
_是否允许运行
方法实现核心过滤逻辑 - 检查运行标签是否匹配预设的并发限制配置
- 不匹配的运行业务会被直接取消
- 通过
-
配置管理:
- 使用
运行队列配置
类封装所有运行参数 - 支持动态更新并发设置
- 使用
-
错误处理:
- 被拒绝的运行会被标记为"已取消"状态
- 提供清晰的拒绝原因信息
实际应用场景
这种自定义运行协调器特别适合以下场景:
-
多租户环境:不同团队/项目共享同一个Dagster实例时,通过标签控制运行权限
-
资源配额管理:为不同业务分配不同的运行配额
-
优先级控制:高优先级业务可以抢占低优先级业务的运行资源
-
环境隔离:区分测试环境和生产环境的运行
性能考量
在实现自定义运行协调器时,需要注意以下性能因素:
-
过滤逻辑复杂度:保持
_是否允许运行
方法高效,避免复杂计算 -
并发控制:合理设置
最大并发运行数
和标签并发限制
-
错误处理开销:被拒绝的运行业务应快速失败,减少资源占用
扩展建议
-
动态配置:可以结合数据库或配置中心实现运行规则的动态加载
-
审计日志:记录运行决策的详细日志,便于问题排查
-
指标监控:暴露Prometheus指标,监控运行协调器的性能和工作状态
-
多维度过滤:除了标签,还可以考虑基于用户、时间等其他维度进行过滤
总结
通过实现自定义运行协调器,我们可以灵活控制Dagster中的作业运行行为,满足各种复杂的业务需求。本文介绍的实现方法保持了与Dagster原生组件的良好兼容性,同时提供了足够的扩展空间。在实际应用中,可以根据具体需求进一步定制过滤逻辑和运行策略。
- DDeepSeek-V3.1-BaseDeepSeek-V3.1 是一款支持思考模式与非思考模式的混合模型Python00
- HHunyuan-MT-7B腾讯混元翻译模型主要支持33种语言间的互译,包括中国五种少数民族语言。00
GitCode-文心大模型-智源研究院AI应用开发大赛
GitCode&文心大模型&智源研究院强强联合,发起的AI应用开发大赛;总奖池8W,单人最高可得价值3W奖励。快来参加吧~085CommonUtilLibrary
快速开发工具类收集,史上最全的开发工具类,欢迎Follow、Fork、StarJava05GitCode百大开源项目
GitCode百大计划旨在表彰GitCode平台上积极推动项目社区化,拥有广泛影响力的G-Star项目,入选项目不仅代表了GitCode开源生态的蓬勃发展,也反映了当下开源行业的发展趋势。07GOT-OCR-2.0-hf
阶跃星辰StepFun推出的GOT-OCR-2.0-hf是一款强大的多语言OCR开源模型,支持从普通文档到复杂场景的文字识别。它能精准处理表格、图表、数学公式、几何图形甚至乐谱等特殊内容,输出结果可通过第三方工具渲染成多种格式。模型支持1024×1024高分辨率输入,具备多页批量处理、动态分块识别和交互式区域选择等创新功能,用户可通过坐标或颜色指定识别区域。基于Apache 2.0协议开源,提供Hugging Face演示和完整代码,适用于学术研究到工业应用的广泛场景,为OCR领域带来突破性解决方案。00openHiTLS
旨在打造算法先进、性能卓越、高效敏捷、安全可靠的密码套件,通过轻量级、可剪裁的软件技术架构满足各行业不同场景的多样化要求,让密码技术应用更简单,同时探索后量子等先进算法创新实践,构建密码前沿技术底座!C0381- WWan2.2-S2V-14B【Wan2.2 全新发布|更强画质,更快生成】新一代视频生成模型 Wan2.2,创新采用MoE架构,实现电影级美学与复杂运动控制,支持720P高清文本/图像生成视频,消费级显卡即可流畅运行,性能达业界领先水平Python00
- GGLM-4.5-AirGLM-4.5 系列模型是专为智能体设计的基础模型。GLM-4.5拥有 3550 亿总参数量,其中 320 亿活跃参数;GLM-4.5-Air采用更紧凑的设计,拥有 1060 亿总参数量,其中 120 亿活跃参数。GLM-4.5模型统一了推理、编码和智能体能力,以满足智能体应用的复杂需求Jinja00
Yi-Coder
Yi Coder 编程模型,小而强大的编程助手HTML013
热门内容推荐
最新内容推荐
项目优选









