Create ETL job using Glue job

Create ETL job using Glue job

  1. Access AWS Glue interface
    • Select ETL jobs
    • Select Visual ETL
    • Select Script editor

Create VPC

  1. With Engine: Spark
    • Select Create script

Create VPC

  1. In the ETL job interface:
    • Rename job to: Chplay-transform-to-gold-layer
    • Copy the following script:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, udf, lower, when, lit
from pyspark.sql.types import StringType, DoubleType

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

dyf_reviews = glueContext.create_dynamic_frame.from_options(
    format_options={"multiLine": "false"},
    connection_type="s3",
    format="json",
    connection_options={
        "paths": ["s3://glutisify-datalake/chplay-silver/app_reviews/"],
        "recurse": True
    },
    transformation_ctx="Reviews_node"
)
df_reviews = dyf_reviews.toDF()

def extract_label(predictions):
    return predictions[0]["label"]

def extract_score(predictions):
    return float(predictions[0]["score"])

extract_label_udf = udf(extract_label, StringType())
extract_score_udf = udf(extract_score, DoubleType())

performance_keywords = ['slow', 'lag', 'crash', 'bug', 'error', 'load']
performance_regex = "|".join([f"\\b{kw}\\b" for kw in performance_keywords])

df_reviews_parsed = (
    df_reviews.withColumn("sentiment_label", extract_label_udf(col("sentiment_prediction")))
              .withColumn("sentiment_score", extract_score_udf(col("sentiment_prediction")))
              .drop("sentiment_prediction")
              .drop("userImage")
              .withColumn(
                  "mentions_ads",
                  when(lower(col("content")).rlike("ads?|advertis(e|ing)"), True).otherwise(False)
              )
              .withColumn(
                  "mentions_performance_issue",
                  when(lower(col("content")).rlike(performance_regex), True).otherwise(False)
              )
)

dyf_details = glueContext.create_dynamic_frame.from_options(
    format_options={"multiLine": "false"},
    connection_type="s3",
    format="json",
    connection_options={
        "paths": ["s3://glutisify-datalake/chplay/app_details/"],
        "recurse": True
    },
    transformation_ctx="Details_node"
)
df_details = dyf_details.toDF()

from pyspark.sql.types import StringType

df_details_fixed = df_details.select(
    *[
        col(c).cast(StringType()) if dict(df_details.dtypes)[c] == "void" else col(c)
        for c in df_details.columns
    ]
)

df_details_parsed = (
    df_details_fixed
        .withColumn("categories_name", col("categories")[0]["name"])
        .withColumn("categories_id", col("categories")[0]["id"])
        .drop("categories", "descriptionHTML", "icon", "headerImage", "screenshots", "histogram")
)

(
    df_reviews_parsed
    .write
    .mode("overwrite")
    .partitionBy("crawled_at")
    .parquet("s3://glutisify-datalake/chplay-gold/app_reviews/")
)

(
    df_details_parsed
    .write
    .mode("overwrite")
    .partitionBy("crawled_at")
    .parquet("s3://glutisify-datalake/chplay-gold/app_details/")
)

job.commit()
  • Select Save

Create VPC

  1. Switch to Job details tab
    • Select appropriate IAM role

Create VPC

  1. Run Job
    • Select Run

Create VPC

  1. Job runs successfully

Create VPC

  1. Check data on gold layer S3:

Create VPC

⇒ Data is transferred and optimized for storage in Parquet format