Ingest Pancake: nguồn lấy về bằng api

Trong các loại dữ liệu phổ biển của doanh nghiệp việt nam thì pancake là ông tổ của các loại dữ liệu về cs chăm sóc khách hàng. Chúng ta không thể dương mắt nhìn đống data này chỉ có thể view trên web pancake.vn được. Chúng ta phải mang nó về xào qua các lượt và kết hợp nó với dữ liệu từ các nguồn khác của công ty để cho ra được những dashboard, những insight đẹp nhất.

Vì thế blog này ra đời ko chỉ vì lưu trữ kiến thức mà còn là vì nó vô cùng cần thiết, công ty nào mà chả phải có kênh chăm sóc khách hàng, mà đã ở Việt Nam thì kiểu gì cũng phải dùng pancake. Và công ty thể nào cũng bảo ae lấy đống data này về.

Về cách lấy trên trang chủ api docs của pancake đã nêu rất rõ cho từng loại dữ liệu. Bài viết dưới đây tôi chỉ gộp tất cả các nguồn lại và có thêm lấy gộp tất cả các bảng theo từng file và theo ngày. Có cả code sample trên lambda functions.

Rồi. Vào việc thôi !

Introduction

Cách lấy: sử dụng thư viện request trong python gửi một request thông api với các tham số được yêu cầu từ tài liệu: [link], mỗi lượt request sẽ trả về 1 đoạn json tương ứng

  • Xử lý dữ liệu khi lấy: xóa header, format lại dữ liệu theo dòng mỗi bản ghi là 1 dòng tương ứng.
  • Đối với các bảng đặc biệt như Users statatistics cần phải tách riêng 2 phần: user và user_statatistics do có 2 phần trong data lấy về.
  • Các bảng khác
  • Xử lý chuyển từng mã nhân viên vào trong data tương ứng để glue có thể format chuẩn.

hình minh họa: đưa id của nhân viên vào trong để tạo thành 1 record đẹp (Xử lý tại raw data để về sau đưa data lên athena đỡ phức tạp và dễ dàng scale khi số lượng nhân viên tăng lên)

Các bảng lấy về kèm api, tham số tương ứng

  • ads: data quảng cáo hằng ngày (thường thì chả thấy ads đâu 🙂)
  • campaign: cái này 1 năm có khi có 1 campaign nên data ít thậm chí nhiều app ko có
  • conversations: dữ liệu hội thoại của khách hàng
  • engagement_statistics: bảng này phải tách ra làm 2 bảng và lưu thành engagement_statistics và user_enggements để glue có thể hiểu và formate đúng, xử lý luôn để sau đỡ phức tạp.

Cách tách

1
2
3
data = response.json()
users_engagements = data['users_engagements']
engagement_statistics = data['statistics']
  • new_customer_statistics: thống kê khách hàng mới
  • page_customers: dữ liệu khách hàng theo trang (khách hàng tương tác với 1 trang nhất định)
  • posts: thông tin về các bài đăng trên trang
  • tags: lấy về 1 lần do ko có tham số ngày tháng trong api (lưu các thẻ (tags) được sử dụng để phân loại nội dung)
  • user_list: danh sách người quản lý trang (lấy về 1 lần do ko có tham số ngày tháng)
  • user_statistics: thống kê về người dùng, bao gồm các thông tin liên quan đến hoạt động hoặc tương tác của họ trên hệ thống
  • user: thông tin cơ bản về user
  • users_engagements: thông tin về mức độ tương tác của người dùng với các nội dung trên hệ thống

Các lỗi gặp phải: phần lớn là do sai định dạng ngày (phải theo đúng chuẩn tài liệu của pancake

  • Sửa bằng cách viết lại một hàm để convert lại định dạng đúng của nó (hiện tại trong code đang có 2 hàm vì từng api lại yêu cầu định dạng ngày khác)
  • Để ngày sai: dữ liệu có từ 9/2023 (trước đó data toàn null với 0)

Để format lại được dữ liệu được gọn và theo dòng thì phải thông qua hàm lưu data vào s3 sau (ý tưởng là data request về là 1 cục, cục đó đi qua hàm save_to_s3 để xử lý thành mỗi record trên 1 dòng.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def save_to_s3(data, filename, app_name, data_type):
date_str = datetime.now().strftime('%Y-%m-%d') # Sử dụng ngày hiện tại
s3_folder = f'pancake/{app_name}/{data_type}/' # Sử dụng app_name và data_type để tạo thư mục
s3_key = f'{s3_folder}{filename[:-5]}_{date_str}.json' # Thêm ngày hiện tại vào filename

# Convert lines to a single string separated by newlines
if isinstance(data, list):
lines = [json.dumps(item, ensure_ascii=False) for item in data]
output = '\n'.join(lines)
else:
output = json.dumps(data, ensure_ascii=False)

s3_client.put_object(
Bucket=bucket_name,
Key=s3_key,
Body=output.encode('utf-8'),
ContentType='application/json'
)

bằng việc sử dụng hàm lưu file thì dữ liệu được lưu thành các dòng đẹp đẽ

Sau khi làm ổn các bước trên thì chuyển sang viết lambda function: Ý tưởng là chỉ cần viết lại y trang code trên local rồi chuyển các hàm thực hiện chính (lambda_handler)

Các giải thích về việc tổ chức dữ liệu trong datalake

  • Dữ liệu pancake được lưu vào folder pancake
  • Tổ chức dữ liệu theo dự án, pancake//table/file_.json
  • Dữ liệu lịch sử được lấy về sẽ ko có đuôi ngày ở sau
  • Các ngày lấy về sau thì đặt tên file + ngày
  • Các dữ liệu về tag, user_list sẽ chỉ lấy 1 lần, đã comment trong code
  • Chỉnh lấy dữ liệu lịch sử ngay trong code lambda được do có chuyền vào tham số ngày … to ngày ….

Hàm lambdafunction đầy đủ:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# đang daily trươc 1 ngày
import requests
import time
from datetime import datetime, timedelta
import json
import boto3

s3_client = boto3.client('s3')
bucket_name = 'eup-datalake-bronze-zone-prod'

def save_to_s3(data, filename, app_name, data_type):
date_str = datetime.now().strftime('%Y-%m-%d') # Sử dụng ngày hiện tại
s3_folder = f'pancake/{app_name}/{data_type}/' # Sử dụng app_name và data_type để tạo thư mục
s3_key = f'{s3_folder}{filename[:-5]}_{date_str}.json' # Thêm ngày hiện tại vào filename

# Convert lines to a single string separated by newlines
if isinstance(data, list):
lines = [json.dumps(item, ensure_ascii=False) for item in data]
output = '\n'.join(lines)
else:
output = json.dumps(data, ensure_ascii=False)

s3_client.put_object(
Bucket=bucket_name,
Key=s3_key,
Body=output.encode('utf-8'),
ContentType='application/json'
)

# Function to extract specific data based on different response structures
def extract_data(response, data_field):
return response.json().get(data_field, [])

def convert_date_to_timestamp(date_str):
return int(time.mktime(datetime.strptime(date_str, '%Y-%m-%d').timetuple()))

# Fetch and save ads data
def get_ads_data(page_access_token, page_id, app_name, start_date, end_date):
since_timestamp = convert_date_to_timestamp(start_date)
until_timestamp = convert_date_to_timestamp(end_date)
url = f'https://pages.fm/api/public_api/v1/pages/{page_id}/statistics/ads'
params = {'page_access_token': page_access_token, 'since': since_timestamp, 'until': until_timestamp, 'type': 'by_time'}
response = requests.get(url, params=params)
if response.status_code == 200:
data = extract_data(response, 'data')
save_to_s3(data, 'ads.json', app_name, 'ads')
else:
print(f"Error getting ads data: {response.status_code}")

# Fetch and save campaign data
def get_campaign_data(page_access_token, page_id, app_name, start_date, end_date):
since_timestamp = convert_date_to_timestamp(start_date)
until_timestamp = convert_date_to_timestamp(end_date)
url = f'https://pages.fm/api/public_api/v1/pages/{page_id}/statistics/pages_campaigns'
params = {'page_access_token': page_access_token, 'since': since_timestamp, 'until': until_timestamp}
response = requests.get(url, params=params)
if response.status_code == 200:
data = extract_data(response, 'data')
save_to_s3(data, 'campaign.json', app_name, 'campaign')
else:
print(f"Error getting campaign data: {response.status_code}")

# Fetch and save conversation data
def get_conversations_in_chunks(page_access_token, page_id, app_name, start_date, end_date):
current_date = datetime.strptime(start_date, '%Y-%m-%d')
end_date = datetime.strptime(end_date, '%Y-%m-%d')
while current_date < end_date:
next_date = current_date + timedelta(days=29)
if next_date > end_date:
next_date = end_date
since_timestamp = int(current_date.timestamp())
until_timestamp = int(next_date.timestamp())
page_number = 1
while True:
url = f"https://pages.fm/api/public_api/v1/pages/{page_id}/conversations"
params = {"page_access_token": page_access_token, "since": since_timestamp, "until": until_timestamp, "page_number": page_number}
response = requests.get(url, params=params)
if response.status_code == 200 and response.json().get("conversations"):
data = extract_data(response, 'conversations')
save_to_s3(data, f'conversation_page_{page_number}.json', app_name, 'conversations')
page_number += 1
else:
break
current_date = next_date + timedelta(days=1)

# Fetch and save new_customer_statistics
def get_new_customer_statistics(page_access_token, page_id, app_name, start_date, end_date):
def convert_date_format(date_str):
date_obj = datetime.strptime(date_str, '%Y-%m-%d')
return date_obj.strftime('%d/%m/%Y')

start_date_formatted = convert_date_format(start_date)
end_date_formatted = convert_date_format(end_date)

date_range = f"{start_date_formatted} - {end_date_formatted}"

url = f"https://pages.fm/api/public_api/v1/pages/{page_id}/statistics/customers"

params = {
'page_access_token': page_access_token,
'date_range': date_range,
'group_by': 'day'
}

response = requests.get(url, params=params)
if response.status_code == 200:
data = extract_data(response, 'data')
save_to_s3(data, 'new_customer_statistics.json', app_name, 'new_customer_statistics')
else:
print(f"Error getting new customer statistics: {response.status_code}, Message: {response.text}")

# Fetch and save page_customers
def get_page_customers(page_access_token, page_id, app_name, start_date, end_date):
since_timestamp = convert_date_to_timestamp(start_date)
until_timestamp = convert_date_to_timestamp(end_date)

url = f'https://pages.fm/api/public_api/v1/pages/{page_id}/page_customers'
params = {'page_access_token': page_access_token, 'since': since_timestamp, 'until': until_timestamp, 'page_number': 1, 'page_size': 50}

response = requests.get(url, params=params)
if response.status_code == 200:
data = extract_data(response, 'customers')
save_to_s3(data, 'page_customers.json', app_name, 'page_customers')
else:
print(f"Error getting page customers: {response.status_code}")

# Fetch and save user_list
def get_user_list(page_access_token, page_id, app_name):
url = f'https://pages.fm/api/public_api/v1/pages/{page_id}/users'
params = {'page_access_token': page_access_token}
response = requests.get(url, params=params)
if response.status_code == 200:
data = extract_data(response, 'users')
save_to_s3(data, 'user_list.json', app_name, 'user_list')
else:
print(f"Error getting user list: {response.status_code}")

# Fetch and save posts
def get_posts(page_access_token, page_id, app_name, start_date, end_date):
since_timestamp = convert_date_to_timestamp(start_date)
until_timestamp = convert_date_to_timestamp(end_date)

url = f"https://pages.fm/api/public_api/v1/pages/{page_id}/posts"
params = {'since': since_timestamp, 'until': until_timestamp, 'page_number': 1, 'page_size': 30, 'page_access_token': page_access_token}

response = requests.get(url, params=params)
if response.status_code == 200:
data = extract_data(response, 'posts')
save_to_s3(data, 'posts.json', app_name, 'posts')
else:
print(f"Error getting posts: {response.status_code}")

# Fetch and save tags
def get_tags_data(page_access_token, page_id, app_name):
url = f"https://pages.fm/api/public_api/v1/pages/{page_id}/tags"
params = {'page_access_token': page_access_token}
response = requests.get(url, params=params)
if response.status_code == 200:
data = extract_data(response, 'tags')
save_to_s3(data, 'tags.json', app_name, 'tags')
else:
print(f"Error getting tags data: {response.status_code}")

def get_user_statistics(page_access_token, page_id, start_date, end_date, app_name):
url = f'https://pages.fm/api/public_api/v1/pages/{page_id}/statistics/users'

# Tham số gửi kèm trong request
date_range = f'{start_date} - {end_date}'
params = {
'page_access_token': page_access_token,
'date_range': date_range
}

# Gửi yêu cầu GET tới API
response = requests.get(url, params=params)

if response.status_code == 200:
data = response.json()
statistics = data["data"]["statistics"]
users = data["data"]["users"]

statistics1 = []
users1 = []
for key, value in statistics.items():
for elem in value:
elem['user_id'] = key
statistics1.append(elem)

# Lưu users vào users1
for key, value in users.items():
value['user_id'] = key
users1.append(value)

save_to_s3(statistics1, 'user_statistics.json', app_name, 'user_statistics')
save_to_s3(users1, 'user.json', app_name, 'user')


# Hàm lấy dữ liệu Engagement_statistics
def get_engagement_statistics(page_access_token, page_id, start_date, end_date, app_name):
url = f'https://pages.fm/api/public_api/v1/pages/{page_id}/statistics/customer_engagements'

# Tham số truyền vào API
date_range = f'{start_date} - {end_date}'
params = {
'page_access_token': page_access_token,
'date_range': date_range,
'by_hour': False,
'user_ids': [] # Bắt buộc phải có tham số này, có thể để trống nếu không lọc
}

# Gửi request GET đến API
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
users_engagements = data['users_engagements']
engagement_statistics = data['statistics']
save_to_s3(users_engagements, 'user_engagements.json', app_name, 'users_engagements')
save_to_s3(engagement_statistics, 'engagement_statistics.json', app_name, 'engagement_statistics')

def format_date(date_str):
date_obj = datetime.strptime(date_str, '%Y-%m-%d')
return date_obj.strftime('%d/%m/%Y 00:00:00')

Hàm lambda handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def lambda_handler(event, context):
apps = []
page_ids = []
page_access_tokens = []
today = datetime.now()

# Set start_date to two days before today
start_date = (today - timedelta(days=2)).strftime('%Y-%m-%d')

# Set end_date to one day before today
end_date = (today - timedelta(days=1)).strftime('%Y-%m-%d')
start_date1 = format_date(start_date)
end_date1 = format_date(end_date)

for app, page_id, page_access_token in zip(apps, page_ids, page_access_tokens):
get_ads_data(page_access_token, page_id, app, start_date, end_date)
get_campaign_data(page_access_token, page_id, app, start_date, end_date)
get_conversations_in_chunks(page_access_token, page_id, app, start_date, end_date)
get_posts(page_access_token, page_id, app, start_date, end_date)
get_new_customer_statistics(page_access_token, page_id, app, start_date, end_date)
get_page_customers(page_access_token, page_id, app, start_date, end_date)
# get_user_list(page_access_token, page_id, app) # chỉ lấy một lần
# get_tags_data(page_access_token, page_id, app) #chỉ lấy một lần
get_user_statistics(page_access_token, page_id, start_date1, end_date1, app)
get_engagement_statistics(page_access_token, page_id, start_date1, end_date1, app)

return {
'statusCode': 200,
'body': json.dumps('Data fetched and saved to S3 successfully!')
}

SUPPORTED ENDPOINTS

HTTP method Endpoint
GET List user, static….
POST update

Status Codes

Mã phản hồi HTTP được sử dụng để chỉ ra các lớp thành công và lỗi chung.

Success Code

HTTP Status Quote Description
200 Successfully processed request.

Error Codes

Phản hồi lỗi chứa nhiều chi tiết hơn về lỗi trong nội dung phản hồi, trong "code""message".

HTTP Status Quote code message
400 invalid_json The request body could not be decoded as JSON
400 invalid_request_url This request URL is not valid.
400 invalid_request This request is not supported.
401 unauthorized The bearer token is not valid.
403 forbidden You do not have permission to access this resource.
404 not found The requested resource could not be found.
405 method_not_allowed The request method is not allowed.
429 too_many_requests You have made too many requests. Please try again later.
500 internal_server_error An error occurred on the server.
502 bad_gateway The server received an invalid response from the upstream server.
503 service_unavailable The server is temporarily unavailable. Please try again later.
504 gateway_timeout The upstream server did not respond in time.