Phần 1. Làm quen với Airflow & tư duy DAG

“Nếu SQL giúp bạn hiểu dữ liệu, thì Airflow giúp dữ liệu tự chạy đúng lúc — không cần bạn phải thức dậy lúc 2 giờ sáng để nhấn nút Run.”

Mục tiêu bài viết

Sau khi đọc bài này, bạn sẽ:

  • Hiểu Airflow là gì và tại sao nó trở thành xương sống của mọi hệ thống dữ liệu hiện đại.
  • Nắm được tư duy DAG (Directed Acyclic Graph) — nền tảng giúp workflow trở nên dễ bảo trì, dễ mở rộng.
  • Biết cách tạo và chạy thử DAG đầu tiên, xem log, quan sát dependency trên giao diện web.
  • Hiểu cấu trúc project Airflow, các thành phần chính: task, operator, scheduler, executor, worker, webserver.

1. Apache Airflow là gì?

Apache Airflow là một nền tảng mã nguồn mở giúp bạn tự động hóa, lập lịch và giám sát các luồng xử lý dữ liệu (data workflows).

Nó được phát triển bởi Airbnb vào năm 2014 và sau đó đóng góp cho cộng đồng Apache Foundation.
Hiện nay, Airflow đã trở thành chuẩn mực cho các hệ thống Data Pipeline hiện đại (ETL/ELT).

Hãy hình dung Airflow giống như một nhạc trưởng trong dàn nhạc dữ liệu:

Mọi thứ đều được tự động hoá, có log, có retry, có alert và được lập lịch theo ngày/giờ.

2. Vì sao cần Airflow?

Trước khi có Airflow, các nhóm dữ liệu thường:

  • Chạy script Python thủ công hoặc cron job trên server.
  • Khó kiểm soát thứ tự chạy: ETL phải hoàn tất trước khi load vào Power BI.
  • Khi job lỗi, không ai biết.
  • Không có nơi nào xem toàn bộ luồng xử lý.

Airflow giải quyết tất cả:

Vấn đề cũ Airflow giải pháp
Script rời rạc Workflow có cấu trúc rõ ràng (DAG)
Cron job khó debug UI trực quan, xem log từng task
Job fail không báo Có retry và email alert
Không biết dependency Dễ dàng định nghĩa quan hệ giữa tasks
Khó bảo trì Code Python modular, dễ reuse

Airflow giúp bạn chuyển tư duy từ:

“Chạy script rời rạc” → “Thiết kế workflow có cấu trúc, có lịch, có giám sát.”

3. DAG – Linh hồn của Airflow

Định nghĩa

DAG (Directed Acyclic Graph) là một đồ thị có hướng, không chu trình, mô tả thứ tự và quan hệ giữa các bước (tasks) trong workflow.

  • Directed → có hướng (A → B nghĩa là B chỉ chạy sau khi A xong).
  • Acyclic → không được phép quay lại A → B → A.
  • Graph → tập hợp của các node (task) và cạnh (dependency).

Ví dụ đơn giản:

extract_data → transform_data → load_to_db

Cấu trúc DAG trong Airflow

DAG được viết bằng Python — rất dễ đọc và mở rộng.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract():
    print("Extracting data...")

def transform():
    print("Transforming data...")

def load():
    print("Loading to database...")

with DAG(
    dag_id='etl_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False
) as dag:
    
    extract_task = PythonOperator(
        task_id='extract_data',
        python_callable=extract
    )
    
    transform_task = PythonOperator(
        task_id='transform_data',
        python_callable=transform
    )
    
    load_task = PythonOperator(
        task_id='load_to_db',
        python_callable=load
    )

    extract_task >> transform_task >> load_task

Giải thích:
  • dag_id: tên duy nhất của DAG.
  • schedule_interval: tần suất chạy (daily/hourly).
  • catchup=False: chỉ chạy từ hôm nay trở đi.
  • PythonOperator: định nghĩa 1 task chạy hàm Python.
  • >>: biểu diễn thứ tự (A chạy trước B).

4. Các thành phần chính trong Airflow


Thành phần Vai trò Mô tả
Webserver Giao diện quản lý UI để xem DAG, trigger, log, Gantt chart
Scheduler Bộ lập lịch Kiểm tra DAG nào đến giờ chạy
Executor Bộ thực thi Quyết định cách task chạy (Local, Celery, Kubernetes)
Worker Máy chạy thực tế Thực thi các task
Metadata DB Bộ nhớ Lưu trạng thái DAG, log, lịch sử chạy

Chạy Airflow:

airflow webserver
airflow scheduler

Sau đó mở http://localhost:8080 để xem giao diện DAGs.

5. Thực hành: Chạy DAG đầu tiên

1. Cài đặt bằng Docker Compose

Tạo file docker-compose.yaml:

services:
  postgres:
    image: postgres:13
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
  webserver:
    image: apache/airflow:2.9.0
    depends_on:
      - postgres
    environment:
      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
      - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
    ports:
      - "8080:8080"
    command: webserver
  scheduler:
    image: apache/airflow:2.9.0
    depends_on:
      - webserver
    command: scheduler

Chạy lệnh:

docker compose up -d

Rồi mở localhost:8080 → bạn sẽ thấy giao diện Airflow UI.

2. Thêm DAG mới

Tạo file etl_pipeline.py trong thư mục dags/.
Dán đoạn code ví dụ ở trên → chờ vài giây → Refresh UI → DAG xuất hiện.
Click Trigger DAG → quan sát 3 ô task xanh chạy tuần tự: extract → transform → load.

6. Tư duy “Data Workflow như kỹ sư”

Đây là phần nhiều Data Analyst hay bỏ qua, nhưng nó là bí quyết để trở thành Data Engineer bán phần.

Tư duy Pipeline

Thay vì chỉ nghĩ: “Hôm nay tôi chạy file này để lấy dữ liệu,”
hãy nghĩ: “Mỗi ngày, pipeline này cần chạy tự động, không ai phải nhúng tay.”

Tư duy như vậy giúp bạn:

  • Giảm 80% thao tác thủ công (copy file, export Excel, upload BI).
  • Chủ động thiết kế workflow: biết bước nào phụ thuộc bước nào.
  • Biết chỗ nào nên log, retry, cảnh báo khi lỗi.

Ví dụ:

Một pipeline xử lý đơn hàng eCommerce có thể gồm:

  1. Extract: Lấy dữ liệu đơn hàng từ API Shopee.
  2. Transform: Chuẩn hóa schema, tính revenue.
  3. Load: Ghi vào PostgreSQL.
  4. Notify: Gửi email báo cáo sáng.

Nhờ DAG, bạn có thể mô hình hoá mọi quy trình nghiệp vụ phức tạp — và để Airflow chạy thay con người.

7. Best Practice khi viết DAG

Nguyên tắc Giải thích
1 DAG = 1 Business Flow Mỗi DAG nên gắn với 1 quy trình nghiệp vụ cụ thể
Idempotent Task có thể chạy lại nhiều lần mà không gây trùng/lỗi
Small Tasks Chia logic thành nhiều bước nhỏ dễ retry
Retry & Alert Đặt retries=2, retry_delay=5min, gửi mail khi fail
Use XCom/Variable Dùng XCom để truyền dữ liệu giữa tasks
Version Control Quản lý DAG qua Git, CI/CD deploy
Test Locally Dùng airflow tasks test để thử trước khi deploy

8. Ví dụ thực tế: DAG tính doanh thu eCommerce

with DAG(
    dag_id='daily_nmv_summary',
    schedule_interval='0 7 * * *',
    start_date=datetime(2024,1,1),
    catchup=False,
    tags=['ecommerce','report']
) as dag:

    extract_sales = PythonOperator(
        task_id='extract_sales',
        python_callable=extract_from_platform
    )

    clean_sales = PythonOperator(
        task_id='clean_sales',
        python_callable=transform_clean
    )

    upload_to_dw = PythonOperator(
        task_id='upload_to_dw',
        python_callable=load_to_postgres
    )

    notify_team = EmailOperator(
        task_id='send_report',
        to='data_team@company.com',
        subject='[Airflow] Daily NMV Summary Completed',
        html_content='Pipeline đã chạy thành công 🎉'
    )

    extract_sales >> clean_sales >> upload_to_dw >> notify_team

Pipeline này tự động:

  1. Lấy dữ liệu bán hàng từ sàn.
  2. Làm sạch, tính toán.
  3. Đẩy lên Data Warehouse.
  4. Gửi email thông báo mỗi sáng.

9. Quan sát & Debug DAG

Airflow UI có nhiều chế độ xem (Views) hữu ích:

View Chức năng Mục đích
Graph View Hiển thị quan hệ giữa tasks Quan sát tổng thể pipeline
Tree View Màu trạng thái (green=success, red=fail) Theo dõi lịch sử
Gantt Chart Biểu đồ thời gian chạy Phân tích hiệu suất
Log View Hiển thị log chi tiết Debug khi lỗi

Các lỗi thường gặp & cách xử lý

  1. DAG không hiển thị trong UI
    → Kiểm tra file .py có nằm đúng thư mục /dags.
    → Đảm bảo có biến dag trong file.
  2. Scheduler không chạy
    → Chạy lại service:
    airflow scheduler
    airflow webserver
    
    → Nếu vẫn lỗi: reset DB
    airflow db reset
    
  3. ModuleNotFoundError / ImportError
    → Cài đặt lại package trong requirements.txt.
  4. Task failed with exit code 1
    → Mở Log để xem chi tiết, thường là lỗi trong function Python.
  5. XCom quá lớn
    → Chỉ nên truyền metadata qua XCom, không truyền DataFrame nặng.
    Nếu cần, lưu file CSV/Parquet ra S3 hoặc local rồi gửi path.

Debug tốt nhất là mở tab Logs → đọc từng dòng → sửa → Clear Task → chạy lại.

10. Khi nào nên (hoặc không nên) dùng Airflow?

Nên dùng khi:

  • Workflow có >3 bước cần chạy theo thứ tự.
  • Cần tự động hóa ETL/ELT định kỳ.
  • Cần retry, alert, logging, version control.
  • Dữ liệu nằm ở nhiều nguồn (API, DB, Cloud).
  • Có nhiều team (DataOps, BI, ML) chia sẻ pipeline.

Không nên dùng khi:

  • Workflow quá nhỏ (chỉ 1 script cron job là đủ).
  • Xử lý real-time streaming (Airflow là batch-oriented).
  • Không có nhu cầu log/monitor chi tiết.

11. Tổng kết

Airflow không chỉ là công cụ automation, mà là tư duy hệ thống hóa quy trình dữ liệu.
Khi bạn viết DAG đầu tiên, bạn đang bước sang mindset của một Data Engineer thực thụ — người không chỉ phân tích, mà còn thiết kế luồng dữ liệu vận hành tự động.

“Một Data Team trưởng thành không chỉ hiểu số, mà còn làm cho dữ liệu tự vận hành.”

Tóm tắt nhanh

Khái niệm Ghi nhớ
Airflow Hệ thống lập lịch và tự động hóa workflow
DAG Đồ thị có hướng, không chu trình
Task Một hành động cụ thể (Python, SQL, API call…)
Scheduler Bộ điều phối thời gian chạy
Webserver Giao diện quan sát DAG
Executor Bộ thực thi task
Best Practice DAG nhỏ, có retry, log, version control

Hướng tiếp theo (Phần 2)

Viết PythonOperator và Sensor trong Airflow

Ở phần tới, ta sẽ học:

  • Cách truyền dữ liệu giữa các task bằng XCom.
  • Dùng Sensor để chờ file/API xuất hiện trước khi chạy tiếp.
  • Kết hợp Airflow + PostgreSQL + Power BI trong pipeline thực tế.

Tư liệu & Link tham khảo


Đăng nhận xét

0 Nhận xét