DolphinScheduler API文档:完整接口参考手册
2026-02-04 04:44:35作者:盛欣凯Ernestine
概述
Apache DolphinScheduler是一个现代化的数据编排平台,提供完整的RESTful API接口,支持通过编程方式管理任务调度和工作流。本文档提供DolphinScheduler所有API接口的详细参考,帮助开发者快速集成和使用。
API基础信息
认证方式
DolphinScheduler API支持多种认证方式:
| 认证方式 | 说明 | 适用场景 |
|---|---|---|
| Token认证 | 使用Access Token进行身份验证 | 自动化脚本、第三方集成 |
| Session认证 | 基于Cookie的会话认证 | Web UI交互 |
| Basic认证 | 用户名密码基础认证 | 简单集成场景 |
请求格式
- Content-Type:
application/json - 响应格式: JSON
- 编码: UTF-8
基础URL
http://{host}:{port}/dolphinscheduler/api
核心API接口分类
1. 项目管理API
项目管理API用于创建、查询、更新和删除项目。
// 项目控制器基础路径
@RestController
@RequestMapping("/v2/projects")
public class ProjectV2Controller {
@ApiOperation("创建项目")
@PostMapping
public Result createProject(@RequestBody ProjectCreateRequest createRequest) {
// 实现逻辑
}
@ApiOperation("查询项目列表")
@GetMapping
public Result<PageInfo<ProjectVO>> queryProjectList(
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
// 实现逻辑
}
}
主要接口:
POST /v2/projects- 创建新项目GET /v2/projects- 查询项目列表GET /v2/projects/{projectCode}- 查询项目详情PUT /v2/projects/{projectCode}- 更新项目信息DELETE /v2/projects/{projectCode}- 删除项目
2. 工作流定义API
工作流定义API用于管理工作流的创建、版本控制和部署。
@RestController
@RequestMapping("projects/{projectCode}/workflow-definition")
public class WorkflowDefinitionController {
@ApiOperation("创建工作流定义")
@PostMapping
public Result createWorkflowDefinition(
@PathVariable("projectCode") long projectCode,
@RequestBody WorkflowDefinitionCreateRequest createRequest) {
// 实现逻辑
}
@ApiOperation("查询工作流定义列表")
@GetMapping
public Result queryWorkflowDefinitionList(
@PathVariable("projectCode") long projectCode,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam(value = "userId", required = false) Integer userId,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
// 实现逻辑
}
}
主要接口:
POST /projects/{projectCode}/workflow-definition- 创建工作流GET /projects/{projectCode}/workflow-definition- 查询工作流列表PUT /projects/{projectCode}/workflow-definition/{code}- 更新工作流DELETE /projects/{projectCode}/workflow-definition/{code}- 删除工作流POST /projects/{projectCode}/workflow-definition/{code}/release- 发布工作流
3. 任务定义API
任务定义API用于管理具体任务的配置和执行。
@RestController
@RequestMapping("projects/{projectCode}/task-definition")
public class TaskDefinitionController {
@ApiOperation("创建任务定义")
@PostMapping
public Result createTaskDefinition(
@PathVariable("projectCode") long projectCode,
@RequestBody TaskDefinitionCreateRequest createRequest) {
// 实现逻辑
}
@ApiOperation("查询任务定义列表")
@GetMapping
public Result queryTaskDefinitionList(
@PathVariable("projectCode") long projectCode,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam(value = "taskType", required = false) String taskType,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
// 实现逻辑
}
}
主要接口:
POST /projects/{projectCode}/task-definition- 创建任务GET /projects/{projectCode}/task-definition- 查询任务列表GET /projects/{projectCode}/task-definition/{code}- 查询任务详情PUT /projects/{projectCode}/task-definition/{code}- 更新任务DELETE /projects/{projectCode}/task-definition/{code}- 删除任务
4. 工作流实例API
工作流实例API用于监控和管理运行中的工作流实例。
@RestController
@RequestMapping("/v2/workflow-instances")
public class WorkflowInstanceV2Controller {
@ApiOperation("查询工作流实例列表")
@GetMapping
public Result queryWorkflowInstanceList(
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam(value = "executorName", required = false) String executorName,
@RequestParam(value = "host", required = false) String host,
@RequestParam(value = "stateType", required = false) String stateType,
@RequestParam(value = "startDate", required = false) String startDate,
@RequestParam(value = "endDate", required = false) String endDate,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
// 实现逻辑
}
@ApiOperation("启停工作流实例")
@PostMapping("/{workflowInstanceId}/execute")
public Result executeWorkflowInstance(
@PathVariable("workflowInstanceId") Integer workflowInstanceId,
@RequestParam("executeType") ExecuteType executeType) {
// 实现逻辑
}
}
主要接口:
GET /v2/workflow-instances- 查询工作流实例列表GET /v2/workflow-instances/{id}- 查询工作流实例详情POST /v2/workflow-instances/{id}/execute- 执行工作流实例操作DELETE /v2/workflow-instances/{id}- 删除工作流实例
5. 任务实例API
任务实例API用于监控和管理具体的任务执行情况。
@RestController
@RequestMapping("/v2/projects/{projectCode}/task-instances")
public class TaskInstanceV2Controller {
@ApiOperation("查询任务实例列表")
@GetMapping
public Result queryTaskInstanceList(
@PathVariable("projectCode") long projectCode,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam(value = "executorName", required = false) String executorName,
@RequestParam(value = "host", required = false) String host,
@RequestParam(value = "stateType", required = false) String stateType,
@RequestParam(value = "startDate", required = false) String startDate,
@RequestParam(value = "endDate", required = false) String endDate,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
// 实现逻辑
}
}
主要接口:
GET /v2/projects/{projectCode}/task-instances- 查询任务实例列表GET /v2/projects/{projectCode}/task-instances/{id}- 查询任务实例详情POST /v2/projects/{projectCode}/task-instances/{id}/rerun- 重跑任务实例POST /v2/projects/{projectCode}/task-instances/{id}/kill- 杀死任务实例
6. 数据源管理API
数据源API用于管理各种类型的数据源连接。
@RestController
@RequestMapping("datasources")
public class DataSourceController {
@ApiOperation("创建数据源")
@PostMapping
public Result createDataSource(@RequestBody DataSourceCreateRequest createRequest) {
// 实现逻辑
}
@ApiOperation("查询数据源列表")
@GetMapping
public Result queryDataSourceList(
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam(value = "type", required = false) String type,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
// 实现逻辑
}
}
支持的数据源类型:
- MySQL、PostgreSQL、Oracle
- Hive、Spark、Flink
- ClickHouse、Doris、StarRocks
- Kafka、Redis、Elasticsearch
7. 用户和权限API
用户权限API用于管理系统用户、角色和权限控制。
@RestController
@RequestMapping("/users")
public class UsersController {
@ApiOperation("创建用户")
@PostMapping
public Result createUser(@RequestBody UserCreateRequest createRequest) {
// 实现逻辑
}
@ApiOperation("查询用户列表")
@GetMapping
public Result queryUserList(
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
// 实现逻辑
}
}
主要接口:
POST /users- 创建用户GET /users- 查询用户列表PUT /users/{id}- 更新用户信息DELETE /users/{id}- 删除用户POST /users/{id}/grant-project- 授予项目权限
8. 监控和统计API
监控API提供系统运行状态和性能指标的查询功能。
@RestController
@RequestMapping("/v2/statistics")
public class StatisticsV2Controller {
@ApiOperation("查询任务状态统计")
@GetMapping("/task-state-count")
public Result queryTaskStateCount(
@RequestParam(value = "startDate") String startDate,
@RequestParam(value = "endDate") String endDate,
@RequestParam(value = "projectCode", required = false) Long projectCode) {
// 实现逻辑
}
@ApiOperation("查询工作流状态统计")
@GetMapping("/workflow-state-count")
public Result queryWorkflowStateCount(
@RequestParam(value = "startDate") String startDate,
@RequestParam(value = "endDate") String endDate,
@RequestParam(value = "projectCode", required = false) Long projectCode) {
// 实现逻辑
}
}
监控指标:
- 任务执行状态统计
- 工作流运行状态统计
- 系统资源使用情况
- 队列任务堆积情况
API使用示例
示例1:创建项目
curl -X POST "http://localhost:12345/dolphinscheduler/api/v2/projects" \
-H "Content-Type: application/json" \
-H "token: your-access-token" \
-d '{
"projectName": "数据分析平台",
"description": "用于大数据分析任务调度",
"userName": "admin"
}'
响应示例:
{
"code": 0,
"msg": "success",
"data": {
"code": 1000001,
"name": "数据分析平台",
"description": "用于大数据分析任务调度",
"userName": "admin",
"createTime": "2024-01-15 10:30:00",
"updateTime": "2024-01-15 10:30:00"
}
}
示例2:创建工作流
curl -X POST "http://localhost:12345/dolphinscheduler/api/projects/1000001/workflow-definition" \
-H "Content-Type: application/json" \
-H "token: your-access-token" \
-d '{
"name": "每日数据ETL流程",
"description": "每日定时执行的数据抽取转换加载流程",
"globalParams": "[{\"prop\":\"bizDate\",\"value\":\"${system.datetime}\"}]",
"tasks": [
{
"name": "数据抽取",
"taskType": "SQL",
"description": "从源数据库抽取数据",
"params": {
"type": "MYSQL",
"datasource": 1,
"sql": "SELECT * FROM source_table WHERE biz_date = '${bizDate}'"
}
},
{
"name": "数据转换",
"taskType": "SPARK",
"description": "使用Spark进行数据清洗转换",
"params": {
"programType": "SQL",
"mainClass": "",
"sparkVersion": "SPARK3",
"deployMode": "cluster",
"appResource": "hdfs://path/to/etl-job.jar",
"mainArgs": "--date ${bizDate}"
},
"preTasks": ["数据抽取"]
}
]
}'
示例3:查询工作流实例
curl -X GET "http://localhost:12345/dolphinscheduler/api/v2/workflow-instances? \
pageNo=1&pageSize=10&stateType=RUNNING&startDate=2024-01-15&endDate=2024-01-16" \
-H "token: your-access-token"
错误码参考
DolphinScheduler使用统一的错误码体系:
| 错误码 | 说明 | 解决方案 |
|---|---|---|
| 0 | 成功 | - |
| 10000 | 参数错误 | 检查请求参数格式 |
| 10001 | 数据库错误 | 检查数据库连接状态 |
| 10002 | 重复操作 | 避免重复提交相同请求 |
| 10003 | 权限不足 | 检查用户权限设置 |
| 10004 | 资源不存在 | 检查资源ID是否正确 |
| 10005 | 系统繁忙 | 稍后重试或扩容系统 |
最佳实践
1. 认证管理
// 获取Access Token
public String getAccessToken(String username, String password) {
String auth = Base64.getEncoder().encodeToString((username + ":" + password).getBytes());
// 调用认证接口获取token
return token;
}
// 使用Token发起请求
public Result callApiWithToken(String endpoint, Object request) {
HttpHeaders headers = new HttpHeaders();
headers.set("token", accessToken);
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Object> entity = new HttpEntity<>(request, headers);
return restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, entity, Result.class);
}
2. 错误处理
public void handleApiResponse(Result result) {
if (result.getCode() == 0) {
// 成功处理
processSuccess(result.getData());
} else {
// 错误处理
switch (result.getCode()) {
case 10000:
throw new IllegalArgumentException("参数错误: " + result.getMsg());
case 10003:
throw new SecurityException("权限不足: " + result.getMsg());
default:
throw new RuntimeException("API调用失败: " + result.getMsg());
}
}
}
3. 批量操作优化
// 批量创建任务避免频繁调用
public void batchCreateTasks(List<TaskDefinition> tasks) {
int batchSize = 50;
for (int i = 0; i < tasks.size(); i += batchSize) {
List<TaskDefinition> batch = tasks.subList(i, Math.min(i + batchSize, tasks.size()));
createTaskBatch(batch);
Thread.sleep(100); // 避免请求过于频繁
}
}
性能优化建议
- 连接池配置: 使用HTTP连接池减少连接建立开销
- 请求合并: 对批量操作使用批量接口
- 缓存策略: 对频繁查询的数据添加本地缓存
- 异步处理: 对耗时操作使用异步调用方式
- 重试机制: 实现指数退避的重试策略
版本兼容性
DolphinScheduler API保持向后兼容性,但建议:
- 使用最新的API版本(v2)
- 在升级时测试现有集成功能
- 关注官方发布的版本变更说明
总结
DolphinScheduler提供了完整且强大的API体系,覆盖了任务调度的全生命周期管理。通过合理使用这些API,开发者可以构建高度自动化的数据调度平台,实现与现有系统的无缝集成。
建议在实际使用前:
- 仔细阅读相关接口的详细文档
- 进行充分的测试验证
- 实现适当的错误处理和重试机制
- 监控API调用性能和稳定性
通过本文档的指导,您应该能够快速上手并使用DolphinScheduler的API功能来满足您的业务需求。
登录后查看全文
热门项目推荐
相关项目推荐
GLM-5智谱 AI 正式发布 GLM-5,旨在应对复杂系统工程和长时域智能体任务。Jinja00
GLM-5-w4a8GLM-5-w4a8基于混合专家架构,专为复杂系统工程与长周期智能体任务设计。支持单/多节点部署,适配Atlas 800T A3,采用w4a8量化技术,结合vLLM推理优化,高效平衡性能与精度,助力智能应用开发Jinja00
jiuwenclawJiuwenClaw 是一款基于openJiuwen开发的智能AI Agent,它能够将大语言模型的强大能力,通过你日常使用的各类通讯应用,直接延伸至你的指尖。Python0187- QQwen3.5-397B-A17BQwen3.5 实现了重大飞跃,整合了多模态学习、架构效率、强化学习规模以及全球可访问性等方面的突破性进展,旨在为开发者和企业赋予前所未有的能力与效率。Jinja00
AtomGit城市坐标计划AtomGit 城市坐标计划开启!让开源有坐标,让城市有星火。致力于与城市合伙人共同构建并长期运营一个健康、活跃的本地开发者生态。01
snackjson新一代高性能 Jsonpath 框架。同时兼容 `jayway.jsonpath` 和 IETF JSONPath (RFC 9535) 标准规范(支持开放式定制)。Java00
热门内容推荐
最新内容推荐
5个实战技巧:用langchaingo构建企业级对话系统的全流程指南解锁模块化编辑:Milkdown框架的可扩展开发指南[技术专题] OpenWeChat消息处理:从核心原理到高级实践Dapr集群部署失败?5步实战指南助你快速定位并解决问题小爱音箱AI升级定制指南:从零开始的设备改造与功能扩展Vanna AI训练数据效率提升实战指南:从数据准备到模型优化全流程解析打造现代界面新范式:Glass Liquid设计理念与实践指南PandaWiki部署实战:从环境准备到系统优化全指南4个步骤掌握Claude AI应用容器化部署:claude-quickstarts项目Docker实践指南4个高效步骤:Pixelle-Video API集成与开发实战指南
项目优选
收起
deepin linux kernel
C
27
12
OpenHarmony documentation | OpenHarmony开发者文档
Dockerfile
599
4.03 K
Ascend Extension for PyTorch
Python
437
530
本项目是CANN提供的数学类基础计算算子库,实现网络在NPU上加速计算。
C++
920
764
暂无简介
Dart
844
204
React Native鸿蒙化仓库
JavaScript
320
373
🔥LeetCode solutions in any programming language | 多种编程语言实现 LeetCode、《剑指 Offer(第 2 版)》、《程序员面试金典(第 6 版)》题解
Java
69
21
🎉 (RuoYi)官方仓库 基于SpringBoot,Spring Security,JWT,Vue3 & Vite、Element Plus 的前后端分离权限管理系统
Vue
1.46 K
821
openEuler内核是openEuler操作系统的核心,既是系统性能与稳定性的基石,也是连接处理器、设备与服务的桥梁。
C
368
247
昇腾LLM分布式训练框架
Python
130
156