Functional Programming in Python

This tutorial is adapted from the Web Age course Introduction to Python Programming.

1.1 What is Functional Programming?

Functional programming reduces problems to a set of function calls.

The functions used, referred to as Pure functions, follow these rules:

  • Only produce a result
  • Do not modify the parameters that were passed in
  • Do not produce any side effects
  • Always produce the same result when called with the same parameters

Another condition of functional programming is that all data be immutable.

1.2 Benefits of Functional Programming

Solutions implemented using functional programming have several advantages:

  • Pure functions are easier to test
  • Programs follow a predictable flow
  • Debugging is easier
  • Functional building blocks allow for high-level programming
  • Parallel execution is easier to implement

1.3 Functions as Data

In Python, functions can be assigned to and passed as variables. This allows for:

  • Passing functions as parameters to other functions
  • Returning functions as the result of a function

Python functions such as map(), filter() and sort() take functions as parameters. Functions that accept other functions as parameters or return functions as their result are referred to as higher-order functions.

1.4 Using Map Function

The map() function applies a specified function to each item in an iterable data type (list, string, dictionary, set).

map( transform_function, iterable )

The transform_function passed to map takes an item from the iterable as a parameter and returns data related to the item but modified in some way.

def toUpper(item):
  return   item.upper()

The following map function call converts all items to upper case characters:

states =['Arizona', 'Georgia','New York', 'Texas']
map_result = map( toUpper, states)
list_result = list(map_result)
print(list_result)
# prints:['ARIZONA', 'GEORGIA', 'NEW YORK', 'TEXAS']

The map() returns a map type object which is then converted to a list using the list() constructor function.

1.5 Using Filter Function

The filter() function checks each item in an iterable (list, string, dictionary, set) against a condition and outputs a new iterable that only includes items that pass the check

filter( check_function, iterable )

The check_function passed to filter takes an item from the iterable as a parameter and returns True|False depending on whether the item conforms to the given condition.

def lengthCheck(item, maxlen=7):
  return len(item) <= maxlen

The following map function call converts all itmes to upper case characters:

states =['Arizona', 'Oklahoma','Utah', 'Texas']
filter_result = filter( lengthCheck, states)
list_result = list(filter_result)
print(list_result)
# prints:['Arizona', 'Georgia', 'Texas']

The filter() returns a filter type object which is then converted to a list using the list() constructor function.

1.6 Lambda expressions

Lambda expressions in Python:

  • Are a special syntax for creating anonymous functions
  • Are limited to a single line of code
  • Return the result of their single code statement by default
  • Are typically defined in-line where they will be used
  • Some lambda expressions
lambda x : x * x     # multiply parameter by itself
lambda y : y * 2     # multiply parameter by 2 
lambda z : z['name'] # get value from dict with 
                     # key = 'name'

Normally a lambda expression is placed in code where it will be used:

list(map(lambda x:x*x, [1,2,3]))
# outputs [1, 4, 9]

Lambda expressions can be tested as shown here by supplying a parameter in parenthesis:

(lambda x : x * x)(5) # returns 25

1.7 List.sort() Using Lambda Expression

List.sort() takes two parameters:

list.sort( key, reverse )
    • key = function that should return a value to be sorted on
    • reverse = True/False to indicate source order

In the code below a lambda expression is used to provide the function for the ‘key’ parameter:

list1 = [{'name':'Jack', 'age':25},
{'name':'Cindy', 'age':17},
{'name':'Stuart', 'age':20}]

list1.sort(key=lambda item: item['name'],  
           reverse=False)
print(list1)
# outputs: [{'name': 'Cindy', 'age': 17}, 
# {'name': 'Jack', 'age': 25}, 
# {'name': 'Stuart', 'age': 20}]

The lambda expression returns the value of each object’s ‘name’ parameter to be sorted on

Notes

Python’s sorted() function is similar to list.sort() except that you need to pass in the list as the first parameter and that sorted() returns the sorted list which must then be assigned to a variable

list_sorted = sorted(list1, key, reverse)

1.8 Difference Between Simple Loops and map/filter Type Functions

Loops intended to modify an iterable’s items often does so my mutating the original iterable:

# loop over and mutate iterable
list1 = [ 'c', 'f', 'a', 'e', 'b' ]
for i in range(len(list1)):
  list1[i] = list1[i].upper()

The problems with this are:

  • The original list is no longer available as it has been mutated
  • This code would not work with immutable sequences like tuples
  • The equivalent code using map fixes both of these issues
# map creates new list based on original
list1 = [ 'c', 'f', 'a', 'e', 'b' ]
list2 = list(map(lambda x: x.upper(), list1))

Here is an example of the same map function being used with a tuple:

tup1 = ( 'c', 'f', 'a', 'e', 'b' )
tup2 = tuple(map(lambda x:x.upper(), tup1))

 

1.9 Additional Functions

Python includes many functions developers can use out of the box rather than have to hand-code them:

any(seq) # iterate boolean seq, returns true if at least one item in true

all(seq) # iterate boolean seq, returns true if all items are true

max(seq) # iterate sequence and calculate min value

min(seq) # iterate sequence and calculate min value

sum(list) # iterate sequence and calculate sum

len(seq) # return length of strings, # items in list, etc.

input() # get input from user

randint(0,10) # generate a random number between 0 and 10

Counter(seq) # create dictionary with freq of each seq member

Notes:

randint() is part of the random module which must be imported:
import random as r

r.randint(0,10)

Counter is part of the collections module and must be imported

import collections as c

c.Counter(‘asdffg’)

1.10 General Rules for Creating Functions

Since creating functions is a big part of functional programming we will re-iterate here some of the rules we learned earlier

  • Name functions appropriately
  • Limit the function to a single responsibility
  • Include a docstring
  • Always return a value
  • Limit functions to 50 lines or less
  • Make the function ‘idempotent’ and ‘pure’ if possible

1.11 Summary

In this tutorial, we covered:

  • What is Functional Programming
  • Functional Programming Benefits
  • Using Map Function
  • Using Filter Function
  • Lambda Expressions
  • List.sort() using Lambda
  • Loops vs. map/filter
  • Additional Functions
  • General Rules for Creating Functions

Building Data Pipelines in Kafka

This tutorial is adapted from Web Age course Kafka for Application Developers Training.

1.1 Building Data Pipelines

Data pipelines can involve various use cases:

  • Building a data pipeline where Apache Kafka is one of the two endpoints. For example, getting data from Kafka to S3 or getting data from MongoDB into Kafka. 
  • Building a pipeline between two different systems but using Kafka as an intermediary. For example, getting data from Twitter to Elasticsearch by sending the data first from Twitter to Kafka and then from Kafka to Elasticsearch. 

The main value Kafka provides to data pipelines is its ability to serve as a very large, reliable buffer between various stages in the pipeline, effectively decoupling producers and consumers of data within the pipeline. This decoupling, combined with reliability security, and efficiency, makes Kafka a good fit for most data pipelines.

1.2 Considerations When Building Data Pipelines

  • Timeliness
  • Reliability
  • High and varying throughput
  • Data formats
  • Transformations
  • Security
  • Failure handling
  • Coupling and agility

1.3 Timeliness

Good data integration systems can support different timeliness requirements for different pipelines. Kafka makes the migration between different timetables easier as business requirements can change. Kafka is a scalable and reliable streaming data platform that can be used to support anything from near-real-time pipelines to hourly batches. Producers can write to Kafka as frequently as needed and consumers can also read and deliver the latest events as they arrive. Consumers can work in batches, when required, such as run every hour, connect to Kafka, and read the events that accumulated during the previous hour. Kafka acts as a buffer that decouples the time-sensitivity requirements between producers and consumers. Producers can write events in real-time while consumers process batches of events or vice versa. The consumption rate is driven entirely by consumers.

1.4 Reliability

Systems failure for more than a few seconds can be hugely disruptive, especially when the timeliness requirement is closer to the few-milliseconds end of the spectrum. Data integration systems should avoid single points of failure and allow for fast and automatic recovery from all sorts of failure events. Data pipelines are often the way data arrives in business-critical systems. Another important consideration for reliability is delivery guarantees. Kafka offers a reliable and guaranteed delivery.

1.5 High and Varying Throughput

The data pipelines should be able to scale to very high throughput. They should be able to adapt if throughput suddenly increases and reduces. With Kafka acting as a buffer between producers and consumers, we no longer need a couple of consumer throughput to the producer throughput. If producer throughput exceeds that of the consumer, data will accumulate in Kafka until the consumer can catch up. Kafka’s ability to scale by adding consumers or producers independently allows us to scale either side of the pipeline dynamically and independently to match the changing requirements. Kafka is a high-throughput distributed system capable of processing hundreds of megabytes per second on even modest clusters. Kafka also focuses on parallelizing the work and not just scaling it out. Parallelizing means it allows data sources and sinks to split the work between multiple threads of execution and use the available CPU resources even when running on a single machine. Kafka also supports several types of compression, allowing users and admins to control the use of network and storage resources as the throughput requirements increase.

1.6 Data Formats

A good data integration platform allows and reconciles different data formats and data types. The data types supported vary among different databases and other storage systems. For e.g. you may be loading XMLs and relational data into Kafka and then need to convert data to JSON when writing it. Kafka itself and the Connect APIs are completely agnostic when it comes to data formats. Producers and consumers can use any serializer to represent data in any format that works for you. Kafka Connect has its own in-memory objects that include data types and schemas, but it allows for pluggable converters to allow storing these records in any format. Many sources and sinks have a schema; we can read the schema from the source with the data, store it, and use it to validate compatibility or even update the schema in the sink database. For e.g. if someone added a column in MySQL, a pipeline will make sure the column gets added to Hive too as we are loading new data into it. When writing data from Kafka to external systems, Sink connectors are responsible for the format in which the data is written to the external system. Some connectors choose to make this format pluggable. For example, the HDFS connector allows a choice between Avro and Parquet formats.

1.7 Transformations

There are generally two schools of building data pipelines:

  • ETL (Extract-Transform-Load)
  • ELT (Extract-Load-Transform)

ETL– It means the data pipeline is responsible for making modifications to the data as it passes through. It has the perceived benefit of saving time and storage because you don’t need to store the data, modify it, and store it again.  It shifts the burden of computation and storage to the data pipeline itself, which may or may not be desirable. The transformations that happen to the data in the pipeline tie the hands of those who wish to process the data farther down the pipe. If users require access to the missing fields, the pipeline needs to be rebuilt and historical data will require reprocessing (assuming it is available).

ELT– It means the data pipeline does only minimal transformation (mostly around data type conversion), with the goal of making sure the data that arrives at the target is as similar as possible to the source data. These are also called high-fidelity pipelines or data-lake architecture. In these systems, the target system collects “raw data” and all required processing is done at the target system. Users of the target system have access to all the data. These systems also tend to be easier to troubleshoot since all data processing is limited to one system rather than split between the pipeline and additional applications.. The transformations take CPU and storage resources at the target system.

1.8 Security

In terms of data pipelines, the main security concerns are:

  • Encryption – the data going through the pipe should be encrypted. This is mainly a concern for data pipelines that cross datacenter boundaries.
  • Authorization – Who is allowed to make modifications to the pipelines?
  • Authentication – If the data pipeline needs to read or write from access-controlled locations, can it authenticate properly?

Kafka allows encrypting data on the wire, as it is piped from sources to Kafka and from Kafka to sinks. It also supports authentication (via SASL) and authorization. Kafka’s encryption feature ensures the sensitive data can’t be piped into less secured systems by someone unauthorized. Kafka also provides an audit log to track access—unauthorized and authorized. With some extra coding, it is also possible to track where the events in each topic came from and who modified them, so you can provide the entire lineage for each record.

1.9 Failure Handling

It is important to plan for failure handling in advance, such as:

  • Can we prevent faulty records from ever making it into the pipeline?
  • Can we recover from records that cannot be parsed?
  • Can bad records get fixed (perhaps by a human) and reprocessed?
  • What if the bad event looks exactly like a normal event and you only discover the problem a few days later?

Because Kafka stores all events for long periods of time, it is possible to go back in time and recover from errors when needed.

1.10 Coupling and Agility

One of the most important goals of data pipelines is to decouple the data sources and data targets.

There are multiple ways accidental coupling can happen:

  • Ad-hoc pipelines
  • Loss of metadata
  • Extreme processing

1.11 Ad-hoc Pipelines

Some companies end up building a custom pipeline for each pair of applications they want to connect.

For example:

  • Use Logstash to dump logs to Elasticsearch
  • Use Flume to dump logs to HDFS
  • Use GoldenGate to get data from Oracle to HDFS
  • Use Informatica to get data from MySQL and XMLs to Oracle

This tightly couples the data pipeline to the specific endpoints and creates a mess of integration points that requires significant effort to deploy, maintain, and monitor. Data pipelines should only be planned for systems where it’s really required.

1.12 Loss of Metadata

If the data pipeline doesn’t preserve schema metadata and does not allow for schema evolution, you end up tightly coupling the software producing the data at the source and the software that uses it at the destination. Without schema information, both software products need to include information on how to parse the data and interpret it. For example: If data flow from Oracle to HDFS and a DBA added a new field in Oracle without preserving schema information and allowing schema evolution, either every app that reads data from HDFS will break or all the developers will need to upgrade their applications at the same time. Neither option is agile. With support for schema evolution in the pipeline, each team can modify their applications at their own pace without worrying that things will break down the line.

1.13 Extreme Processing

Some processing/transformation of data is inherent to data pipelines. Too much processing ties all the downstream systems to decisions made when building the pipelines. For example, which fields to preserve, how to aggregate data. This often leads to constant changes to the pipeline as requirements of downstream applications change, which isn’t agile, efficient, or safe. The more agile way is to preserve as much of the raw data as possible and allow downstream apps to make their own decisions regarding data processing and aggregation.

1.15 Kafka Connect Versus Producer and Consumer

When writing to Kafka or reading from Kafka, you have the choice between using a traditional producer and consumer clients and using the Connect APIs and the connectors. Use Kafka clients when you can modify the code of the application that you want to connect an application to and when you want to either push data into Kafka or pull data from Kafka. Use Connect to connect Kafka to datastores that you did not write and whose code you cannot or will not modify. Connect is used to pull data from the external datastore into Kafka or push data from Kafka to an external store. For datastores where a connector already exists, Connect can be used by non-developers, who will only need to configure the connectors. Connect is recommended because it provides out-of-the-box features like configuration management, offset storage, parallelization, error handling, support for different data types, and standard management REST APIs. If you need to connect Kafka to a datastore and a connector does not exist yet, you can choose between writing an app using the Kafka clients or the Connect API. Writing a small app that connects Kafka to a datastore sounds simple, but there are many little details you will need to handle data types and configurations that make the task non-trivial. Kafka Connect handles most of this for you, allowing you to focus on transporting data to and from the external stores.

1.16 Summary

  • Kafka can be used to implement data pipelines
  • When designing the data pipelines, various factors should be considered.
  • One of the most important Kafka features is its ability to deliver all messages under all failure conditions.

What is HTTP Client in Angular?

This tutorial is adapted from Web Age course Comprehensive Angular 10 Programming Training.

1.1 The Angular HTTP Client

The Angular HTTP Client provides a simplified API for network communication. It is a wrapper over the JavaScript XMLHttpRequest API. The API is asynchronous. JavaScript is single-threaded. Doing a blocking synchronous HTTP call will otherwise freeze the UI. It supports making HTTP requests (GET, POST, etc.), working with request and response headers, asynchronous programming. It makes use of the rxjs async library Observable object.

1.2 Using The HTTP Client – Overview

The core client API is available from the HttpClient Angular service class. This is available from the HttpClientModule Angular module.

  • Import HttpClientModule from your application module.

A Data Service is created that makes network requests:

  • Inject the HttpClient service instance.
  • Use various methods of HttpClient to make network calls. They return an Observable object. Usually, this Observable is returned from the service method.

An Angular component is used to display the response data:

    • The data service is injected into the constructor
    • The component calls a method of the data service and obtains an Observable object.
    • The component then “subscribes” to the Observable to receive the response from the HTTP call.
    • The component’s template displays the data

1.3 Importing HttpClientModule

In order to use the Http client throughout the application, we need to import HttpClientModule in the application module. HttpClientModule is a collection of service providers from the Angular HTTP library

import {HttpClientModule} from '@angular/common/http';

@NgModule({
    imports: [ BrowserModule, HttpClientModule ],
...})

Now the HttpClient service is injectable throughout your application

Importing HttpClientModule

Note that HttpClientModule configures HttpClient as a provider. You don’t have to do this again from your application module. As a result, HttpClient is now injectable anywhere in your application.

1.4 Service Using HttpClient

Sample Service Code (“people.service.ts”):

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs';

export class Person {id:number, name:string ...}

@Injectable(...)
export class PeopleService{
  constructor(private http: HttpClient){}

  getPerson(id:number) : Observable<Person> {
   return this.http.get<Person>(`/app/person/${id}`)
  }
  getAllPersons() : Observable<Person[]> {
   return http.get<Person[]>(`/app/person`)
  }
}

1.5 Making a GET Request

  • Call the get() method of HttpClient service.
      All network call methods like get(), post(), put() return an Observable.
  • There are many overloaded versions of the get() call. Some are:
let obs:Observable<Object> = http.get(url) 
//Strong typing
let obs:Observable<Person> = http.get<Person>(url) 
  • For the get() calls it is recommended you use the strongly typed version (second one from above).
  • The get() method returns an Observable immediately. One needs to subscribe to the Observable to obtain the response data asynchronously.

Notes:

As they are used with the Angular HttpClient object Observable objects can be thought of as the glue that connects network requests with data consumers. There are several aspects to this relationship that our discussion will cover one at a time. From the current slide, we see that http requests return Observable objects that we can assign to a variable for later use. In the next slide, we will take a look at how Observable objects are used.

1.6 What does an Observable Object do?

Angular uses the Reactive Extensions for JavaScript (RxJS) implementation of the Observable object. Observable objects provide a convenient way for applications to consume asynchronous data/event streams. Observable objects are exposed to events which they then make available to consumers. Applications consume events by subscribing to the Observable object. The Observable object can be used to transform data before returning it to a consumer if needed.

  • Notes:

Reactive programming and Observable objects can be used anywhere an asynchronous programming model is required. For more information on these topics see:

http://reactivex.io/intro.html

1.7 Using the Service in a Component

  • Once the service has been created we can create an Angular component that uses it to retrieve and display data.
  • Our component will have to import the service.
  • In order to retrieve data, the code in our component will have to work with the Observable object that was created in the service.
  • The screenshot at the right shows how the data will look when displayed.

1.8 The PeopleService Client Component

import { Component, OnInit } from '@angular/core';
import { PeopleService, Person } from './people.service';

@Component({
  selector: 'app-people',
  template: `
    <p *ngFor='let person of list' >{person.name}}</p>`
})
export class PeopleComponent implements OnInit {
  list: Person[]
  constructor(private peopleService: PeopleService ){}
  ngOnInit() {
    this.peopleService.getAllPersons().subscribe(
      (data: Person[]) => this.list = data
    )
  }
}}

1.9 Error Handling

  • Two types of errors can happen during a network call:
    • No network connection can be made to the server.
    • The server returns an invalid response code in 4XX and 5XX range.
  • A subscriber can handle these errors by supplying an error handler function to subscribe().
ngOnInit() {
  this.peopleService.getAllPersons().subscribe(
    (data: Person[]) => this.list = data,
    (error:HttpErrorResponse) => console.log(error)
    )
}

1.10 Customizing the Error Object

    • A service may decide to offer more meaningful errors in a custom error object instead of the default HttpErrorResponse. You can intercept an error with the

catchError()

    • operator and supply a custom error object using

throwError()

    • .

The following transforms the error object from HttpErrorResponse to a string.

import {Observable, throwError} from 'rxjs'
import {catchError} from 'rxjs/operators'

export class PeopleService {
  constructor(httpClient: HttpClient){}

  getAllPersons() : Observable<Person[]> {
   return httpClient
    .get<Person[]>(`/app/person`)
    .pipe(
      catchError(error => 
        throwError("There was a problem with the network"))
    )
  }
}

//The component
ngOnInit() {
  this.peopleService.getAllPersons().subscribe(
    (data: Person[]) => this.list = data,
    (errMsg:string) => alert(errMsg))
}

1.11 Making a POST Request

Supply the request body as the second argument to the post() method.

createPerson(person:Person) : Observable {
  return this.http.post("/app/person", person) 
}

If a POST request returns data in the response body, you can make a more type-safe call.

createPerson(person:Person) : Observable<AddPersonResult> {
  return this.http.post<AddPersonResult>("/app/person", 
    person) 
}
//The component
this.peopleService.createPerson(...)
  .subscribe((result: AddPersonResult) => {...})

1.12 Making a PUT Request

  • Supply the request body as the second argument to the put() method.
updatePerson(person:Person) : Observable {
  return this.http.put("/app/person", person) 
}
  • If a PUT request returns data in the response body, you can make a more type-safe call.
updatePerson(person:Person) : Observable<UpdateResult> {
  return this.http.put<UpdateResult>("/app/person", 
    person) 
}
//The component
this.peopleService.updatePerson(...)
  .subscribe((result: UpdateResult) => {...})

1.13 Making a DELETE Request

  • A DELETE request does not take any body

deletePerson(id:number) : Observable { return this.http.delete(`/app/person/${id}`) }

  • If a DELETE request returns data in the response body, you can make a more type safe call.
deletePerson(id:number) : Observable<DeleteStatus> {
  return this.http.delete<DeleteStatus>(`/app/person/${id}`) 
}
//The component
this.peopleService.deletePerson(12)
  .subscribe((result: DeleteStatus) => {...})

1.14 Summary

In this tutorial, we covered:

  • What is the Angular HTTP Client
  • Importing HttpClientModule
  • Making Get/ Post Calls
  • Working with Observables