NASDAG.org

A data scientist blog, by Philippe Dagher

How to Install Step by Step a Local Data Lake (2/3)

This post is the second of a series on How to Install Step by Step a Local Data Lake. Before reading, I suggest to follow this tutorial which will allow you to get the tools up and running on a hosted or virtual machine.

You should have now the following architecture ready to receive data flows, crunch them and expose them to Machine learning or Business Intelligence tools:

image

The purpose of this post is to process the data received according to its type of flow, schema and format, save it in parquet tables for a later use by Machine learning tools and in Hive tables for JDBC access by BI tools.

image

We will consider each flow type and use pyspark to process the data samples in order to achieve the outlined objective.

This tutorial is available on my GitHub: https://github.com/nasdag/pyspark/blob/master/data-flows.ipynb

You can download it to your Local Data Lake:

1
2
3
cd ~/tutorials/pyspark
git pull
ipython notebook

and select data-flows.ipynb .

Regards,

Philippe.


We need to simulate the arrival of ADD, FULL and DELTA flows to our Data Lake. Let’s use for this purpose the dump of a retail database. We will load it into mysql and generate csv files to illustrate the arrival of regular flows into our Transfer Hub.

1
2
3
4
5
6
7
8
9
10
11
#! mysql -u hiveuser -phivepassword -D retail_db -e "DROP DATABASE retail_db" > /dev/null

import pymysql

db = pymysql.connect(host="localhost", user='hiveuser', password='hivepassword')
cur = db.cursor()
cur.execute("CREATE DATABASE retail_db")
cur.execute("USE retail_db")
cur.execute(open("dump.sql").read())
cur.close()
db.close()
1
2
3
4
import pandas as pd

db = pymysql.connect(host="localhost", user='hiveuser', password='hivepassword', db="retail_db")
print pd.read_sql("SHOW TABLES", db)
  Tables_in_retail_db
0          categories
1           customers
2         departments
3         order_items
4              orders
5            products
1
print pd.read_sql("DESCRIBE customers", db)
               Field          Type Null  Key Default           Extra
0        customer_id       int(11)   NO  PRI    None  auto_increment
1     customer_fname   varchar(45)   NO         None                
2     customer_lname   varchar(45)   NO         None                
3     customer_email   varchar(45)   NO         None                
4  customer_password   varchar(45)   NO         None                
5    customer_street  varchar(255)   NO         None                
6      customer_city   varchar(45)   NO         None                
7     customer_state   varchar(45)   NO         None                
8   customer_zipcode   varchar(45)   NO         None                
1
print pd.read_sql("SELECT * FROM customers LIMIT 5", db)
   customer_id customer_fname customer_lname customer_email customer_password  \
0            1        Richard      Hernandez      XXXXXXXXX         XXXXXXXXX   
1            2           Mary        Barrett      XXXXXXXXX         XXXXXXXXX   
2            3            Ann          Smith      XXXXXXXXX         XXXXXXXXX   
3            4           Mary          Jones      XXXXXXXXX         XXXXXXXXX   
4            5         Robert         Hudson      XXXXXXXXX         XXXXXXXXX   

           customer_street customer_city customer_state customer_zipcode  
0       6303 Heather Plaza   Brownsville             TX            78521  
1  9526 Noble Embers Ridge     Littleton             CO            80126  
2   3422 Blue Pioneer Bend        Caguas             PR            00725  
3       8324 Little Common    San Marcos             CA            92069  
4   10 Crystal River Mall         Caguas             PR            00725  

generating ADD Flow: daily orders

Schemas will be stored in memory for this tutorial - we will explore in a later post how to select the right repository

1
2
3
4
5
6
7
pd.read_sql("SELECT * FROM orders WHERE order_date='2014-01-01'", db).to_csv('orders20140101.csv', header=False, index=False, quoting=1)
pd.read_sql("SELECT * FROM orders WHERE order_date='2014-01-02'", db).to_csv('orders20140102.csv', header=False, index=False, quoting=1)
pd.read_sql("SELECT * FROM orders WHERE order_date='2014-01-03'", db).to_csv('orders20140103.csv', header=False, index=False, quoting=1)

schemaStringADD = ','.join(pd.read_sql("DESCRIBE orders", db).Field)
print schemaStringADD
typeStringADD = 'INT,STRING,INT,STRING'
order_id,order_date,order_customer_id,order_status

generating FULL Flow: products

1
2
3
4
5
6
pd.read_sql("SELECT * FROM products", db).to_csv('products20150101.csv', header=False, index=False, quoting=1)
pd.read_sql("SELECT product_id,product_category_id,product_name,product_description,product_price*10.0,product_image  FROM products", db).to_csv('products20150102.csv', header=False, index=False, quoting=1)

schemaStringFULL = ','.join(pd.read_sql("DESCRIBE products", db).Field)
print schemaStringFULL
typeStringFULL = 'INT,INT,STRING,STRING,FLOAT,STRING'
product_id,product_category_id,product_name,product_description,product_price,product_image

generating DELTA Flow: customers

1
2
3
4
5
6
7
8
# zip code of NY based customers became 99999
pd.read_sql("SELECT customer_id,customer_fname,customer_lname,customer_email,customer_password,customer_street,customer_city,customer_state,'99999' FROM customers WHERE customer_state='NY'", db).to_csv('customers20150101.csv', header=False, index=False, quoting=1)
# zip code of even customer_id is became 00000
pd.read_sql("SELECT customer_id,customer_fname,customer_lname,customer_email,customer_password,customer_street,customer_city,customer_state,'00000' FROM customers WHERE (customer_id%2)=0", db).to_csv('customers20150102.csv', header=False, index=False, quoting=1)

schemaStringDELTA = ','.join(pd.read_sql("DESCRIBE customers", db).Field)
print schemaStringDELTA
typeStringDELTA = 'INT,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING'
customer_id,customer_fname,customer_lname,customer_email,customer_password,customer_street,customer_city,customer_state,customer_zipcode

Let’s move the generated files into HDFS

we will explore later how to move files appropriately between Local and HDFS

1
2
3
4
5
6
7
8
9
#! hdfs dfs -rm -r /user/datacruncher/in
#! hdfs dfs -rm -r /user/datacruncher/raw
#! hdfs dfs -rm -r /user/datacruncher/out
! hdfs dfs -mkdir -p /user/datacruncher/in
! hdfs dfs -mkdir -p /user/datacruncher/raw
! hdfs dfs -put orders*.csv /user/datacruncher/in/
! hdfs dfs -put products*.csv /user/datacruncher/in/
! hdfs dfs -put customers*.csv /user/datacruncher/in/
! rm orders*.csv products*.csv customers*.csv
15/12/21 09:11:52 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/datacruncher/in
15/12/21 09:11:55 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/datacruncher/raw
15/12/21 09:11:58 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/datacruncher/out

Create Hive Tables

let’s generate the SQL string from the flow schema

1
2
3
4
5
6
sqlStringADD = ', '.join([' '.join(x) for x in zip(schemaStringADD.split(','), typeStringADD.split(','))])
print sqlStringADD
sqlStringFULL = ', '.join([' '.join(x) for x in zip(schemaStringFULL.split(','), typeStringFULL.split(','))])
print sqlStringFULL
sqlStringDELTA = ', '.join([' '.join(x) for x in zip(schemaStringDELTA.split(','), typeStringDELTA.split(','))])
print sqlStringDELTA
order_id INT, order_date STRING, order_customer_id INT, order_status STRING
product_id INT, product_category_id INT, product_name STRING, product_description STRING, product_price FLOAT, product_image STRING
customer_id INT, customer_fname STRING, customer_lname STRING, customer_email STRING, customer_password STRING, customer_street STRING, customer_city STRING, customer_state STRING, customer_zipcode STRING

and use JDBC to connect to Hive and create the tables

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import pyhs2
conn = pyhs2.connect(host='localhost', port=10000, authMechanism="PLAIN", user='nasdag', password='', database='default')
cur = conn.cursor()
#cur.execute("drop table orders")
#cur.execute("drop table products")
#cur.execute("drop table customers")
#cur.execute("drop table customers_ref")
cur.execute("CREATE EXTERNAL TABLE orders (" + sqlStringADD + """)
        COMMENT 'orders table'
        PARTITIONED BY (crunch_date STRING)
        STORED AS PARQUET
        LOCATION '/user/datacruncher/out/orders'""")
cur.execute("CREATE EXTERNAL TABLE products (" + sqlStringFULL + """)
        COMMENT 'products table'
        PARTITIONED BY (crunch_date STRING)
        STORED AS PARQUET
        LOCATION '/user/datacruncher/out/products'""")
cur.execute("CREATE EXTERNAL TABLE customers (" + sqlStringDELTA + """)
        COMMENT 'customers table'
        PARTITIONED BY (crunch_date STRING)
        STORED AS PARQUET
        LOCATION '/user/datacruncher/out/customers'""")
cur.execute("CREATE EXTERNAL TABLE customers_ref (" + sqlStringDELTA + """)
        COMMENT 'customers ref table'
        PARTITIONED BY (crunch_date STRING)
        STORED AS PARQUET
        LOCATION '/user/datacruncher/out/customers_ref'""")

Start pyspark to process the data flows

1
2
3
4
5
6
7
#sc.stop()
import pyspark
sc = pyspark.SparkContext()

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("spark.sql.sources.partitionColumnTypeInference.enabled", "false")

we need to generate also the the appropriate Spark schemas from the schema strings

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from pyspark.sql.types import *
st = { 'INT': IntegerType(), 'STRING': StringType() , 'FLOAT': FloatType() }

fieldsADD = [StructField(field_name, st[field_type], False) for field_name, field_type in \
          zip(schemaStringADD.split(','), typeStringADD.split(','))]
schemaADD = StructType(fieldsADD)
print schemaADD

fieldsFULL = [StructField(field_name, st[field_type], False) for field_name, field_type in \
          zip(schemaStringFULL.split(','), typeStringFULL.split(','))]
schemaFULL = StructType(fieldsFULL)
print schemaFULL

fieldsDELTA = [StructField(field_name, st[field_type], False) for field_name, field_type in \
          zip(schemaStringDELTA.split(','), typeStringDELTA.split(','))]
schemaDELTA = StructType(fieldsDELTA)
print schemaDELTA
StructType(List(StructField(order_id,IntegerType,false),StructField(order_date,StringType,false),StructField(order_customer_id,IntegerType,false),StructField(order_status,StringType,false)))
StructType(List(StructField(product_id,IntegerType,false),StructField(product_category_id,IntegerType,false),StructField(product_name,StringType,false),StructField(product_description,StringType,false),StructField(product_price,FloatType,false),StructField(product_image,StringType,false)))
StructType(List(StructField(customer_id,IntegerType,false),StructField(customer_fname,StringType,false),StructField(customer_lname,StringType,false),StructField(customer_email,StringType,false),StructField(customer_password,StringType,false),StructField(customer_street,StringType,false),StructField(customer_city,StringType,false),StructField(customer_state,StringType,false),StructField(customer_zipcode,StringType,false)))

Crunching of Flow ADD

load the csv flow according to its schema

1
df = sqlContext.read.format("com.databricks.spark.csv").schema(schemaADD).load('/user/datacruncher/in/orders20140101.csv')
1
df.first()
Row(order_id=25876, order_date=u'2014-01-01 00:00:00', order_customer_id=3414, order_status=u'PENDING_PAYMENT')

add the crunch date column

1
2
3
sqlContext.registerDataFrameAsTable(df, "df")
dfWithCrunchDate = sqlContext.sql("SELECT *, '2015-12-18' as crunch_date FROM df")
dfWithCrunchDate.first()
Row(order_id=25876, order_date=u'2014-01-01 00:00:00', order_customer_id=3414, order_status=u'PENDING_PAYMENT', crunch_date=u'2015-12-18')

append to the parquet table and update Hive

1
2
dfWithCrunchDate.write.format("parquet").mode('Append').partitionBy('crunch_date').parquet('/user/datacruncher/out/orders')
sqlContext.sql("MSCK REPAIR TABLE orders")
DataFrame[result: string]

move the original file to a raw directory

1
! hdfs dfs -mv /user/datacruncher/in/orders20140101.csv /user/datacruncher/raw/

query the content of Hive through JDBC

1
2
3
4
5
conn = pyhs2.connect(host='localhost', port=10000, authMechanism="PLAIN", user='nasdag', password='', database='default')
cur = conn.cursor()
cur.execute("select * from orders order by order_id limit 10")
print cur.getSchema()
print pd.DataFrame(cur.fetchall())
[{'comment': '', 'columnName': 'order_id', 'type': 'INT_TYPE'}, {'comment': '', 'columnName': 'order_date', 'type': 'STRING_TYPE'}, {'comment': '', 'columnName': 'order_customer_id', 'type': 'INT_TYPE'}, {'comment': '', 'columnName': 'order_status', 'type': 'STRING_TYPE'}, {'comment': '', 'columnName': 'crunch_date', 'type': 'STRING_TYPE'}]
       0                    1      2                3           4
0  25876  2014-01-01 00:00:00   3414  PENDING_PAYMENT  2015-12-18
1  25877  2014-01-01 00:00:00   5549  PENDING_PAYMENT  2015-12-18
2  25878  2014-01-01 00:00:00   9084          PENDING  2015-12-18
3  25879  2014-01-01 00:00:00   5118          PENDING  2015-12-18
4  25880  2014-01-01 00:00:00  10146         CANCELED  2015-12-18
5  25881  2014-01-01 00:00:00   3205  PENDING_PAYMENT  2015-12-18
6  25882  2014-01-01 00:00:00   4598         COMPLETE  2015-12-18
7  25883  2014-01-01 00:00:00  11764          PENDING  2015-12-18
8  25884  2014-01-01 00:00:00   7904  PENDING_PAYMENT  2015-12-18
9  25885  2014-01-01 00:00:00   7253          PENDING  2015-12-18

repeat the same for FULL: load the incoming data, append it to the parquet table with the crunch date but also create a view on the last set of data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
df = sqlContext.read.format("com.databricks.spark.csv").schema(schemaFULL).load('/user/datacruncher/in/products20150101.csv')
sqlContext.registerDataFrameAsTable(df, "df")
dfWithCrunchDate = sqlContext.sql("SELECT *, '2015-12-18' as crunch_date FROM df")
dfWithCrunchDate.write.format("parquet").mode('Append').partitionBy('crunch_date').parquet('/user/datacruncher/out/products')
sqlContext.sql("MSCK REPAIR TABLE products")
! hdfs dfs -mv /user/datacruncher/in/products20150101.csv /user/datacruncher/raw/

df = sqlContext.read.format("com.databricks.spark.csv").schema(schemaFULL).load('/user/datacruncher/in/products20150102.csv')
sqlContext.registerDataFrameAsTable(df, "df")
dfWithCrunchDate = sqlContext.sql("SELECT *, '2015-12-19' as crunch_date FROM df")
dfWithCrunchDate.write.format("parquet").mode('Append').partitionBy('crunch_date').parquet('/user/datacruncher/out/products')
sqlContext.sql("MSCK REPAIR TABLE products")
! hdfs dfs -mv /user/datacruncher/in/products20150102.csv /user/datacruncher/raw/

cur = conn.cursor()
cur.execute("select * from products where product_id < 5 limit 10")
print pd.DataFrame(cur.fetchall())
   0  1                                              2 3            4  \
0  1  2  Quest Q64 10 FT. x 10 FT. Slant Leg Instant U      59.980000   
1  2  2  Under Armour Men's Highlight MC Football Clea     129.990005   
2  3  2  Under Armour Men's Renegade D Mid Football Cl      89.989998   
3  4  2  Under Armour Men's Renegade D Mid Football Cl      89.989998   
4  1  2  Quest Q64 10 FT. x 10 FT. Slant Leg Instant U     599.799988   
5  2  2  Under Armour Men's Highlight MC Football Clea    1299.900024   
6  3  2  Under Armour Men's Renegade D Mid Football Cl     899.899963   
7  4  2  Under Armour Men's Renegade D Mid Football Cl     899.899963   

                                                   5           6  
0  http://images.acmesports.sports/Quest+Q64+10+F...  2015-12-18  
1  http://images.acmesports.sports/Under+Armour+M...  2015-12-18  
2  http://images.acmesports.sports/Under+Armour+M...  2015-12-18  
3  http://images.acmesports.sports/Under+Armour+M...  2015-12-18  
4  http://images.acmesports.sports/Quest+Q64+10+F...  2015-12-19  
5  http://images.acmesports.sports/Under+Armour+M...  2015-12-19  
6  http://images.acmesports.sports/Under+Armour+M...  2015-12-19  
7  http://images.acmesports.sports/Under+Armour+M...  2015-12-19  

create a view for Hive queries

1
2
3
4
5
6
cur = conn.cursor()
cur.execute("CREATE VIEW IF NOT EXISTS prod (" + schemaStringFULL + """)
            AS SELECT """ + schemaStringFULL + """ FROM products
            WHERE crunch_date = '2015-12-19' """)
cur.execute("select * from prod where product_id < 5 limit 10")
print pd.DataFrame(cur.fetchall())
   0  1                                              2 3            4  \
0  1  2  Quest Q64 10 FT. x 10 FT. Slant Leg Instant U     599.799988   
1  2  2  Under Armour Men's Highlight MC Football Clea    1299.900024   
2  3  2  Under Armour Men's Renegade D Mid Football Cl     899.899963   
3  4  2  Under Armour Men's Renegade D Mid Football Cl     899.899963   

                                                   5  
0  http://images.acmesports.sports/Quest+Q64+10+F...  
1  http://images.acmesports.sports/Under+Armour+M...  
2  http://images.acmesports.sports/Under+Armour+M...  
3  http://images.acmesports.sports/Under+Armour+M...  

and now for DELTA: load the incoming data, append it to the parquet table with the crunch date but also - create a consolidated parquet table as reference of all changes and update hive to see the historical and reference tables

1
2
3
4
df = sqlContext.read.format("com.databricks.spark.csv").schema(schemaDELTA).load('/user/datacruncher/in/customers20150101.csv')
sqlContext.registerDataFrameAsTable(df, "df")
dfWithCrunchDate = sqlContext.sql("SELECT *, '2015-12-18' as crunch_date FROM df")
dfWithCrunchDate.first()
Row(customer_id=41, customer_fname=u'Victoria', customer_lname=u'Mason', customer_email=u'XXXXXXXXX', customer_password=u'XXXXXXXXX', customer_street=u'7869 Crystal View Villas', customer_city=u'Brooklyn', customer_state=u'NY', customer_zipcode=u'99999', crunch_date=u'2015-12-18')
1
2
3
4
5
6
7
8
9
10
11
12
13
from pyspark.sql import Window
from pyspark.sql.functions import rowNumber, desc
dfWithCrunchDate.cache()
try:
    unionDf = sqlContext.read.parquet('/user/datacruncher/out/customers_ref').unionAll(dfWithCrunchDate)
except:
    unionDf = dfWithCrunchDate

window = Window.partitionBy("customer_id").orderBy(desc("crunch_date"))

mergeDf = unionDf.select("*", rowNumber().over(window).alias("rowNumber")).filter("rowNumber = 1").drop("rowNumber")
mergeDf.write.format("parquet").mode('Overwrite').partitionBy('crunch_date').parquet('/user/datacruncher/out/customers_merge')
sqlContext.read.parquet('/user/datacruncher/out/customers_merge').write.format("parquet").mode('Overwrite').partitionBy('crunch_date').parquet('/user/datacruncher/out/customers_ref')
1
sqlContext.sql("MSCK REPAIR TABLE customers_ref")
DataFrame[result: string]
1
2
3
dfWithCrunchDate.write.format("parquet").mode('Append').partitionBy('crunch_date').parquet('/user/datacruncher/out/customers')
sqlContext.sql("MSCK REPAIR TABLE customers")
! hdfs dfs -mv /user/datacruncher/in/customers20150101.csv /user/datacruncher/raw/

second pass with a new file

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
df = sqlContext.read.format("com.databricks.spark.csv").schema(schemaDELTA).load('/user/datacruncher/in/customers20150102.csv')
sqlContext.registerDataFrameAsTable(df, "df")
dfWithCrunchDate = sqlContext.sql("SELECT *, '2015-12-19' as crunch_date FROM df")
dfWithCrunchDate.cache()
try:
    unionDf = sqlContext.read.parquet('/user/datacruncher/out/customers_ref').unionAll(dfWithCrunchDate)
except:
    unionDf = dfWithCrunchDate

window = Window.partitionBy("customer_id").orderBy(desc("crunch_date"))

mergeDf = unionDf.select("*", rowNumber().over(window).alias("rowNumber")).filter("rowNumber = 1").drop("rowNumber")
mergeDf.write.format("parquet").mode('Overwrite').partitionBy('crunch_date').parquet('/user/datacruncher/out/customers_merge')
sqlContext.read.parquet('/user/datacruncher/out/customers_merge').write.format("parquet").mode('Overwrite').partitionBy('crunch_date').parquet('/user/datacruncher/out/customers_ref')
sqlContext.sql("MSCK REPAIR TABLE customers_ref")

dfWithCrunchDate.write.format("parquet").mode('Append').partitionBy('crunch_date').parquet('/user/datacruncher/out/customers')
sqlContext.sql("MSCK REPAIR TABLE customers")
! hdfs dfs -mv /user/datacruncher/in/customers20150102.csv /user/datacruncher/raw/

let’s check the results

1
2
3
cur = conn.cursor()
cur.execute("select * from customers where customer_state = 'NY' order by customer_id limit 10")
print pd.DataFrame(cur.fetchall())
     0         1          2          3          4  \
0   41  Victoria      Mason  XXXXXXXXX  XXXXXXXXX   
1   46  Jennifer      Smith  XXXXXXXXX  XXXXXXXXX   
2   46  Jennifer      Smith  XXXXXXXXX  XXXXXXXXX   
3   60      Mary  Gutierrez  XXXXXXXXX  XXXXXXXXX   
4   60      Mary  Gutierrez  XXXXXXXXX  XXXXXXXXX   
5   62      Mary      Oneal  XXXXXXXXX  XXXXXXXXX   
6   62      Mary      Oneal  XXXXXXXXX  XXXXXXXXX   
7   81      Mary      Smith  XXXXXXXXX  XXXXXXXXX   
8  108    Joshua      Smith  XXXXXXXXX  XXXXXXXXX   
9  108    Joshua      Smith  XXXXXXXXX  XXXXXXXXX   

                             5         6   7      8           9  
0     7869 Crystal View Villas  Brooklyn  NY  99999  2015-12-18  
1         5463 Rocky Autoroute  Freeport  NY  00000  2015-12-19  
2         5463 Rocky Autoroute  Freeport  NY  99999  2015-12-18  
3            8632 Bright Route   Webster  NY  00000  2015-12-19  
4            8632 Bright Route   Webster  NY  99999  2015-12-18  
5      2659 Jagged Rabbit View  Brooklyn  NY  99999  2015-12-18  
6      2659 Jagged Rabbit View  Brooklyn  NY  00000  2015-12-19  
7             8434 Honey Pines    Ithaca  NY  99999  2015-12-18  
8  4587 Noble Zephyr Promenade     Bronx  NY  00000  2015-12-19  
9  4587 Noble Zephyr Promenade     Bronx  NY  99999  2015-12-18  
1
2
3
cur = conn.cursor()
cur.execute("select * from customers_ref where customer_state = 'NY' order by customer_id limit 10")
print pd.DataFrame(cur.fetchall())
     0         1          2          3          4  \
0   41  Victoria      Mason  XXXXXXXXX  XXXXXXXXX   
1   46  Jennifer      Smith  XXXXXXXXX  XXXXXXXXX   
2   60      Mary  Gutierrez  XXXXXXXXX  XXXXXXXXX   
3   62      Mary      Oneal  XXXXXXXXX  XXXXXXXXX   
4   81      Mary      Smith  XXXXXXXXX  XXXXXXXXX   
5  108    Joshua      Smith  XXXXXXXXX  XXXXXXXXX   
6  120     Nancy      Smith  XXXXXXXXX  XXXXXXXXX   
7  133      Mary      Lopez  XXXXXXXXX  XXXXXXXXX   
8  134     Donna      Gomez  XXXXXXXXX  XXXXXXXXX   
9  141      Mary    Mcmahon  XXXXXXXXX  XXXXXXXXX   

                             5         6   7      8           9  
0     7869 Crystal View Villas  Brooklyn  NY  99999  2015-12-18  
1         5463 Rocky Autoroute  Freeport  NY  00000  2015-12-19  
2            8632 Bright Route   Webster  NY  00000  2015-12-19  
3      2659 Jagged Rabbit View  Brooklyn  NY  00000  2015-12-19  
4             8434 Honey Pines    Ithaca  NY  99999  2015-12-18  
5  4587 Noble Zephyr Promenade     Bronx  NY  00000  2015-12-19  
6        7840 Umber Sky Villas     Bronx  NY  00000  2015-12-19  
7       948 Thunder Gate Beach  Brooklyn  NY  99999  2015-12-18  
8       1690 Cinder Deer Chase  Brooklyn  NY  00000  2015-12-19  
9    1526 Broad Mountain Plaza  Brooklyn  NY  99999  2015-12-18