Clustering with Pyspark
This post includes code from Spark and Python for Big Data udemy course and Spark and Python for Big Data notebooks.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField,StringType,IntegerType,StructType, DoubleType, FloatType
data_schema = [
StructField("_c0", IntegerType(), True)
,StructField("province", StringType(), True)
,StructField("specific", DoubleType(), True)
,StructField("general", DoubleType(), True)
,StructField("year", IntegerType(), True)
,StructField("gdp", FloatType(), True)
,StructField("fdi", FloatType(), True)
,StructField("rnr", DoubleType(), True)
,StructField("rr", FloatType(), True)
,StructField("i", FloatType(), True)
,StructField("fr", IntegerType(), True)
,StructField("reg", StringType(), True)
,StructField("it", IntegerType(), True)
]
final_struc = StructType(fields=data_schema)
file_location = "/FileStore/tables/df_panel_fix.csv"
df = spark.read.format("CSV").schema(final_struc).option("header", True).load(file_location)
#df.printSchema()
df.show()
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
feat_cols = ['year', 'gdp', 'fdi','fr']
feat_cols = ['gdp']
vec_assembler = VectorAssembler(inputCols = feat_cols, outputCol='features')
final_df = vec_assembler.transform(df)
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(final_df)
# Normalize each feature to have unit standard deviation.
cluster_final_data = scalerModel.transform(final_df)
kmeans3 = KMeans(featuresCol='scaledFeatures',k=3)
kmeans2 = KMeans(featuresCol='scaledFeatures',k=2)
model_k3 = kmeans3.fit(cluster_final_data)
model_k2 = kmeans2.fit(cluster_final_data)
model_k3.transform(cluster_final_data).groupBy('prediction').count().show()
model_k2.transform(cluster_final_data).groupBy('prediction').count().show()