spark mllib (3)

Scroll Down

Spark 机器学习笔记(3)

分类模型和回归模型

SVM 支持向量机

支持向量机(support vector machine)是一种分类算法,通过寻求结构化风险最小来提高学习机泛化能力,实现经验风险和置信范围的最小化,从而达到在统计样本量较少的情况下,亦能获得良好统计规律的目的。通俗来讲,它是一种二类分类模型,其基本模型定义为特征空间上的间隔最大的线性分类器,即支持向量机的学习策略便是间隔最大化,最终可转化为一个凸二次规划问题的求解。

int numIterations = 1000;
SVMWithSGD svmWithSGD = new SVMWithSGD();
svmWithSGD.setIntercept(true);              //常数项/偏置项
svmWithSGD.setFeatureScaling(true);        //特征缩放
svmWithSGD.optimizer().setRegParam(0.01);   //正则参数
svmWithSGD.optimizer().setNumIterations(numIterations); //迭代次数
SVMModel model = svmWithSGD.run(training.rdd());
JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(p ->new Tuple2<>(model.predict(p.features()), p.label()));
BinaryClassificationMetrics metrics =
            new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
    double auROC = metrics.areaUnderROC();

逻辑回归

Logistic regression is widely used to predict a binary response. It is a linear method.

逻辑回归广泛地用于二元分类,这个是一个线性的模型。

LogisticRegressionWithLBFGS logisticRegressionWithLBFGS = new LogisticRegressionWithLBFGS();
logisticRegressionWithLBFGS.setIntercept(true); //偏置项
logisticRegressionWithLBFGS.setFeatureScaling(true);    //特征缩放
logisticRegressionWithLBFGS.optimizer().setNumIterations(200).setRegParam(0.01);    //迭代次数,正则项
LogisticRegressionModel model = logisticRegressionWithLBFGS.run(training.rdd());        //进行学习
JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());
double accuracy = metrics.accuracy();       //算出准确率
System.out.println("Accuracy = " + accuracy);

线性回归

// Building the model
// 建立模型
int numIterations = 100;        //迭代次数
double stepSize = 0.00000001;   //学习率
LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations, stepSize);

// Evaluate model on training examples and compute training error
//  计算预测结果和误差
JavaPairRDD<Double, Double> valuesAndPreds = parsedData.mapToPair(point ->
  new Tuple2<>(model.predict(point.features()), point.label()));
double MSE = valuesAndPreds.mapToDouble(pair -> {
  double diff = pair._1() - pair._2();
  return diff * diff;
}).mean();
System.out.println("training Mean Squared Error = " + MSE);

多项式特征映射

因为针对某一种特殊的逻辑回归(决策边界接近圆),一次项不能满足需求,就需要特征映射,这个仅限于dataframe

SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
sqlContext.read().csv("ex2data2.csv");
jsc.setLogLevel("ERROR");
Dataset<Row> csv = sqlContext.read().csv("ex2data2.csv")
    .toDF("feather1","feather2","label");
//数据类型转换
csv = csv.select(
    csv.col("feather1").cast("Double").as("feather1"),
    csv.col("feather2").cast("Double").as("feather2"),
    csv.col("label").cast("Double").as("label"));
//向量合成
csv= new VectorAssembler().setInputCols(new String[]{"feather1", "feather2"}).setOutputCol("feathers").transform(csv);
PolynomialExpansion polynomialExpansion = new PolynomialExpansion().setDegree(5).setInputCol("feathers").setOutputCol("polynomialFeathers");
Dataset<Row> data= polynomialExpansion.transform(csv);
//训练集切分
Dataset<Row> training = data.sample(false, 0.6, 11L);
Dataset<Row> test = data.except(training);
//模型参数设置
LogisticRegression lr = new LogisticRegression()
    .setMaxIter(200)
    .setRegParam(0.1)
    .setFitIntercept(true)
    .setFeaturesCol("polynomialFeathers")
    .setLabelCol("label");
    //模型训练
LogisticRegressionModel fit = lr.fit(training);
System.out.println("Coefficients: "
    + fit.coefficients() + " Intercept: " + fit.intercept());
test = test.select("label", "polynomialFeathers");
JavaRDD<Row> rowJavaRDD = test.javaRDD();
//计算结果
JavaRDD<Row> map = rowJavaRDD.map(new Function<Row, Row>() {
@Override
public Row call(Row row) throws Exception {
    Vector vector = (Vector) row.get(1);
    double predict = fit.predict(vector);
    return RowFactory.create(row.get(0), predict);
}
});
List<Row> collect = map.collect();
double n = collect.size();
int a=0;
//结果评估
for (Row row : collect) {
if (((Double) row.get(0)).equals((Double) row.get(1)))
    a++;
}
System.out.println("正确率="+((double)a/n));

决策树

Decision trees and their ensembles are popular methods for the machine learning tasks of classification and regression.

决策树是一种流行的机器学习方法,适用于分类问题和回归问题

The node impurity is a measure of the homogeneity of the labels at the node. The current implementation provides two impurity measures for classification (Gini impurity and entropy) and one impurity measure for regression (variance).

在决策树中,有三种计算不纯净度的方式,有两种用于分类(基尼系数,信息熵),一种用于回归(方差)

参数选择

  1. algo: 算法的类型,回归还是分类
  2. numClasses: 分类数量
  3. categoricalFeaturesInfo: 指定哪些特征是可分类的和那些特征可以包含多少可分类的值

结束标准

  1. maxDepth: 最大深度
  2. minInstancesPerNode: 一个节点的最小训练样本数
  3. minInfoGain: 最小信息增益

可变参数

  1. maxBins: 最大划分
  2. maxMemoryInMB: 最大内存使用
  3. subsamplingRate: 训练决策树的样本比例
  4. impurity: 用于划分的样本不纯净度
//  Empty categoricalFeaturesInfo indicates all features are continuous.
int numClasses = 2;
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
String impurity = "gini";
int maxDepth = 5;
int maxBins = 32;
// Train a DecisionTree model for classification.
DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses,categoricalFeaturesInfo, impurity, maxDepth, maxBins);

朴素贝叶斯

Naive Bayes is a simple multiclass classification algorithm with the assumption of independence between every pair of features.

朴素贝叶斯是一种简单的多元分类算法使用每个特征之间的独立性

参数选择

Additive smoothing can be used by setting the parameter λ

平滑参数 λ

SparkConf sparkConf = new SparkConf().setAppName("nativeBayes").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), "data.txt").toJavaRDD();
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// 第二个参数是平滑参数
NaiveBayesModel model = NaiveBayes.train(trainingData.rdd(), 1.0);
JavaPairRDD<Double, Double> predictionAndLabel =
       testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double accuracy =
       predictionAndLabel.filter(pl -> pl._1().equals(pl._2())).count() / (double) testData.count();
System.out.println(accuracy);

随机森林

Like decision trees, random forests handle categorical features, extend to the multiclass classification setting, do not require feature scaling, and are able to capture non-linearities and feature interactions.

随机森林就像决策树,可以处理分好类的特征,可以用于分类和回归,且不需要降维,善于去处理非线性特征

可选参数

  1. numTrees: 树的数量
  2. maxDepth: 最大深度
Integer numClasses = 2;
// Empty categoricalFeaturesInfo indicates all features are continuous.
// 这里除非某个特征的取值是固定的0,1或者是别的,都不用去设置
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
//这里树用的越多越能提高算法的准确度,到达一定数量趋于平缓
Integer numTrees = 12; // Use more in practice.
String featureSubsetStrategy = "auto"; // Let the algorithm choose.
String impurity = "gini";
Integer maxDepth = 8;
Integer maxBins = 64;
Integer seed = 12345;

RandomForestModel model = RandomForest.trainClassifier(trainingData, numClasses,
       categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins,
       seed);

// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
       testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testErr =
       predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
System.out.println("Test Error: " + testErr);

事实证明在分类上随机森林算法是非常高效的一种算法

梯度提升树

梯度提升树是决策树的一个集合,可以和决策树一样处理回归和分类问题

可调参数

  1. loss: 残差的计算方式,有LogLoss和Squared Error以及Absolute Error,后两者适用于回归,第一种适用于分类
  2. numIterations: 迭代次数
  3. learningRate: 学习率
  4. algo: 算法
// Train a GradientBoostedTrees model.
// The defaultParams for Classification use LogLoss by default.
// 设置为分类算法
BoostingStrategy boostingStrategy = BoostingStrategy.defaultParams("Classification");

//参数设置
boostingStrategy.setNumIterations(3); // Note: Use more iterations in practice.
boostingStrategy.getTreeStrategy().setNumClasses(2);
boostingStrategy.getTreeStrategy().setMaxDepth(5);
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo);

GradientBoostedTreesModel model = GradientBoostedTrees.train(trainingData, boostingStrategy);

// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
       testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testErr =
       predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();