Building K-Means with Spark

Building K-Means with Spark

Industry applications of machine learning generally require us to have the ability to deal with massive datasets. Spark provides a machine learning library named mllib allowing us to build machine learning models efficiently and parallelly.

This post is going to start with a Spark ML modelling example based on pyspark on Python, K-Means, and to explain some basic steps as well as the usage of Spark APIs when building an ML model on Spark.

For the complete code of the K-Means example, please refer to Sec2. Spark K-Means code summarization.

K-Means with Spark.

Loading data with spark.sql()

spark.sql() will returns a DataFrame representing the result of the given query.

Generally, we need to prepare features used to build our models, and save these features on our database. The following two lines of code will select the prepared data from the Hive Database and fill the NaN/Null with -1.

1
2
3
4
from pyspark.sql import SparkSession
# Loading data from Hive
# This requires us to prepare the feature engieering on Hive before loading data
df = spark.sql("SELECT * FROM dev_dm_mdm.keep_matthew_lc_analysis_feature").fillna(-1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
vs = [Vectors.dense([-2.0, 2.3, 0]), Vectors.dense([3.8, 0.0, 1.9])]
dataset = sc.parallelize(vs)
standardizer = StandardScaler(True, True)
model = standardizer.fit(dataset)
result = model.transform(dataset)

for r in result.collect(): r
DenseVector([-0.7071, 0.7071, -0.7071])
DenseVector([0.7071, -0.7071, 0.7071])

int(model.std[0])
4

int(model.mean[0]*10)
9

Data preprocessing

Features assembling

pyspark.ml.feature.VectorAssembler(inputCols=None, outputCol=None, handleInvalid='error') will merges multiple columns into a vector column.

1
2
3
4
5
6
df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
vecAssembler = VectorAssembler(outputCol="features")
vecAssembler.setInputCols(["a", "b", "c"])
vecAssembler.transform(df).head().features

Out[14]: DenseVector([1.0, 0.0, 3.0])

The example above is a spark official example snip which creates a DataFrame containing three columns and shows how to merge these three columns into a single column named features. The VectorAssembler is called and passed a list of ["a", "b", "c"] to setInputCols([]) so that the Assembler knows which columns you want to combine.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
dfWithNullsAndNaNs = spark.createDataFrame(
[(1.0, 2.0, None),
(3.0, float("nan"), 4.0),
(5.0, 6.0, 7.0)],
["a", "b", "c"]
)
vecAssembler2 = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features", handleInvalid="keep")
vecAssembler2.transform(dfWithNullsAndNaNs).show()
+---+---+----+-------------+
| a| b| c| features|
+---+---+----+-------------+
|1.0|2.0|null|[1.0,2.0,NaN]|
|3.0|NaN| 4.0|[3.0,NaN,4.0]|
|5.0|6.0| 7.0|[5.0,6.0,7.0]|
+---+---+----+-------------+

vecAssembler2.setParams(handleInvalid="skip").transform(dfWithNullsAndNaNs).show()
+---+---+---+-------------+
| a| b| c| features|
+---+---+---+-------------+
|5.0|6.0|7.0|[5.0,6.0,7.0]|
+---+---+---+-------------+

Standardization

pyspark.ml.feature.StandardScaler(withMean=False, withStd=True, inputCol=None, outputCol=None), this function is going to standardizes features by removing the mean and scaling to unit variance using column summary statistics on the samples in the training set.

  • Parameters:
    • withMean – False by default. Centers the data with mean before scaling. It will build a dense output, so take care when applying to sparse input.
    • withStd – True by default. Scales the data to unit standard deviation.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StandardScaler

df = spark.createDataFrame([(Vectors.dense([0.0]),), (Vectors.dense([2.0]),), (Vectors.dense([3.0]),)], ["col_1"])
df.show()
+-----+
|col_1|
+-----+
|[0.0]|
|[2.0]|
|[3.0]|
+-----+

stdScaler = StandardScaler(inputCol="col_1", outputCol="scaled_col")
model = stdScaler.fit(df)
model.mean
Out[9]: DenseVector([1.6667])

model.std
Out[9]: DenseVector([1.5275])

df_2 = model.transform(df)
df_2.show()
+-----+--------------------+
|col_1| scaled_col|
+-----+--------------------+
|[0.0]| [0.0]|
|[2.0]|[1.3093073414159542]|
|[3.0]|[1.9639610121239313]|
+-----+--------------------+

K-Means Modelling

class pyspark.ml.clustering.KMeans(featuresCol='features', predictionCol='prediction', k=2, initMode='k-means||', initSteps=2, tol=0.0001, maxIter=20, seed=None, distanceMeasure='euclidean', weightCol=None)

The following code snip is a K-Means example where we are trying to find the best ‘k’ value by calculating the within set sum of squared error and the Silhouette. We create a list of k from 20, 30, 40, …, all the way to 200 so as to write a simple for loop to compute values of these 2 metrics (within set sum of squared error and Silhouette) of each k.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Finding the best-k
K = [i for i in range(20, 200, 10)]
errors = []
silhouette = []
trtime = []
for k in K:
t0 = time.time()
km = KMeans(featuresCol = "features", k=k, seed=77, maxIter=20)
val_df = df_std.sample(0.1) # Sampling 抽样
model = km.fit(val_df)
t1 = time.time()
trtime.append(t1-t0)

# compute wssse
pre = model.transform(val_df)
wssse = model.computeCost(val_df) # 计算平方和
errors.append(wssse)
# compute silhouette
evaluator = ClusteringEvaluator() # 评估模型
socre = evaluator.evaluate(pre)
silhouette.append(score)

print("==> Training time : %s \n" %(t1-t0))
print("==> with k = {}".format(k))
print("==> within set sum of squared Errors = {}".format(wssse))
print("==> Silhouette with squared euclidean distance = {}".format(score))

pkmeans_stats = pd.DataFrame({'K': K, 'errors': errors, 'silhouette': silhouette, 'training_time': trtime})
pkmeans_stats.to_csv("home/users/mdm_app/zengzhanhang/kmeans/kmeans_results_1227_01.csv", header=None, index=False)

Spark K-Means code summarization

Run Spark on Terminal
1
2
3
cd /home/users/mdm_app/matthew/kmenas
export PYSPARK_DRIVER_PYTHON=/usr/local/anaconda3/bin/ipython
pyspark --name matthew --num-executors=10 --executor-cores=1 --executor-memory=16GB --driver-memory=32GB --conf spark.port.maxRetries=100
K-Means training script
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import sys
import os
import time
import csv
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark import SQLContext
from pyspark import SparkContext
from pyspark.sql.functions import rand, randn

# Loading data from Hive
# This requires us to prepare the feature engieering on Hive before loading data
df = spark.sql("SELECT * FROM dev_dm_mdm.keep_matthew_lc_analysis_feature").fillna(-1)

# Getting insight our data
df.printSchema()
print(df.count(), len(df.columns))
df.show(10)
df.head(5)
df.tail(5)

# Basic processing
feature_name = df.columns[1:]
vec_assembler = VectorAssembler(inputCols = feature_name, outputCol = 'vas_features')
df_vas = vec_assembler.transform(df) # 将每一行所有列组合成一个特征行向量
scaler = StandardScaler(withMean=True, withStd=True).setInputCol("vas_features").setOutputCol("Features") # 标准化
df_std = scaler.fit(df_vas).transform(df_vas)


# Finding the best-k
K = [i for i in range(20, 200, 10)]
errors = []
silhouette = []
trtime = []
for k in K:
t0 = time.time()
km = KMeans(featuresCol = "features", k=k, seed=77, maxIter=20)
val_df = df_std.sample(0.1) # Sampling 抽样
model = km.fit(val_df)
t1 = time.time()
trtime.append(t1-t0)

# compute wssse
pre = model.transform(val_df)
wssse = model.computeCost(val_df) # 计算平方和
errors.append(wssse)
# compute silhouette
evaluator = ClusteringEvaluator() # 评估模型
socre = evaluator.evaluate(pre)
silhouette.append(score)

print("==> Training time : %s \n" %(t1-t0))
print("==> with k = {}".format(k))
print("==> within setsum of squared Errors = {}".format(wssse))
print("==> Silhouette with squared euclidean distance = {}".format(score))

pkmeans_stats = pd.DataFrame({'K': K, 'errors': errors, 'silhouette': silhouette, 'training_time': trtime})
pkmeans_stats.to_csv("home/users/mdm_app/zengzhanhang/kmeans/kmeans_results_1227_01.csv", header=None, index=False)


# K-means inference based on the best-k
best_k = 50
km = KMeans(featuresCol = 'features', k = best_k, seed = 77, maxIter = 30)
val_df = df_std.sample(0.1)
model = km.fit(val_df)
save_path = "hdfs://tmp/zengzhanhang/kmeans_{}.model".format(best_k)
model.write().overwrite().save(save_path)
transformed = model.transform(df_std).select("cust_nbr", "prediction")
transformed.write.mode("overwrite").saveAsTable("dev_dm_mdm.tmp_kmeans_pre_resutls{}".format(best_k)) ## save table in our datebases

res = model.transform(df_std).groupby("prediction").avg(*feature_name).toDF(*(["predict"] + feature_names)).sort("prediction")
res2 = res.toPandas()
cnt = model.transform(df_std).groupby("prediction").count().sort("prediction").toPandas()
res2['cnt'] = cnt['count']
res3 = res2.copy()
res_ = res3.T
res_.to_csv("home/users/mdm_app/zengzhanhang/kmeans/kmeans_results_1227_inference_{}_01.csv".format(best_k))

Comments

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×