This tutorial is adapted from the Web Age Course Data Analytics on AWS.
1.1 AWS Glue and Spark
1.2 The DynamicFrame Object
DynamicFrame offers finer control over schema inference and some other benefits over the standard Spark DataFrame object. These benefits come from the DynamicRecord object that represents a logical record in a DynamicFrame. DynamicRecord is similar to a row in the Spark DataFrame except that it is self-describing and can be used for rows that do not conform to a fixed schema. DynamicRecord offers a way for each record to self-describe itself without requiring up-front schema definition. Glue creators allow developers to programmatically switch between the DynamicFrame and DataFrame using the DynamicFrame’s toDF() and fromDF() methods.
1.3 The DynamicFrame API
df.drop_fields()
df.rename_field('id', 'org_id').rename_field('name', 'org_name')
partitions = df.filter("type = 'partition'")
df.rdd.map(lambda row: (row, {row: row}))
df.select_fields().toDF().distinct().show()
1.4 The GlueContext Object
glueContext = GlueContext(SparkContext.getOrCreate())
1.5 Glue Transforms
dyf_joined = Join.apply(dyf_1, dyf_2, j_col_dyf_1, j_col_dyf_2)
1.6 A Sample Glue PySpark Script
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
orders = glueContext.create_dynamic_frame.from_catalog(database="sx_db",
table_name="order_csv")
# orders is of type <class 'awsglue.dynamicframe.DynamicFrame'>
# You can get the count of records in the DynamicFrame with this command: orders.count()
# Projections (using PySpark's DataFrame API):
# orders.select_fields().toDF().show(5)
# Renaming columns (fields) orders.rename_field("`payment type`", "pmtt").toDF().columns
order_details = glueContext.create_dynamic_frame.from_catalog(database="sx_db",
table_name="order_details_csv")
# Joining two Glue DynamicFrames on the 'order id' column (field)
dyf_joined = Join.apply(order_details, orders, 'order id', 'order id')
1.7 Using PySpark
# Here is how you can access S3 using PySpark: orders = spark.read.format("com.databricks.spark.csv") .option("header", "true") .option("inferSchema", "true") .option("sep", "t") .load('s3://webage-data-sets/glue-data-sets/order.csv') # orders object is Spark's DataFrame object, which you can convert to Glue's DynamicFrame object using this code: from awsglue.dynamicframe import DynamicFrame orders_dyf = DynamicFrame.fromDF(orders, glueContext, "orders_dyf")
1.8 AWS Glue PySpark SDK
import boto3 glue = boto3.client(service_name='glue', region_name='us-east-1', endpoint_url='https://glue.us-east-1.amazonaws.com')
glueContext = GlueContext(SparkContext.getOrCreate())
Notes:
Glue client code sample
Here is an example of a Glue client packaged as a lambda function (running on an automatically provisioned server (or servers)) that invokes an ETL script to process input parameters (the code samples are taken and adapted from this source: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-calling.html)
The lambda function code:
from datetime import datetime, timedelta glue_client = boto3.client('glue') # This is the callback invoked by AWS in response to an event (e.g. a record is # inserted into a DynamoDB NoSQL database) def lambda_handler(event, context): last_hour_date_time = datetime.now() - timedelta(hours = 1) day_partition_value = last_hour_date_time.strftime("%Y-%m-%d") hour_partition_value = last_hour_date_time.strftime("%-H") response = glue_client.start_job_run( JobName = 'my_test_Job', Arguments = { # a set of key-value pairs '--day_partition_key': 'partition_0', '--hour_partition_key': 'partition_1', '--day_partition_value': day_partition_value, '--hour_partition_value': hour_partition_value } )
The AWS Glue script:
import sys from awsglue.utils import getResolvedOptions # getResolvedOptions offers a reliable way to access values in the sys.argv list args = getResolvedOptions(sys.argv, ['JOB_NAME', # 'my_test_Job 'day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value']) print "The day partition key is: ", args print "and the day partition value is: ", args
Note that each of the arguments is defined as beginning with two hyphens, then referenced in the script without the hyphens. Your arguments need to follow this convention to be resolved.