from pyspark.sql import SparkSession
from pyspark.sql.types import StructField,StringType,IntegerType,StructType, DoubleType, FloatType
from pyspark.sql.functions import *


data_schema = [
StructField("_c0", IntegerType(), True)
,StructField("province", StringType(), True)
,StructField("specific", DoubleType(), True)
,StructField("general", DoubleType(), True)
,StructField("year", IntegerType(), True)
,StructField("gdp", FloatType(), True)
,StructField("fdi", FloatType(), True)
,StructField("rnr", DoubleType(), True)
,StructField("rr", FloatType(), True)
,StructField("i", FloatType(), True)
,StructField("fr", IntegerType(), True)
,StructField("reg", StringType(), True)
,StructField("it", IntegerType(), True)
]

final_struc = StructType(fields=data_schema)

file_location = "/FileStore/tables/df_panel_fix.csv"
df = spark.read.format("CSV").schema(final_struc).option("header", True).load(file_location)

#df.printSchema()

df.show()
+---+--------+---------+--------+----+-------+--------+----+---------+---------+-------+-----------+-------+ _c0|province| specific| general|year| gdp| fdi| rnr| rr| i| fr| reg| it| +---+--------+---------+--------+----+-------+--------+----+---------+---------+-------+-----------+-------+ 0| Anhui| 147002.0| null|1996| 2093.3| 50661.0| 0.0| 0.0| 0.0|1128873| East China| 631930| 1| Anhui| 151981.0| null|1997|2347.32| 43443.0| 0.0| 0.0| 0.0|1356287| East China| 657860| 2| Anhui| 174930.0| null|1998|2542.96| 27673.0| 0.0| 0.0| 0.0|1518236| East China| 889463| 3| Anhui| 285324.0| null|1999|2712.34| 26131.0|null| null| null|1646891| East China|1227364| 4| Anhui| 195580.0| 32100.0|2000|2902.09| 31847.0| 0.0| 0.0| 0.0|1601508| East China|1499110| 5| Anhui| 250898.0| null|2001|3246.71| 33672.0| 0.0| 0.0| 0.0|1672445| East China|2165189| 6| Anhui| 434149.0| 66529.0|2002|3519.72| 38375.0| 0.0| 0.0| 0.0|1677840| East China|2404936| 7| Anhui| 619201.0| 52108.0|2003|3923.11| 36720.0| 0.0| 0.0| 0.0|1896479| East China|2815820| 8| Anhui| 898441.0|349699.0|2004| 4759.3| 54669.0| 0.0| 0.0| 0.0| null| East China|3422176| 9| Anhui| 898441.0| null|2005|5350.17| 69000.0| 0.0| 0.0|0.3243243| null| East China|3874846| 10| Anhui|1457872.0|279052.0|2006| 6112.5|139354.0| 0.0| 0.0|0.3243243|3434548| East China|5167300| 11| Anhui|2213991.0|178705.0|2007|7360.92|299892.0| 0.0| 0.0|0.3243243|4468640| East China|7040099| 12| Beijing| 165957.0| null|1996| 1789.2|155290.0|null| null| null| 634562|North China| 508135| 13| Beijing| 165957.0| null|1997|2077.09|159286.0| 0.0| 0.0| 0.6| 634562|North China| 569283| 14| Beijing| 245198.0| null|1998|2377.18|216800.0| 0.0| 0.0| 0.53| 938788|North China| 695528| 15| Beijing| 388083.0| null|1999|2678.82|197525.0| 0.0| 0.0| 0.53| null|North China| 944047| 16| Beijing| 281769.0|188633.0|2000|3161.66|168368.0| 0.0| 0.0| 0.53|1667114|North China| 757990| 17| Beijing| 441923.0| null|2001|3707.96|176818.0| 0.0| 0.0| 0.53|2093925|North China|1194728| 18| Beijing| 558569.0|280277.0|2002| 4315.0|172464.0| 0.0| 0.0| 0.53|2511249|North China|1078754| 19| Beijing| 642581.0|269596.0|2003|5007.21|219126.0| 0.0|0.7948718| 0.0|2823366|North China|1426600| +---+--------+---------+--------+----+-------+--------+----+---------+---------+-------+-----------+-------+ only showing top 20 rows

Using toPandas to look at the data

df.limit(10).toPandas()
_c0 province specific general year gdp fdi rnr rr i fr reg it
0 0 Anhui 147002.0 NaN 1996 2093.300049 50661.0 0.0 0.0 0.000000 1128873.0 East China 631930
1 1 Anhui 151981.0 NaN 1997 2347.320068 43443.0 0.0 0.0 0.000000 1356287.0 East China 657860
2 2 Anhui 174930.0 NaN 1998 2542.959961 27673.0 0.0 0.0 0.000000 1518236.0 East China 889463
3 3 Anhui 285324.0 NaN 1999 2712.340088 26131.0 NaN NaN NaN 1646891.0 East China 1227364
4 4 Anhui 195580.0 32100.0 2000 2902.090088 31847.0 0.0 0.0 0.000000 1601508.0 East China 1499110
5 5 Anhui 250898.0 NaN 2001 3246.709961 33672.0 0.0 0.0 0.000000 1672445.0 East China 2165189
6 6 Anhui 434149.0 66529.0 2002 3519.719971 38375.0 0.0 0.0 0.000000 1677840.0 East China 2404936
7 7 Anhui 619201.0 52108.0 2003 3923.110107 36720.0 0.0 0.0 0.000000 1896479.0 East China 2815820
8 8 Anhui 898441.0 349699.0 2004 4759.299805 54669.0 0.0 0.0 0.000000 NaN East China 3422176
9 9 Anhui 898441.0 NaN 2005 5350.169922 69000.0 0.0 0.0 0.324324 NaN East China 3874846

Renaming Columns

df = df.withColumnRenamed("reg","region")
df.limit(10).toPandas()
_c0 province specific general year gdp fdi rnr rr i fr region it
0 0 Anhui 147002.0 NaN 1996 2093.300049 50661.0 0.0 0.0 0.000000 1128873.0 East China 631930
1 1 Anhui 151981.0 NaN 1997 2347.320068 43443.0 0.0 0.0 0.000000 1356287.0 East China 657860
2 2 Anhui 174930.0 NaN 1998 2542.959961 27673.0 0.0 0.0 0.000000 1518236.0 East China 889463
3 3 Anhui 285324.0 NaN 1999 2712.340088 26131.0 NaN NaN NaN 1646891.0 East China 1227364
4 4 Anhui 195580.0 32100.0 2000 2902.090088 31847.0 0.0 0.0 0.000000 1601508.0 East China 1499110
5 5 Anhui 250898.0 NaN 2001 3246.709961 33672.0 0.0 0.0 0.000000 1672445.0 East China 2165189
6 6 Anhui 434149.0 66529.0 2002 3519.719971 38375.0 0.0 0.0 0.000000 1677840.0 East China 2404936
7 7 Anhui 619201.0 52108.0 2003 3923.110107 36720.0 0.0 0.0 0.000000 1896479.0 East China 2815820
8 8 Anhui 898441.0 349699.0 2004 4759.299805 54669.0 0.0 0.0 0.000000 NaN East China 3422176
9 9 Anhui 898441.0 NaN 2005 5350.169922 69000.0 0.0 0.0 0.324324 NaN East China 3874846
# df = df.toDF(*['year', 'region', 'province', 'gdp', 'fdi', 'specific', 'general', 'it', 'fr', 'rnr', 'rr', 'i', '_c0', 'specific_classification', 'provinceIndex', 'regionIndex'])

Selecting Columns of Interest

df = df.select('year','region','province','gdp', 'fdi')
df.sort("gdp").show()
+----+---------------+--------+------+-------+ year| region|province| gdp| fdi| +----+---------------+--------+------+-------+ 1996|Southwest China| Tibet| 64.98| 679.0| 1997|Southwest China| Tibet| 77.24| 63.0| 1998|Southwest China| Tibet| 91.5| 481.0| 1999|Southwest China| Tibet|105.98| 196.0| 2000|Southwest China| Tibet| 117.8| 2.0| 2001|Southwest China| Tibet|139.16| 106.0| 2002|Southwest China| Tibet|162.04| 293.0| 1996|Northwest China| Qinghai|184.17| 576.0| 2003|Southwest China| Tibet|185.09| 467.0| 1997|Northwest China| Qinghai|202.79| 247.0| 1996|Northwest China| Ningxia| 202.9| 2826.0| 2004|Southwest China| Tibet|220.34| 2699.0| 1998|Northwest China| Qinghai|220.92| 1010.0| 1997|Northwest China| Ningxia|224.59| 671.0| 1999|Northwest China| Qinghai|239.38| 459.0| 1998|Northwest China| Ningxia|245.44| 1856.0| 2005|Southwest China| Tibet| 248.8| 1151.0| 2000|Northwest China| Qinghai|263.68|11020.0| 1999|Northwest China| Ningxia|264.58| 5134.0| 2006|Southwest China| Tibet|290.76| 1522.0| +----+---------------+--------+------+-------+ only showing top 20 rows

Sorting RDDs by Columns

from pyspark.sql import functions as F
df.sort(F.desc("gdp")).show()
+----+-------------------+---------+--------+---------+ year| region| province| gdp| fdi| +----+-------------------+---------+--------+---------+ 2007|South Central China|Guangdong|31777.01|1712603.0| 2006|South Central China|Guangdong|26587.76|1451065.0| 2007| East China| Shandong|25776.91|1101159.0| 2005|South Central China|Guangdong|22557.37|1236400.0| 2006| East China| Shandong|21900.19|1000069.0| 2007| East China| Jiangsu|21742.05|1743140.0| 2004|South Central China|Guangdong|18864.62|1001158.0| 2007| East China| Zhejiang|18753.73|1036576.0| 2006| East China| Jiangsu|18598.69|1318339.0| 2005| East China| Shandong|18366.87| 897000.0| 2003|South Central China|Guangdong|15844.64| 782294.0| 2006| East China| Zhejiang|15718.47| 888935.0| 2004| East China| Shandong|15021.84| 870064.0| 2007|South Central China| Henan|15012.46| 306162.0| 2005| East China| Jiangsu| 15003.6|1213800.0| 2007| North China| Hebei|13607.32| 241621.0| 2002|South Central China|Guangdong|13502.42|1133400.0| 2005| East China| Zhejiang|13417.68| 772000.0| 2007| East China| Shanghai|12494.01| 792000.0| 2004| East China| Jiangsu|12442.87|1056365.0| +----+-------------------+---------+--------+---------+ only showing top 20 rows

Casting Data Types

from pyspark.sql.types import IntegerType, StringType, DoubleType
df = df.withColumn('gdp', F.col('gdp').cast(DoubleType()))
df = df.withColumn('province', F.col('province').cast(StringType()))
df.filter((df.gdp>10000) & (df.region=='East China')).show()
+----+----------+--------+----------------+---------+ year| region|province| gdp| fdi| +----+----------+--------+----------------+---------+ 2003|East China| Jiangsu| 10606.849609375|1018960.0| 2004|East China| Jiangsu|12442.8701171875|1056365.0| 2005|East China| Jiangsu| 15003.599609375|1213800.0| 2006|East China| Jiangsu| 18598.689453125|1318339.0| 2007|East China| Jiangsu| 21742.05078125|1743140.0| 2002|East China|Shandong| 10275.5| 473404.0| 2003|East China|Shandong| 12078.150390625| 601617.0| 2004|East China|Shandong| 15021.83984375| 870064.0| 2005|East China|Shandong| 18366.869140625| 897000.0| 2006|East China|Shandong| 21900.189453125|1000069.0| 2007|East China|Shandong| 25776.91015625|1101159.0| 2006|East China|Shanghai| 10572.240234375| 710700.0| 2007|East China|Shanghai| 12494.009765625| 792000.0| 2004|East China|Zhejiang|11648.7001953125| 668128.0| 2005|East China|Zhejiang| 13417.6796875| 772000.0| 2006|East China|Zhejiang|15718.4697265625| 888935.0| 2007|East China|Zhejiang| 18753.73046875|1036576.0| +----+----------+--------+----------------+---------+

Aggregating using groupBy, .agg and sum/max

from pyspark.sql import functions as F

df.groupBy(["region","province"]).agg(F.sum("gdp") ,F.max("gdp")).show()
+-------------------+---------+------------------+------------------+ region| province| sum(gdp)| max(gdp)| +-------------------+---------+------------------+------------------+ South Central China| Hunan| 57190.69970703125| 9439.599609375| North China| Tianjin|30343.979858398438| 5252.759765625| Northwest China| Xinjiang| 21946.75994873047| 3523.159912109375| North China| Beijing|56081.439208984375| 9846.8095703125| South Central China|Guangdong| 184305.376953125| 31777.009765625| South Central China| Henan| 86507.60034179688| 15012.4599609375| East China| Jiangsu|129142.15966796875| 21742.05078125| Northwest China| Gansu| 16773.99005126953| 2703.97998046875| Southwest China| Guizhou|17064.130249023438| 2884.110107421875| Southwest China| Sichuan|64533.479736328125| 10562.3896484375| South Central China| Hainan| 8240.570068359375|1254.1700439453125| East China| Shandong| 147888.0283203125| 25776.91015625| Southwest China|Chongqing|29732.549926757812| 4676.1298828125| Northwest China| Shaanxi|31896.409790039062| 5757.2900390625| East China| Shanghai| 77189.4501953125| 12494.009765625| Southwest China| Tibet| 2045.120002746582|341.42999267578125| North China| Hebei| 83241.8994140625| 13607.3203125| Northeast China| Jilin|27298.250366210938| 4275.1201171875| East China| Zhejiang|109657.81884765625| 18753.73046875| North China| Shanxi| 33806.52990722656| 6024.4501953125| +-------------------+---------+------------------+------------------+ only showing top 20 rows
df.groupBy(["region","province"]).agg(F.sum("gdp").alias("SumGDP"),F.max("gdp").alias("MaxGDP")).show()
+-------------------+---------+------------------+------------------+ region| province| GDP| MaxGDP| +-------------------+---------+------------------+------------------+ South Central China| Hunan| 57190.69970703125| 9439.599609375| North China| Tianjin|30343.979858398438| 5252.759765625| Northwest China| Xinjiang| 21946.75994873047| 3523.159912109375| North China| Beijing|56081.439208984375| 9846.8095703125| South Central China|Guangdong| 184305.376953125| 31777.009765625| South Central China| Henan| 86507.60034179688| 15012.4599609375| East China| Jiangsu|129142.15966796875| 21742.05078125| Northwest China| Gansu| 16773.99005126953| 2703.97998046875| Southwest China| Guizhou|17064.130249023438| 2884.110107421875| Southwest China| Sichuan|64533.479736328125| 10562.3896484375| South Central China| Hainan| 8240.570068359375|1254.1700439453125| East China| Shandong| 147888.0283203125| 25776.91015625| Southwest China|Chongqing|29732.549926757812| 4676.1298828125| Northwest China| Shaanxi|31896.409790039062| 5757.2900390625| East China| Shanghai| 77189.4501953125| 12494.009765625| Southwest China| Tibet| 2045.120002746582|341.42999267578125| North China| Hebei| 83241.8994140625| 13607.3203125| Northeast China| Jilin|27298.250366210938| 4275.1201171875| East China| Zhejiang|109657.81884765625| 18753.73046875| North China| Shanxi| 33806.52990722656| 6024.4501953125| +-------------------+---------+------------------+------------------+ only showing top 20 rows
df.groupBy(["region","province"]).agg(
    F.sum("gdp").alias("SumGDP"),\
    F.max("gdp").alias("MaxGDP")\
    ).show()
+-------------------+---------+------------------+------------------+ region| province| SumGDP| MaxGDP| +-------------------+---------+------------------+------------------+ South Central China| Hunan| 57190.69970703125| 9439.599609375| North China| Tianjin|30343.979858398438| 5252.759765625| Northwest China| Xinjiang| 21946.75994873047| 3523.159912109375| North China| Beijing|56081.439208984375| 9846.8095703125| South Central China|Guangdong| 184305.376953125| 31777.009765625| South Central China| Henan| 86507.60034179688| 15012.4599609375| East China| Jiangsu|129142.15966796875| 21742.05078125| Northwest China| Gansu| 16773.99005126953| 2703.97998046875| Southwest China| Guizhou|17064.130249023438| 2884.110107421875| Southwest China| Sichuan|64533.479736328125| 10562.3896484375| South Central China| Hainan| 8240.570068359375|1254.1700439453125| East China| Shandong| 147888.0283203125| 25776.91015625| Southwest China|Chongqing|29732.549926757812| 4676.1298828125| Northwest China| Shaanxi|31896.409790039062| 5757.2900390625| East China| Shanghai| 77189.4501953125| 12494.009765625| Southwest China| Tibet| 2045.120002746582|341.42999267578125| North China| Hebei| 83241.8994140625| 13607.3203125| Northeast China| Jilin|27298.250366210938| 4275.1201171875| East China| Zhejiang|109657.81884765625| 18753.73046875| North China| Shanxi| 33806.52990722656| 6024.4501953125| +-------------------+---------+------------------+------------------+ only showing top 20 rows
df.limit(10).toPandas()
year region province gdp fdi
0 1996 East China Anhui 2093.300049 50661.0
1 1997 East China Anhui 2347.320068 43443.0
2 1998 East China Anhui 2542.959961 27673.0
3 1999 East China Anhui 2712.340088 26131.0
4 2000 East China Anhui 2902.090088 31847.0
5 2001 East China Anhui 3246.709961 33672.0
6 2002 East China Anhui 3519.719971 38375.0
7 2003 East China Anhui 3923.110107 36720.0
8 2004 East China Anhui 4759.299805 54669.0
9 2005 East China Anhui 5350.169922 69000.0

Exponentials using exp

df = df.withColumn("Exp_GDP", F.exp("gdp"))
df.show()
+----+-----------+--------+-----------------+--------+--------+ year| region|province| gdp| fdi| Exp_GDP| +----+-----------+--------+-----------------+--------+--------+ 1996| East China| Anhui|2093.300048828125| 50661.0|Infinity| 1997| East China| Anhui|2347.320068359375| 43443.0|Infinity| 1998| East China| Anhui| 2542.9599609375| 27673.0|Infinity| 1999| East China| Anhui|2712.340087890625| 26131.0|Infinity| 2000| East China| Anhui|2902.090087890625| 31847.0|Infinity| 2001| East China| Anhui| 3246.7099609375| 33672.0|Infinity| 2002| East China| Anhui|3519.719970703125| 38375.0|Infinity| 2003| East China| Anhui|3923.110107421875| 36720.0|Infinity| 2004| East China| Anhui| 4759.2998046875| 54669.0|Infinity| 2005| East China| Anhui| 5350.169921875| 69000.0|Infinity| 2006| East China| Anhui| 6112.5|139354.0|Infinity| 2007| East China| Anhui| 7360.919921875|299892.0|Infinity| 1996|North China| Beijing|1789.199951171875|155290.0|Infinity| 1997|North China| Beijing|2077.090087890625|159286.0|Infinity| 1998|North China| Beijing|2377.179931640625|216800.0|Infinity| 1999|North China| Beijing|2678.820068359375|197525.0|Infinity| 2000|North China| Beijing|3161.659912109375|168368.0|Infinity| 2001|North China| Beijing| 3707.9599609375|176818.0|Infinity| 2002|North China| Beijing| 4315.0|172464.0|Infinity| 2003|North China| Beijing| 5007.2099609375|219126.0|Infinity| +----+-----------+--------+-----------------+--------+--------+ only showing top 20 rows

Window functions

Note: Window functions

# Window functions

from pyspark.sql.window import Window
windowSpec = Window().partitionBy(['province']).orderBy(F.desc('gdp'))
df.withColumn("rank",F.rank().over(windowSpec)).show()
+----+-------------------+---------+-----------------+---------+--------+----+ year| region| province| gdp| fdi| Exp_GDP|rank| +----+-------------------+---------+-----------------+---------+--------+----+ 2007|South Central China|Guangdong| 31777.009765625|1712603.0|Infinity| 1| 2006|South Central China|Guangdong| 26587.759765625|1451065.0|Infinity| 2| 2005|South Central China|Guangdong| 22557.369140625|1236400.0|Infinity| 3| 2004|South Central China|Guangdong| 18864.619140625|1001158.0|Infinity| 4| 2003|South Central China|Guangdong| 15844.6396484375| 782294.0|Infinity| 5| 2002|South Central China|Guangdong| 13502.419921875|1133400.0|Infinity| 6| 2001|South Central China|Guangdong| 12039.25|1193203.0|Infinity| 7| 2000|South Central China|Guangdong| 10741.25|1128091.0|Infinity| 8| 1999|South Central China|Guangdong| 9250.6796875|1165750.0|Infinity| 9| 1998|South Central China|Guangdong| 8530.8798828125|1201994.0|Infinity| 10| 1997|South Central China|Guangdong| 7774.52978515625|1171083.0|Infinity| 11| 1996|South Central China|Guangdong| 6834.97021484375|1162362.0|Infinity| 12| 2007|South Central China| Hunan| 9439.599609375| 327051.0|Infinity| 1| 2006|South Central China| Hunan| 7688.669921875| 259335.0|Infinity| 2| 2005|South Central China| Hunan| 6596.10009765625| 207200.0|Infinity| 3| 2004|South Central China| Hunan| 5641.93994140625| 141800.0|Infinity| 4| 2003|South Central China| Hunan| 4659.990234375| 101835.0|Infinity| 5| 2002|South Central China| Hunan| 4151.5400390625| 90022.0|Infinity| 6| 2001|South Central China| Hunan| 3831.89990234375| 81011.0|Infinity| 7| 2000|South Central China| Hunan|3551.489990234375| 67833.0|Infinity| 8| +----+-------------------+---------+-----------------+---------+--------+----+ only showing top 20 rows
from pyspark.sql.window import Window
windowSpec = Window().partitionBy(['province']).orderBy('year')

Lagging Variables

dfWithLag = df.withColumn("lag_7",F.lag("gdp", 7).over(windowSpec))
df.filter(df.year>'2000').show()
+----+---------------+---------+------------------+--------+--------+ year| region| province| gdp| fdi| Exp_GDP| +----+---------------+---------+------------------+--------+--------+ 2001| East China| Anhui| 3246.7099609375| 33672.0|Infinity| 2002| East China| Anhui| 3519.719970703125| 38375.0|Infinity| 2003| East China| Anhui| 3923.110107421875| 36720.0|Infinity| 2004| East China| Anhui| 4759.2998046875| 54669.0|Infinity| 2005| East China| Anhui| 5350.169921875| 69000.0|Infinity| 2006| East China| Anhui| 6112.5|139354.0|Infinity| 2007| East China| Anhui| 7360.919921875|299892.0|Infinity| 2001| North China| Beijing| 3707.9599609375|176818.0|Infinity| 2002| North China| Beijing| 4315.0|172464.0|Infinity| 2003| North China| Beijing| 5007.2099609375|219126.0|Infinity| 2004| North China| Beijing| 6033.2099609375|308354.0|Infinity| 2005| North China| Beijing| 6969.52001953125|352638.0|Infinity| 2006| North China| Beijing| 8117.77978515625|455191.0|Infinity| 2007| North China| Beijing| 9846.8095703125|506572.0|Infinity| 2001|Southwest China|Chongqing|1976.8599853515625| 25649.0|Infinity| 2002|Southwest China|Chongqing| 2232.860107421875| 19576.0|Infinity| 2003|Southwest China|Chongqing| 2555.719970703125| 26083.0|Infinity| 2004|Southwest China|Chongqing| 3034.580078125| 40508.0|Infinity| 2005|Southwest China|Chongqing| 3467.719970703125| 51600.0|Infinity| 2006|Southwest China|Chongqing| 3907.22998046875| 69595.0|Infinity| +----+---------------+---------+------------------+--------+--------+ only showing top 20 rows

Looking at windows within the data

from pyspark.sql.window import Window

windowSpec = Window().partitionBy(['province']).orderBy('year').rowsBetween(-6,0)
dfWithRoll = df.withColumn("roll_7_confirmed",F.mean("gdp").over(windowSpec))
dfWithRoll.filter(dfWithLag.year>'2001').show()
+----+-------------------+---------+------------------+---------+--------------------+------------------+ year| region| province| gdp| fdi| Exp_GDP| roll_7_confirmed| +----+-------------------+---------+------------------+---------+--------------------+------------------+ 2002|South Central China|Guangdong| 13502.419921875|1133400.0| Infinity| 9810.56849888393| 2003|South Central China|Guangdong| 15844.6396484375| 782294.0| Infinity|11097.664132254464| 2004|South Central China|Guangdong| 18864.619140625|1001158.0| Infinity|12681.962611607143| 2005|South Central China|Guangdong| 22557.369140625|1236400.0| Infinity|14685.746791294643| 2006|South Central China|Guangdong| 26587.759765625|1451065.0| Infinity|17162.472516741072| 2007|South Central China|Guangdong| 31777.009765625|1712603.0| Infinity| 20167.5810546875| 2002|South Central China| Hunan| 4151.5400390625| 90022.0| Infinity|3309.1999860491073| 2003|South Central China| Hunan| 4659.990234375| 101835.0| Infinity| 3612.037179129464| 2004|South Central China| Hunan| 5641.93994140625| 141800.0| Infinity|4010.9900251116073| 2005|South Central China| Hunan| 6596.10009765625| 207200.0| Infinity| 4521.07146344866| 2006|South Central China| Hunan| 7688.669921875| 259335.0| Infinity| 5160.232875279018| 2007|South Central China| Hunan| 9439.599609375| 327051.0| Infinity| 6001.391392299107| 2002| North China| Shanxi| 2324.800048828125| 21164.0| Infinity|1749.4771379743304| 2003| North China| Shanxi| 2855.22998046875| 21361.0| Infinity| 1972.779994419643| 2004| North China| Shanxi| 3571.3701171875| 62184.0| Infinity| 2272.118582589286| 2005| North China| Shanxi| 4230.52978515625| 27516.0| Infinity| 2646.325701032366| 2006| North China| Shanxi| 4878.60986328125| 47199.0| Infinity|3105.1128278459823| 2007| North China| Shanxi| 6024.4501953125| 134283.0| Infinity| 3702.074288504464| 2002| Southwest China| Tibet| 162.0399932861328| 293.0|2.360885537826244E70|108.38571493966239| 2003| Southwest China| Tibet|185.08999633789062| 467.0|2.418600091901801E80|125.54428536551339| +----+-------------------+---------+------------------+---------+--------------------+------------------+ only showing top 20 rows
from pyspark.sql.window import Window
windowSpec = Window().partitionBy(['province']).orderBy('year').rowsBetween(Window.unboundedPreceding,Window.currentRow)
dfWithRoll = df.withColumn("cumulative_gdp",F.sum("gdp").over(windowSpec))
dfWithRoll.filter(dfWithLag.year>'1999').show()
+----+-------------------+---------+-----------------+---------+--------+------------------+ year| region| province| gdp| fdi| Exp_GDP| cumulative_gdp| +----+-------------------+---------+-----------------+---------+--------+------------------+ 2000|South Central China|Guangdong| 10741.25|1128091.0|Infinity| 43132.3095703125| 2001|South Central China|Guangdong| 12039.25|1193203.0|Infinity| 55171.5595703125| 2002|South Central China|Guangdong| 13502.419921875|1133400.0|Infinity| 68673.9794921875| 2003|South Central China|Guangdong| 15844.6396484375| 782294.0|Infinity| 84518.619140625| 2004|South Central China|Guangdong| 18864.619140625|1001158.0|Infinity| 103383.23828125| 2005|South Central China|Guangdong| 22557.369140625|1236400.0|Infinity| 125940.607421875| 2006|South Central China|Guangdong| 26587.759765625|1451065.0|Infinity| 152528.3671875| 2007|South Central China|Guangdong| 31777.009765625|1712603.0|Infinity| 184305.376953125| 2000|South Central China| Hunan|3551.489990234375| 67833.0|Infinity| 15180.9599609375| 2001|South Central China| Hunan| 3831.89990234375| 81011.0|Infinity| 19012.85986328125| 2002|South Central China| Hunan| 4151.5400390625| 90022.0|Infinity| 23164.39990234375| 2003|South Central China| Hunan| 4659.990234375| 101835.0|Infinity| 27824.39013671875| 2004|South Central China| Hunan| 5641.93994140625| 141800.0|Infinity| 33466.330078125| 2005|South Central China| Hunan| 6596.10009765625| 207200.0|Infinity| 40062.43017578125| 2006|South Central China| Hunan| 7688.669921875| 259335.0|Infinity| 47751.10009765625| 2007|South Central China| Hunan| 9439.599609375| 327051.0|Infinity| 57190.69970703125| 2000| North China| Shanxi|1845.719970703125| 22472.0|Infinity|7892.0098876953125| 2001| North China| Shanxi|2029.530029296875| 23393.0|Infinity| 9921.539916992188| 2002| North China| Shanxi|2324.800048828125| 21164.0|Infinity|12246.339965820312| 2003| North China| Shanxi| 2855.22998046875| 21361.0|Infinity|15101.569946289062| +----+-------------------+---------+-----------------+---------+--------+------------------+ only showing top 20 rows

Pivot Dataframes

Note: Pivot Dataframes

pivoted_df = df.groupBy('year').pivot('province') \
                      .agg(F.sum('gdp').alias('gdp') , F.sum('fdi').alias('fdi'))
pivoted_df.limit(10).toPandas()
year Anhui_gdp Anhui_fdi Beijing_gdp Beijing_fdi Chongqing_gdp Chongqing_fdi Fujian_gdp Fujian_fdi Gansu_gdp Gansu_fdi Guangdong_gdp Guangdong_fdi Guangxi_gdp Guangxi_fdi Guizhou_gdp Guizhou_fdi Hainan_gdp Hainan_fdi Hebei_gdp Hebei_fdi Heilongjiang_gdp Heilongjiang_fdi Henan_gdp Henan_fdi Hubei_gdp Hubei_fdi Hunan_gdp Hunan_fdi Jiangsu_gdp Jiangsu_fdi Jiangxi_gdp Jiangxi_fdi Jilin_gdp Jilin_fdi Liaoning_gdp Liaoning_fdi Ningxia_gdp Ningxia_fdi Qinghai_gdp Qinghai_fdi Shaanxi_gdp Shaanxi_fdi Shandong_gdp Shandong_fdi Shanghai_gdp Shanghai_fdi Shanxi_gdp Shanxi_fdi Sichuan_gdp Sichuan_fdi Tianjin_gdp Tianjin_fdi Tibet_gdp Tibet_fdi Xinjiang_gdp Xinjiang_fdi Yunnan_gdp Yunnan_fdi Zhejiang_gdp Zhejiang_fdi
0 2003 3923.110107 36720.0 5007.209961 219126.0 2555.719971 26083.0 4983.669922 259903.0 1399.829956 2342.0 15844.639648 782294.0 2821.110107 41856.0 1426.339966 4521.0 713.960022 42125.0 6921.290039 96405.0 4057.399902 32180.0 6867.700195 53903.0 4757.450195 156886.0 4659.990234 101835.0 10606.849609 1018960.0 2450.479980 108197.0 2348.540039 24468.0 5458.220215 341168.0 445.359985 1743.0 390.200012 2522.0 2587.719971 33190.0 12078.150391 601617.0 6694.229980 546849.0 2855.229980 21361.0 5333.089844 41231.0 2578.030029 153473.0 185.089996 467.0 1886.349976 1534.0 2556.020020 8384.0 9705.019531 498055.0
1 2007 7360.919922 299892.0 9846.809570 506572.0 4676.129883 108534.0 9248.530273 406058.0 2703.979980 11802.0 31777.009766 1712603.0 5823.410156 68396.0 2884.110107 12651.0 1254.170044 112001.0 13607.320312 241621.0 7104.000000 208508.0 15012.459961 306162.0 9333.400391 276622.0 9439.599609 327051.0 21742.050781 1743140.0 4820.529785 280657.0 4275.120117 76064.0 9304.519531 598554.0 919.109985 5047.0 797.349976 31000.0 5757.290039 119516.0 25776.910156 1101159.0 12494.009766 792000.0 6024.450195 134283.0 10562.389648 149322.0 5252.759766 527776.0 341.429993 2418.0 3523.159912 12484.0 4772.520020 39453.0 18753.730469 1036576.0
2 2006 6112.500000 139354.0 8117.779785 455191.0 3907.229980 69595.0 7583.850098 322047.0 2277.350098 2954.0 26587.759766 1451065.0 4746.160156 44740.0 2338.979980 9384.0 1065.670044 74878.0 11467.599609 201434.0 6211.799805 170801.0 12362.790039 184526.0 7617.470215 244853.0 7688.669922 259335.0 18598.689453 1318339.0 4056.760010 242000.0 3620.270020 66100.0 8047.259766 359000.0 725.900024 3718.0 648.500000 27500.0 4743.609863 92489.0 21900.189453 1000069.0 10572.240234 710700.0 4878.609863 47199.0 8690.240234 120819.0 4462.740234 413077.0 290.760010 1522.0 3045.260010 10366.0 3988.139893 30234.0 15718.469727 888935.0
3 1997 2347.320068 43443.0 2077.090088 159286.0 1509.750000 38675.0 2870.899902 419666.0 793.570007 4144.0 7774.529785 1171083.0 1817.250000 87986.0 805.789978 4977.0 411.160004 70554.0 3953.780029 110064.0 2667.500000 73485.0 4041.090088 69204.0 2856.469971 79019.0 2849.270020 91702.0 6004.209961 507208.0 1409.739990 30068.0 1346.790039 45155.0 3157.689941 167142.0 224.589996 671.0 202.789993 247.0 1363.599976 62816.0 6537.069824 249294.0 3438.790039 422536.0 1476.000000 26592.0 3241.469971 24846.0 1264.630005 251135.0 77.239998 63.0 1039.849976 2472.0 1676.170044 16566.0 4686.109863 150345.0
4 2004 4759.299805 54669.0 6033.209961 308354.0 3034.580078 40508.0 5763.350098 474801.0 1688.489990 3539.0 18864.619141 1001158.0 3433.500000 29579.0 1677.800049 6533.0 819.659973 64343.0 8477.629883 162341.0 4750.600098 123639.0 8553.790039 87367.0 5633.240234 207126.0 5641.939941 141800.0 12442.870117 1056365.0 2807.409912 161202.0 2662.080078 19059.0 6002.540039 282410.0 537.109985 6689.0 466.100006 22500.0 3175.580078 52664.0 15021.839844 870064.0 8072.830078 654100.0 3571.370117 62184.0 6379.629883 70129.0 3110.969971 247243.0 220.339996 2699.0 2209.090088 4586.0 3081.909912 14200.0 11648.700195 668128.0
5 1996 2093.300049 50661.0 1789.199951 155290.0 1315.119995 21878.0 2484.250000 407876.0 722.520020 9002.0 6834.970215 1162362.0 1697.900024 66618.0 723.179993 3138.0 389.679993 78960.0 3452.969971 123652.0 2370.500000 54841.0 3634.689941 52566.0 2499.770020 68878.0 2540.129883 70344.0 5155.250000 478058.0 1169.729980 28818.0 1137.229980 39876.0 2793.370117 140405.0 202.899994 2826.0 184.169998 576.0 1215.839966 33008.0 5883.799805 259041.0 2957.550049 471578.0 1292.109985 13802.0 2871.649902 22519.0 1121.930054 200587.0 64.980003 679.0 900.929993 6639.0 1517.689941 18000.0 4188.529785 152021.0
6 1998 2542.959961 27673.0 2377.179932 216800.0 1602.380005 43107.0 3159.909912 421211.0 887.669983 3864.0 8530.879883 1201994.0 1911.300049 88613.0 858.390015 4535.0 442.130005 71715.0 4256.009766 142868.0 2774.399902 52639.0 4308.240234 61654.0 3114.020020 97294.0 3025.530029 81816.0 6680.339844 543511.0 1605.770020 47768.0 1464.339966 40227.0 3582.459961 220470.0 245.440002 1856.0 220.919998 1010.0 1458.400024 30010.0 7021.350098 220274.0 3801.090088 360150.0 1611.079956 24451.0 3474.090088 37248.0 1374.599976 211361.0 91.500000 481.0 1106.949951 2167.0 1831.329956 14568.0 5052.620117 131802.0
7 2001 3246.709961 33672.0 3707.959961 176818.0 1976.859985 25649.0 4072.850098 391804.0 1125.369995 7439.0 12039.250000 1193203.0 2279.340088 38416.0 1133.270020 2829.0 579.169983 46691.0 5516.759766 66989.0 3390.100098 34114.0 5533.009766 45729.0 3880.530029 118860.0 3831.899902 81011.0 8553.690430 642550.0 2003.069946 22724.0 1951.510010 33701.0 4669.060059 204446.0 337.440002 1680.0 300.130005 3649.0 2010.619995 35174.0 9195.040039 352093.0 5210.120117 429159.0 2029.530029 23393.0 4293.490234 58188.0 1919.089966 213348.0 139.160004 106.0 1491.599976 2035.0 2138.310059 6457.0 6898.339844 221162.0
8 2005 5350.169922 69000.0 6969.520020 352638.0 3467.719971 51600.0 6554.689941 260800.0 1933.979980 2000.0 22557.369141 1236400.0 3984.100098 37866.0 2005.420044 10768.0 918.750000 68400.0 10012.110352 191000.0 5513.700195 145000.0 10587.419922 123000.0 6590.189941 218500.0 6596.100098 207200.0 15003.599609 1213800.0 3456.699951 205238.0 3122.010010 45266.0 6672.000000 540679.0 612.609985 14100.0 543.320007 26600.0 3933.719971 62800.0 18366.869141 897000.0 9247.660156 685000.0 4230.529785 27516.0 7385.100098 88686.0 3905.639893 332885.0 248.800003 1151.0 2604.189941 4700.0 3462.729980 17352.0 13417.679688 772000.0
9 2000 2902.090088 31847.0 3161.659912 168368.0 1791.000000 24436.0 3764.540039 343191.0 1052.880005 6235.0 10741.250000 1128091.0 2080.040039 52466.0 1029.920044 2501.0 526.820007 43080.0 5043.959961 67923.0 3151.399902 30086.0 5052.990234 56403.0 3545.389893 94368.0 3551.489990 67833.0 7697.819824 607756.0 1853.650024 32080.0 1672.959961 30120.0 4171.689941 106173.0 295.019989 1741.0 263.679993 11020.0 1804.000000 28842.0 8337.469727 297119.0 4771.169922 316014.0 1845.719971 22472.0 3928.199951 43694.0 1701.880005 116601.0 117.800003 2.0 1363.560059 1911.0 2011.189941 12812.0 6141.029785 161266.0
pivoted_df.columns
Out[55]: ['year', 'Anhui_gdp', 'Anhui_fdi', 'Beijing_gdp', 'Beijing_fdi', 'Chongqing_gdp', 'Chongqing_fdi', 'Fujian_gdp', 'Fujian_fdi', 'Gansu_gdp', 'Gansu_fdi', 'Guangdong_gdp', 'Guangdong_fdi', 'Guangxi_gdp', 'Guangxi_fdi', 'Guizhou_gdp', 'Guizhou_fdi', 'Hainan_gdp', 'Hainan_fdi', 'Hebei_gdp', 'Hebei_fdi', 'Heilongjiang_gdp', 'Heilongjiang_fdi', 'Henan_gdp', 'Henan_fdi', 'Hubei_gdp', 'Hubei_fdi', 'Hunan_gdp', 'Hunan_fdi', 'Jiangsu_gdp', 'Jiangsu_fdi', 'Jiangxi_gdp', 'Jiangxi_fdi', 'Jilin_gdp', 'Jilin_fdi', 'Liaoning_gdp', 'Liaoning_fdi', 'Ningxia_gdp', 'Ningxia_fdi', 'Qinghai_gdp', 'Qinghai_fdi', 'Shaanxi_gdp', 'Shaanxi_fdi', 'Shandong_gdp', 'Shandong_fdi', 'Shanghai_gdp', 'Shanghai_fdi', 'Shanxi_gdp', 'Shanxi_fdi', 'Sichuan_gdp', 'Sichuan_fdi', 'Tianjin_gdp', 'Tianjin_fdi', 'Tibet_gdp', 'Tibet_fdi', 'Xinjiang_gdp', 'Xinjiang_fdi', 'Yunnan_gdp', 'Yunnan_fdi', 'Zhejiang_gdp', 'Zhejiang_fdi']
newColnames = [x.replace("-","_") for x in pivoted_df.columns]
pivoted_df = pivoted_df.toDF(*newColnames)
expression = ""
cnt=0
for column in pivoted_df.columns:
    if column!='year':
        cnt +=1
        expression += f"'{column}' , {column},"
        
expression = f"stack({cnt}, {expression[:-1]}) as (Type,Value)"

Unpivoting RDDs

unpivoted_df = pivoted_df.select('year',F.expr(expression))
unpivoted_df.show()
+----+-------------+------------------+ year| Type| Value| +----+-------------+------------------+ 2003| Anhui_gdp| 3923.110107421875| 2003| Anhui_fdi| 36720.0| 2003| Beijing_gdp| 5007.2099609375| 2003| Beijing_fdi| 219126.0| 2003|Chongqing_gdp| 2555.719970703125| 2003|Chongqing_fdi| 26083.0| 2003| Fujian_gdp| 4983.669921875| 2003| Fujian_fdi| 259903.0| 2003| Gansu_gdp|1399.8299560546875| 2003| Gansu_fdi| 2342.0| 2003|Guangdong_gdp| 15844.6396484375| 2003|Guangdong_fdi| 782294.0| 2003| Guangxi_gdp| 2821.110107421875| 2003| Guangxi_fdi| 41856.0| 2003| Guizhou_gdp|1426.3399658203125| 2003| Guizhou_fdi| 4521.0| 2003| Hainan_gdp| 713.9600219726562| 2003| Hainan_fdi| 42125.0| 2003| Hebei_gdp| 6921.2900390625| 2003| Hebei_fdi| 96405.0| +----+-------------+------------------+ only showing top 20 rows