from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])
sentenceDataFrame.show()
+---+--------------------+ id| sentence| +---+--------------------+ 0|Hi I heard about ...| 1|I wish Java could...| 2|Logistic,regressi...| +---+--------------------+

Using Tokenizer and RegexTokenizer

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
+-----------------------------------+------------------------------------------+------+ sentence |words |tokens| +-----------------------------------+------------------------------------------+------+ Hi I heard about Spark |[hi, i, heard, about, spark] |5 | I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7 | Logistic,regression,models,are,neat|[logistic,regression,models,are,neat] |1 | +-----------------------------------+------------------------------------------+------+ +-----------------------------------+------------------------------------------+------+ sentence |words |tokens| +-----------------------------------+------------------------------------------+------+ Hi I heard about Spark |[hi, i, heard, about, spark] |5 | I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7 | Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5 | +-----------------------------------+------------------------------------------+------+

Removing Stop Words

from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
+---+----------------------------+--------------------+ id |raw |filtered | +---+----------------------------+--------------------+ 0 |[I, saw, the, red, balloon] |[saw, red, balloon] | 1 |[Mary, had, a, little, lamb]|[Mary, little, lamb]| +---+----------------------------+--------------------+

n-grams

from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)
+------------------------------------------------------------------+ ngrams | +------------------------------------------------------------------+ [Hi I, I heard, heard about, about Spark] | [I wish, wish Java, Java could, could use, use case, case classes]| [Logistic regression, regression models, models are, are neat] | +------------------------------------------------------------------+
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

sentenceData.show()
+-----+--------------------+ label| sentence| +-----+--------------------+ 0.0|Hi I heard about ...| 0.0|I wish Java could...| 1.0|Logistic regressi...| +-----+--------------------+
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
wordsData.show()
+-----+--------------------+--------------------+ label| sentence| words| +-----+--------------------+--------------------+ 0.0|Hi I heard about ...|[hi, i, heard, ab...| 0.0|I wish Java could...|[i, wish, java, c...| 1.0|Logistic regressi...|[logistic, regres...| +-----+--------------------+--------------------+
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()
+-----+--------------------+ label| features| +-----+--------------------+ 0.0|(20,[6,8,13,16],[...| 0.0|(20,[0,2,7,13,15,...| 1.0|(20,[3,4,6,11,19]...| +-----+--------------------+

CountVectorizer

from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)
+---+---------------+-------------------------+ id |words |features | +---+---------------+-------------------------+ 0 |[a, b, c] |(3,[0,1,2],[1.0,1.0,1.0])| 1 |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])| +---+---------------+-------------------------+
df = spark.read.load("/FileStore/tables/SMSSpamCollection",
                     format="csv", sep="\t", inferSchema="true", header="false")
df.printSchema()
root -- _c0: string (nullable = true) -- _c1: string (nullable = true)
data = df.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')
data.printSchema()
root -- class: string (nullable = true) -- text: string (nullable = true)

Clean and Prepare the Data

from pyspark.sql.functions import length
data = data.withColumn('length',length(data['text']))
data.printSchema()
root -- class: string (nullable = true) -- text: string (nullable = true) -- length: integer (nullable = true)
# Pretty Clear Difference
data.groupby('class').mean().show()
+-----+-----------------+ class| avg(length)| +-----+-----------------+ ham| 71.4545266210897| spam|138.6706827309237| +-----+-----------------+
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer

tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

Naive Bayes

from pyspark.ml.classification import NaiveBayes
# Use defaults
nb = NaiveBayes()
### Pipeline
from pyspark.ml import Pipeline
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,clean_up])
cleaner = data_prep_pipe.fit(data)
clean_data = cleaner.transform(data)

Training and Evaluation

clean_data = clean_data.select(['label','features'])
clean_data.show()
+-----+--------------------+ label| features| +-----+--------------------+ 0.0|(13424,[7,11,31,6...| 0.0|(13424,[0,24,297,...| 1.0|(13424,[2,13,19,3...| 0.0|(13424,[0,70,80,1...| 0.0|(13424,[36,134,31...| 1.0|(13424,[10,60,139...| 0.0|(13424,[10,53,103...| 0.0|(13424,[125,184,4...| 1.0|(13424,[1,47,118,...| 1.0|(13424,[0,1,13,27...| 0.0|(13424,[18,43,120...| 1.0|(13424,[8,17,37,8...| 1.0|(13424,[13,30,47,...| 0.0|(13424,[39,96,217...| 0.0|(13424,[552,1697,...| 1.0|(13424,[30,109,11...| 0.0|(13424,[82,214,47...| 0.0|(13424,[0,2,49,13...| 0.0|(13424,[0,74,105,...| 1.0|(13424,[4,30,33,5...| +-----+--------------------+ only showing top 20 rows
(training,testing) = clean_data.randomSplit([0.7,0.3])
spam_predictor = nb.fit(training)
data.printSchema()
root -- class: string (nullable = true) -- text: string (nullable = true) -- length: integer (nullable = true)
test_results = spam_predictor.transform(testing)
test_results.show()
+-----+--------------------+--------------------+--------------------+----------+ label| features| rawPrediction| probability|prediction| +-----+--------------------+--------------------+--------------------+----------+ 0.0|(13424,[0,1,2,13,...|[-605.26168264963...|[1.0,7.3447866033...| 0.0| 0.0|(13424,[0,1,2,41,...|[-1063.2170425771...|[1.0,9.8700382552...| 0.0| 0.0|(13424,[0,1,3,9,1...|[-569.95657733189...|[1.0,1.4498595638...| 0.0| 0.0|(13424,[0,1,5,15,...|[-998.87457222776...|[1.0,5.4020023412...| 0.0| 0.0|(13424,[0,1,7,15,...|[-658.37986687391...|[1.0,2.6912246466...| 0.0| 0.0|(13424,[0,1,14,31...|[-217.18809411711...|[1.0,3.3892033063...| 0.0| 0.0|(13424,[0,1,14,78...|[-688.50251926938...|[1.0,8.6317783323...| 0.0| 0.0|(13424,[0,1,17,19...|[-809.51840544334...|[1.0,1.3686507989...| 0.0| 0.0|(13424,[0,1,27,35...|[-1472.6804140726...|[0.99999999999983...| 0.0| 0.0|(13424,[0,1,31,43...|[-341.31126583915...|[1.0,3.4983325940...| 0.0| 0.0|(13424,[0,1,46,17...|[-1137.4942938439...|[5.99448563047616...| 1.0| 0.0|(13424,[0,1,72,10...|[-704.77256939631...|[1.0,1.2592610663...| 0.0| 0.0|(13424,[0,1,874,1...|[-96.404593207515...|[0.99999996015865...| 0.0| 0.0|(13424,[0,1,874,1...|[-98.086094104500...|[0.99999996999685...| 0.0| 0.0|(13424,[0,2,3,4,6...|[-1289.3891411076...|[1.0,1.3408017664...| 0.0| 0.0|(13424,[0,2,3,5,6...|[-2561.6651406471...|[1.0,2.6887776075...| 0.0| 0.0|(13424,[0,2,3,5,3...|[-490.88944126371...|[1.0,9.6538338828...| 0.0| 0.0|(13424,[0,2,4,5,1...|[-2493.1672898653...|[1.0,9.4058507096...| 0.0| 0.0|(13424,[0,2,4,7,2...|[-517.23267032348...|[1.0,2.8915589432...| 0.0| 0.0|(13424,[0,2,4,8,2...|[-1402.5570102185...|[1.0,6.7531061115...| 0.0| +-----+--------------------+--------------------+--------------------+----------+ only showing top 20 rows
## Evaluating Model Accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(acc))
Accuracy of model at predicting spam was: 0.9204435112848836