Giới thiệu
Airflow là một công cụ hữu ích cho DataEngineer dùng để lập lịch việc này đến việc kia, ….
- Airflow cung cấp các khái niệm như “DAG” (Directed Acyclic Graph), “Task”, “Operator”, “Sensor” để mô tả quy trình xử lý dữ liệu
DAG
Hiểu đại khái là một đồ thị có hướng không có chu trình (do điểm đầu cuối ko trùng)
Mỗi DAG được định nghĩa trong 1 file trong folder dags
Các nút trong đồ thị là các tác vụ xử lý dữ liệu
Vòng đời của 1 tác vụ bao gồm:
- No status: chưa được xếp hàng để thực hiện
- Scheduled: Bộ lập lịch đã xác định rằng các phụ thuộc của nhiệm vụ được đáp ứng và lên lịch cho nó chạy
- Removed
- Upstream failed
- Queue
- Running
- Success
- Failed
- Up for retry
-> Quy trình lý tưởng nhất cho một tác vụ No status -> Scheduled -> Queued -> Running -> Success
Task
Đơn vị cơ bản để thực hiện một công việc nhỏ trong quy trình xử lý dữ liệu.
Mỗi task trong Airflow có các thuộc tính và phương thức sau (lưu ý vì khi code phải có đủ):
- task_id
- owner
- depends_on_part
- retries: số lần thử lại nếu task thất bại
- retry_delay: khoảng thời gian giữa các lần thử lại
- start_date, end_date
- execution_timeout: thời gian tối đa cho phép để thực hiện task
- n_failure_callback, on_success_calLback: hàm được gọi khi task thất bại hay thành công.
Operator
Operator là loại Task có chức năng cụ thể, được Airflow cung cấp sẵn.
Bạn có thể hình dung Operator là “thực thể thực hiện công việc” còn Task là “phiên bản cụ thể của Operator”.
Một số loại Operator phổ biến:
PythonOperator: Thực thi một hàm Python
BashOperator: Chạy câu lệnh Bash
EmailOperator: Gửi email
DummyOperator: Task giả để đánh dấu
PostgresOperator, MySqlOperator, BigQueryOperator…: Thực thi câu lệnh SQL trên hệ CSDL tương ứng
Ví dụ:1
2
3
4
5
6
7
8
9
10from airflow.operators.python import PythonOperator
def print_hello():
print("Hello Airflow!")
hello_task = PythonOperator(
task_id='say_hello',
python_callable=print_hello,
dag=dag
)
Sensor
Sensor là một dạng Task đặc biệt, dùng để chờ một điều kiện xảy ra trước khi tiếp tục workflow.
Ví dụ: chờ một file xuất hiện, chờ một bản ghi trong database, hoặc chờ một API trả về dữ liệu.
Cài đặt
Do máy tôi không đủ bộ nhớ nên tôi thuê luôn một con server chạy trên nền amazon linux để cài. Chạy lần lượt từng lệnh dưới đây để cài.
1 | sudo yum update -y |
Airflow hiện đã hỗ trợ cài chỉ bằng các lệnh pip thay vì dùng container tại here. Nhưng mà cài bằng container sẽ là tốt nhất theo recommend của họ