spark MlLib (4)

Scroll Down

Spark 机器学习笔记(4)

Collaborative Filtering 协同过滤

这个算法主要基于对矩阵的低秩假设,把一个大矩阵分解成两个矩阵,把大矩阵看作两个矩阵相乘,然后使相乘后的矩阵和原来的矩阵误差最小

SparkConf sparkConf = new SparkConf().setAppName("cf").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
System.out.println(jsc.sc().executorMemory());
FileReader rd = new FileReader("ml-25m\\ratings.csv");
BufferedReader bufferedReader = new BufferedReader(rd);
String line = "";
List<Rating> list = new ArrayList<>();
bufferedReader.readLine();
while ((line = bufferedReader.readLine())!=null){
    String[] split = line.split(",");
    if (split[0].equals("10000"))
        break;
    Rating rating = new Rating(
            Integer.parseInt(split[0]),
            Integer.parseInt(split[1]),
            Double.parseDouble(split[2])
    );
    list.add(rating);
}
JavaRDD<Rating> ratings = jsc.parallelize(list);
int rank = 10;
int numIterations = 5;
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);

JavaRDD<Tuple2<Object, Object>> userProducts =
        ratings.map(r -> new Tuple2<>(r.user(), r.product()));
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
        model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD()
                .map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()))
);
JavaRDD<Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(
        ratings.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())))
        .join(predictions).values();
double MSE = ratesAndPreds.mapToDouble(pair -> {
    double err = pair._1() - pair._2();
    return err * err;
}).mean();
System.out.println("Mean Squared Error = " + MSE);

聚类算法

K-means

SparkConf sparkConf = new SparkConf().setAppName("nativeBayes").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
String line = "";
FileReader rd = new FileReader("test.csv");
BufferedReader bufferedReader = new BufferedReader(rd);
ArrayList<Vector> vectors = new ArrayList<>();
while ((line = bufferedReader.readLine())!=null){
String[] split = line.split(",");
double[] list = new double[split.length];
for (int i=0;i<split.length;i++)
list[i] = Double.parseDouble(split[i]);
vectors.add(Vectors.dense(list));
}
JavaRDD<Vector> parsedData  = jsc.parallelize(vectors);

// Cluster the data into two classes using KMeans
int numClusters = 6;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);

System.out.println("Cluster centers:");
for (Vector center: clusters.clusterCenters()) {
System.out.println(" " + center);
}
double cost = clusters.computeCost(parsedData.rdd());
System.out.println("Cost: " + cost);

// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);

高斯混合

// Cluster the data into two classes using GaussianMixture
GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());

// Save and load GaussianMixtureModel
gmm.save(jsc.sc(), "target/org/apache/spark/JavaGaussianMixtureExample/GaussianMixtureModel");
GaussianMixtureModel sameModel = GaussianMixtureModel.load(jsc.sc(),
  "target/org.apache.spark.JavaGaussianMixtureExample/GaussianMixtureModel");

// Output the parameters of the mixture model
for (int j = 0; j < gmm.k(); j++) {
  System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
    gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());

幂迭代聚类

PowerIterationClustering pic = new PowerIterationClustering()
  .setK(2)
  .setMaxIterations(10);
PowerIterationClusteringModel model = pic.run(similarities);

for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
  System.out.println(a.id() + " -> " + a.cluster());
}

隐狄利克雷分配模型

// Cluster the documents into three topics using LDA
LDAModel ldaModel = new LDA().setK(3).run(corpus);

// Output topics. Each is a distribution over words (matching word count vectors)
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
  + " words):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
  System.out.print("Topic " + topic + ":");
  for (int word = 0; word < ldaModel.vocabSize(); word++) {
    System.out.print(" " + topics.apply(word, topic));
  }
  System.out.println();
}

k-means++

BisectingKMeans bkm = new BisectingKMeans()
  .setK(4);
BisectingKMeansModel model = bkm.run(data);

System.out.println("Compute Cost: " + model.computeCost(data));

Vector[] clusterCenters = model.clusterCenters();
for (int i = 0; i < clusterCenters.length; i++) {
  Vector clusterCenter = clusterCenters[i];
  System.out.println("Cluster Center " + i + ": " + clusterCenter);
}

降维

SVD奇异值分解

JavaRDD<Vector> rows = jsc.parallelize(data);

// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());

// Compute the top 5 singular values and corresponding singular vectors.
SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(5, true, 1.0E-9d);
RowMatrix U = svd.U();  // The U factor is a RowMatrix.
Vector s = svd.s();     // The singular values are stored in a local dense vector.
Matrix V = svd.V();     // The V factor is a local dense matrix.

PCA降维

JavaRDD<Vector> rows = jsc.parallelize(data);

// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());

// Compute the top 4 principal components.
// Principal components are stored in a local dense matrix.
Matrix pc = mat.computePrincipalComponents(4);

// Project the rows to the linear space spanned by the top 4 principal components.
RowMatrix projected = mat.multiply(pc);

频繁模式挖掘

FP-growth

JavaRDD<List<String>> transactions = data.map(line -> Arrays.asList(line.split(" ")));

FPGrowth fpg = new FPGrowth()
  .setMinSupport(0.2)
  .setNumPartitions(10);
FPGrowthModel<String> model = fpg.run(transactions);

for (FPGrowth.FreqItemset<String> itemset: model.freqItemsets().toJavaRDD().collect()) {
  System.out.println("[" + itemset.javaItems() + "], " + itemset.freq());
}

double minConfidence = 0.8;
for (AssociationRules.Rule<String> rule
  : model.generateAssociationRules(minConfidence).toJavaRDD().collect()) {
  System.out.println(
    rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}

Association Rules

JavaRDD<FPGrowth.FreqItemset<String>> freqItemsets = sc.parallelize(Arrays.asList(
  new FreqItemset<>(new String[] {"a"}, 15L),
  new FreqItemset<>(new String[] {"b"}, 35L),
  new FreqItemset<>(new String[] {"a", "b"}, 12L)
));

AssociationRules arules = new AssociationRules()
  .setMinConfidence(0.8);
JavaRDD<AssociationRules.Rule<String>> results = arules.run(freqItemsets);

for (AssociationRules.Rule<String> rule : results.collect()) {
  System.out.println(
    rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}