Industry applications of machine learning generally require us to have the ability to deal with massive datasets. Spark provides a machine learning library named mllib allowing us to build machine learning models efficiently and parallelly.
This post is going to start with a Spark ML modelling example based on pyspark on Python, K-Means, and to explain some basic steps as well as the usage of Spark APIs when building an ML model on Spark.
spark.sql() will returns a DataFrame representing the result of the given query.
Generally, we need to prepare features used to build our models, and save these features on our database. The following two lines of code will select the prepared data from the Hive Database and fill the NaN/Null with -1.
1 2 3 4
from pyspark.sql import SparkSession # Loading data from Hive # This requires us to prepare the feature engieering on Hive before loading data df = spark.sql("SELECT * FROM dev_dm_mdm.keep_matthew_lc_analysis_feature").fillna(-1)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
vs = [Vectors.dense([-2.0, 2.3, 0]), Vectors.dense([3.8, 0.0, 1.9])] dataset = sc.parallelize(vs) standardizer = StandardScaler(True, True) model = standardizer.fit(dataset) result = model.transform(dataset)
for r in result.collect(): r DenseVector([-0.7071, 0.7071, -0.7071]) DenseVector([0.7071, -0.7071, 0.7071])
int(model.std[0]) 4
int(model.mean[0]*10) 9
Data preprocessing
Features assembling
pyspark.ml.feature.VectorAssembler(inputCols=None, outputCol=None, handleInvalid='error') will merges multiple columns into a vector column.
The example above is a spark official example snip which creates a DataFrame containing three columns and shows how to merge these three columns into a single column named features. The VectorAssembler is called and passed a list of ["a", "b", "c"] to setInputCols([]) so that the Assembler knows which columns you want to combine.
pyspark.ml.feature.StandardScaler(withMean=False, withStd=True, inputCol=None, outputCol=None), this function is going to standardizes features by removing the mean and scaling to unit variance using column summary statistics on the samples in the training set.
Parameters:
withMean – False by default. Centers the data with mean before scaling. It will build a dense output, so take care when applying to sparse input.
withStd – True by default. Scales the data to unit standard deviation.
The following code snip is a K-Means example where we are trying to find the best ‘k’ value by calculating the within set sum of squared error and the Silhouette. We create a list of k from 20, 30, 40, …, all the way to 200 so as to write a simple for loop to compute values of these 2 metrics (within set sum of squared error and Silhouette) of each k.
# Finding the best-k K = [i for i in range(20, 200, 10)] errors = [] silhouette = [] trtime = [] for k in K: t0 = time.time() km = KMeans(featuresCol = "features", k=k, seed=77, maxIter=20) val_df = df_std.sample(0.1) # Sampling 抽样 model = km.fit(val_df) t1 = time.time() trtime.append(t1-t0)
print("==> Training time : %s \n" %(t1-t0)) print("==> with k = {}".format(k)) print("==> within set sum of squared Errors = {}".format(wssse)) print("==> Silhouette with squared euclidean distance = {}".format(score))
import sys import os import time import csv import numpy as np import pandas as pd import matplotlib.pyplot as plt from pyspark.sql import SparkSession from pyspark.ml.clustering import KMeans from pyspark.ml.evaluation import ClusteringEvaluator from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA from pyspark import SQLContext from pyspark import SparkContext from pyspark.sql.functions import rand, randn
# Loading data from Hive # This requires us to prepare the feature engieering on Hive before loading data df = spark.sql("SELECT * FROM dev_dm_mdm.keep_matthew_lc_analysis_feature").fillna(-1)
# Finding the best-k K = [i for i in range(20, 200, 10)] errors = [] silhouette = [] trtime = [] for k in K: t0 = time.time() km = KMeans(featuresCol = "features", k=k, seed=77, maxIter=20) val_df = df_std.sample(0.1) # Sampling 抽样 model = km.fit(val_df) t1 = time.time() trtime.append(t1-t0)
print("==> Training time : %s \n" %(t1-t0)) print("==> with k = {}".format(k)) print("==> within setsum of squared Errors = {}".format(wssse)) print("==> Silhouette with squared euclidean distance = {}".format(score))