Skillsskill-addax
S

skill-addax

- **名称**:Addax - **类型**:通用开源 ETL 工具(Extract–Transform–Load) - **起源**:基于阿里巴巴 DataX 的 fork 与演进 - **目标**:在多种异构数据源之间,提供稳定、高效、可扩展的“离线数据同步”能力

wgzhao
1.4k stars
27.9k downloads
Updated 5d ago

Readme

skill-addax follows the SKILL.md standard. Use the install command to add it to your agent stack.

# SKILL: Addax 项目知识

## 1. 项目整体认识

### 1.1 项目定位

- **名称**:Addax  
- **类型**:通用开源 ETL 工具(Extract–Transform–Load)  
- **起源**:基于阿里巴巴 DataX 的 fork 与演进  
- **目标**:在多种异构数据源之间,提供稳定、高效、可扩展的“离线数据同步”能力

### 1.2 核心价值

- 支持 **20+ SQL/NoSQL/文件/时序/大数据** 数据源
- 使用 **JSON 任务配置** 即可完成复杂同步,无需写代码
- 插件化架构,Reader / Writer / Transformer 解耦,可自由扩展
- 提供 **数据质量监控、速率控制、错误容忍、脏数据探测** 等生产级能力
- 既可命令行运行,也可通过 **Server 模块 HTTP 接口** 异步提交和管理任务
- 有配套的 **addax-admin / addax-ui** 项目做 Web 管控

---

## 2. 概念与架构模型

### 2.1 核心业务概念

在与用户讨论 / 理解需求时,应优先按以下抽象模型理解:

- **Job(作业)**
  - 一次完整的数据同步任务,从一个源到一个目标
  - 通过一个 JSON 文件描述:数据源 reader、目标端 writer、变换规则、速率控制、错误阈值等
  - Job 是业务上的最小单位,如 “从 MySQL 表 A 同步到 PostgreSQL 表 B”

- **Task(子任务)**
  - 为提升性能,将一个 Job 拆分为多个 Task 并发执行
  - 每个 Task 负责同步一部分数据(如若干分表、某一范围分片)

- **TaskGroup**
  - 一组 Task 的集合,由框架统一调度执行
  - 每个 TaskGroup 内有若干通道(channel),每个 channel 负责一条 `Reader → Channel → Writer` 流水线

- **Reader 插件**
  - 数据采集模块,负责从“源数据源”读取数据,发送给框架
  - 只关心“如何正确读”,不关注类型转换、指标统计等通用问题

- **Writer 插件**
  - 数据写入模块,负责从框架拿数据写入“目标端”
  - 只关心“如何正确写”,通用逻辑由框架处理

- **Transformer(数据转换)**
  - 可选模块,在 Reader 和 Writer 之间对数据进行转换
  - 支持内置 UDF:`dx_substr` / `dx_pad` / `dx_replace` / `dx_filter` / `dx_groovy`
  - 可以做脱敏、字段裁剪、补全、过滤、自定义 Groovy 脚本转换等

- **Channel(通道)**
  - Reader 到 Writer 之间的数据通路和缓冲队列
  - 决定并发度 & 流量控制(基于字节数、记录数、通道数)

### 2.2 架构概览

- **整体框架**:Framework + 插件(Reader / Writer / Transformer)
- 数据通路(简化):

  - **源端 → Reader → Framework(Channel) → Writer → 目标端**

- 作业生命周期(JobContainer 内部):
  1. `preHandler()` – 作业前置处理
  2. `init()` – 初始化 reader/writer 插件
  3. `prepare()` – 源端和目标端的准备工作
  4. `split()` – 按并发度拆分成多个 Task
  5. `schedule()` – 将 Task 组织为 TaskGroup,并发执行
  6. `post()` – 全局后置收尾(如 rename 影子表)
  7. `postHandler()` – 作业后置处理

- Task 执行:
  - 每个 Task 固定以 `Reader → Channel → Writer` 的线程模型执行
  - Channel 内以 Record/Column 为单位传输数据

---

## 3. SKILL:与用户交互时的“领域语言”

### 3.1 如何理解/解释一个 Job JSON

Job JSON 顶层结构:

```json
{
  "job": {
    "settings": {},
    "content": {
      "reader": {},
      "writer": {},
      "transformer": []
    }
  }
}
```

- `job.settings`
  - 控制本次任务的全局行为
  - 重点字段:
    - `speed.byte`:每秒允许的最大字节数(Bps),`-1` 表示不限制
    - `speed.record`:每秒允许的最大记录数
    - `speed.channel`:通道数(影响 Task 数量)
    - `errorLimit.record`:允许错误记录总数
    - `errorLimit.percentage`:允许错误记录占比

- `job.content.reader`
  - 必填,描述数据源及其读取方式
  - 核心字段(以关系型数据库为例):
    ```json
    {
      "name": "mysqlreader",
      "parameter": {
        "username": "",
        "password": "",
        "column": [],
        "autoPk": false,
        "splitPk": "",
        "connection": [
          {
            "jdbcUrl": [],
            "table": []
          }
        ],
        "where": ""
      }
    }
    ```

- `job.content.writer`
  - 必填,描述落地目标及写入策略
  - 核心字段(以关系库为例):
    ```json
    {
      "name": "mysqlwriter",
      "parameter": {
        "username": "",
        "password": "",
        "writeMode": "",
        "column": [],
        "session": [],
        "preSql": [],
        "postSql": [],
        "connection": [
          {
            "jdbcUrl": "",
            "table": []
          }
        ]
      }
    }
    ```

- `job.content.transformer`
  - 可选,列表形式,每个元素为一个转换规则
  - 典型片段(内置函数):
    ```json
    {
      "transformer": [
        {
          "name": "dx_substr",
          "parameter": { "idx": 1, "pos": 0, "length": 3 }
        }
      ]
    }
    ```

AI 在阅读/生成 Job 时,应显式区分:
- 全局控制(settings) vs 数据流定义(reader/writer) vs 转换规则(transformer)

### 3.2 任务拆分与并发推导逻辑

- 用户设定并发度:`job.settings.speed.channel = N`
- 框架内部:
  1. Reader 的 `split()` 按源端特性拆成若干 Task(如按分表、分片、主键范围等)
  2. Writer 的 `split()` 需与 Reader 的 Task 数量 **1:1 对齐**
  3. Scheduler 根据 `taskGroup.channel`(`conf/core.json` 中配置)决定 TaskGroup 数量:
     - `taskGroupCount = speed.channel / taskGroup.channel`
- 与用户讨论“为什么任务这么慢/这么多连接”时,应从:
  - `speed.channel`
  - Reader 拆分策略(是否按 `splitPk` / 分区表)
  - 目标端写入瓶颈(Writer 能力、批量大小等)
  入手解释。

### 3.3 数据质量与错误处理

Addax 在数据质量方面的关键点:

- **类型不丢失/不失真**
  - 内部抽象了统一的 Column 类型:`Long / Double / String / Date / Timestamp / Bool / Bytes`
  - 每个插件有自己的类型转换策略,保证最小损失

- **错误控制**
  - 通过 `errorLimit.record` 和 `errorLimit.percentage` 控制“可容忍错误”
  - 超过阈值即认为任务失败

- **脏数据(Dirty Data)**
  - 概念:传输过程中因各种原因(例如类型不匹配)导致出错的记录
  - 能够过滤、识别、收集与展示脏数据,并统计数量和字节数
  - Transformer 层若抛出异常/返回 null,也会影响成功/失败/过滤计数

AI 在帮助用户排错时:
- 应主动询问/检查:错误是否集中在类型转换、特定列、特定插件
- 建议合理设置 `errorLimit`,在保障数据质量和任务稳定之间平衡

---

## 4. 使用方式与运行环境

### 4.1 安装与运行

- **运行时环境**
  - Java:JDK 17
  - Python 2.7+ / 3.7+(仅 Windows 使用本地脚本时需要)

- **三种典型使用方式**

1)Docker 运行示例:
```bash
docker pull quay.io/wgzhao/addax:latest
docker run -ti --rm --name addax \
  quay.io/wgzhao/addax:latest \
  /opt/addax/bin/addax.sh /opt/addax/job/job.json
```

2)一键安装脚本(Linux / macOS):
```bash
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/wgzhao/Addax/master/install.sh)"
```
- 安装目录:
  - macOS:`/usr/local/addax`
  - Linux:`/opt/addax`

3)源码编译:
```bash
git clone https://github.com/wgzhao/addax.git
cd addax
mvn clean package
mvn package -Pdistribution        # 或 assembly:single
# 产出目录示例:target/addax-<version>
```

- **首次运行任务**
  ```bash
  bin/addax.sh job/job.json
  ```

### 4.2 命令行工具 addax.sh

基本用法:
```bash
bin/addax.sh <job_file> [options]
```

关键参数(AI 在给终端命令建议时要正确使用):

- `-h, --help`:帮助
- `-v, --version`:版本
- `-l, --log`:指定日志文件路径
- `-d, --debug`:开启调试模式(IDEA 远程调试会用到)
- `-L, --log-level`:`DEBUG | INFO | WARN | ERROR`
- `-j, --jvm`:追加 JVM 参数
- `-p, --params`:向 Job 传入动态参数(`-Dkey=value` 形式)

示例(动态参数):
```bash
bin/addax.sh job/test.json \
  -p "-Dusername=root -Dpassword=123456 -Dparam1=value1 -Dparam2=value2"
```

Job JSON 中可以通过 `${param}` 访问这些值,比如 `${username}`。

内置时间变量(示例时间 `2025-07-16 12:13:14`):
- `${curr_date_short}` → `20250716`
- `${curr_date_dash}` → `2025-07-16`
- `${curr_datetime_short}` → `20250716121314`
- `${curr_datetime_dash}` → `2025-07-16 12:13:14`
- `${biz_date_short}` / `${biz_date_dash}` / `${biz_datetime_*}` 等

### 4.3 Server 模块(HTTP 提交任务)

- 用途:通过 HTTP 提交 Job JSON 并异步执行,可查询进度和结果
- 启动脚本:`core/src/main/bin/addax-server.sh`

启动示例:
```bash
./addax-server.sh start               # 默认并发上限 30
./addax-server.sh start -p 50 --daemon  # 最大并发 50,后台运行
./addax-server.sh stop
```

并发配置优先级:
1. 命令行 `-p` / `--parallel`
2. 环境变量 `ADDAX_SERVER_PARALLEL`
3. 默认 30

HTTP 接口:

1)提交任务
- URL: `/api/submit?k1=v1&k2=v2`
- Method: POST
- Body: Job JSON

示例:
```bash
curl 'http://localhost:10601/api/submit?jobName=example-job' \
  -H 'Content-Type: application/json' \
  -d @job/job.json
```

响应:
```json
{ "taskId": "xxxx-xxxx-xxxx" }
```
或并发达到上限时:
```json
{ "error": "ERROR: Maximum number of concurrent tasks reached." }
```

2)查询任务状态
- URL: `/api/status?taskId={taskId}`
- Method: GET
- 响应:
```json
{
  "taskId": "xxxx-xxxx-xxxx",
  "status": "SUCCESS",
  "result": "Job example-job executed.",
  "error": null
}
```

AI 在设计自动化系统(如调度、工作流)时,可建议用户使用 Server 模块通过 HTTP 集成。

---

## 5. 插件系统与二次开发

### 5.1 支持的数据源(典型)

Reader / Writer 插件覆盖的常见系统包括但不限于:

- 关系库:MySQL, PostgreSQL, Oracle, SQLServer, SQLite, Greenplum, DB2, Sybase, Doris, StarRocks 等
- NoSQL / KV:Cassandra, Redis, MongoDB, HBase(1.x, 2.x,多种模式)
- 大数据 / 文件:HDFS, Hive, Kudu, Iceberg, Paimon, S3/MinIO, FTP, 本地文件(txt/dbf/excel/json)
- 时序 / 流:InfluxDB/InfluxDB2, TDengine, Kafka, streamreader/streamwriter
- 其它:Access, SAP HANA, ClickHouse, Databend 等

当用户问“是否支持 XXX 数据源”时,应:
- 优先在 docs/reader 与 docs/writer 列表中查找对应 `xxxreader`/`xxxwriter`
- 若暂不支持,可建议走 **插件开发** 路线,说明难度和接口模型

### 5.2 插件开发模型

- 插件 = 一个 Java 模块 + `plugin.json` 描述 + 若干 jar 依赖
- 入口类需继承:
  - `Reader` 或 `Writer` 抽象类
  - 内部包含两个静态内部类:`Job` 和 `Task`
- 插件目录结构(约定):
  ```text
  ${ADDAX_HOME}/plugin
    ├── reader
    │   └── <plugin_name>
    │       ├── <plugin_name>-<version>.jar
    │       ├── libs/ -> 指向 shared 依赖目录的符号链接
    │       ├── plugin.json
    │       └── plugin_job_template.json
    └── writer
        └── ...
  ```

`plugin.json` 示例:
```json
{
  "name": "mysqlwriter",
  "class": "com.wgzhao.addax.plugin.writer.mysqlwriter.MysqlWriter",
  "description": "Use Jdbc connect to database, execute insert sql.",
  "developer": "wgzhao"
}
```
注意:
- 目录名必须与 `plugin.json` 中的 `name` 一致
- 框架通过 `name` 找插件,通过 `class`(完全限定名)反射加载

### 5.3 Job / Task 接口职责(用于代码分析/生成)

以 Reader 为例:

```java
public class SomeReader extends Reader {
    public static class Job extends Reader.Job {
        public void init() { }
        public void prepare() { }
        public List<Configuration> split(int adviceNumber) { return null; }
        public void post() { }
        public void destroy() { }
    }

    public static class Task extends Reader.Task {
        public void init() { }
        public void prepare() { }
        public void startRead(RecordSender recordSender) { }
        public void post() { }
        public void destroy() { }
    }
}
```

- `Job` 级别:
  - 负责读取插件配置:`super.getPluginJobConf()`
  - 全局准备/收尾:如建表/清表、校验权限等
  - `split(adviceNumber)`:按并发建议数拆分 Configuration 列表(每个对应一个 Task)

- `Task` 级别:
  - 用 `super.getPluginJobConf()` 获取本 Task 的配置
  - 在 `startRead()` / `startWrite()` 中进行实际 I/O
  - 使用 `RecordSender` / `RecordReceiver` 和 `Record`/`Column` 抽象进行传输

重要约束:
- Job 和 Task 之间禁止使用共享变量,只能通过配置(Configuration)传递信息
- `prepare` / `post` 在 Job 和 Task 层都有,需根据场景选择合适层级实现

### 5.4 Configuration DSL

Addax 提供 `Configuration` 类和路径 DSL 来读取 JSON 配置:

- 路径规则:
  - 子对象:`a.b.c`
  - 数组元素:`a.f[2].g`
- 示例 JSON:
  ```json
  {
    "a": {
      "b": { "c": 2 },
      "f": [1, 2, { "g": true }]
    },
    "x": 4
  }
  ```
- 示例路径与结果:
  - `x` → `4`
  - `a.b.c` → `2`
  - `a.b.f[2].g` → `true`

AI 在帮用户写插件代码时,可直接给出 `Configuration.get("a.b.c")` 等示例。

---

## 6. 调试、监控与结果上报

### 6.1 运行日志与调试

- 本地/远程调试模式:`bin/addax.sh -d job/job.json`
  - 程序会以 `Listening for transport dt_socket at address: 9999` 形式暴露 JVM 调试端口
  - 可用 IntelliJ IDEA 的 Remote JVM Debug 挂载到指定 host:port

本地调试典型配置:
- Main class:`com.wgzhao.addax.core.Engine`
- VM Options:
  - `-Daddax.home=/opt/app/addax/4.0.3`
  - 如需加载本地 lib 依赖:`-classpath .:/opt/app/addax/4.0.3/lib/*`
- Program arguments: `-job job/job.json`
- Working directory: `/opt/app/addax/4.0.3`

### 6.2 任务运行统计和 Transformer 计量

Addax 会在日志中输出类似:

```text
Total 1000000 records, 22000000 bytes | 
Transform 100000 records(in), 10000 records(out) | 
Speed 2.10MB/s, 100000 records/s | 
Error 0 records, 0 bytes | Percentage 100.00%
```

以及最终汇总:

```text
任务启动时刻                    : 2015-03-10 17:34:21
任务结束时刻                    : 2015-03-10 17:34:31
任务总计耗时                    :                 10s
任务平均流量                    :            2.10MB/s
记录写入速度                    :         100000rec/s
转换输入总数                    :             1000000
转换输出总数                    :             1000000
读取出记录总数                  :             1000000
同步失败总数                    :                   0
```

Transformer 维度统计:
- 输入记录数/字节数
- 输出记录数/字节数
- 脏数据记录数/字节数
- 总耗时

AI 在分析性能问题 / 瓶颈时,应从:
- 读/写 QPS
- Transform 前后记录量变化
- 错误/脏数据数量
- 任务耗时与并发度
这些维度提示用户。

### 6.3 任务结果上报(Stats Report)

- 用途:将 Job 执行的统计结果通过 HTTP POST 报告到外部服务(如监控平台 / 管理端)
- 配置位置:`$ADDAX_HOME/conf/core.json` → `core.server.address`

示例:
```json
{
  "core": {
    "server": {
      "address": "http://localhost:9090/api/v1/addax/jobReport",
      "timeout": 5
    }
  }
}
```

上报数据结构(JSON):
```json
{
  "jobName": "test",
  "startTimeStamp": 1587971621,
  "endTimeStamp": 1587971621,
  "totalCosts": 10,
  "totalBytes": 330,
  "byteSpeedPerSecond": 33,
  "recordSpeedPerSecond": 1,
  "totalReadRecords": 6,
  "totalErrorRecords": 0,
  "jobContent": { "配置内容省略": "此处为实际任务配置" }
}
```

AI 可建议用户:
- 使用简单 Flask/Node/Java 服务接收并入库统计数据
- 或与现有监控系统集成

---

## 7. 与 Addax 相关的生态项目

- **addax-admin**(后端)
  - 仓库:`https://github.com/wgzhao/addax-admin`
  - 提供 Web 管理界面和 API,用于管理 Addax 任务

- **addax-ui**(前端)
  - 仓库:`https://github.com/wgzhao/addax-ui`
  - 为 addax-admin 提供前端展示

AI 在回答“有没有 Web 管理界面 / 调度平台”时,可推荐这两个项目。

---

## 8. 版本与贡献

- **版本规范**:遵循 SemVer (`x.y.z`)
  - `z` Patch:兼容修复、性能优化
  - `y` Minor:新增特性或兼容性风险较小的修改
  - `x` Major:重大变更,通常不向后兼容

- **开发规范(概略)**
  - 使用 IntelliJ + Airlift 代码风格
  - 异常使用 `AddaxException`,并区分错误类型
  - 谨慎使用 Stream API,避免在性能敏感路径中滥用
  - 避免复杂三元表达式
  - 所有文件需包含 Apache 2.0 许可证头
  - Commit message 参考 <https://cbea.ms/git-commit/>

AI 在生成 PR/代码建议时,应尽量贴合以上风格。

Install

Requires askill CLI v1.0+

Metadata

LicenseUnknown
Version-
Updated5d ago
Publisherwgzhao

Tags

apici-cddatabasegithubtesting