Window functions and Pivot Tables with Pyspark
• 16 min read
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField,StringType,IntegerType,StructType, DoubleType, FloatType
from pyspark.sql.functions import *
data_schema = [
StructField("_c0", IntegerType(), True)
,StructField("province", StringType(), True)
,StructField("specific", DoubleType(), True)
,StructField("general", DoubleType(), True)
,StructField("year", IntegerType(), True)
,StructField("gdp", FloatType(), True)
,StructField("fdi", FloatType(), True)
,StructField("rnr", DoubleType(), True)
,StructField("rr", FloatType(), True)
,StructField("i", FloatType(), True)
,StructField("fr", IntegerType(), True)
,StructField("reg", StringType(), True)
,StructField("it", IntegerType(), True)
]
final_struc = StructType(fields=data_schema)
file_location = "/FileStore/tables/df_panel_fix.csv"
df = spark.read.format("CSV").schema(final_struc).option("header", True).load(file_location)
#df.printSchema()
df.show()
+---+--------+---------+--------+----+-------+--------+----+---------+---------+-------+-----------+-------+
_c0|province| specific| general|year| gdp| fdi| rnr| rr| i| fr| reg| it|
+---+--------+---------+--------+----+-------+--------+----+---------+---------+-------+-----------+-------+
0| Anhui| 147002.0| null|1996| 2093.3| 50661.0| 0.0| 0.0| 0.0|1128873| East China| 631930|
1| Anhui| 151981.0| null|1997|2347.32| 43443.0| 0.0| 0.0| 0.0|1356287| East China| 657860|
2| Anhui| 174930.0| null|1998|2542.96| 27673.0| 0.0| 0.0| 0.0|1518236| East China| 889463|
3| Anhui| 285324.0| null|1999|2712.34| 26131.0|null| null| null|1646891| East China|1227364|
4| Anhui| 195580.0| 32100.0|2000|2902.09| 31847.0| 0.0| 0.0| 0.0|1601508| East China|1499110|
5| Anhui| 250898.0| null|2001|3246.71| 33672.0| 0.0| 0.0| 0.0|1672445| East China|2165189|
6| Anhui| 434149.0| 66529.0|2002|3519.72| 38375.0| 0.0| 0.0| 0.0|1677840| East China|2404936|
7| Anhui| 619201.0| 52108.0|2003|3923.11| 36720.0| 0.0| 0.0| 0.0|1896479| East China|2815820|
8| Anhui| 898441.0|349699.0|2004| 4759.3| 54669.0| 0.0| 0.0| 0.0| null| East China|3422176|
9| Anhui| 898441.0| null|2005|5350.17| 69000.0| 0.0| 0.0|0.3243243| null| East China|3874846|
10| Anhui|1457872.0|279052.0|2006| 6112.5|139354.0| 0.0| 0.0|0.3243243|3434548| East China|5167300|
11| Anhui|2213991.0|178705.0|2007|7360.92|299892.0| 0.0| 0.0|0.3243243|4468640| East China|7040099|
12| Beijing| 165957.0| null|1996| 1789.2|155290.0|null| null| null| 634562|North China| 508135|
13| Beijing| 165957.0| null|1997|2077.09|159286.0| 0.0| 0.0| 0.6| 634562|North China| 569283|
14| Beijing| 245198.0| null|1998|2377.18|216800.0| 0.0| 0.0| 0.53| 938788|North China| 695528|
15| Beijing| 388083.0| null|1999|2678.82|197525.0| 0.0| 0.0| 0.53| null|North China| 944047|
16| Beijing| 281769.0|188633.0|2000|3161.66|168368.0| 0.0| 0.0| 0.53|1667114|North China| 757990|
17| Beijing| 441923.0| null|2001|3707.96|176818.0| 0.0| 0.0| 0.53|2093925|North China|1194728|
18| Beijing| 558569.0|280277.0|2002| 4315.0|172464.0| 0.0| 0.0| 0.53|2511249|North China|1078754|
19| Beijing| 642581.0|269596.0|2003|5007.21|219126.0| 0.0|0.7948718| 0.0|2823366|North China|1426600|
+---+--------+---------+--------+----+-------+--------+----+---------+---------+-------+-----------+-------+
only showing top 20 rows
df.limit(10).toPandas()
_c0 | province | specific | general | year | gdp | fdi | rnr | rr | i | fr | reg | it | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | Anhui | 147002.0 | NaN | 1996 | 2093.300049 | 50661.0 | 0.0 | 0.0 | 0.000000 | 1128873.0 | East China | 631930 |
1 | 1 | Anhui | 151981.0 | NaN | 1997 | 2347.320068 | 43443.0 | 0.0 | 0.0 | 0.000000 | 1356287.0 | East China | 657860 |
2 | 2 | Anhui | 174930.0 | NaN | 1998 | 2542.959961 | 27673.0 | 0.0 | 0.0 | 0.000000 | 1518236.0 | East China | 889463 |
3 | 3 | Anhui | 285324.0 | NaN | 1999 | 2712.340088 | 26131.0 | NaN | NaN | NaN | 1646891.0 | East China | 1227364 |
4 | 4 | Anhui | 195580.0 | 32100.0 | 2000 | 2902.090088 | 31847.0 | 0.0 | 0.0 | 0.000000 | 1601508.0 | East China | 1499110 |
5 | 5 | Anhui | 250898.0 | NaN | 2001 | 3246.709961 | 33672.0 | 0.0 | 0.0 | 0.000000 | 1672445.0 | East China | 2165189 |
6 | 6 | Anhui | 434149.0 | 66529.0 | 2002 | 3519.719971 | 38375.0 | 0.0 | 0.0 | 0.000000 | 1677840.0 | East China | 2404936 |
7 | 7 | Anhui | 619201.0 | 52108.0 | 2003 | 3923.110107 | 36720.0 | 0.0 | 0.0 | 0.000000 | 1896479.0 | East China | 2815820 |
8 | 8 | Anhui | 898441.0 | 349699.0 | 2004 | 4759.299805 | 54669.0 | 0.0 | 0.0 | 0.000000 | NaN | East China | 3422176 |
9 | 9 | Anhui | 898441.0 | NaN | 2005 | 5350.169922 | 69000.0 | 0.0 | 0.0 | 0.324324 | NaN | East China | 3874846 |
df = df.withColumnRenamed("reg","region")
df.limit(10).toPandas()
_c0 | province | specific | general | year | gdp | fdi | rnr | rr | i | fr | region | it | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | Anhui | 147002.0 | NaN | 1996 | 2093.300049 | 50661.0 | 0.0 | 0.0 | 0.000000 | 1128873.0 | East China | 631930 |
1 | 1 | Anhui | 151981.0 | NaN | 1997 | 2347.320068 | 43443.0 | 0.0 | 0.0 | 0.000000 | 1356287.0 | East China | 657860 |
2 | 2 | Anhui | 174930.0 | NaN | 1998 | 2542.959961 | 27673.0 | 0.0 | 0.0 | 0.000000 | 1518236.0 | East China | 889463 |
3 | 3 | Anhui | 285324.0 | NaN | 1999 | 2712.340088 | 26131.0 | NaN | NaN | NaN | 1646891.0 | East China | 1227364 |
4 | 4 | Anhui | 195580.0 | 32100.0 | 2000 | 2902.090088 | 31847.0 | 0.0 | 0.0 | 0.000000 | 1601508.0 | East China | 1499110 |
5 | 5 | Anhui | 250898.0 | NaN | 2001 | 3246.709961 | 33672.0 | 0.0 | 0.0 | 0.000000 | 1672445.0 | East China | 2165189 |
6 | 6 | Anhui | 434149.0 | 66529.0 | 2002 | 3519.719971 | 38375.0 | 0.0 | 0.0 | 0.000000 | 1677840.0 | East China | 2404936 |
7 | 7 | Anhui | 619201.0 | 52108.0 | 2003 | 3923.110107 | 36720.0 | 0.0 | 0.0 | 0.000000 | 1896479.0 | East China | 2815820 |
8 | 8 | Anhui | 898441.0 | 349699.0 | 2004 | 4759.299805 | 54669.0 | 0.0 | 0.0 | 0.000000 | NaN | East China | 3422176 |
9 | 9 | Anhui | 898441.0 | NaN | 2005 | 5350.169922 | 69000.0 | 0.0 | 0.0 | 0.324324 | NaN | East China | 3874846 |
# df = df.toDF(*['year', 'region', 'province', 'gdp', 'fdi', 'specific', 'general', 'it', 'fr', 'rnr', 'rr', 'i', '_c0', 'specific_classification', 'provinceIndex', 'regionIndex'])
df = df.select('year','region','province','gdp', 'fdi')
df.sort("gdp").show()
+----+---------------+--------+------+-------+
year| region|province| gdp| fdi|
+----+---------------+--------+------+-------+
1996|Southwest China| Tibet| 64.98| 679.0|
1997|Southwest China| Tibet| 77.24| 63.0|
1998|Southwest China| Tibet| 91.5| 481.0|
1999|Southwest China| Tibet|105.98| 196.0|
2000|Southwest China| Tibet| 117.8| 2.0|
2001|Southwest China| Tibet|139.16| 106.0|
2002|Southwest China| Tibet|162.04| 293.0|
1996|Northwest China| Qinghai|184.17| 576.0|
2003|Southwest China| Tibet|185.09| 467.0|
1997|Northwest China| Qinghai|202.79| 247.0|
1996|Northwest China| Ningxia| 202.9| 2826.0|
2004|Southwest China| Tibet|220.34| 2699.0|
1998|Northwest China| Qinghai|220.92| 1010.0|
1997|Northwest China| Ningxia|224.59| 671.0|
1999|Northwest China| Qinghai|239.38| 459.0|
1998|Northwest China| Ningxia|245.44| 1856.0|
2005|Southwest China| Tibet| 248.8| 1151.0|
2000|Northwest China| Qinghai|263.68|11020.0|
1999|Northwest China| Ningxia|264.58| 5134.0|
2006|Southwest China| Tibet|290.76| 1522.0|
+----+---------------+--------+------+-------+
only showing top 20 rows
from pyspark.sql import functions as F
df.sort(F.desc("gdp")).show()
+----+-------------------+---------+--------+---------+
year| region| province| gdp| fdi|
+----+-------------------+---------+--------+---------+
2007|South Central China|Guangdong|31777.01|1712603.0|
2006|South Central China|Guangdong|26587.76|1451065.0|
2007| East China| Shandong|25776.91|1101159.0|
2005|South Central China|Guangdong|22557.37|1236400.0|
2006| East China| Shandong|21900.19|1000069.0|
2007| East China| Jiangsu|21742.05|1743140.0|
2004|South Central China|Guangdong|18864.62|1001158.0|
2007| East China| Zhejiang|18753.73|1036576.0|
2006| East China| Jiangsu|18598.69|1318339.0|
2005| East China| Shandong|18366.87| 897000.0|
2003|South Central China|Guangdong|15844.64| 782294.0|
2006| East China| Zhejiang|15718.47| 888935.0|
2004| East China| Shandong|15021.84| 870064.0|
2007|South Central China| Henan|15012.46| 306162.0|
2005| East China| Jiangsu| 15003.6|1213800.0|
2007| North China| Hebei|13607.32| 241621.0|
2002|South Central China|Guangdong|13502.42|1133400.0|
2005| East China| Zhejiang|13417.68| 772000.0|
2007| East China| Shanghai|12494.01| 792000.0|
2004| East China| Jiangsu|12442.87|1056365.0|
+----+-------------------+---------+--------+---------+
only showing top 20 rows
from pyspark.sql.types import IntegerType, StringType, DoubleType
df = df.withColumn('gdp', F.col('gdp').cast(DoubleType()))
df = df.withColumn('province', F.col('province').cast(StringType()))
df.filter((df.gdp>10000) & (df.region=='East China')).show()
+----+----------+--------+----------------+---------+
year| region|province| gdp| fdi|
+----+----------+--------+----------------+---------+
2003|East China| Jiangsu| 10606.849609375|1018960.0|
2004|East China| Jiangsu|12442.8701171875|1056365.0|
2005|East China| Jiangsu| 15003.599609375|1213800.0|
2006|East China| Jiangsu| 18598.689453125|1318339.0|
2007|East China| Jiangsu| 21742.05078125|1743140.0|
2002|East China|Shandong| 10275.5| 473404.0|
2003|East China|Shandong| 12078.150390625| 601617.0|
2004|East China|Shandong| 15021.83984375| 870064.0|
2005|East China|Shandong| 18366.869140625| 897000.0|
2006|East China|Shandong| 21900.189453125|1000069.0|
2007|East China|Shandong| 25776.91015625|1101159.0|
2006|East China|Shanghai| 10572.240234375| 710700.0|
2007|East China|Shanghai| 12494.009765625| 792000.0|
2004|East China|Zhejiang|11648.7001953125| 668128.0|
2005|East China|Zhejiang| 13417.6796875| 772000.0|
2006|East China|Zhejiang|15718.4697265625| 888935.0|
2007|East China|Zhejiang| 18753.73046875|1036576.0|
+----+----------+--------+----------------+---------+
from pyspark.sql import functions as F
df.groupBy(["region","province"]).agg(F.sum("gdp") ,F.max("gdp")).show()
+-------------------+---------+------------------+------------------+
region| province| sum(gdp)| max(gdp)|
+-------------------+---------+------------------+------------------+
South Central China| Hunan| 57190.69970703125| 9439.599609375|
North China| Tianjin|30343.979858398438| 5252.759765625|
Northwest China| Xinjiang| 21946.75994873047| 3523.159912109375|
North China| Beijing|56081.439208984375| 9846.8095703125|
South Central China|Guangdong| 184305.376953125| 31777.009765625|
South Central China| Henan| 86507.60034179688| 15012.4599609375|
East China| Jiangsu|129142.15966796875| 21742.05078125|
Northwest China| Gansu| 16773.99005126953| 2703.97998046875|
Southwest China| Guizhou|17064.130249023438| 2884.110107421875|
Southwest China| Sichuan|64533.479736328125| 10562.3896484375|
South Central China| Hainan| 8240.570068359375|1254.1700439453125|
East China| Shandong| 147888.0283203125| 25776.91015625|
Southwest China|Chongqing|29732.549926757812| 4676.1298828125|
Northwest China| Shaanxi|31896.409790039062| 5757.2900390625|
East China| Shanghai| 77189.4501953125| 12494.009765625|
Southwest China| Tibet| 2045.120002746582|341.42999267578125|
North China| Hebei| 83241.8994140625| 13607.3203125|
Northeast China| Jilin|27298.250366210938| 4275.1201171875|
East China| Zhejiang|109657.81884765625| 18753.73046875|
North China| Shanxi| 33806.52990722656| 6024.4501953125|
+-------------------+---------+------------------+------------------+
only showing top 20 rows
df.groupBy(["region","province"]).agg(F.sum("gdp").alias("SumGDP"),F.max("gdp").alias("MaxGDP")).show()
+-------------------+---------+------------------+------------------+
region| province| GDP| MaxGDP|
+-------------------+---------+------------------+------------------+
South Central China| Hunan| 57190.69970703125| 9439.599609375|
North China| Tianjin|30343.979858398438| 5252.759765625|
Northwest China| Xinjiang| 21946.75994873047| 3523.159912109375|
North China| Beijing|56081.439208984375| 9846.8095703125|
South Central China|Guangdong| 184305.376953125| 31777.009765625|
South Central China| Henan| 86507.60034179688| 15012.4599609375|
East China| Jiangsu|129142.15966796875| 21742.05078125|
Northwest China| Gansu| 16773.99005126953| 2703.97998046875|
Southwest China| Guizhou|17064.130249023438| 2884.110107421875|
Southwest China| Sichuan|64533.479736328125| 10562.3896484375|
South Central China| Hainan| 8240.570068359375|1254.1700439453125|
East China| Shandong| 147888.0283203125| 25776.91015625|
Southwest China|Chongqing|29732.549926757812| 4676.1298828125|
Northwest China| Shaanxi|31896.409790039062| 5757.2900390625|
East China| Shanghai| 77189.4501953125| 12494.009765625|
Southwest China| Tibet| 2045.120002746582|341.42999267578125|
North China| Hebei| 83241.8994140625| 13607.3203125|
Northeast China| Jilin|27298.250366210938| 4275.1201171875|
East China| Zhejiang|109657.81884765625| 18753.73046875|
North China| Shanxi| 33806.52990722656| 6024.4501953125|
+-------------------+---------+------------------+------------------+
only showing top 20 rows
df.groupBy(["region","province"]).agg(
F.sum("gdp").alias("SumGDP"),\
F.max("gdp").alias("MaxGDP")\
).show()
+-------------------+---------+------------------+------------------+
region| province| SumGDP| MaxGDP|
+-------------------+---------+------------------+------------------+
South Central China| Hunan| 57190.69970703125| 9439.599609375|
North China| Tianjin|30343.979858398438| 5252.759765625|
Northwest China| Xinjiang| 21946.75994873047| 3523.159912109375|
North China| Beijing|56081.439208984375| 9846.8095703125|
South Central China|Guangdong| 184305.376953125| 31777.009765625|
South Central China| Henan| 86507.60034179688| 15012.4599609375|
East China| Jiangsu|129142.15966796875| 21742.05078125|
Northwest China| Gansu| 16773.99005126953| 2703.97998046875|
Southwest China| Guizhou|17064.130249023438| 2884.110107421875|
Southwest China| Sichuan|64533.479736328125| 10562.3896484375|
South Central China| Hainan| 8240.570068359375|1254.1700439453125|
East China| Shandong| 147888.0283203125| 25776.91015625|
Southwest China|Chongqing|29732.549926757812| 4676.1298828125|
Northwest China| Shaanxi|31896.409790039062| 5757.2900390625|
East China| Shanghai| 77189.4501953125| 12494.009765625|
Southwest China| Tibet| 2045.120002746582|341.42999267578125|
North China| Hebei| 83241.8994140625| 13607.3203125|
Northeast China| Jilin|27298.250366210938| 4275.1201171875|
East China| Zhejiang|109657.81884765625| 18753.73046875|
North China| Shanxi| 33806.52990722656| 6024.4501953125|
+-------------------+---------+------------------+------------------+
only showing top 20 rows
df.limit(10).toPandas()
year | region | province | gdp | fdi | |
---|---|---|---|---|---|
0 | 1996 | East China | Anhui | 2093.300049 | 50661.0 |
1 | 1997 | East China | Anhui | 2347.320068 | 43443.0 |
2 | 1998 | East China | Anhui | 2542.959961 | 27673.0 |
3 | 1999 | East China | Anhui | 2712.340088 | 26131.0 |
4 | 2000 | East China | Anhui | 2902.090088 | 31847.0 |
5 | 2001 | East China | Anhui | 3246.709961 | 33672.0 |
6 | 2002 | East China | Anhui | 3519.719971 | 38375.0 |
7 | 2003 | East China | Anhui | 3923.110107 | 36720.0 |
8 | 2004 | East China | Anhui | 4759.299805 | 54669.0 |
9 | 2005 | East China | Anhui | 5350.169922 | 69000.0 |
df = df.withColumn("Exp_GDP", F.exp("gdp"))
df.show()
+----+-----------+--------+-----------------+--------+--------+
year| region|province| gdp| fdi| Exp_GDP|
+----+-----------+--------+-----------------+--------+--------+
1996| East China| Anhui|2093.300048828125| 50661.0|Infinity|
1997| East China| Anhui|2347.320068359375| 43443.0|Infinity|
1998| East China| Anhui| 2542.9599609375| 27673.0|Infinity|
1999| East China| Anhui|2712.340087890625| 26131.0|Infinity|
2000| East China| Anhui|2902.090087890625| 31847.0|Infinity|
2001| East China| Anhui| 3246.7099609375| 33672.0|Infinity|
2002| East China| Anhui|3519.719970703125| 38375.0|Infinity|
2003| East China| Anhui|3923.110107421875| 36720.0|Infinity|
2004| East China| Anhui| 4759.2998046875| 54669.0|Infinity|
2005| East China| Anhui| 5350.169921875| 69000.0|Infinity|
2006| East China| Anhui| 6112.5|139354.0|Infinity|
2007| East China| Anhui| 7360.919921875|299892.0|Infinity|
1996|North China| Beijing|1789.199951171875|155290.0|Infinity|
1997|North China| Beijing|2077.090087890625|159286.0|Infinity|
1998|North China| Beijing|2377.179931640625|216800.0|Infinity|
1999|North China| Beijing|2678.820068359375|197525.0|Infinity|
2000|North China| Beijing|3161.659912109375|168368.0|Infinity|
2001|North China| Beijing| 3707.9599609375|176818.0|Infinity|
2002|North China| Beijing| 4315.0|172464.0|Infinity|
2003|North China| Beijing| 5007.2099609375|219126.0|Infinity|
+----+-----------+--------+-----------------+--------+--------+
only showing top 20 rows
Note: Window functions
# Window functions
from pyspark.sql.window import Window
windowSpec = Window().partitionBy(['province']).orderBy(F.desc('gdp'))
df.withColumn("rank",F.rank().over(windowSpec)).show()
+----+-------------------+---------+-----------------+---------+--------+----+
year| region| province| gdp| fdi| Exp_GDP|rank|
+----+-------------------+---------+-----------------+---------+--------+----+
2007|South Central China|Guangdong| 31777.009765625|1712603.0|Infinity| 1|
2006|South Central China|Guangdong| 26587.759765625|1451065.0|Infinity| 2|
2005|South Central China|Guangdong| 22557.369140625|1236400.0|Infinity| 3|
2004|South Central China|Guangdong| 18864.619140625|1001158.0|Infinity| 4|
2003|South Central China|Guangdong| 15844.6396484375| 782294.0|Infinity| 5|
2002|South Central China|Guangdong| 13502.419921875|1133400.0|Infinity| 6|
2001|South Central China|Guangdong| 12039.25|1193203.0|Infinity| 7|
2000|South Central China|Guangdong| 10741.25|1128091.0|Infinity| 8|
1999|South Central China|Guangdong| 9250.6796875|1165750.0|Infinity| 9|
1998|South Central China|Guangdong| 8530.8798828125|1201994.0|Infinity| 10|
1997|South Central China|Guangdong| 7774.52978515625|1171083.0|Infinity| 11|
1996|South Central China|Guangdong| 6834.97021484375|1162362.0|Infinity| 12|
2007|South Central China| Hunan| 9439.599609375| 327051.0|Infinity| 1|
2006|South Central China| Hunan| 7688.669921875| 259335.0|Infinity| 2|
2005|South Central China| Hunan| 6596.10009765625| 207200.0|Infinity| 3|
2004|South Central China| Hunan| 5641.93994140625| 141800.0|Infinity| 4|
2003|South Central China| Hunan| 4659.990234375| 101835.0|Infinity| 5|
2002|South Central China| Hunan| 4151.5400390625| 90022.0|Infinity| 6|
2001|South Central China| Hunan| 3831.89990234375| 81011.0|Infinity| 7|
2000|South Central China| Hunan|3551.489990234375| 67833.0|Infinity| 8|
+----+-------------------+---------+-----------------+---------+--------+----+
only showing top 20 rows
from pyspark.sql.window import Window
windowSpec = Window().partitionBy(['province']).orderBy('year')
dfWithLag = df.withColumn("lag_7",F.lag("gdp", 7).over(windowSpec))
df.filter(df.year>'2000').show()
+----+---------------+---------+------------------+--------+--------+
year| region| province| gdp| fdi| Exp_GDP|
+----+---------------+---------+------------------+--------+--------+
2001| East China| Anhui| 3246.7099609375| 33672.0|Infinity|
2002| East China| Anhui| 3519.719970703125| 38375.0|Infinity|
2003| East China| Anhui| 3923.110107421875| 36720.0|Infinity|
2004| East China| Anhui| 4759.2998046875| 54669.0|Infinity|
2005| East China| Anhui| 5350.169921875| 69000.0|Infinity|
2006| East China| Anhui| 6112.5|139354.0|Infinity|
2007| East China| Anhui| 7360.919921875|299892.0|Infinity|
2001| North China| Beijing| 3707.9599609375|176818.0|Infinity|
2002| North China| Beijing| 4315.0|172464.0|Infinity|
2003| North China| Beijing| 5007.2099609375|219126.0|Infinity|
2004| North China| Beijing| 6033.2099609375|308354.0|Infinity|
2005| North China| Beijing| 6969.52001953125|352638.0|Infinity|
2006| North China| Beijing| 8117.77978515625|455191.0|Infinity|
2007| North China| Beijing| 9846.8095703125|506572.0|Infinity|
2001|Southwest China|Chongqing|1976.8599853515625| 25649.0|Infinity|
2002|Southwest China|Chongqing| 2232.860107421875| 19576.0|Infinity|
2003|Southwest China|Chongqing| 2555.719970703125| 26083.0|Infinity|
2004|Southwest China|Chongqing| 3034.580078125| 40508.0|Infinity|
2005|Southwest China|Chongqing| 3467.719970703125| 51600.0|Infinity|
2006|Southwest China|Chongqing| 3907.22998046875| 69595.0|Infinity|
+----+---------------+---------+------------------+--------+--------+
only showing top 20 rows
from pyspark.sql.window import Window
windowSpec = Window().partitionBy(['province']).orderBy('year').rowsBetween(-6,0)
dfWithRoll = df.withColumn("roll_7_confirmed",F.mean("gdp").over(windowSpec))
dfWithRoll.filter(dfWithLag.year>'2001').show()
+----+-------------------+---------+------------------+---------+--------------------+------------------+
year| region| province| gdp| fdi| Exp_GDP| roll_7_confirmed|
+----+-------------------+---------+------------------+---------+--------------------+------------------+
2002|South Central China|Guangdong| 13502.419921875|1133400.0| Infinity| 9810.56849888393|
2003|South Central China|Guangdong| 15844.6396484375| 782294.0| Infinity|11097.664132254464|
2004|South Central China|Guangdong| 18864.619140625|1001158.0| Infinity|12681.962611607143|
2005|South Central China|Guangdong| 22557.369140625|1236400.0| Infinity|14685.746791294643|
2006|South Central China|Guangdong| 26587.759765625|1451065.0| Infinity|17162.472516741072|
2007|South Central China|Guangdong| 31777.009765625|1712603.0| Infinity| 20167.5810546875|
2002|South Central China| Hunan| 4151.5400390625| 90022.0| Infinity|3309.1999860491073|
2003|South Central China| Hunan| 4659.990234375| 101835.0| Infinity| 3612.037179129464|
2004|South Central China| Hunan| 5641.93994140625| 141800.0| Infinity|4010.9900251116073|
2005|South Central China| Hunan| 6596.10009765625| 207200.0| Infinity| 4521.07146344866|
2006|South Central China| Hunan| 7688.669921875| 259335.0| Infinity| 5160.232875279018|
2007|South Central China| Hunan| 9439.599609375| 327051.0| Infinity| 6001.391392299107|
2002| North China| Shanxi| 2324.800048828125| 21164.0| Infinity|1749.4771379743304|
2003| North China| Shanxi| 2855.22998046875| 21361.0| Infinity| 1972.779994419643|
2004| North China| Shanxi| 3571.3701171875| 62184.0| Infinity| 2272.118582589286|
2005| North China| Shanxi| 4230.52978515625| 27516.0| Infinity| 2646.325701032366|
2006| North China| Shanxi| 4878.60986328125| 47199.0| Infinity|3105.1128278459823|
2007| North China| Shanxi| 6024.4501953125| 134283.0| Infinity| 3702.074288504464|
2002| Southwest China| Tibet| 162.0399932861328| 293.0|2.360885537826244E70|108.38571493966239|
2003| Southwest China| Tibet|185.08999633789062| 467.0|2.418600091901801E80|125.54428536551339|
+----+-------------------+---------+------------------+---------+--------------------+------------------+
only showing top 20 rows
from pyspark.sql.window import Window
windowSpec = Window().partitionBy(['province']).orderBy('year').rowsBetween(Window.unboundedPreceding,Window.currentRow)
dfWithRoll = df.withColumn("cumulative_gdp",F.sum("gdp").over(windowSpec))
dfWithRoll.filter(dfWithLag.year>'1999').show()
+----+-------------------+---------+-----------------+---------+--------+------------------+
year| region| province| gdp| fdi| Exp_GDP| cumulative_gdp|
+----+-------------------+---------+-----------------+---------+--------+------------------+
2000|South Central China|Guangdong| 10741.25|1128091.0|Infinity| 43132.3095703125|
2001|South Central China|Guangdong| 12039.25|1193203.0|Infinity| 55171.5595703125|
2002|South Central China|Guangdong| 13502.419921875|1133400.0|Infinity| 68673.9794921875|
2003|South Central China|Guangdong| 15844.6396484375| 782294.0|Infinity| 84518.619140625|
2004|South Central China|Guangdong| 18864.619140625|1001158.0|Infinity| 103383.23828125|
2005|South Central China|Guangdong| 22557.369140625|1236400.0|Infinity| 125940.607421875|
2006|South Central China|Guangdong| 26587.759765625|1451065.0|Infinity| 152528.3671875|
2007|South Central China|Guangdong| 31777.009765625|1712603.0|Infinity| 184305.376953125|
2000|South Central China| Hunan|3551.489990234375| 67833.0|Infinity| 15180.9599609375|
2001|South Central China| Hunan| 3831.89990234375| 81011.0|Infinity| 19012.85986328125|
2002|South Central China| Hunan| 4151.5400390625| 90022.0|Infinity| 23164.39990234375|
2003|South Central China| Hunan| 4659.990234375| 101835.0|Infinity| 27824.39013671875|
2004|South Central China| Hunan| 5641.93994140625| 141800.0|Infinity| 33466.330078125|
2005|South Central China| Hunan| 6596.10009765625| 207200.0|Infinity| 40062.43017578125|
2006|South Central China| Hunan| 7688.669921875| 259335.0|Infinity| 47751.10009765625|
2007|South Central China| Hunan| 9439.599609375| 327051.0|Infinity| 57190.69970703125|
2000| North China| Shanxi|1845.719970703125| 22472.0|Infinity|7892.0098876953125|
2001| North China| Shanxi|2029.530029296875| 23393.0|Infinity| 9921.539916992188|
2002| North China| Shanxi|2324.800048828125| 21164.0|Infinity|12246.339965820312|
2003| North China| Shanxi| 2855.22998046875| 21361.0|Infinity|15101.569946289062|
+----+-------------------+---------+-----------------+---------+--------+------------------+
only showing top 20 rows
Note: Pivot Dataframes
pivoted_df = df.groupBy('year').pivot('province') \
.agg(F.sum('gdp').alias('gdp') , F.sum('fdi').alias('fdi'))
pivoted_df.limit(10).toPandas()
year | Anhui_gdp | Anhui_fdi | Beijing_gdp | Beijing_fdi | Chongqing_gdp | Chongqing_fdi | Fujian_gdp | Fujian_fdi | Gansu_gdp | Gansu_fdi | Guangdong_gdp | Guangdong_fdi | Guangxi_gdp | Guangxi_fdi | Guizhou_gdp | Guizhou_fdi | Hainan_gdp | Hainan_fdi | Hebei_gdp | Hebei_fdi | Heilongjiang_gdp | Heilongjiang_fdi | Henan_gdp | Henan_fdi | Hubei_gdp | Hubei_fdi | Hunan_gdp | Hunan_fdi | Jiangsu_gdp | Jiangsu_fdi | Jiangxi_gdp | Jiangxi_fdi | Jilin_gdp | Jilin_fdi | Liaoning_gdp | Liaoning_fdi | Ningxia_gdp | Ningxia_fdi | Qinghai_gdp | Qinghai_fdi | Shaanxi_gdp | Shaanxi_fdi | Shandong_gdp | Shandong_fdi | Shanghai_gdp | Shanghai_fdi | Shanxi_gdp | Shanxi_fdi | Sichuan_gdp | Sichuan_fdi | Tianjin_gdp | Tianjin_fdi | Tibet_gdp | Tibet_fdi | Xinjiang_gdp | Xinjiang_fdi | Yunnan_gdp | Yunnan_fdi | Zhejiang_gdp | Zhejiang_fdi | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2003 | 3923.110107 | 36720.0 | 5007.209961 | 219126.0 | 2555.719971 | 26083.0 | 4983.669922 | 259903.0 | 1399.829956 | 2342.0 | 15844.639648 | 782294.0 | 2821.110107 | 41856.0 | 1426.339966 | 4521.0 | 713.960022 | 42125.0 | 6921.290039 | 96405.0 | 4057.399902 | 32180.0 | 6867.700195 | 53903.0 | 4757.450195 | 156886.0 | 4659.990234 | 101835.0 | 10606.849609 | 1018960.0 | 2450.479980 | 108197.0 | 2348.540039 | 24468.0 | 5458.220215 | 341168.0 | 445.359985 | 1743.0 | 390.200012 | 2522.0 | 2587.719971 | 33190.0 | 12078.150391 | 601617.0 | 6694.229980 | 546849.0 | 2855.229980 | 21361.0 | 5333.089844 | 41231.0 | 2578.030029 | 153473.0 | 185.089996 | 467.0 | 1886.349976 | 1534.0 | 2556.020020 | 8384.0 | 9705.019531 | 498055.0 |
1 | 2007 | 7360.919922 | 299892.0 | 9846.809570 | 506572.0 | 4676.129883 | 108534.0 | 9248.530273 | 406058.0 | 2703.979980 | 11802.0 | 31777.009766 | 1712603.0 | 5823.410156 | 68396.0 | 2884.110107 | 12651.0 | 1254.170044 | 112001.0 | 13607.320312 | 241621.0 | 7104.000000 | 208508.0 | 15012.459961 | 306162.0 | 9333.400391 | 276622.0 | 9439.599609 | 327051.0 | 21742.050781 | 1743140.0 | 4820.529785 | 280657.0 | 4275.120117 | 76064.0 | 9304.519531 | 598554.0 | 919.109985 | 5047.0 | 797.349976 | 31000.0 | 5757.290039 | 119516.0 | 25776.910156 | 1101159.0 | 12494.009766 | 792000.0 | 6024.450195 | 134283.0 | 10562.389648 | 149322.0 | 5252.759766 | 527776.0 | 341.429993 | 2418.0 | 3523.159912 | 12484.0 | 4772.520020 | 39453.0 | 18753.730469 | 1036576.0 |
2 | 2006 | 6112.500000 | 139354.0 | 8117.779785 | 455191.0 | 3907.229980 | 69595.0 | 7583.850098 | 322047.0 | 2277.350098 | 2954.0 | 26587.759766 | 1451065.0 | 4746.160156 | 44740.0 | 2338.979980 | 9384.0 | 1065.670044 | 74878.0 | 11467.599609 | 201434.0 | 6211.799805 | 170801.0 | 12362.790039 | 184526.0 | 7617.470215 | 244853.0 | 7688.669922 | 259335.0 | 18598.689453 | 1318339.0 | 4056.760010 | 242000.0 | 3620.270020 | 66100.0 | 8047.259766 | 359000.0 | 725.900024 | 3718.0 | 648.500000 | 27500.0 | 4743.609863 | 92489.0 | 21900.189453 | 1000069.0 | 10572.240234 | 710700.0 | 4878.609863 | 47199.0 | 8690.240234 | 120819.0 | 4462.740234 | 413077.0 | 290.760010 | 1522.0 | 3045.260010 | 10366.0 | 3988.139893 | 30234.0 | 15718.469727 | 888935.0 |
3 | 1997 | 2347.320068 | 43443.0 | 2077.090088 | 159286.0 | 1509.750000 | 38675.0 | 2870.899902 | 419666.0 | 793.570007 | 4144.0 | 7774.529785 | 1171083.0 | 1817.250000 | 87986.0 | 805.789978 | 4977.0 | 411.160004 | 70554.0 | 3953.780029 | 110064.0 | 2667.500000 | 73485.0 | 4041.090088 | 69204.0 | 2856.469971 | 79019.0 | 2849.270020 | 91702.0 | 6004.209961 | 507208.0 | 1409.739990 | 30068.0 | 1346.790039 | 45155.0 | 3157.689941 | 167142.0 | 224.589996 | 671.0 | 202.789993 | 247.0 | 1363.599976 | 62816.0 | 6537.069824 | 249294.0 | 3438.790039 | 422536.0 | 1476.000000 | 26592.0 | 3241.469971 | 24846.0 | 1264.630005 | 251135.0 | 77.239998 | 63.0 | 1039.849976 | 2472.0 | 1676.170044 | 16566.0 | 4686.109863 | 150345.0 |
4 | 2004 | 4759.299805 | 54669.0 | 6033.209961 | 308354.0 | 3034.580078 | 40508.0 | 5763.350098 | 474801.0 | 1688.489990 | 3539.0 | 18864.619141 | 1001158.0 | 3433.500000 | 29579.0 | 1677.800049 | 6533.0 | 819.659973 | 64343.0 | 8477.629883 | 162341.0 | 4750.600098 | 123639.0 | 8553.790039 | 87367.0 | 5633.240234 | 207126.0 | 5641.939941 | 141800.0 | 12442.870117 | 1056365.0 | 2807.409912 | 161202.0 | 2662.080078 | 19059.0 | 6002.540039 | 282410.0 | 537.109985 | 6689.0 | 466.100006 | 22500.0 | 3175.580078 | 52664.0 | 15021.839844 | 870064.0 | 8072.830078 | 654100.0 | 3571.370117 | 62184.0 | 6379.629883 | 70129.0 | 3110.969971 | 247243.0 | 220.339996 | 2699.0 | 2209.090088 | 4586.0 | 3081.909912 | 14200.0 | 11648.700195 | 668128.0 |
5 | 1996 | 2093.300049 | 50661.0 | 1789.199951 | 155290.0 | 1315.119995 | 21878.0 | 2484.250000 | 407876.0 | 722.520020 | 9002.0 | 6834.970215 | 1162362.0 | 1697.900024 | 66618.0 | 723.179993 | 3138.0 | 389.679993 | 78960.0 | 3452.969971 | 123652.0 | 2370.500000 | 54841.0 | 3634.689941 | 52566.0 | 2499.770020 | 68878.0 | 2540.129883 | 70344.0 | 5155.250000 | 478058.0 | 1169.729980 | 28818.0 | 1137.229980 | 39876.0 | 2793.370117 | 140405.0 | 202.899994 | 2826.0 | 184.169998 | 576.0 | 1215.839966 | 33008.0 | 5883.799805 | 259041.0 | 2957.550049 | 471578.0 | 1292.109985 | 13802.0 | 2871.649902 | 22519.0 | 1121.930054 | 200587.0 | 64.980003 | 679.0 | 900.929993 | 6639.0 | 1517.689941 | 18000.0 | 4188.529785 | 152021.0 |
6 | 1998 | 2542.959961 | 27673.0 | 2377.179932 | 216800.0 | 1602.380005 | 43107.0 | 3159.909912 | 421211.0 | 887.669983 | 3864.0 | 8530.879883 | 1201994.0 | 1911.300049 | 88613.0 | 858.390015 | 4535.0 | 442.130005 | 71715.0 | 4256.009766 | 142868.0 | 2774.399902 | 52639.0 | 4308.240234 | 61654.0 | 3114.020020 | 97294.0 | 3025.530029 | 81816.0 | 6680.339844 | 543511.0 | 1605.770020 | 47768.0 | 1464.339966 | 40227.0 | 3582.459961 | 220470.0 | 245.440002 | 1856.0 | 220.919998 | 1010.0 | 1458.400024 | 30010.0 | 7021.350098 | 220274.0 | 3801.090088 | 360150.0 | 1611.079956 | 24451.0 | 3474.090088 | 37248.0 | 1374.599976 | 211361.0 | 91.500000 | 481.0 | 1106.949951 | 2167.0 | 1831.329956 | 14568.0 | 5052.620117 | 131802.0 |
7 | 2001 | 3246.709961 | 33672.0 | 3707.959961 | 176818.0 | 1976.859985 | 25649.0 | 4072.850098 | 391804.0 | 1125.369995 | 7439.0 | 12039.250000 | 1193203.0 | 2279.340088 | 38416.0 | 1133.270020 | 2829.0 | 579.169983 | 46691.0 | 5516.759766 | 66989.0 | 3390.100098 | 34114.0 | 5533.009766 | 45729.0 | 3880.530029 | 118860.0 | 3831.899902 | 81011.0 | 8553.690430 | 642550.0 | 2003.069946 | 22724.0 | 1951.510010 | 33701.0 | 4669.060059 | 204446.0 | 337.440002 | 1680.0 | 300.130005 | 3649.0 | 2010.619995 | 35174.0 | 9195.040039 | 352093.0 | 5210.120117 | 429159.0 | 2029.530029 | 23393.0 | 4293.490234 | 58188.0 | 1919.089966 | 213348.0 | 139.160004 | 106.0 | 1491.599976 | 2035.0 | 2138.310059 | 6457.0 | 6898.339844 | 221162.0 |
8 | 2005 | 5350.169922 | 69000.0 | 6969.520020 | 352638.0 | 3467.719971 | 51600.0 | 6554.689941 | 260800.0 | 1933.979980 | 2000.0 | 22557.369141 | 1236400.0 | 3984.100098 | 37866.0 | 2005.420044 | 10768.0 | 918.750000 | 68400.0 | 10012.110352 | 191000.0 | 5513.700195 | 145000.0 | 10587.419922 | 123000.0 | 6590.189941 | 218500.0 | 6596.100098 | 207200.0 | 15003.599609 | 1213800.0 | 3456.699951 | 205238.0 | 3122.010010 | 45266.0 | 6672.000000 | 540679.0 | 612.609985 | 14100.0 | 543.320007 | 26600.0 | 3933.719971 | 62800.0 | 18366.869141 | 897000.0 | 9247.660156 | 685000.0 | 4230.529785 | 27516.0 | 7385.100098 | 88686.0 | 3905.639893 | 332885.0 | 248.800003 | 1151.0 | 2604.189941 | 4700.0 | 3462.729980 | 17352.0 | 13417.679688 | 772000.0 |
9 | 2000 | 2902.090088 | 31847.0 | 3161.659912 | 168368.0 | 1791.000000 | 24436.0 | 3764.540039 | 343191.0 | 1052.880005 | 6235.0 | 10741.250000 | 1128091.0 | 2080.040039 | 52466.0 | 1029.920044 | 2501.0 | 526.820007 | 43080.0 | 5043.959961 | 67923.0 | 3151.399902 | 30086.0 | 5052.990234 | 56403.0 | 3545.389893 | 94368.0 | 3551.489990 | 67833.0 | 7697.819824 | 607756.0 | 1853.650024 | 32080.0 | 1672.959961 | 30120.0 | 4171.689941 | 106173.0 | 295.019989 | 1741.0 | 263.679993 | 11020.0 | 1804.000000 | 28842.0 | 8337.469727 | 297119.0 | 4771.169922 | 316014.0 | 1845.719971 | 22472.0 | 3928.199951 | 43694.0 | 1701.880005 | 116601.0 | 117.800003 | 2.0 | 1363.560059 | 1911.0 | 2011.189941 | 12812.0 | 6141.029785 | 161266.0 |
pivoted_df.columns
Out[55]: ['year',
'Anhui_gdp',
'Anhui_fdi',
'Beijing_gdp',
'Beijing_fdi',
'Chongqing_gdp',
'Chongqing_fdi',
'Fujian_gdp',
'Fujian_fdi',
'Gansu_gdp',
'Gansu_fdi',
'Guangdong_gdp',
'Guangdong_fdi',
'Guangxi_gdp',
'Guangxi_fdi',
'Guizhou_gdp',
'Guizhou_fdi',
'Hainan_gdp',
'Hainan_fdi',
'Hebei_gdp',
'Hebei_fdi',
'Heilongjiang_gdp',
'Heilongjiang_fdi',
'Henan_gdp',
'Henan_fdi',
'Hubei_gdp',
'Hubei_fdi',
'Hunan_gdp',
'Hunan_fdi',
'Jiangsu_gdp',
'Jiangsu_fdi',
'Jiangxi_gdp',
'Jiangxi_fdi',
'Jilin_gdp',
'Jilin_fdi',
'Liaoning_gdp',
'Liaoning_fdi',
'Ningxia_gdp',
'Ningxia_fdi',
'Qinghai_gdp',
'Qinghai_fdi',
'Shaanxi_gdp',
'Shaanxi_fdi',
'Shandong_gdp',
'Shandong_fdi',
'Shanghai_gdp',
'Shanghai_fdi',
'Shanxi_gdp',
'Shanxi_fdi',
'Sichuan_gdp',
'Sichuan_fdi',
'Tianjin_gdp',
'Tianjin_fdi',
'Tibet_gdp',
'Tibet_fdi',
'Xinjiang_gdp',
'Xinjiang_fdi',
'Yunnan_gdp',
'Yunnan_fdi',
'Zhejiang_gdp',
'Zhejiang_fdi']
newColnames = [x.replace("-","_") for x in pivoted_df.columns]
pivoted_df = pivoted_df.toDF(*newColnames)
expression = ""
cnt=0
for column in pivoted_df.columns:
if column!='year':
cnt +=1
expression += f"'{column}' , {column},"
expression = f"stack({cnt}, {expression[:-1]}) as (Type,Value)"
unpivoted_df = pivoted_df.select('year',F.expr(expression))
unpivoted_df.show()
+----+-------------+------------------+
year| Type| Value|
+----+-------------+------------------+
2003| Anhui_gdp| 3923.110107421875|
2003| Anhui_fdi| 36720.0|
2003| Beijing_gdp| 5007.2099609375|
2003| Beijing_fdi| 219126.0|
2003|Chongqing_gdp| 2555.719970703125|
2003|Chongqing_fdi| 26083.0|
2003| Fujian_gdp| 4983.669921875|
2003| Fujian_fdi| 259903.0|
2003| Gansu_gdp|1399.8299560546875|
2003| Gansu_fdi| 2342.0|
2003|Guangdong_gdp| 15844.6396484375|
2003|Guangdong_fdi| 782294.0|
2003| Guangxi_gdp| 2821.110107421875|
2003| Guangxi_fdi| 41856.0|
2003| Guizhou_gdp|1426.3399658203125|
2003| Guizhou_fdi| 4521.0|
2003| Hainan_gdp| 713.9600219726562|
2003| Hainan_fdi| 42125.0|
2003| Hebei_gdp| 6921.2900390625|
2003| Hebei_fdi| 96405.0|
+----+-------------+------------------+
only showing top 20 rows
This post includes code adapted from Spark and Python for Big Data udemy course and Spark and Python for Big Data notebooks.