Spark 机器学习笔记(4)
Collaborative Filtering 协同过滤
这个算法主要基于对矩阵的低秩假设,把一个大矩阵分解成两个矩阵,把大矩阵看作两个矩阵相乘,然后使相乘后的矩阵和原来的矩阵误差最小
SparkConf sparkConf = new SparkConf().setAppName("cf").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
System.out.println(jsc.sc().executorMemory());
FileReader rd = new FileReader("ml-25m\\ratings.csv");
BufferedReader bufferedReader = new BufferedReader(rd);
String line = "";
List<Rating> list = new ArrayList<>();
bufferedReader.readLine();
while ((line = bufferedReader.readLine())!=null){
String[] split = line.split(",");
if (split[0].equals("10000"))
break;
Rating rating = new Rating(
Integer.parseInt(split[0]),
Integer.parseInt(split[1]),
Double.parseDouble(split[2])
);
list.add(rating);
}
JavaRDD<Rating> ratings = jsc.parallelize(list);
int rank = 10;
int numIterations = 5;
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);
JavaRDD<Tuple2<Object, Object>> userProducts =
ratings.map(r -> new Tuple2<>(r.user(), r.product()));
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD()
.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()))
);
JavaRDD<Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(
ratings.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())))
.join(predictions).values();
double MSE = ratesAndPreds.mapToDouble(pair -> {
double err = pair._1() - pair._2();
return err * err;
}).mean();
System.out.println("Mean Squared Error = " + MSE);
聚类算法
K-means
SparkConf sparkConf = new SparkConf().setAppName("nativeBayes").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
String line = "";
FileReader rd = new FileReader("test.csv");
BufferedReader bufferedReader = new BufferedReader(rd);
ArrayList<Vector> vectors = new ArrayList<>();
while ((line = bufferedReader.readLine())!=null){
String[] split = line.split(",");
double[] list = new double[split.length];
for (int i=0;i<split.length;i++)
list[i] = Double.parseDouble(split[i]);
vectors.add(Vectors.dense(list));
}
JavaRDD<Vector> parsedData = jsc.parallelize(vectors);
// Cluster the data into two classes using KMeans
int numClusters = 6;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
System.out.println("Cluster centers:");
for (Vector center: clusters.clusterCenters()) {
System.out.println(" " + center);
}
double cost = clusters.computeCost(parsedData.rdd());
System.out.println("Cost: " + cost);
// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
高斯混合
// Cluster the data into two classes using GaussianMixture
GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());
// Save and load GaussianMixtureModel
gmm.save(jsc.sc(), "target/org/apache/spark/JavaGaussianMixtureExample/GaussianMixtureModel");
GaussianMixtureModel sameModel = GaussianMixtureModel.load(jsc.sc(),
"target/org.apache.spark.JavaGaussianMixtureExample/GaussianMixtureModel");
// Output the parameters of the mixture model
for (int j = 0; j < gmm.k(); j++) {
System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
幂迭代聚类
PowerIterationClustering pic = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10);
PowerIterationClusteringModel model = pic.run(similarities);
for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
System.out.println(a.id() + " -> " + a.cluster());
}
隐狄利克雷分配模型
// Cluster the documents into three topics using LDA
LDAModel ldaModel = new LDA().setK(3).run(corpus);
// Output topics. Each is a distribution over words (matching word count vectors)
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
+ " words):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
System.out.print("Topic " + topic + ":");
for (int word = 0; word < ldaModel.vocabSize(); word++) {
System.out.print(" " + topics.apply(word, topic));
}
System.out.println();
}
k-means++
BisectingKMeans bkm = new BisectingKMeans()
.setK(4);
BisectingKMeansModel model = bkm.run(data);
System.out.println("Compute Cost: " + model.computeCost(data));
Vector[] clusterCenters = model.clusterCenters();
for (int i = 0; i < clusterCenters.length; i++) {
Vector clusterCenter = clusterCenters[i];
System.out.println("Cluster Center " + i + ": " + clusterCenter);
}
降维
SVD奇异值分解
JavaRDD<Vector> rows = jsc.parallelize(data);
// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());
// Compute the top 5 singular values and corresponding singular vectors.
SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(5, true, 1.0E-9d);
RowMatrix U = svd.U(); // The U factor is a RowMatrix.
Vector s = svd.s(); // The singular values are stored in a local dense vector.
Matrix V = svd.V(); // The V factor is a local dense matrix.
PCA降维
JavaRDD<Vector> rows = jsc.parallelize(data);
// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());
// Compute the top 4 principal components.
// Principal components are stored in a local dense matrix.
Matrix pc = mat.computePrincipalComponents(4);
// Project the rows to the linear space spanned by the top 4 principal components.
RowMatrix projected = mat.multiply(pc);
频繁模式挖掘
FP-growth
JavaRDD<List<String>> transactions = data.map(line -> Arrays.asList(line.split(" ")));
FPGrowth fpg = new FPGrowth()
.setMinSupport(0.2)
.setNumPartitions(10);
FPGrowthModel<String> model = fpg.run(transactions);
for (FPGrowth.FreqItemset<String> itemset: model.freqItemsets().toJavaRDD().collect()) {
System.out.println("[" + itemset.javaItems() + "], " + itemset.freq());
}
double minConfidence = 0.8;
for (AssociationRules.Rule<String> rule
: model.generateAssociationRules(minConfidence).toJavaRDD().collect()) {
System.out.println(
rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}
Association Rules
JavaRDD<FPGrowth.FreqItemset<String>> freqItemsets = sc.parallelize(Arrays.asList(
new FreqItemset<>(new String[] {"a"}, 15L),
new FreqItemset<>(new String[] {"b"}, 35L),
new FreqItemset<>(new String[] {"a", "b"}, 12L)
));
AssociationRules arules = new AssociationRules()
.setMinConfidence(0.8);
JavaRDD<AssociationRules.Rule<String>> results = arules.run(freqItemsets);
for (AssociationRules.Rule<String> rule : results.collect()) {
System.out.println(
rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}