NASDAG.org

A data scientist blog, by Philippe Dagher

Predicting Heart Disease With Hadoop, Spark and Python

This post is the last one of the series “How to install step by step a Data Lake”. Our purpose here is to use the tools and logic that we setup in the first and second posts, to process data received from several hospitals and analyze it, in order to predict Heart Disease.

I worked on this use case last year with Sklearn during the Metis Data Science Bootcamp, so our objective here is just to test Spark ML, evaluate concepts such as Pipelines, and experiment some built-in packages for classification, tuning and evaluation - as well as feature engineering with Spark SQL and Dataframe.

The following notebook is self explanatory. Just pull it from my GitHub to your Data Lake and run the different scripts: https://github.com/nasdag/pyspark/blob/master/heart-disease-predictions.ipynb

Cheers,

Philippe


Heart Disease Prediction System

Part 1: Data Crunching

Patient’s diagnosis data is available from several hospitals

1
2
3
4
5
! wget https://archive.ics.uci.edu/ml/machine-learning-databases/heart-disease/processed.cleveland.data
! wget https://archive.ics.uci.edu/ml/machine-learning-databases/heart-disease/processed.hungarian.data
! wget https://archive.ics.uci.edu/ml/machine-learning-databases/heart-disease/processed.switzerland.data
! wget https://archive.ics.uci.edu/ml/machine-learning-databases/heart-disease/processed.va.data
! ls processed.*.data
processed.cleveland.data  processed.switzerland.data
processed.hungarian.data  processed.va.data

14 attributes are sent for each patient. The “goal” field refers to the presence of heart disease in the patient - from 0 (no presence) to 4.

1
! head -5 processed.cleveland.data
63.0,1.0,1.0,145.0,233.0,1.0,2.0,150.0,0.0,2.3,3.0,0.0,6.0,0
67.0,1.0,4.0,160.0,286.0,0.0,2.0,108.0,1.0,1.5,2.0,3.0,3.0,2
67.0,1.0,4.0,120.0,229.0,0.0,2.0,129.0,1.0,2.6,2.0,2.0,7.0,1
37.0,1.0,3.0,130.0,250.0,0.0,0.0,187.0,0.0,3.5,3.0,0.0,3.0,0
41.0,0.0,2.0,130.0,204.0,0.0,2.0,172.0,0.0,1.4,1.0,0.0,3.0,0

Here is a description of the attributes:

-- 1. age           age in years
-- 2. sex           sex (1 = male; 0 = female)
-- 3. cp            chest pain type
                      - Value 1: typical angina
                      - Value 2: atypical angina
                      - Value 3: non-anginal pain
                      - Value 4: asymptomatic
-- 4. trestbps      resting blood pressure (in mm Hg on admission to the hospital)
-- 5. chol          serum cholestoral in mg/dl
-- 6. fbs           fasting blood sugar > 120 mg/dl (1 = true; 0 = false)
-- 7. restecg       resting electrocardiographic results
                      - Value 0: normal
                      - Value 1: having ST-T wave abnormality (T wave inversions and/
                                 or ST elevation or depression of > 0.05 mV)
                      - Value 2: showing probable or definite left ventricular hypertrophy by Estes' criteria
-- 8. thalach       maximum heart rate achieved
-- 9. exang         exercise induced angina (1 = yes; 0 = no)
-- 10. oldpeak      ST depression induced by exercise relative to rest
-- 11. slope        the slope of the peak exercise ST segment
                      - Value 1: upsloping
                      - Value 2: flat
                      - Value 3: downsloping
-- 12. ca           number of major vessels (0-3) colored by flourosopy
-- 13. thal         3 = normal; 6 = fixed defect; 7 = reversable defect
-- 14. num          diagnosis of heart disease (angiographic disease status)
                      - Value 0: < 50% diameter narrowing
                      - Value 1: > 50% diameter narrowing (in any major vessel)

As we did in the previous tutorial, we will hold for the time being the schema in memory variables

1
2
schemaStringADD = 'age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,diagnosis'
typeStringADD = 'INT,STRING,STRING,INT,INT,BOOLEAN,STRING,INT,BOOLEAN,FLOAT,STRING,INT,STRING,INT'

and move the incoming data to the “in” directory of HDFS

1
2
3
4
5
6
#! hdfs dfs -rm -r /user/datacruncher/in/processed.*.data
#! hdfs dfs -rm -r /user/datacruncher/raw/processed.*.data
#! hdfs dfs -rm -r /user/datacruncher/out/heart_disease*
! hdfs dfs -put processed.*.data /user/datacruncher/in/
! rm processed.*.data
! hdfs dfs -ls /user/datacruncher/in/
Found 6 items
-rw-r--r--   1 nasdag supergroup       5523 2015-12-21 09:12 /user/datacruncher/in/orders20140102.csv
-rw-r--r--   1 nasdag supergroup      12505 2015-12-21 09:12 /user/datacruncher/in/orders20140103.csv
-rw-r--r--   1 nasdag supergroup      18461 2016-01-01 09:42 /user/datacruncher/in/processed.cleveland.data
-rw-r--r--   1 nasdag supergroup      10263 2016-01-01 09:42 /user/datacruncher/in/processed.hungarian.data
-rw-r--r--   1 nasdag supergroup       4109 2016-01-01 09:42 /user/datacruncher/in/processed.switzerland.data
-rw-r--r--   1 nasdag supergroup       6737 2016-01-01 09:42 /user/datacruncher/in/processed.va.data

Let’s make sure that the JDBC access to the diagnosis data is available although it will not be used later in this tutorial

1
2
3
4
5
6
7
8
9
10
11
12
13
import pyhs2

sqlStringADD = ', '.join([' '.join(x) for x in zip(schemaStringADD.split(','), typeStringADD.split(','))])
print sqlStringADD

conn = pyhs2.connect(host='localhost', port=10000, authMechanism="PLAIN", user='nasdag', password='', database='default')
cur = conn.cursor()
#cur.execute("drop table heart_disease")
cur.execute("CREATE EXTERNAL TABLE heart_disease (" + sqlStringADD + """)
        COMMENT 'heart disease table'
        PARTITIONED BY (crunch_date STRING)
        STORED AS PARQUET
        LOCATION '/user/datacruncher/out/heart_disease'""")
age INT, sex STRING, cp STRING, trestbps INT, chol INT, fbs BOOLEAN, restecg STRING, thalach INT, exang BOOLEAN, oldpeak FLOAT, slope STRING, ca INT, thal STRING, diagnosis INT

We need to load the raw data, clean it,

1
2
3
4
5
6
7
8
9
10
11
#sc.stop()
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)
st = { 'INT': IntegerType(), 'STRING': StringType() , 'FLOAT': FloatType() , 'BOOLEAN': BooleanType() }

fieldsRAW = [StructField(field_name, StringType(), True) for field_name in schemaStringADD.split(',')]
schemaRAW = StructType(fieldsRAW)
1
2
3
parts = sc.textFile('/user/datacruncher/in/processed.*.data').map(lambda l: l.split(','))
df = parts.map(lambda p: [None if x=='?' else float(x) for x in p]).toDF(schemaRAW)
df.take(2)
[Row(age=u'63.0', sex=u'1.0', cp=u'1.0', trestbps=u'145.0', chol=u'233.0', fbs=u'1.0', restecg=u'2.0', thalach=u'150.0', exang=u'0.0', oldpeak=u'2.3', slope=u'3.0', ca=u'0.0', thal=u'6.0', diagnosis=u'0.0'),
 Row(age=u'67.0', sex=u'1.0', cp=u'4.0', trestbps=u'160.0', chol=u'286.0', fbs=u'0.0', restecg=u'2.0', thalach=u'108.0', exang=u'1.0', oldpeak=u'1.5', slope=u'2.0', ca=u'3.0', thal=u'3.0', diagnosis=u'2.0')]

process it,

1
2
3
4
5
sqlContext.udf.register("t_sex", lambda x: 'male' if x=='1.0' else 'female' if x=='0.0' else None)
sqlContext.udf.register("t_cp", lambda x: 'typ_angina' if x=='1.0' else 'asympt' if x=='4.0' else 'non_angina' if x=='3.0' else 'atyp_angina' if x=='2.0' else None)
sqlContext.udf.register("t_restecg", lambda x: 'left_vent_hyper' if x=='2.0' else 'normal' if x=='0.0' else 'st_t_wave_abnormality' if x=='1.0' else None)
sqlContext.udf.register("t_slope", lambda x: 'up' if x=='1.0' else 'flat' if x=='2.0' else 'down' if x=='3.0' else None)
sqlContext.udf.register("t_thal", lambda x: 'fixed_defect' if x=='6.0' else 'normal' if x=='3.0' else 'reversable_defect' if x=='7.0' else None)
1
2
3
4
5
6
7
8
9
10
11
12
sqlStringADD = ', '.join([x1 if x2=='STRING' else 'cast('+x1+' as '+x2+') as '+x1 \
                          for x1, x2 in zip(schemaStringADD.split(','), typeStringADD.split(','))])

sqlStringADD = sqlStringADD.replace(' sex,', ' t_sex(sex) as sex,')
sqlStringADD = sqlStringADD.replace(' cp,', ' t_cp(cp) as cp,')
sqlStringADD = sqlStringADD.replace(' restecg,', ' t_restecg(restecg) as restecg,')
sqlStringADD = sqlStringADD.replace(' slope,', ' t_slope(slope) as slope,')
sqlStringADD = sqlStringADD.replace(' thal,', ' t_thal(thal) as thal,')
sqlStringADD = sqlStringADD.replace(' cast(fbs as BOOLEAN) as fbs,', ' cast(cast(fbs as INT) as BOOLEAN) as fbs,')
sqlStringADD = sqlStringADD.replace(' cast(exang as BOOLEAN) as exang,', 
                                    ' cast(cast(exang as INT) as BOOLEAN) as exang,')
print sqlStringADD
cast(age as INT) as age, t_sex(sex) as sex, t_cp(cp) as cp, cast(trestbps as INT) as trestbps, cast(chol as INT) as chol, cast(cast(fbs as INT) as BOOLEAN) as fbs, t_restecg(restecg) as restecg, cast(thalach as INT) as thalach, cast(cast(exang as INT) as BOOLEAN) as exang, cast(oldpeak as FLOAT) as oldpeak, t_slope(slope) as slope, cast(ca as INT) as ca, t_thal(thal) as thal, cast(diagnosis as INT) as diagnosis
1
2
3
df.registerTempTable("df")
dfWithCrunchDate = sqlContext.sql("SELECT "+sqlStringADD+", '2015-12-18' as crunch_date FROM df")
dfWithCrunchDate.first()
Row(age=63, sex=u'male', cp=u'typ_angina', trestbps=145, chol=233, fbs=True, restecg=u'left_vent_hyper', thalach=150, exang=False, oldpeak=2.299999952316284, slope=u'down', ca=0, thal=u'fixed_defect', diagnosis=0, crunch_date=u'2015-12-18')

append the result to the parquet table and update Hive

1
dfWithCrunchDate.write.format("parquet").mode('Append').partitionBy('crunch_date').parquet('/user/datacruncher/out/heart_disease')
1
cur.execute("MSCK REPAIR TABLE heart_disease")

and move the original file to a raw directory

1
! hdfs dfs -mv /user/datacruncher/in/processed.*.data /user/datacruncher/raw/

Part 2: Data Analysis

1
2
3
4
5
6
7
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, GBTClassifier, NaiveBayes, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import RegressionMetrics, BinaryClassificationMetrics
from pyspark.mllib.util import MLUtils

Load the data from the parquet table

1
2
3
df = sqlContext.read.load("/user/datacruncher/out/heart_disease").repartition(6)
df.registerTempTable("training_data")
df.first()
Row(age=38, sex=u'female', cp=u'asympt', trestbps=110, chol=0, fbs=False, restecg=u'normal', thalach=156, exang=False, oldpeak=0.0, slope=u'flat', ca=None, thal=u'normal', diagnosis=1, crunch_date=u'2015-12-18')

We will try first a logistic regression and keep only 2 classes: sick (1) or healthy (0). So let’s select the features:

1
2
3
4
5
6
#cast(cast(diagnosis as boolean) as double) ##only 2 classes
#cast(diagnosis as double)
data = sqlContext.sql("""SELECT age, sex, cp, trestbps, chol, fbs, restecg, thalach, exang, oldpeak, 
                         slope, ca, thal, cast(cast(diagnosis as boolean) as double) as label 
                         FROM training_data""").na.drop()
data.take(3)
[Row(age=67, sex=u'male', cp=u'asympt', trestbps=160, chol=286, fbs=False, restecg=u'left_vent_hyper', thalach=108, exang=True, oldpeak=1.5, slope=u'flat', ca=3, thal=u'normal', label=1.0),
 Row(age=57, sex=u'female', cp=u'asympt', trestbps=120, chol=354, fbs=False, restecg=u'normal', thalach=163, exang=True, oldpeak=0.6000000238418579, slope=u'up', ca=0, thal=u'normal', label=0.0),
 Row(age=44, sex=u'male', cp=u'atyp_angina', trestbps=120, chol=263, fbs=False, restecg=u'normal', thalach=173, exang=False, oldpeak=0.0, slope=u'up', ca=0, thal=u'reversable_defect', label=0.0)]

and convert string based categorical features to numerical categorical features, then numerical based categorical features (in one column) to numerical continuous features (one column per category) - and assemble a vector from a variety of columns and vectors.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#age,sex,   cp,    trestbps,chol,fbs,    restecg,thalach,exang,  oldpeak,slope, ca, thal,  diagnosis
#INT,STRING,STRING,INT,     INT, BOOLEAN,STRING, INT,    BOOLEAN,FLOAT,  STRING,INT,STRING,INT

sexIndexer = StringIndexer(inputCol="sex", outputCol="sexIndex")
sexEncoder = OneHotEncoder(inputCol="sexIndex", outputCol="sexVec")
cpIndexer = StringIndexer(inputCol="cp", outputCol="cpIndex")
cpEncoder = OneHotEncoder(inputCol="cpIndex", outputCol="cpVec")
fbsIndexer = StringIndexer(inputCol="fbs", outputCol="fbsIndex")
fbsEncoder = OneHotEncoder(inputCol="fbsIndex", outputCol="fbsVec")
restecgIndexer = StringIndexer(inputCol="restecg", outputCol="restecgIndex")
restecgEncoder = OneHotEncoder(inputCol="restecgIndex", outputCol="restecgVec")
exangIndexer = StringIndexer(inputCol="exang", outputCol="exangIndex")
exangEncoder = OneHotEncoder(inputCol="exangIndex", outputCol="exangVec")
slopeIndexer = StringIndexer(inputCol="slope", outputCol="slopeIndex")
slopeEncoder = OneHotEncoder(inputCol="slopeIndex", outputCol="slopeVec")
caIndexer = StringIndexer(inputCol="ca", outputCol="caIndex")
caEncoder = OneHotEncoder(inputCol="caIndex", outputCol="caVec")
thalIndexer = StringIndexer(inputCol="thal", outputCol="thalIndex")
thalEncoder = OneHotEncoder(inputCol="thalIndex", outputCol="thalVec")

assembler = VectorAssembler(inputCols=["age", "sexVec", "cpVec", "trestbps", "chol", "fbsVec", 
                                       "restecgVec", "thalach", "exangVec", "oldpeak", "slopeVec", 
                                       "caVec", "thalVec"], outputCol="features")
1
lr = LogisticRegression()
1
2
#paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0]).build()
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1]).addGrid(lr.elasticNetParam, [0.5]).build()
1
pipeline = Pipeline().setStages([sexIndexer, sexEncoder, cpIndexer, cpEncoder, fbsIndexer, fbsEncoder, restecgIndexer, restecgEncoder, exangIndexer, exangEncoder, slopeIndexer, slopeEncoder, caIndexer, caEncoder, thalIndexer, thalEncoder, assembler, lr])
1
2
3
evaluator = BinaryClassificationEvaluator()
print evaluator.explainParams()
print evaluator.getRawPredictionCol()
labelCol: label column name (default: label)
metricName: metric name in evaluation (areaUnderROC|areaUnderPR) (default: areaUnderROC)
rawPredictionCol: raw prediction (a.k.a. confidence) column name (default: rawPrediction)
rawPrediction
1
tvs = CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(4)
1
training, test = data.randomSplit([0.75, 0.25], seed = 12345)
1
model = tvs.fit(training)
1
model.transform(test).first()
Row(age=57, sex=u'female', cp=u'asympt', trestbps=120, chol=354, fbs=False, restecg=u'normal', thalach=163, exang=True, oldpeak=0.6000000238418579, slope=u'up', ca=0, thal=u'normal', label=0.0, sexIndex=1.0, sexVec=SparseVector(1, {}), cpIndex=0.0, cpVec=SparseVector(3, {0: 1.0}), fbsIndex=0.0, fbsVec=SparseVector(1, {0: 1.0}), restecgIndex=0.0, restecgVec=SparseVector(2, {0: 1.0}), exangIndex=1.0, exangVec=SparseVector(1, {}), slopeIndex=1.0, slopeVec=SparseVector(2, {1: 1.0}), caIndex=0.0, caVec=SparseVector(3, {0: 1.0}), thalIndex=0.0, thalVec=SparseVector(2, {0: 1.0}), features=SparseVector(20, {0: 57.0, 2: 1.0, 5: 120.0, 6: 354.0, 7: 1.0, 8: 1.0, 10: 163.0, 12: 0.6, 14: 1.0, 15: 1.0, 18: 1.0}), rawPrediction=DenseVector([0.7801, -0.7801]), probability=DenseVector([0.6857, 0.3143]), prediction=0.0)
1
2
print evaluator.evaluate(model.transform(test), {evaluator.metricName: "areaUnderROC"})
print evaluator.evaluate(model.transform(test), {evaluator.metricName: "areaUnderPR"})
0.911764705882
0.916386456872
1
holdout = model.transform(test).select("prediction","label")
1
2
3
cm = BinaryClassificationMetrics(holdout.map(lambda x : (x[0], x[1])))
print cm.areaUnderROC
print cm.areaUnderPR
0.800735294118
0.85486731075

Now let’s try Random Forest Load and parse the data file, converting it to a DataFrame:

1
2
3
data = sqlContext.sql("""SELECT age, sex, cp, trestbps, chol, fbs, restecg, thalach, exang, oldpeak, 
                      slope, ca, thal, cast(diagnosis as boolean) as label 
                      FROM training_data""").na.drop()

Index labels, adding metadata to the label column. Fit on whole dataset to include all labels in index.

1
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

Split the data into training and test sets (30% held out for testing)

1
training, test = data.randomSplit([0.75, 0.25], seed = 12345)

Train a RandomForest model. Chain indexers and tree in a Pipeline.

1
2
3
4
5
dt = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features")
 
pipeline = Pipeline().setStages([labelIndexer, sexIndexer, sexEncoder, cpIndexer, cpEncoder, fbsIndexer, fbsEncoder, restecgIndexer, restecgEncoder, exangIndexer, exangEncoder, slopeIndexer, slopeEncoder, caIndexer, caEncoder, thalIndexer, thalEncoder, assembler, dt])

model = pipeline.fit(training)

Make predictions. Select example rows to display. Select (prediction, true label) and compute test error.

1
2
3
4
5
6
7
8
9
predictions = model.transform(test)
predictions.select("prediction", "indexedLabel").show(15)

evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="precision")
accuracy = evaluator.evaluate(predictions)
print "Test Error = %g" % (1.0 - accuracy)

treeModel = model.stages[-1]
print treeModel
+----------+------------+
|prediction|indexedLabel|
+----------+------------+
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       1.0|         1.0|
|       1.0|         1.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       0.0|         0.0|
|       1.0|         1.0|
|       0.0|         0.0|
|       1.0|         1.0|
|       0.0|         1.0|
|       1.0|         1.0|
+----------+------------+
only showing top 15 rows

Test Error = 0.202703
RandomForestClassificationModel with 20 trees
1
evaluator.explainParams()
'labelCol: label column name (default: label, current: indexedLabel)\nmetricName: metric name in evaluation (f1|precision|recall|weightedPrecision|weightedRecall) (default: f1, current: precision)\npredictionCol: prediction column name (default: prediction, current: prediction)'
1
2
3
evaluator = BinaryClassificationEvaluator()
print evaluator.evaluate(model.transform(test), {evaluator.metricName: "areaUnderROC", evaluator.labelCol: "indexedLabel"})
print evaluator.evaluate(model.transform(test), {evaluator.metricName: "areaUnderPR", evaluator.labelCol: "indexedLabel"})
0.894852941176
0.89484945751
1
2
3
cm = BinaryClassificationMetrics(model.transform(test).select("prediction","indexedLabel").map(lambda x : (x[0], x[1])))
print cm.areaUnderROC
print cm.areaUnderPR
0.788235294118
0.838485544368