AWS Glue PySpark Extensions

This tutorial is adapted from the Web Age Course Data Analytics on AWS.

1.1 AWS Glue and Spark

AWS Glue is based on the Apache Spark platform extending it with Glue-specific libraries. In this tutorial, we will only review Glue’s support for PySpark. As of version 2.0, Glue supports Python 3, which you should use in your development.

1.2 The DynamicFrame Object

AWS Glue API is centered around the DynamicFrame object which is an extension of Spark’s DataFrame object. 

DynamicFrame offers finer control over schema inference and some other benefits over the standard Spark DataFrame object. These benefits come from the DynamicRecord object that represents a logical record in a DynamicFrame. DynamicRecord is similar to a row in the Spark DataFrame except that it is self-describing and can be used for rows that do not conform to a fixed schema. DynamicRecord offers a way for each record to self-describe itself without requiring up-front schema definition. Glue creators allow developers to programmatically switch between the DynamicFrame and DataFrame using the DynamicFrame’s toDF() and fromDF() methods. 

1.3 The DynamicFrame API

fromDF() / toDF()

count() – returns the number of rows in the underlying DataFrame

show(num_rows) – prints a specified number of rows from the underlying DataFrame

Selected transforms with usage examples:

drop_fields

		df.drop_fields(['other_names','identifiers'])

rename_field

df.rename_field('id', 'org_id').rename_field('name', 'org_name')

filter

		partitions = df.filter("type = 'partition'")

map

		df.rdd.map(lambda row: (row[id_col], {row[key]: row[value]}))

select_fields

		df.select_fields(['organization_id']).toDF().distinct().show()

For more details on AWS Glue PySpark extensions and transformations, visit https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python.html

1.4 The GlueContext Object

GlueContext is the wrapper around SparkContext object that you need to create before you can use the Glue API.

GlueContext creation code:

glueContext = GlueContext(SparkContext.getOrCreate())

1.5 Glue Transforms

Transforms in Glue are classes that help you code logic for manipulating your data, including:

DropFields, DropNullFields, Filter, Join, RenameField, SelectFields, and others

Example of using the Join transform for joining two DynamicFrames:

dyf_joined = Join.apply(dyf_1, dyf_2, j_col_dyf_1, j_col_dyf_2)

For more details on the available Glue transforms, visit https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-transforms.html

1.6 A Sample Glue PySpark Script


from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job 
glueContext = GlueContext(SparkContext.getOrCreate())
orders = glueContext.create_dynamic_frame.from_catalog(database="sx_db",           
table_name="order_csv") 
# orders is of type <class 'awsglue.dynamicframe.DynamicFrame'> 
# You can get the count of records in the DynamicFrame with this command: orders.count() 
# Projections (using PySpark's DataFrame API): 
# orders.select_fields(['order id', 'employee id', 'customer id', 'order summary']).toDF().show(5) 
# Renaming columns (fields) orders.rename_field("`payment type`", "pmtt").toDF().columns 
order_details = glueContext.create_dynamic_frame.from_catalog(database="sx_db",             
table_name="order_details_csv") 
# Joining two Glue DynamicFrames on the 'order id' column (field) 
dyf_joined = Join.apply(order_details, orders, 'order id',  'order id')

1.7 Using PySpark

# Here is how you can access S3 using PySpark:
orders = spark.read.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("sep", "\t") \
    .load('s3://webage-data-sets/glue-data-sets/order.csv')
# orders object is Spark's DataFrame object, which you can convert to Glue's DynamicFrame object using this code:
 
from awsglue.dynamicframe import DynamicFrame
orders_dyf = DynamicFrame.fromDF(orders, glueContext, "orders_dyf")

1.8 AWS Glue PySpark SDK

PySpark integrates with AWS SDK via AWS boto3 module:

import boto3
glue = boto3.client(service_name='glue', region_name='us-east-1', 
endpoint_url='https://glue.us-east-1.amazonaws.com')

Most of AWS Glue functionality comes from the awsglue module.The Facade API object awsglue.context.GlueContext wraps the Apache SparkSQL SQLContext object and you create it like so:

glueContext = GlueContext(SparkContext.getOrCreate())

AWS Glue ETL code samples can be found here: https://github.com/aws-samples/aws-glue-samples

Notes:

Glue client code sample

Here is an example of a Glue client packaged as a lambda function (running on an automatically provisioned server (or servers)) that invokes an ETL script to process input parameters (the code samples are taken and adapted from this source: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-calling.html)

The lambda function code:

from datetime import datetime, timedelta

glue_client = boto3.client('glue')

# This is the callback invoked by AWS in response to an event (e.g. a record is 
# inserted into a DynamoDB NoSQL database)
def lambda_handler(event, context):
  last_hour_date_time = datetime.now() - timedelta(hours = 1)
  day_partition_value = last_hour_date_time.strftime("%Y-%m-%d")
  hour_partition_value = last_hour_date_time.strftime("%-H")

  response = glue_client.start_job_run(
               JobName = 'my_test_Job',  
               Arguments = {      # a set of key-value pairs
                 '--day_partition_key':   'partition_0',
                 '--hour_partition_key':  'partition_1',
                 '--day_partition_value':  day_partition_value,
                 '--hour_partition_value': hour_partition_value } ) 

 

The AWS Glue script:

import sys
from awsglue.utils import getResolvedOptions

# getResolvedOptions offers a reliable way to access values in the sys.argv list
args = getResolvedOptions(sys.argv,
                          ['JOB_NAME',    # 'my_test_Job
                           'day_partition_key',
                           'hour_partition_key',
                           'day_partition_value',
                           'hour_partition_value'])
print "The day partition key is: ", args['day_partition_key']
print "and the day partition value is: ", args['day_partition_value']

Note that each of the arguments is defined as beginning with two hyphens, then referenced in the script without the hyphens. Your arguments need to follow this convention to be resolved.

1.9 Summary

In this tutorial, we have reviewed the following topics:

AWS Glue ETL Service

AWS Glue PySpark extensions and transforms

Leave a Reply

Your email address will not be published. Required fields are marked *