from pyspark.sql import SparkSession

# Load data from a CSV
file_location = "/FileStore/tables/df_panel_fix.csv"
df = spark.read.format("CSV").option("inferSchema", True).option("header", True).load(file_location)
display(df.take(5))
_c0provincespecificgeneralyeargdpfdirnrrrifrregit
0Anhui147002.0null19962093.3506610.00.00.01128873East China631930
1Anhui151981.0null19972347.32434430.00.00.01356287East China657860
2Anhui174930.0null19982542.96276730.00.00.01518236East China889463
3Anhui285324.0null19992712.3426131nullnullnull1646891East China1227364
4Anhui195580.032100.020002902.09318470.00.00.01601508East China1499110
df.columns
Out[85]: ['_c0', 'province', 'specific', 'general', 'year', 'gdp', 'fdi', 'rnr', 'rr', 'i', 'fr', 'reg', 'it']
df.printSchema()
root -- _c0: integer (nullable = true) -- province: string (nullable = true) -- specific: double (nullable = true) -- general: double (nullable = true) -- year: integer (nullable = true) -- gdp: double (nullable = true) -- fdi: integer (nullable = true) -- rnr: double (nullable = true) -- rr: double (nullable = true) -- i: double (nullable = true) -- fr: string (nullable = true) -- reg: string (nullable = true) -- it: integer (nullable = true)
# for row in df.head(5):
#     print(row)
#     print('\n')
df.describe().show()
+-------+------------------+--------+-----------------+------------------+------------------+-----------------+------------------+-------------------+--------------------+-------------------+------------------+---------------+------------------+ summary| _c0|province| specific| general| year| gdp| fdi| rnr| rr| i| fr| reg| it| +-------+------------------+--------+-----------------+------------------+------------------+-----------------+------------------+-------------------+--------------------+-------------------+------------------+---------------+------------------+ count| 360| 360| 356| 169| 360| 360| 360| 294| 296| 287| 295| 360| 360| mean| 179.5| null|583470.7303370787|309127.53846153844| 2001.5|4428.653416666667|196139.38333333333| 0.0355944252244898|0.059688621057432424|0.08376351662369343|2522449.0034013605| null|2165819.2583333333| stddev|104.06728592598157| null|654055.3290782663| 355423.5760674793|3.4568570586927794|4484.668659976412|303043.97011891654|0.16061503029299648| 0.15673351824073453| 0.1838933104683607|3491329.8613106664| null|1769294.2935487411| min| 0| Anhui| 8964.0| 0.0| 1996| 64.98| 2| 0.0| 0.0| 0.0| #REF!| East China| 147897| max| 359|Zhejiang| 3937966.0| 1737800.0| 2007| 31777.01| 1743140| 1.214285714| 0.84| 1.05| 9898522|Southwest China| 10533312| +-------+------------------+--------+-----------------+------------------+------------------+-----------------+------------------+-------------------+--------------------+-------------------+------------------+---------------+------------------+
df.describe().printSchema()
root -- summary: string (nullable = true) -- _c0: string (nullable = true) -- province: string (nullable = true) -- specific: string (nullable = true) -- general: string (nullable = true) -- year: string (nullable = true) -- gdp: string (nullable = true) -- fdi: string (nullable = true) -- rnr: string (nullable = true) -- rr: string (nullable = true) -- i: string (nullable = true) -- fr: string (nullable = true) -- reg: string (nullable = true) -- it: string (nullable = true)

Casting Data Types and Formatting Significant Digits

from pyspark.sql.functions import format_number
result = df.describe()
result.select(result['province']
,format_number(result['specific'].cast('float'),2).alias('specific')
,format_number(result['general'].cast('float'),2).alias('general')
,format_number(result['year'].cast('int'),2).alias('year'),format_number(result['gdp'].cast('float'),2).alias('gdp')
,format_number(result['rnr'].cast('int'),2).alias('rnr'),format_number(result['rr'].cast('float'),2).alias('rr')
,format_number(result['fdi'].cast('int'),2).alias('fdi'),format_number(result['it'].cast('float'),2).alias('it')
,result['reg'].cast('string').alias('reg')
             ).show()
+--------+------------+------------+--------+---------+------+------+------------+-------------+---------------+ province| specific| general| year| gdp| rnr| rr| fdi| it| reg| +--------+------------+------------+--------+---------+------+------+------------+-------------+---------------+ 360| 356.00| 169.00| 360.00| 360.00|294.00|296.00| 360.00| 360.00| 360| null| 583,470.75| 309,127.53|2,001.00| 4,428.65| 0.00| 0.06| 196,139.00| 2,165,819.25| null| null| 654,055.31| 355,423.56| 3.00| 4,484.67| 0.00| 0.16| 303,043.00| 1,769,294.25| null| Anhui| 8,964.00| 0.00|1,996.00| 64.98| 0.00| 0.00| 2.00| 147,897.00| East China| Zhejiang|3,937,966.00|1,737,800.00|2,007.00|31,777.01| 1.00| 0.84|1,743,140.00|10,533,312.00|Southwest China| +--------+------------+------------+--------+---------+------+------+------------+-------------+---------------+

New Columns generated from extant columns using withColumn

df2 = df.withColumn("specific_gdp_ratio",df["specific"]/(df["gdp"]*100))#.show()
df2.select('specific_gdp_ratio').show()
+------------------+ specific_gdp_ratio| +------------------+ 0.7022500358285959| 0.6474660463848132| 0.6878991411583352| 1.0519477646607727| 0.673928100093381| 0.7727761333780966| 1.233475958314866| 1.5783421826051272| 1.8877587040110941| 1.6792756118029895| 2.3850666666666664| 3.0077639751552794| 0.9275486250838364| 0.7989880072601573| 1.0314658544998698| 1.448708759827088| 0.8912058855158366| 1.1918224576316896| 1.2944820393974508| 1.283311464867661| +------------------+ only showing top 20 rows
df.orderBy(df["specific"].asc()).head(1)[0][0]
Out[94]: 24

Finding the Mean, Max, and Min

from pyspark.sql.functions import mean
df.select(mean("specific")).show()
+-----------------+ avg(specific)| +-----------------+ 583470.7303370787| +-----------------+
from pyspark.sql.functions import max,min
df.select(max("specific"),min("specific")).show()
+-------------+-------------+ max(specific)|min(specific)| +-------------+-------------+ 3937966.0| 8964.0| +-------------+-------------+
df.filter("specific < 60000").count()
Out[98]: 23
df.filter(df['specific'] < 60000).count()
Out[99]: 23
from pyspark.sql.functions import count
result = df.filter(df['specific'] < 60000)
result.select(count('specific')).show()
+---------------+ count(specific)| +---------------+ 23| +---------------+
(df.filter(df["gdp"]>8000).count()*1.0/df.count())*100
Out[101]: 14.444444444444443
from pyspark.sql.functions import corr
df.select(corr("gdp","fdi")).show()
+------------------+ corr(gdp, fdi)| +------------------+ 0.8366328478935896| +------------------+

Finding the max value by Year

from pyspark.sql.functions import year
#yeardf = df.withColumn("Year",year(df["year"]))
max_df = df.groupBy('year').max()
max_df.select('year','max(gdp)').show()
+----+--------+ year|max(gdp)| +----+--------+ 2003|15844.64| 2007|31777.01| 2006|26587.76| 1997| 7774.53| 2004|18864.62| 1996| 6834.97| 1998| 8530.88| 2001|12039.25| 2005|22557.37| 2000|10741.25| 1999| 9250.68| 2002|13502.42| +----+--------+
from pyspark.sql.functions import month
#df.select("year","avg(gdp)").orderBy('year').show()