其他分享
首页 > 其他分享> > pyspark学习之——逻辑回归、模型选择与调参

pyspark学习之——逻辑回归、模型选择与调参

作者:互联网

       记录pyspark的MLlib库学习篇,学习资料来自spark官方文档,主要记录pyspark相关内容,要么直接翻译过来,要么加上自己的理解。spark2.4.8官方文档如下:https://spark.apache.org/docs/2.4.8/ml-classification-regression.html#logistic-regression

目录

一、参数

       如果对逻辑回归原理不清楚,请查看逻辑回归推导。spark的逻辑回归即可以用来处理二分类任务,也能用来预测多分类。spark中逻辑回归的损失函数是:

J ( w ) = L ( w ) + γ [ α ∣ ∣ w ∣ ∣ + ( 1 − α ) 1 2 ∣ ∣ w ∣ ∣ 2 ] J(w)=L(w)+γ[α||w||+(1-α)\frac{1}{2}||w||^2] J(w)=L(w)+γ[α∣∣w∣∣+(1−α)21​∣∣w∣∣2]       

       其中, γ γ γ称为规范化系数, α α α称为Elastic net, a ∈ [ 0 , 1 ] a∈ [0,1] a∈[0,1],可以看到当 a = 1 a=1 a=1时,正则化采用的是L1范数; a = 0 a=0 a=0时正则化采用L2范数。
       逻辑回归常用参数:

参数含义
regParam正则化参数,默认值为0
elasticNetParam α ∈ [ 0 , 1 ] α∈ [0,1] α∈[0,1],可以看到当 a = 1 a=1 a=1时,正则化采用的是L1范数; a = 0 a=0 a=0时正则化采用L2范数。
family定义分分类,有三个可选择值:auto、binomial、multinomial,默认为auto。                                 binomial:二分类,                                                                                                                                multinomial:多分类,(二分类也能用)                                                                                                auto:自动选择分类方式,当类别只有1个或2个是,设置为binomial,多分类时设置为multinomial
featuresCol设置特征列名,默认为列名为’features’
fitIntercept是否匹配一个截距项,默认为True
labelCol设置标签名,默认为“label”
maxIter最大迭代次数,默认为100
predictionCol设置预测的列名,默认为"prediction"
probabilityCol设置预测的条件概率列名,默认为“probabi”
threshold二元分类阈值,默认为0.5
standardization模型拟合前是否将输入特征做标准化处理,默认为True
weightCol设置权重列名
tol迭代算法收敛阈值,默认为1e-6

二、二分类

2.1 二分类训练

# 1.导包
from pyspark.ml.classification import LogisticRegression

# 2.生成数据,这里用自己的路径
data = spark.read.format("libsvm").load("file:///opt/module/spark/data/mllib/sample_libsvm_data.txt")

# 3.设置模型参数,elasticNetParam是α, regParam是规范化系数γ
lr = LogisticRegression(maxIter=10,regParam=0.3,elasticNetParam=0.8)

# 4.训练模型
lrModel = lr.fit(data)

# 5.查看模型参数  查看系数和截距
print(f"当前逻辑回归的各个系数是:{lrModel.coefficients}")
print(f"当前逻辑回归的截距是:{lrModel.intercept}")

# 结果为:
当前逻辑回归的各个系数是:(692,[244,263,272,300,301,328,350,351,378,379,405,406,407,428,433,434,455,456,461,462,483,484,489,490,496,511,512,517,539,540,568],[-7.353983524188197e-05,-9.102738505589466e-05,-0.00019467430546904298,-0.00020300642473486668,-3.1476183314863995e-05,-6.842977602660743e-05,1.5883626898239883e-05,1.4023497091372047e-05,0.00035432047524968605,0.00011443272898171087,0.00010016712383666666,0.0006014109303795481,0.0002840248179122762,-0.00011541084736508837,0.000385996886312906,0.000635019557424107,-0.00011506412384575676,-0.00015271865864986808,0.0002804933808994214,0.0006070117471191634,-0.0002008459663247437,-0.0001421075579290126,0.0002739010341160883,0.00027730456244968115,-9.838027027269332e-05,-0.0003808522443517704,-0.00025315198008555033,0.00027747714770754307,-0.0002443619763919199,-0.0015394744687597765,-0.00023073328411331293])
当前逻辑回归的截距是:0.22456315961250325
# 可以看到系数是一个稀疏向量,692表示有692个系数,第一个列表表示非零的位置,第二个列表表示非零的值。


# 6. 将family设置为multinomial也能用来进行二分类
mlr = LogisticRegression(maxIter=10,regParam=0.3,elasticNetParam=0.8,family="multinomial")
mlrModel = mlr.fit(data)
print(f"当前逻辑回归的各个系数是:{mlrModel.coefficientMatrix}")
print(f"当前逻辑回归的截距是:{mlrModel.interceptVector}")

# 结果是:
当前逻辑回归的各个系数是:2 X 692 CSRMatrix
(0,244) 0.0
(0,263) 0.0001
(0,272) 0.0001
(0,300) 0.0001
(0,350) -0.0
(0,351) -0.0
(0,378) -0.0
(0,379) -0.0
(0,405) -0.0
(0,406) -0.0006
(0,407) -0.0001
(0,428) 0.0001
(0,433) -0.0
(0,434) -0.0007
(0,455) 0.0001
(0,456) 0.0001
..
..
当前逻辑回归的截距是:[-0.12065879445860686,0.12065879445860686]


2.2 LogisticRegressionTrainingSummary

       LogisticRegressionTrainingSummary记录了LogisticRegressionModel(Model是算法调用fit()方法后生成的)的摘要信息。在二元分类的情况下,可以获取某些附加指标,例如ROC曲线。
       

# 1.导包
from pyspark.ml.classification import LogisticRegression

# 2.获取摘要信息。   摘要来自于已经得到的LogisticRegressionModel模型
trainingSummay = lrModel.summary

# 3.获取每次迭代的目标
objectiveHistory = trainingSummay.objectiveHistory
print("objcectiveHistory")
for objective in objectiveHistory:
	print(objective)
	
# 4.获取ROC的取值及其面积
trainingSummary.roc.show()
print(f"ROC面积:{trainingSummary.areaUnderROC}")

# 结果为:
+---+--------------------+
|FPR|                 TPR|
+---+--------------------+
|0.0|                 0.0|
|0.0|0.017543859649122806|
|0.0| 0.03508771929824561|
|0.0| 0.05263157894736842|
|0.0| 0.07017543859649122|
|0.0| 0.08771929824561403|
|0.0| 0.10526315789473684|
|0.0| 0.12280701754385964|
|0.0| 0.14035087719298245|
|0.0| 0.15789473684210525|
|0.0| 0.17543859649122806|
|0.0| 0.19298245614035087|
|0.0| 0.21052631578947367|
|0.0| 0.22807017543859648|
|0.0| 0.24561403508771928|
|0.0|  0.2631578947368421|
|0.0|  0.2807017543859649|
|0.0|  0.2982456140350877|
|0.0|  0.3157894736842105|
|0.0|  0.3333333333333333|
+---+--------------------+
only showing top 20 rows
ROC面积:1.0
# 只显示了前20个数据,ROC面积为1,说明模型的训练结果十分好

三、多分类

     

# 1.导包
from pyspark.ml.classification import LogisticRegression

# 2.生成数据。 这里用自己的路径
data = spark.read.format("libsvm").load("file:///opt/module/spark/data/mllib/sample_multiclass_classification_data.txt")

# 3.构建模型
lr = LogisticRegression(maxIter=10,regParam=0.3,elasticNetParam=0.8)
lrModel = lr.fit(data)
print(f"当前逻辑回归的各个系数是:{mlrModel.coefficientMatrix}")
print(f"当前逻辑回归的截距是:{mlrModel.interceptVector}")

# 结果:
当前逻辑回归的各个系数是:2 X 692 CSRMatrix
(0,244) 0.0
(0,263) 0.0001
(0,272) 0.0001
(0,300) 0.0001
(0,350) -0.0
(0,351) -0.0
(0,378) -0.0
(0,379) -0.0
(0,405) -0.0
(0,406) -0.0006
(0,407) -0.0001
(0,428) 0.0001
(0,433) -0.0
(0,434) -0.0007
(0,455) 0.0001
(0,456) 0.0001
..
..
当前逻辑回归的截距是:[-0.12065879445860686,0.12065879445860686]

# 4.查看摘要信息
trainingSummary = lrModel.summary
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
 print(objective)

# 5.对于多类,我们可以基于每个标签检查度量
# 假正例率
print("False positive rate by label:")
for i,rate in enumerate(traingingSummary.falsePositiveRateByLabel):
 print("label %d: %s" % (i, rate))
# 结果:
label 0: 0.22
label 1: 0.05
label 2: 0.0

# 真正例率
print("True positive rate by label:")
for i,rate in enumerate(trainingSummary.truePositiveRateByLabel):
 print("label %d: %s" % (i, rate))
#结果:label 0: 1.0
label 1: 1.0
label 2: 0.46

# 查准率
print("Precision by label:")
for i,rate in enumerate(trainingSummary.precisionByLabel):
 print("label %d: %s" % (i, rate))
# 结果:
label 0: 0.6944444444444444
label 1: 0.9090909090909091
label 2: 1.0

# 查全率(召回率)
print("Recall by label:")
for i,rate in enumerate(trainingSummary.recallByLabel):
 print("label %d: %s" % (i, rate))
# 结果:
label 0: 1.0
label 1: 1.0
label 2: 0.46

# F得分,这里需要添加一堆括号,注意
print("F-measure by label:")
for i,rate in enumerate(trainingSummary.fMeasureByLabel()):
 print("label %d: %s" % (i, rate))
# 结果:
label 0: 0.819672131147541
label 1: 0.9523809523809523
label 2: 0.6301369863013699

四、模型选择(又称超参数调整)

     前面三步简单介绍了下逻辑回归二分类和多分类的大体做法,接下来介绍逻辑回归的模型选择,并将前两天学到的管道运用起来,避免遗忘。
     机器学习中的一项重要任务是模型选择,利用数据找到最佳模型或参数,也称为调参。Spark MLlib利用 CrossValidator(交叉验证) 或 TrainValidationSplit(划分训练集验证集)来进行模型的选择。这俩方法的输入要求如下:
      1)Estimator:需要调参的算法或管道Pipeline;
      2)参数列表ParamMap:可供选择的参数,也叫“参数网格”的搜索空间;
      3)Evaluator: 用来评估模型拟合效果的指标或方法。

     更具体来说,模型选择的工作原理:
      1)将数据集划分为训练集和测试集;
      2)对于每一个(训练集,测试集)对,遍历一遍参数列表ParamMap,那么每一个参数都对应一个拟合模型,于是就会得到很多个模型,然后用Evaluator来评估这些模型;
      3)选择性能最优的模型对应的各个参数作为模型的最终参数。

     Evaluator(评估器)可以是用于回归任务的回归评估器 RegressionEvaluator,也可以是用于二分类的二分类评估器 BinaryClassificationEvaluator,当然也可以是用于多分类的多分类评估器 MulticlassClassificationEvaluator。
     我们可以使用ParamGridBuilder来构造参数网格,默认情况下,参数网格中的参数集以串行方式计算。在使用 CrossValidator 或 TrainValidationSplit 运行模型选择之前,通过将并行度设置为 一个≥2的数(值为 1 将是串行的)来并行完成参数评估。这个平行度并不是越大越好,对于一个集群来说,这个值一般不要超过10。

4.1 交叉验证CrossValidator调参

     暂时不在这里介绍交叉验证法,未来会专门写一个专题来介绍验证法。所以此处直接使用该方法

# 1.导包:逻辑回归、管道流、二分类评估器、特诊选择、交叉验证
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF,Tokenizer
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

# 2.生成数据  spark.ml处理的数据格式是DataFrame
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
],['id','text','label'])

# 3.建立管道,本例包含三个阶段:tokenizer,hashingTF,lr  
# 3.1 句子拆分成单词
tokenizer = Tokenizer(inputCol='text',outputCol='words')
# 3.2 单词转换为特征向量
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),outputCol='features')
# hashingTF = HashingTF(inputCol='words',outputCol='features') 也一样
# 3.3 建立逻辑回归模型
lr = LogisticRegression(maxIter=10)
# 3.4 建立管道流
pipeline = Pipeline(stages=[tokenizer,hashingTF,lr])


# 4.通过第3步建立了管道流,这是一个Estimator(调用fit方法后才会一个Model,也就成为了Transform),
#    将它整体放入交叉验证CrossValidator实例中。 CrossValidator的参数有三个:Estimator,ParamMaps, Evaluator
# 利用ParamGridBuilder来构建一个参数搜索空间。利用addGrid来添加参数,利用build来构建设定的参数网格。在此处设定hashingTF有三个参数需要调参,lr两个参数需要调参
# 于是就会产生3*2=6个参数组合,然后由CrossValidator来做选择,交叉验证的折数是2
paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures,[10,100,1000])\
							  .addGrid(lr.regParam,[0.1,0.01])\
							  .build()
# 交叉验证需要输入:待调参的模型、搜索网格、evaluator,交叉验证的折数
crossval = CrossValidator(estimator=pipeline,
						  estimatorParamMaps=paramGrid,
						  evaluator=BinaryClassificationEvaluator(),
						  numFolds=2)
						  
# 5.运行交叉验证,选择最优模型(此时已经自动选择了最优模型)
cvModel = crossval.fit(training)

# 6.准备测试集(没有标签)
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")
], ["id", "text"])

# 7.做预测
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)
# 输出结果:
Row(id=4, text='spark i j k', probability=DenseVector([0.6287, 0.3713]), prediction=0.0)
Row(id=5, text='l m n', probability=DenseVector([0.3454, 0.6546]), prediction=1.0)
Row(id=6, text='mapreduce spark', probability=DenseVector([0.3373, 0.6627]), prediction=1.0)
Row(id=7, text='apache hadoop', probability=DenseVector([0.2749, 0.7251]), prediction=1.0)

# 8.查看最优模型信息:bestModel属性获取最优模型
bestModel = cvModel.bestModel
# 在构建管道流时,lr是管道流的第三个阶段(阶段从0开始),所以要获取指定lr模型,指定阶段即可
lrBestModel = bestModel.stages[2]
print(f"最优模型的系数:{lrBestModel.coefficients}")
print(f"最优模型的截距:{lrBestModel.interceptVector}")

      输出结果见下图
在这里插入图片描述

4.2 训练集验证集划分TrainValidationSplit调参

     TrainValidationSplit将一个数据集划分为训练集和验证集,划分比例参数为trainRatio,假设trainRatio=0.75,那么表示将整个数据集的75%划分为训练集,剩余的25%划分为验证集。下面以线性回归来演示:

# 1.导包
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit

# 2.生成数据
data = spark.read.format("libsvm").load("file:///opt/module/spark/data/mllib/sample_linear_regression_data.txt")

# 3.划分数据集,10%的数据作为测试集,90%的数据作为训练集(后面又会将该部分划分为训练集和验证集)
train,test = data.randomSplit([0.9,0.1],seed=12345)

# 4. 创建模型
lr = LinearRegression(maxIter=10)

# 5.创建搜索空间
paramGrid = ParamGridBuilder()\
				.addGrid(lr.regParam,[0.1,0.01])\
				.addGrid(lr.fitIntercept,[False,True])\
				.addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0])\
				.build()

# 6.设置TrainValidationSplit,训练集中80%的数据用于训练,20%的数据用于验证集(选择模型或称为调参)
tvs = TrainValidationSplit(estimator=lr,
						   estimatorParamMaps=paramGrid,
						   evaluator=RegressionEvaluator(),
						   trainRatio=0.8)

# 7. 拟合数据,得到最优模型
model = tvs.fit(train)

# 8.预测(经过拟合后已经得到最优模型)
model.transform(test)\
    .select("features", "label", "prediction")\
    .show()

     输出的结果如下:
在这里插入图片描述

标签:逻辑,pyspark,0.0,模型,label,参数,调参,print,spark
来源: https://blog.csdn.net/weixin_42535480/article/details/121952942