Linear Regression and Random Forest/GBT Classification with Pyspark

from pyspark.sql import SparkSession
from pyspark.sql.types import StructField,StringType,IntegerType,StructType, DoubleType, FloatType
from pyspark.sql.functions import *

data_schema = [
StructField("_c0", IntegerType(), True)
,StructField("province", StringType(), True)
,StructField("specific", DoubleType(), True)
,StructField("general", DoubleType(), True)
,StructField("year", IntegerType(), True)
,StructField("gdp", FloatType(), True)
,StructField("fdi", FloatType(), True)
,StructField("rnr", DoubleType(), True)
,StructField("rr", FloatType(), True)
,StructField("i", FloatType(), True)
,StructField("fr", IntegerType(), True)
,StructField("reg", StringType(), True)
,StructField("it", IntegerType(), True)
]

final_struc = StructType(fields=data_schema)

file_location = "/FileStore/tables/df_panel_fix.csv"
df = spark.read.format("CSV").schema(final_struc).option("header", True).load(file_location)

#df.printSchema()

df.show()
+---+--------+---------+--------+----+-------+--------+----+---------+---------+-------+-----------+-------+ _c0|province| specific| general|year| gdp| fdi| rnr| rr| i| fr| reg| it| +---+--------+---------+--------+----+-------+--------+----+---------+---------+-------+-----------+-------+ 0| Anhui| 147002.0| null|1996| 2093.3| 50661.0| 0.0| 0.0| 0.0|1128873| East China| 631930| 1| Anhui| 151981.0| null|1997|2347.32| 43443.0| 0.0| 0.0| 0.0|1356287| East China| 657860| 2| Anhui| 174930.0| null|1998|2542.96| 27673.0| 0.0| 0.0| 0.0|1518236| East China| 889463| 3| Anhui| 285324.0| null|1999|2712.34| 26131.0|null| null| null|1646891| East China|1227364| 4| Anhui| 195580.0| 32100.0|2000|2902.09| 31847.0| 0.0| 0.0| 0.0|1601508| East China|1499110| 5| Anhui| 250898.0| null|2001|3246.71| 33672.0| 0.0| 0.0| 0.0|1672445| East China|2165189| 6| Anhui| 434149.0| 66529.0|2002|3519.72| 38375.0| 0.0| 0.0| 0.0|1677840| East China|2404936| 7| Anhui| 619201.0| 52108.0|2003|3923.11| 36720.0| 0.0| 0.0| 0.0|1896479| East China|2815820| 8| Anhui| 898441.0|349699.0|2004| 4759.3| 54669.0| 0.0| 0.0| 0.0| null| East China|3422176| 9| Anhui| 898441.0| null|2005|5350.17| 69000.0| 0.0| 0.0|0.3243243| null| East China|3874846| 10| Anhui|1457872.0|279052.0|2006| 6112.5|139354.0| 0.0| 0.0|0.3243243|3434548| East China|5167300| 11| Anhui|2213991.0|178705.0|2007|7360.92|299892.0| 0.0| 0.0|0.3243243|4468640| East China|7040099| 12| Beijing| 165957.0| null|1996| 1789.2|155290.0|null| null| null| 634562|North China| 508135| 13| Beijing| 165957.0| null|1997|2077.09|159286.0| 0.0| 0.0| 0.6| 634562|North China| 569283| 14| Beijing| 245198.0| null|1998|2377.18|216800.0| 0.0| 0.0| 0.53| 938788|North China| 695528| 15| Beijing| 388083.0| null|1999|2678.82|197525.0| 0.0| 0.0| 0.53| null|North China| 944047| 16| Beijing| 281769.0|188633.0|2000|3161.66|168368.0| 0.0| 0.0| 0.53|1667114|North China| 757990| 17| Beijing| 441923.0| null|2001|3707.96|176818.0| 0.0| 0.0| 0.53|2093925|North China|1194728| 18| Beijing| 558569.0|280277.0|2002| 4315.0|172464.0| 0.0| 0.0| 0.53|2511249|North China|1078754| 19| Beijing| 642581.0|269596.0|2003|5007.21|219126.0| 0.0|0.7948718| 0.0|2823366|North China|1426600| +---+--------+---------+--------+----+-------+--------+----+---------+---------+-------+-----------+-------+ only showing top 20 rows
df.groupBy('province').count().show()
+------------+-----+ province|count| +------------+-----+ Guangdong| 12| Hunan| 12| Shanxi| 12| Tibet| 12| Hubei| 12| Tianjin| 12| Beijing| 12| Heilongjiang| 12| Liaoning| 12| Henan| 12| Anhui| 12| Xinjiang| 12| Fujian| 12| Jiangxi| 12| Jilin| 12| Chongqing| 12| Shaanxi| 12| Sichuan| 12| Yunnan| 12| Gansu| 12| +------------+-----+ only showing top 20 rows

Imputation of mean values to prepare the data

mean_val = df.select(mean(df['general'])).collect()
mean_val[0][0]
mean_gen = mean_val[0][0]
df = df.na.fill(mean_gen,["general"])
mean_val = df.select(mean(df['specific'])).collect()
mean_val[0][0]
mean_gen = mean_val[0][0]
df = df.na.fill(mean_gen,["specific"])
mean_val = df.select(mean(df['rr'])).collect()
mean_val[0][0]
mean_gen = mean_val[0][0]
df = df.na.fill(mean_gen,["rr"])
mean_val = df.select(mean(df['fr'])).collect()
mean_val[0][0]
mean_gen = mean_val[0][0]
df = df.na.fill(mean_gen,["fr"])
mean_val = df.select(mean(df['rnr'])).collect()
mean_val[0][0]
mean_gen = mean_val[0][0]
df = df.na.fill(mean_gen,["rnr"])
mean_val = df.select(mean(df['i'])).collect()
mean_val[0][0]
mean_gen = mean_val[0][0]
df = df.na.fill(mean_gen,["i"])

Creating binary target feature from extant column for classification

from pyspark.sql.functions import *
df = df.withColumn('specific_classification',when(df.specific >= 583470.7303370787,1).otherwise(0))

Using StringIndexer for categorical encoding of string type columns

from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="province", outputCol="provinceIndex")
df = indexer.fit(df).transform(df)
indexer = StringIndexer(inputCol="reg", outputCol="regionIndex")
df = indexer.fit(df).transform(df)
df.show()
+---+--------+---------+------------------+----+-------+--------+------------------+----------+----------+-------+-----------+-------+-----------------------+-------------+-----------+ _c0|province| specific| general|year| gdp| fdi| rnr| rr| i| fr| reg| it|specific_classification|provinceIndex|regionIndex| +---+--------+---------+------------------+----+-------+--------+------------------+----------+----------+-------+-----------+-------+-----------------------+-------------+-----------+ 0| Anhui| 147002.0|309127.53846153844|1996| 2093.3| 50661.0| 0.0| 0.0| 0.0|1128873| East China| 631930| 0| 0.0| 0.0| 1| Anhui| 151981.0|309127.53846153844|1997|2347.32| 43443.0| 0.0| 0.0| 0.0|1356287| East China| 657860| 0| 0.0| 0.0| 2| Anhui| 174930.0|309127.53846153844|1998|2542.96| 27673.0| 0.0| 0.0| 0.0|1518236| East China| 889463| 0| 0.0| 0.0| 3| Anhui| 285324.0|309127.53846153844|1999|2712.34| 26131.0|0.0355944252244898|0.05968862|0.08376352|1646891| East China|1227364| 0| 0.0| 0.0| 4| Anhui| 195580.0| 32100.0|2000|2902.09| 31847.0| 0.0| 0.0| 0.0|1601508| East China|1499110| 0| 0.0| 0.0| 5| Anhui| 250898.0|309127.53846153844|2001|3246.71| 33672.0| 0.0| 0.0| 0.0|1672445| East China|2165189| 0| 0.0| 0.0| 6| Anhui| 434149.0| 66529.0|2002|3519.72| 38375.0| 0.0| 0.0| 0.0|1677840| East China|2404936| 0| 0.0| 0.0| 7| Anhui| 619201.0| 52108.0|2003|3923.11| 36720.0| 0.0| 0.0| 0.0|1896479| East China|2815820| 1| 0.0| 0.0| 8| Anhui| 898441.0| 349699.0|2004| 4759.3| 54669.0| 0.0| 0.0| 0.0|2522449| East China|3422176| 1| 0.0| 0.0| 9| Anhui| 898441.0|309127.53846153844|2005|5350.17| 69000.0| 0.0| 0.0| 0.3243243|2522449| East China|3874846| 1| 0.0| 0.0| 10| Anhui|1457872.0| 279052.0|2006| 6112.5|139354.0| 0.0| 0.0| 0.3243243|3434548| East China|5167300| 1| 0.0| 0.0| 11| Anhui|2213991.0| 178705.0|2007|7360.92|299892.0| 0.0| 0.0| 0.3243243|4468640| East China|7040099| 1| 0.0| 0.0| 12| Beijing| 165957.0|309127.53846153844|1996| 1789.2|155290.0|0.0355944252244898|0.05968862|0.08376352| 634562|North China| 508135| 0| 1.0| 4.0| 13| Beijing| 165957.0|309127.53846153844|1997|2077.09|159286.0| 0.0| 0.0| 0.6| 634562|North China| 569283| 0| 1.0| 4.0| 14| Beijing| 245198.0|309127.53846153844|1998|2377.18|216800.0| 0.0| 0.0| 0.53| 938788|North China| 695528| 0| 1.0| 4.0| 15| Beijing| 388083.0|309127.53846153844|1999|2678.82|197525.0| 0.0| 0.0| 0.53|2522449|North China| 944047| 0| 1.0| 4.0| 16| Beijing| 281769.0| 188633.0|2000|3161.66|168368.0| 0.0| 0.0| 0.53|1667114|North China| 757990| 0| 1.0| 4.0| 17| Beijing| 441923.0|309127.53846153844|2001|3707.96|176818.0| 0.0| 0.0| 0.53|2093925|North China|1194728| 0| 1.0| 4.0| 18| Beijing| 558569.0| 280277.0|2002| 4315.0|172464.0| 0.0| 0.0| 0.53|2511249|North China|1078754| 0| 1.0| 4.0| 19| Beijing| 642581.0| 269596.0|2003|5007.21|219126.0| 0.0| 0.7948718| 0.0|2823366|North China|1426600| 1| 1.0| 4.0| +---+--------+---------+------------------+----+-------+--------+------------------+----------+----------+-------+-----------+-------+-----------------------+-------------+-----------+ only showing top 20 rows

Using VectorAssembler to prepare features for machine learning

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
df.columns
Out[375]: ['_c0', 'province', 'specific', 'general', 'year', 'gdp', 'fdi', 'rnr', 'rr', 'i', 'fr', 'reg', 'it', 'specific_classification', 'provinceIndex', 'regionIndex']
assembler = VectorAssembler(
 inputCols=[
 'provinceIndex',
# 'specific',
 'general',
 'year',
 'gdp',
 'fdi',
 #'rnr',
 #'rr',
 #'i',
 #'fr',
 'regionIndex',
 'it'
 ],
 outputCol="features")
output = assembler.transform(df)
final_data = output.select("features", "specific")

Spliting data into train and test

train_data,test_data = final_data.randomSplit([0.7,0.3])

Regression with Pyspark ML

from pyspark.ml.regression import LinearRegression
lr = LinearRegression(labelCol='specific')

Fitting the linear regression model to the training data

lrModel = lr.fit(train_data)

Coefficients and Intercept of the linear regression model

print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))
Coefficients: [-4936.461707001148,0.8007702471080539,-3994.683052325085,-7.5033201950338,0.42095493334994133,50994.51222529955,0.2531915644818595] Intercept: 7695214.561654471

Evaluating trained linear regression model on the test data

test_results = lrModel.evaluate(test_data)

Metrics of trained linear regression model on the test data (RMSE, MSE, R2)

print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("MSE: {}".format(test_results.meanSquaredError))
print("R2: {}".format(test_results.r2))
RMSE: 292695.0825058327 MSE: 85670411323.0962 R2: 0.7853651103073853

Looking at correlations with corr

from pyspark.sql.functions import corr
df.select(corr('specific','gdp')).show()
+-------------------+ corr(specific, gdp)| +-------------------+ 0.5141876884991972| +-------------------+

Classification with Pyspark ML

from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

DecisionTreeClassifier, RandomForestClassifier and GBTClassifier

dtc = DecisionTreeClassifier(labelCol='specific_classification',featuresCol='features')
rfc = RandomForestClassifier(labelCol='specific_classification',featuresCol='features')
gbt = GBTClassifier(labelCol='specific_classification',featuresCol='features')

Selecting features and binary target

final_data = output.select("features", "specific_classification")
train_data,test_data = final_data.randomSplit([0.7,0.3])

Fitting the Classifiers to the Training Data

rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)
dtc_model = dtc.fit(train_data)

Classifier predictions on test data

dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)
gbt_predictions = gbt_model.transform(test_data)

Evaluating Classifiers using pyspark.ml.evaluation and MulticlassClassificationEvaluator

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Classifier Accuracy

acc_evaluator = MulticlassClassificationEvaluator(labelCol="specific_classification", predictionCol="prediction", metricName="accuracy")

Classifier Accuracy Metrics

dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
gbt_acc = acc_evaluator.evaluate(gbt_predictions)
print('-'*80)
print('Decision tree accuracy: {0:2.2f}%'.format(dtc_acc*100))
print('-'*80)
print('Random forest ensemble accuracy: {0:2.2f}%'.format(rfc_acc*100))
print('-'*80)
print('GBT accuracy: {0:2.2f}%'.format(gbt_acc*100))
print('-'*80)
-------------------------------------------------------------------------------- Decision tree accuracy: 81.98% -------------------------------------------------------------------------------- Random forest ensemble accuracy: 88.29% -------------------------------------------------------------------------------- GBT accuracy: 81.08% --------------------------------------------------------------------------------

Classification Correlation with Corr

df.select(corr('specific_classification','fdi')).show()
+----------------------------------+ corr(specific_classification, fdi)| +----------------------------------+ 0.307429849493392| +----------------------------------+
df.select(corr('specific_classification','gdp')).show()
+----------------------------------+ corr(specific_classification, gdp)| +----------------------------------+ 0.492176921599151| +----------------------------------+