This post is the second of a series on How to Install Step by Step a Local Data Lake. Before reading, I suggest to follow this tutorial which will allow you to get the tools up and running on a hosted or virtual machine.
You should have now the following architecture ready to receive data flows, crunch them and expose them to Machine learning or Business Intelligence tools:
The purpose of this post is to process the data received according to its type of flow, schema and format, save it in parquet tables for a later use by Machine learning tools and in Hive tables for JDBC access by BI tools.
We will consider each flow type and use pyspark to process the data samples in order to achieve the outlined objective.
This tutorial is available on my GitHub: https://github.com/nasdag/pyspark/blob/master/data-flows.ipynb
You can download it to your Local Data Lake:
1 2 3 |
|
and select data-flows.ipynb
.
Regards,
Philippe.
We need to simulate the arrival of ADD, FULL and DELTA flows to our Data Lake. Let’s use for this purpose the dump of a retail database. We will load it into mysql and generate csv files to illustrate the arrival of regular flows into our Transfer Hub.
1 2 3 4 5 6 7 8 9 10 11 |
|
1 2 3 4 |
|
Tables_in_retail_db
0 categories
1 customers
2 departments
3 order_items
4 orders
5 products
1
|
|
Field Type Null Key Default Extra
0 customer_id int(11) NO PRI None auto_increment
1 customer_fname varchar(45) NO None
2 customer_lname varchar(45) NO None
3 customer_email varchar(45) NO None
4 customer_password varchar(45) NO None
5 customer_street varchar(255) NO None
6 customer_city varchar(45) NO None
7 customer_state varchar(45) NO None
8 customer_zipcode varchar(45) NO None
1
|
|
customer_id customer_fname customer_lname customer_email customer_password \
0 1 Richard Hernandez XXXXXXXXX XXXXXXXXX
1 2 Mary Barrett XXXXXXXXX XXXXXXXXX
2 3 Ann Smith XXXXXXXXX XXXXXXXXX
3 4 Mary Jones XXXXXXXXX XXXXXXXXX
4 5 Robert Hudson XXXXXXXXX XXXXXXXXX
customer_street customer_city customer_state customer_zipcode
0 6303 Heather Plaza Brownsville TX 78521
1 9526 Noble Embers Ridge Littleton CO 80126
2 3422 Blue Pioneer Bend Caguas PR 00725
3 8324 Little Common San Marcos CA 92069
4 10 Crystal River Mall Caguas PR 00725
generating ADD Flow: daily orders
Schemas will be stored in memory for this tutorial - we will explore in a later post how to select the right repository
1 2 3 4 5 6 7 |
|
order_id,order_date,order_customer_id,order_status
generating FULL Flow: products
1 2 3 4 5 6 |
|
product_id,product_category_id,product_name,product_description,product_price,product_image
generating DELTA Flow: customers
1 2 3 4 5 6 7 8 |
|
customer_id,customer_fname,customer_lname,customer_email,customer_password,customer_street,customer_city,customer_state,customer_zipcode
Let’s move the generated files into HDFS
we will explore later how to move files appropriately between Local and HDFS
1 2 3 4 5 6 7 8 9 |
|
15/12/21 09:11:52 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/datacruncher/in
15/12/21 09:11:55 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/datacruncher/raw
15/12/21 09:11:58 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/datacruncher/out
Create Hive Tables
let’s generate the SQL string from the flow schema
1 2 3 4 5 6 |
|
order_id INT, order_date STRING, order_customer_id INT, order_status STRING
product_id INT, product_category_id INT, product_name STRING, product_description STRING, product_price FLOAT, product_image STRING
customer_id INT, customer_fname STRING, customer_lname STRING, customer_email STRING, customer_password STRING, customer_street STRING, customer_city STRING, customer_state STRING, customer_zipcode STRING
and use JDBC to connect to Hive and create the tables
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
|
Start pyspark to process the data flows
1 2 3 4 5 6 7 |
|
we need to generate also the the appropriate Spark schemas from the schema strings
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
StructType(List(StructField(order_id,IntegerType,false),StructField(order_date,StringType,false),StructField(order_customer_id,IntegerType,false),StructField(order_status,StringType,false)))
StructType(List(StructField(product_id,IntegerType,false),StructField(product_category_id,IntegerType,false),StructField(product_name,StringType,false),StructField(product_description,StringType,false),StructField(product_price,FloatType,false),StructField(product_image,StringType,false)))
StructType(List(StructField(customer_id,IntegerType,false),StructField(customer_fname,StringType,false),StructField(customer_lname,StringType,false),StructField(customer_email,StringType,false),StructField(customer_password,StringType,false),StructField(customer_street,StringType,false),StructField(customer_city,StringType,false),StructField(customer_state,StringType,false),StructField(customer_zipcode,StringType,false)))
Crunching of Flow ADD
load the csv flow according to its schema
1
|
|
1
|
|
Row(order_id=25876, order_date=u'2014-01-01 00:00:00', order_customer_id=3414, order_status=u'PENDING_PAYMENT')
add the crunch date column
1 2 3 |
|
Row(order_id=25876, order_date=u'2014-01-01 00:00:00', order_customer_id=3414, order_status=u'PENDING_PAYMENT', crunch_date=u'2015-12-18')
append to the parquet table and update Hive
1 2 |
|
DataFrame[result: string]
move the original file to a raw directory
1
|
|
query the content of Hive through JDBC
1 2 3 4 5 |
|
[{'comment': '', 'columnName': 'order_id', 'type': 'INT_TYPE'}, {'comment': '', 'columnName': 'order_date', 'type': 'STRING_TYPE'}, {'comment': '', 'columnName': 'order_customer_id', 'type': 'INT_TYPE'}, {'comment': '', 'columnName': 'order_status', 'type': 'STRING_TYPE'}, {'comment': '', 'columnName': 'crunch_date', 'type': 'STRING_TYPE'}]
0 1 2 3 4
0 25876 2014-01-01 00:00:00 3414 PENDING_PAYMENT 2015-12-18
1 25877 2014-01-01 00:00:00 5549 PENDING_PAYMENT 2015-12-18
2 25878 2014-01-01 00:00:00 9084 PENDING 2015-12-18
3 25879 2014-01-01 00:00:00 5118 PENDING 2015-12-18
4 25880 2014-01-01 00:00:00 10146 CANCELED 2015-12-18
5 25881 2014-01-01 00:00:00 3205 PENDING_PAYMENT 2015-12-18
6 25882 2014-01-01 00:00:00 4598 COMPLETE 2015-12-18
7 25883 2014-01-01 00:00:00 11764 PENDING 2015-12-18
8 25884 2014-01-01 00:00:00 7904 PENDING_PAYMENT 2015-12-18
9 25885 2014-01-01 00:00:00 7253 PENDING 2015-12-18
repeat the same for FULL: load the incoming data, append it to the parquet table with the crunch date but also create a view on the last set of data
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
0 1 2 3 4 \
0 1 2 Quest Q64 10 FT. x 10 FT. Slant Leg Instant U 59.980000
1 2 2 Under Armour Men's Highlight MC Football Clea 129.990005
2 3 2 Under Armour Men's Renegade D Mid Football Cl 89.989998
3 4 2 Under Armour Men's Renegade D Mid Football Cl 89.989998
4 1 2 Quest Q64 10 FT. x 10 FT. Slant Leg Instant U 599.799988
5 2 2 Under Armour Men's Highlight MC Football Clea 1299.900024
6 3 2 Under Armour Men's Renegade D Mid Football Cl 899.899963
7 4 2 Under Armour Men's Renegade D Mid Football Cl 899.899963
5 6
0 http://images.acmesports.sports/Quest+Q64+10+F... 2015-12-18
1 http://images.acmesports.sports/Under+Armour+M... 2015-12-18
2 http://images.acmesports.sports/Under+Armour+M... 2015-12-18
3 http://images.acmesports.sports/Under+Armour+M... 2015-12-18
4 http://images.acmesports.sports/Quest+Q64+10+F... 2015-12-19
5 http://images.acmesports.sports/Under+Armour+M... 2015-12-19
6 http://images.acmesports.sports/Under+Armour+M... 2015-12-19
7 http://images.acmesports.sports/Under+Armour+M... 2015-12-19
create a view for Hive queries
1 2 3 4 5 6 |
|
0 1 2 3 4 \
0 1 2 Quest Q64 10 FT. x 10 FT. Slant Leg Instant U 599.799988
1 2 2 Under Armour Men's Highlight MC Football Clea 1299.900024
2 3 2 Under Armour Men's Renegade D Mid Football Cl 899.899963
3 4 2 Under Armour Men's Renegade D Mid Football Cl 899.899963
5
0 http://images.acmesports.sports/Quest+Q64+10+F...
1 http://images.acmesports.sports/Under+Armour+M...
2 http://images.acmesports.sports/Under+Armour+M...
3 http://images.acmesports.sports/Under+Armour+M...
and now for DELTA: load the incoming data, append it to the parquet table with the crunch date but also - create a consolidated parquet table as reference of all changes and update hive to see the historical and reference tables
1 2 3 4 |
|
Row(customer_id=41, customer_fname=u'Victoria', customer_lname=u'Mason', customer_email=u'XXXXXXXXX', customer_password=u'XXXXXXXXX', customer_street=u'7869 Crystal View Villas', customer_city=u'Brooklyn', customer_state=u'NY', customer_zipcode=u'99999', crunch_date=u'2015-12-18')
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
1
|
|
DataFrame[result: string]
1 2 3 |
|
second pass with a new file
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
|
let’s check the results
1 2 3 |
|
0 1 2 3 4 \
0 41 Victoria Mason XXXXXXXXX XXXXXXXXX
1 46 Jennifer Smith XXXXXXXXX XXXXXXXXX
2 46 Jennifer Smith XXXXXXXXX XXXXXXXXX
3 60 Mary Gutierrez XXXXXXXXX XXXXXXXXX
4 60 Mary Gutierrez XXXXXXXXX XXXXXXXXX
5 62 Mary Oneal XXXXXXXXX XXXXXXXXX
6 62 Mary Oneal XXXXXXXXX XXXXXXXXX
7 81 Mary Smith XXXXXXXXX XXXXXXXXX
8 108 Joshua Smith XXXXXXXXX XXXXXXXXX
9 108 Joshua Smith XXXXXXXXX XXXXXXXXX
5 6 7 8 9
0 7869 Crystal View Villas Brooklyn NY 99999 2015-12-18
1 5463 Rocky Autoroute Freeport NY 00000 2015-12-19
2 5463 Rocky Autoroute Freeport NY 99999 2015-12-18
3 8632 Bright Route Webster NY 00000 2015-12-19
4 8632 Bright Route Webster NY 99999 2015-12-18
5 2659 Jagged Rabbit View Brooklyn NY 99999 2015-12-18
6 2659 Jagged Rabbit View Brooklyn NY 00000 2015-12-19
7 8434 Honey Pines Ithaca NY 99999 2015-12-18
8 4587 Noble Zephyr Promenade Bronx NY 00000 2015-12-19
9 4587 Noble Zephyr Promenade Bronx NY 99999 2015-12-18
1 2 3 |
|
0 1 2 3 4 \
0 41 Victoria Mason XXXXXXXXX XXXXXXXXX
1 46 Jennifer Smith XXXXXXXXX XXXXXXXXX
2 60 Mary Gutierrez XXXXXXXXX XXXXXXXXX
3 62 Mary Oneal XXXXXXXXX XXXXXXXXX
4 81 Mary Smith XXXXXXXXX XXXXXXXXX
5 108 Joshua Smith XXXXXXXXX XXXXXXXXX
6 120 Nancy Smith XXXXXXXXX XXXXXXXXX
7 133 Mary Lopez XXXXXXXXX XXXXXXXXX
8 134 Donna Gomez XXXXXXXXX XXXXXXXXX
9 141 Mary Mcmahon XXXXXXXXX XXXXXXXXX
5 6 7 8 9
0 7869 Crystal View Villas Brooklyn NY 99999 2015-12-18
1 5463 Rocky Autoroute Freeport NY 00000 2015-12-19
2 8632 Bright Route Webster NY 00000 2015-12-19
3 2659 Jagged Rabbit View Brooklyn NY 00000 2015-12-19
4 8434 Honey Pines Ithaca NY 99999 2015-12-18
5 4587 Noble Zephyr Promenade Bronx NY 00000 2015-12-19
6 7840 Umber Sky Villas Bronx NY 00000 2015-12-19
7 948 Thunder Gate Beach Brooklyn NY 99999 2015-12-18
8 1690 Cinder Deer Chase Brooklyn NY 00000 2015-12-19
9 1526 Broad Mountain Plaza Brooklyn NY 99999 2015-12-18