"A minimal example of using Pyspark for Linear Regression"

  • toc:true- branch: master- badges: true- comments: true
  • author: David Kearney
  • categories: [pyspark, jupyter]
  • description: A minimal example of using Pyspark for Linear Regression
  • title: Pyspark Regression with Fiscal Data

Bring in needed imports

from pyspark.sql.functions import col
from pyspark.sql.types import StringType,BooleanType,DateType,IntegerType
from pyspark.sql.functions import *

Load data from CSV

#collapse-hide

# Load data from a CSV
file_location = "/FileStore/tables/df_panel_fix.csv"
df = spark.read.format("CSV").option("inferSchema", True).option("header", True).load(file_location)
display(df.take(5))

_c0provincespecificgeneralyeargdpfdirnrrrifrregit
0Anhui147002.0null19962093.3506610.00.00.01128873East China631930
1Anhui151981.0null19972347.32434430.00.00.01356287East China657860
2Anhui174930.0null19982542.96276730.00.00.01518236East China889463
3Anhui285324.0null19992712.3426131nullnullnull1646891East China1227364
4Anhui195580.032100.020002902.09318470.00.00.01601508East China1499110
df.createOrReplaceTempView("fiscal_stats")

sums = spark.sql("""
select year, sum(it) as total_yearly_it, sum(fr) as total_yearly_fr
from fiscal_stats
group by 1
order by year asc
""")

sums.show()
+----+---------------+---------------+ year|total_yearly_it|total_yearly_fr| +----+---------------+---------------+ 1996| 19825341| 2.9579215E7| 1997| 21391321| 2.9110765E7| 1998| 25511453| 3.8154711E7| 1999| 31922107| 4.2128627E7| 2000| 38721293| 4.8288092E7| 2001| 50754944| 5.8910649E7| 2002| 62375881| 6.2071474E7| 2003| 69316709| 7.2479293E7| 2004| 88626786| null| 2005| 98263665| null| 2006| 119517822| 1.3349148E8| 2007| 153467611| 2.27385701E8| +----+---------------+---------------+

Describing the Data

df.describe().toPandas().transpose()
0 1 2 3 4
summary count mean stddev min max
_c0 360 179.5 104.06728592598157 0 359
province 360 None None Anhui Zhejiang
specific 356 583470.7303370787 654055.3290782663 8964.0 3937966.0
general 169 309127.53846153844 355423.5760674793 0.0 1737800.0
year 360 2001.5 3.4568570586927794 1996 2007
gdp 360 4428.653416666667 4484.668659976412 64.98 31777.01
fdi 360 196139.38333333333 303043.97011891654 2 1743140
rnr 294 0.0355944252244898 0.16061503029299648 0.0 1.214285714
rr 296 0.059688621057432424 0.15673351824073453 0.0 0.84
i 287 0.08376351662369343 0.1838933104683607 0.0 1.05
fr 295 2522449.0034013605 3491329.8613106664 #REF! 9898522
reg 360 None None East China Southwest China
it 360 2165819.2583333333 1769294.2935487411 147897 10533312

Cast Data Type

df2 = df.withColumn("gdp",col("gdp").cast(IntegerType())) \
.withColumn("specific",col("specific").cast(IntegerType())) \
.withColumn("general",col("general").cast(IntegerType())) \
.withColumn("year",col("year").cast(IntegerType())) \
.withColumn("fdi",col("fdi").cast(IntegerType())) \
.withColumn("rnr",col("rnr").cast(IntegerType())) \
.withColumn("rr",col("rr").cast(IntegerType())) \
.withColumn("i",col("i").cast(IntegerType())) \
.withColumn("fr",col("fr").cast(IntegerType()))

printSchema

df2.printSchema()
root -- _c0: integer (nullable = true) -- province: string (nullable = true) -- specific: integer (nullable = true) -- general: integer (nullable = true) -- year: integer (nullable = true) -- gdp: integer (nullable = true) -- fdi: integer (nullable = true) -- rnr: integer (nullable = true) -- rr: integer (nullable = true) -- i: integer (nullable = true) -- fr: integer (nullable = true) -- reg: string (nullable = true) -- it: integer (nullable = true)
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

assembler = VectorAssembler(inputCols=['gdp', 'fdi'], outputCol="features")
train_df = assembler.transform(df2) 
train_df.select("specific", "year").show()
+--------+----+ specific|year| +--------+----+ 147002|1996| 151981|1997| 174930|1998| 285324|1999| 195580|2000| 250898|2001| 434149|2002| 619201|2003| 898441|2004| 898441|2005| 1457872|2006| 2213991|2007| 165957|1996| 165957|1997| 245198|1998| 388083|1999| 281769|2000| 441923|2001| 558569|2002| 642581|2003| +--------+----+ only showing top 20 rows

Linear Regression in Pyspark

lr = LinearRegression(featuresCol = 'features', labelCol='it')
lr_model = lr.fit(train_df)

trainingSummary = lr_model.summary
print("Coefficients: " + str(lr_model.coefficients))
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("R2: %f" % trainingSummary.r2)
Coefficients: [495.05888709337756,-4.968141828763066] RMSE: 1234228.673087 R2: 0.512023
lr_predictions = lr_model.transform(train_df)
lr_predictions.select("prediction","it","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="it",metricName="r2")
+------------------+-------+----------------+ prediction| it| features| +------------------+-------+----------------+ 1732528.7382477913| 631930|[2093.0,50661.0]| 1894133.7432895212| 657860|[2347.0,43443.0]| 2069017.8229123235| 889463|[2542.0,27673.0]| 2160838.7084181504|1227364|[2712.0,26131.0]| 2226501.9982726825|1499110|[2902.0,31847.0]| +------------------+-------+----------------+ only showing top 5 rows
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))
R Squared (R2) on test data = 0.512023
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
numIterations: 1 objectiveHistory: [0.0] +-------------------+ residuals| +-------------------+ -1100598.7382477913| -1236273.7432895212| -1179554.8229123235| -933474.7084181504| -727391.9982726825| -222546.39659531135| -94585.30175113119| 108072.63313654158| 389732.58121094666| 621021.2194867637| 1885768.997742407| 3938310.059555837| -554084.125169754| -615660.3899049093| -352195.3468934437| -348450.00565795833| -918476.5594253046| -710059.9133252408| -1148661.0062004486| -911572.322055324| +-------------------+ only showing top 20 rows
predictions = lr_model.transform(test_df)
predictions.select("prediction","it","features").show()
+------------------+-------+---------------+ prediction| it| features| +------------------+-------+---------------+ 976371.9212205639| 306114| [64.0,679.0]| 990722.2032541803| 415547| [91.0,481.0]| 1016348.0830204486| 983251| [139.0,106.0]| 1036290.7062801318| 218361| [184.0,576.0]| 1034023.4471330958| 178668| [202.0,2826.0]| 1060130.0768520113| 274994| [245.0,1856.0]| 1023513.0851009073| 546541|[263.0,11020.0]| 1053250.6267921| 361358| [264.0,5134.0]| 1123768.8091592425| 866691| [377.0,2200.0]| 1128604.8330225947| 948521| [390.0,2522.0]| 810587.2575938476| 177748|[442.0,71715.0]| 1159703.254297337| 736165| [445.0,1743.0]| 1066975.770986663|1260633|[466.0,22500.0]| 1288507.6625716756|1423771| [725.0,3718.0]| 1320055.238474972| 573905| [793.0,4144.0]| 1188611.0570700848|2347862|[797.0,31000.0]| 1321857.482976733| 582711| [805.0,4977.0]| 1033849.5995896922| 746784|[819.0,64343.0]| 1445051.792853667|1216605|[1029.0,2501.0]| 1437887.1056682135|1258100|[1052.0,6235.0]| +------------------+-------+---------------+ only showing top 20 rows
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'it')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(train_df)
dt_evaluator = RegressionEvaluator(
    labelCol="it", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
Root Mean Squared Error (RMSE) on test data = 1.01114e+06
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'it', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(train_df)
gbt_predictions.select('prediction', 'it', 'features').show(5)


gbt_evaluator = RegressionEvaluator(
    labelCol="it", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
+------------------+-------+----------------+ prediction| it| features| +------------------+-------+----------------+ 1388898.308543053| 631930|[2093.0,50661.0]| 1388898.308543053| 657860|[2347.0,43443.0]| 1649083.6277172007| 889463|[2542.0,27673.0]| 1649083.6277172007|1227364|[2712.0,26131.0]| 1649083.6277172007|1499110|[2902.0,31847.0]| +------------------+-------+----------------+ only showing top 5 rows Root Mean Squared Error (RMSE) on test data = 778728