首页
/ Dagster项目实战:实现自定义运行协调器(Run Coordinator)

Dagster项目实战:实现自定义运行协调器(Run Coordinator)

2025-05-17 12:15:44作者:秋阔奎Evelyn

概述

在Dagster数据编排平台的实际应用中,有时需要根据业务需求定制运行协调器(Run Coordinator)的行为。本文将详细介绍如何在Dagster项目中实现一个自定义的运行协调器,通过标签(tag)机制来控制作业运行的权限和并发。

项目结构设计

首先我们需要规划项目的目录结构,合理的结构有助于代码管理和维护:

项目根目录/
  |- 自定义组件目录/
  |  |- run_coordinator/
  |  |  |- __init__.py
  |  |  |- custom_run_coordinator.py
  |  |- pyproject.toml
  |
  |- 用户代码目录/
  |  |- dagster/
  |  |  |- __init__.py
  |  |  |- definitions.py
  |  |- pyproject.toml
  |
  |- 部署配置/
  |  |- values.yaml
  |
  |- Dockerfile.自定义组件
  |- Dockerfile.用户代码

核心组件实现

自定义运行协调器

我们基于Dagster的QueuedRunCoordinator类进行扩展,增加基于标签的过滤功能:

from typing import Any, List, Mapping, NamedTuple, Optional, Sequence
from typing_extensions import Self
from dagster import _check as check
from dagster._core.run_coordinator.base import SubmitRunContext
from dagster._core.run_coordinator.queued_run_coordinator import QueuedRunCoordinator
from dagster._core.storage.dagster_run import DagsterRun
from dagster._serdes import ConfigurableClassData

class 运行队列配置(NamedTuple):
    """封装运行队列的配置参数"""
    def __new__(
        cls,
        最大并发运行数: int,
        标签并发限制: Optional[Sequence[Mapping[str, Any]]],
        最大用户代码失败重试次数: int = 0,
        用户代码失败重试延迟: int = 60,
        是否阻塞操作并发限制的运行: bool = True,
        操作并发槽缓冲区: int = 0,
    ):
        # 参数校验和初始化逻辑...

    def 应用并发设置(self, 并发设置: Mapping[str, Any]) -> "运行队列配置":
        """应用新的并发设置"""
        # 实现细节...

class 自定义运行协调器(QueuedRunCoordinator):
    """扩展QueuedRunCoordinator,增加基于标签值的过滤功能"""
    
    def __init__(self, *args, **kwargs):
        """初始化方法"""
        super().__init__(*args, **kwargs)
    
    def 获取运行队列配置(self) -> 运行队列配置:
        """获取当前运行队列配置"""
        # 实现细节...
    
    def _是否允许运行(self, dagster运行: DagsterRun) -> bool:
        """基于标签检查是否允许运行"""
        运行标签 = dagster运行.tags or {}
        
        # 如果有标签并发限制,项目必须包含在内才能允许作业运行
        if self._标签并发限制:
            允许的值 = {限制["value"] for 限制 in self._标签并发限制 
                     if 限制["key"] == "foo"}
            
            if "foo" in 运行标签 and 运行标签.get("foo") in 允许的值:
                return True
            return False
        
        # 如果没有定义标签并发限制,不允许任何运行
        return False
    
    def 提交运行(self, 上下文: SubmitRunContext) -> DagsterRun:
        """重写提交运行方法,增加标签过滤"""
        dagster运行 = 上下文.dagster_run
        运行标签 = dagster运行.tags or {}
        
        if not self._是否允许运行(dagster运行):
            self._实例.报告运行已取消(
                dagster运行,
                message=f"运行被自定义运行协调器拒绝: 标签权限检查失败..."
            )
            return dagster运行
        
        # 通过所有过滤器后,将运行提交到队列
        return super().提交运行(上下文)

部署配置

在Kubernetes环境中,我们需要通过Helm chart配置自定义运行协调器:

dagsterDaemon:
  runCoordintor:
    启用: true
    类型: 自定义运行协调器
    配置:
      自定义运行协调器:
        模块: run_coordinator.custom_run_coordinator
        类: 自定义运行协调器
        配置:
          标签并发限制:
            - { key: foo, value: bar, limit: 1}

Docker容器配置

为了部署自定义组件,我们需要修改Dockerfile:

# 基础镜像
FROM dagster/dagster-k8s

# 安装自定义组件
COPY 自定义组件目录 /opt/自定义组件目录
RUN pip install -e /opt/自定义组件目录

实现原理分析

  1. 继承与扩展:我们基于Dagster内置的QueuedRunCoordinator进行扩展,保留了原有的队列管理功能,同时增加了标签过滤逻辑。

  2. 标签过滤机制

    • 通过_是否允许运行方法实现核心过滤逻辑
    • 检查运行标签是否匹配预设的并发限制配置
    • 不匹配的运行业务会被直接取消
  3. 配置管理

    • 使用运行队列配置类封装所有运行参数
    • 支持动态更新并发设置
  4. 错误处理

    • 被拒绝的运行会被标记为"已取消"状态
    • 提供清晰的拒绝原因信息

实际应用场景

这种自定义运行协调器特别适合以下场景:

  1. 多租户环境:不同团队/项目共享同一个Dagster实例时,通过标签控制运行权限

  2. 资源配额管理:为不同业务分配不同的运行配额

  3. 优先级控制:高优先级业务可以抢占低优先级业务的运行资源

  4. 环境隔离:区分测试环境和生产环境的运行

性能考量

在实现自定义运行协调器时,需要注意以下性能因素:

  1. 过滤逻辑复杂度:保持_是否允许运行方法高效,避免复杂计算

  2. 并发控制:合理设置最大并发运行数标签并发限制

  3. 错误处理开销:被拒绝的运行业务应快速失败,减少资源占用

扩展建议

  1. 动态配置:可以结合数据库或配置中心实现运行规则的动态加载

  2. 审计日志:记录运行决策的详细日志,便于问题排查

  3. 指标监控:暴露Prometheus指标,监控运行协调器的性能和工作状态

  4. 多维度过滤:除了标签,还可以考虑基于用户、时间等其他维度进行过滤

总结

通过实现自定义运行协调器,我们可以灵活控制Dagster中的作业运行行为,满足各种复杂的业务需求。本文介绍的实现方法保持了与Dagster原生组件的良好兼容性,同时提供了足够的扩展空间。在实际应用中,可以根据具体需求进一步定制过滤逻辑和运行策略。

登录后查看全文
热门项目推荐
相关项目推荐

项目优选

收起
openGauss-serveropenGauss-server
openGauss kernel ~ openGauss is an open source relational database management system
C++
136
187
RuoYi-Vue3RuoYi-Vue3
🎉 (RuoYi)官方仓库 基于SpringBoot,Spring Security,JWT,Vue3 & Vite、Element Plus 的前后端分离权限管理系统
Vue
884
523
openHiTLSopenHiTLS
旨在打造算法先进、性能卓越、高效敏捷、安全可靠的密码套件,通过轻量级、可剪裁的软件技术架构满足各行业不同场景的多样化要求,让密码技术应用更简单,同时探索后量子等先进算法创新实践,构建密码前沿技术底座!
C
362
381
ohos_react_nativeohos_react_native
React Native鸿蒙化仓库
C++
182
264
kernelkernel
deepin linux kernel
C
22
5
nop-entropynop-entropy
Nop Platform 2.0是基于可逆计算理论实现的采用面向语言编程范式的新一代低代码开发平台,包含基于全新原理从零开始研发的GraphQL引擎、ORM引擎、工作流引擎、报表引擎、规则引擎、批处理引引擎等完整设计。nop-entropy是它的后端部分,采用java语言实现,可选择集成Spring框架或者Quarkus框架。中小企业可以免费商用
Java
7
0
CangjieCommunityCangjieCommunity
为仓颉编程语言开发者打造活跃、开放、高质量的社区环境
Markdown
1.09 K
0
note-gennote-gen
一款跨平台的 Markdown AI 笔记软件,致力于使用 AI 建立记录和写作的桥梁。
TSX
84
4
cherry-studiocherry-studio
🍒 Cherry Studio 是一款支持多个 LLM 提供商的桌面客户端
TypeScript
613
60
open-eBackupopen-eBackup
open-eBackup是一款开源备份软件,采用集群高扩展架构,通过应用备份通用框架、并行备份等技术,为主流数据库、虚拟化、文件系统、大数据等应用提供E2E的数据备份、恢复等能力,帮助用户实现关键数据高效保护。
HTML
118
78