Giới thiệu

Airflow là một công cụ hữu ích cho DataEngineer dùng để lập lịch việc này đến việc kia, ….

  • Airflow cung cấp các khái niệm như “DAG” (Directed Acyclic Graph), “Task”, “Operator”, “Sensor” để mô tả quy trình xử lý dữ liệu

DAG

Hiểu đại khái là một đồ thị có hướng không có chu trình (do điểm đầu cuối ko trùng)

Mỗi DAG được định nghĩa trong 1 file trong folder dags

Các nút trong đồ thị là các tác vụ xử lý dữ liệu

Vòng đời của 1 tác vụ bao gồm:

  • No status: chưa được xếp hàng để thực hiện
  • Scheduled: Bộ lập lịch đã xác định rằng các phụ thuộc của nhiệm vụ được đáp ứng và lên lịch cho nó chạy
  • Removed
  • Upstream failed
  • Queue
  • Running
  • Success
  • Failed
  • Up for retry

-> Quy trình lý tưởng nhất cho một tác vụ No status -> Scheduled -> Queued -> Running -> Success

Task

Đơn vị cơ bản để thực hiện một công việc nhỏ trong quy trình xử lý dữ liệu.

Mỗi task trong Airflow có các thuộc tính và phương thức sau (lưu ý vì khi code phải có đủ):

  • task_id
  • owner
  • depends_on_part
  • retries: số lần thử lại nếu task thất bại
  • retry_delay: khoảng thời gian giữa các lần thử lại
  • start_date, end_date
  • execution_timeout: thời gian tối đa cho phép để thực hiện task
  • n_failure_callback, on_success_calLback: hàm được gọi khi task thất bại hay thành công.

Operator

Operator là loại Task có chức năng cụ thể, được Airflow cung cấp sẵn.
Bạn có thể hình dung Operator là “thực thể thực hiện công việc” còn Task là “phiên bản cụ thể của Operator”.

Một số loại Operator phổ biến:

  • PythonOperator: Thực thi một hàm Python

  • BashOperator: Chạy câu lệnh Bash

  • EmailOperator: Gửi email

  • DummyOperator: Task giả để đánh dấu

  • PostgresOperator, MySqlOperator, BigQueryOperator…: Thực thi câu lệnh SQL trên hệ CSDL tương ứng

Ví dụ:

1
2
3
4
5
6
7
8
9
10
from airflow.operators.python import PythonOperator

def print_hello():
print("Hello Airflow!")

hello_task = PythonOperator(
task_id='say_hello',
python_callable=print_hello,
dag=dag
)

Sensor

Sensor là một dạng Task đặc biệt, dùng để chờ một điều kiện xảy ra trước khi tiếp tục workflow.
Ví dụ: chờ một file xuất hiện, chờ một bản ghi trong database, hoặc chờ một API trả về dữ liệu.

Cài đặt

Do máy tôi không đủ bộ nhớ nên tôi thuê luôn một con server chạy trên nền amazon linux để cài. Chạy lần lượt từng lệnh dưới đây để cài.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
sudo yum update -y

sudo yum install python3 python3-pip -y

sudo yum install gcc-c++ -y
sudo yum install postgresql-devel -y # Nếu bạn dùng PostgreSQL

python3 -m venv airflow_venv
source airflow_venv/bin/activate

pip install "apache-airflow[celery]==2.10.3" --constraint "https://raw.**githubusercontent**.com/apache/airflow/constraints-2.4.0/constraints-3.7.txt"

airflow db init

# lưu tài nguyên
source airflow_venv/bin/activate

# tạo tài khoản mật khẩu
airflow users create \
--username admin \
--firstname Firstname \
--lastname Lastname \
--role Admin \
--email example@example.com

airflow webserver -p 8080

airflow scheduler

Airflow hiện đã hỗ trợ cài chỉ bằng các lệnh pip thay vì dùng container tại here. Nhưng mà cài bằng container sẽ là tốt nhất theo recommend của họ

Tham khảo

  1. Airflow for newbie

  2. Airflow documents