添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
备案 控制台
学习
实践
活动
专区
工具
TVP
写文章
专栏首页 Coggle数据科学 深入理解XGBoost:分布式实现
3 0

海报分享

深入理解XGBoost:分布式实现

文章来源:公众号【Coggle数据科学】

写在前面

本文将重点介绍XGBoost基于Spark平台Scala版本的实现,带领大家逐步完成特征提取、变换和选择、XGBoost模型训练、Pipelines、模型选择。

XGBoost简单回顾

XGBoost(Extreme Gradient Boosting)由华盛顿大学的陈天奇博士提出,最开始作为分布式(深度)机器学习研究社区(DMLC)小组的研究项目之一。后因在希格斯(Higgs)机器学习挑战赛中大放异彩,被业界所熟知,在数据科学应用中广泛应用。目前,一些主流的互联网公司如腾讯、阿里巴巴等都已将XGBoost应用到其业务中,在各种数据科学竞赛中XGBoost也成为竞赛者们夺冠的利器。XGBoost在推荐、搜索排序、用户行为预测、点击率预测、产品分类等问题上取得了良好的效果。虽然这些年神经网络(尤其是深度神经网络)变得越来越流行,但XGBoost仍旧在训练样本有限、训练时间短、、调参知识缺乏的场景下具有独特的优势。相比深度神经网络,XGBoost能够更好地处理表格数据,并具有更强的可解释性,另外具有易于调参、输入数据不变性等优势。

XGBoost是Gradient Boosting的实现,相比其他实现方法,XGBoost做了很多优化,在模型训练速度和精度上都有明显提升,其优良特性如下。

1)将正则项加入目标函数中,控制模型的复杂度,防止过拟合。

2)对目标函数进行二阶泰勒展开,同时用到了一阶导数和二阶导数。

3)实现了可并行的近似直方图算法。

4)实现了缩减和列采样(借鉴了GBDT和随机森林)。

5)实现了快速直方图算法,引入了基于loss-guide的树构建方法(借鉴了LightGBM)。

6)实现了求解带权值的分位数近似算法(weighted quantile sketch)。

7)可根据样本自动学习缺失值的分裂方向,进行缺失值处理。

8)数据预先排序,并以块(block)的形式保存,有利于并行计算。

9)采用缓存感知访问、外存块计算等方式提高数据访问和计算效率。

10)基于Rabit实现分布式计算,并集成于主流大数据平台中。

11)除CART作为基分类器外,XGBoost还支持线性分类器及LambdaMART排序模型等算法。

12)实现了DART,引入Dropout技术。

目前已经有越来越多的开发人员为XGBoost开源社区做出了贡献。XGBoost实现了多种语言的包,如Python、Scala、Java等。Python用户可将XGBoost与scikit-learn集成,实现更为高效的机器学习应用。另外,XGBoost集成到了Spark、Flink等主流大数据平台中。

分布式XGBoost

也许在竞赛中我们很少或者从不使用分布式XGBoost版本,可是在工业界数据的爆炸式增长的数据规模,单机模式是很难满足用户需求,XGBoost也相应推出了分布式版本,这也是XGBoost如此流行的重要原因。本文将重点介绍XGBoost基于Spark平台的实现,带领大家逐步完成Spark版本的特征提取、变换和选择,以及XGBoost模型训练、Pipelines、模型选择。

1. 基于Spark平台的实现

Spark是一个通用且高效的大数据处理引擎,它是基于内存的大数据并行计算框架。因为Spark计算基于内存,因此能够保证大数据计算的实时性,相比传统的Hadoop MapReduce效率提升很多。Spark拥有一个丰富的生态环境,以Spark为核心,涵盖支持:结构化数据查询与分析的Spark SQL、分布式机器学习库MLlib、并行图计算框架GraphX、可容错流计算框架Spark Streaming等。由于Spark在工业界广泛应用,用户群体庞大,因此XGBoost推出了XGBoost4J-Spark以支持Spark平台。

1.1 Spark架构

如图1所示,Spark主要由如下组件构成。

  • Client:提交Spark job的客户端。
  • Driver:接受Spark job请求,启动SparkContext。
  • SparkContext:整个应用的上下文,可以控制应用的生命周期。
  • ClusterManager:集群管理器,为Application分配资源,包括多种类型,如Spark自带的Standalone、Meso或者YARN等。
  • Worker:集群中任意可执行Application代码的节点,运行一个或者多个Executor。
  • Executor:在Worker节点中提交Application的进程,启动并运行任务,负责将数据存于内存或者硬盘中。每个Application均有各自的Executor执行任务。

由图1可知,Spark作业提交流程如下:首先Client提交应用,Driver接收到请求后,启动SparkContext。SparkContext连接ClusterManager,ClusterManager负责为应用分配资源。Spark将在集群节点中获取到执行任务的Executor,这些Executor负责执行计算和存储数据。Spark将应用程序的代码发送给Executor,最后SparkContext将任务分配给Executor去执行。

图1 Spark架构

在Spark应用中,整个执行流程在逻辑上会转化为RDD(Resilient Distributed Dataset,弹性分布式数据集)的DAG(Directed Acyclic Graph,有向无环图)。RDD是Spark的基本运算单元,后续会详细介绍。Spark将任务转化为DAG形式的工作流进行调度,并进行分布式分发。图2通过示例展示了Spark执行DAG的整个流程。

图2 Spark执行DAG的整个流程

在图2中,Transformations是RDD的一类操作,包括map、flatMap、filter等,该类操作是延迟执行的,即从一个RDD转化为另一个RDD不立即执行,而只是将操作记录下来,直到遇到Actions类的操作才会真正启动计算过程进行计算。Actions类操作会返回结果或将RDD数据写入存储系统,是触发Spark启动计算的动因。Action算子触发后,将所有记录的算子生成一个RDD,Spark根据RDD之间的依赖关系将任务切分为不同的阶段(stage),然后由调度器调度RDD中的任务进行计算。图2中的A~E分别代表不同的RDD,RDD中的方块代表不同的分区。Spark首先通过HDFS将数据读入内存,形成RDD A和RDD C。RDD A转化为RDD B,RDD C执行map操作转化为RDD D,RDD B和RDD E执行join操作转化为RDD F。RDD B和RDD E连接转化为RDD F的过程中会执行Shuffle操作,最后RDD F通过函数saveAsSequenceFile输出并保存到HDFS上。

1.2 RDD

Spark引入了RDD概念,RDD是分布式内存数据的抽象,是一个容错的、并行的数据结构,是Spark中基本的数据结构,所有计算均基于该结构进行,Spark通过RDD和RDD操作设计上层算法。

RDD作为数据结构,本质上是一个只读的分区记录的集合,逻辑上可以把它想象成一个分布式数组,数组中的元素可以为任意的数据结构。一个RDD可以包含多个分区,每个分区都是数据集的一个子集。RDD可以相互依赖,通过依赖关系形成Spark的调度顺序,通过RDD的操作形成整个Spark程序。

RDD有两种操作算子:转换(transformation)与行动(actions)。

1. 转换

转换操作是延迟执行的,即从一个RDD转化为另一个RDD,且不立即执行,而只是将操作记录下来,直到遇到Actions类的操作才会真正启动计算过程。转换操作包括map、flatMap、mapPartitions等多种操作,下面对常用的转换操作进行介绍。

  • map:对原始RDD中的每个元素执行一个用户自定义函数生成一个新的RDD。任何原始RDD中的元素在新的RDD中有且只有一个元素与之对应。
  • flatMap:与map类似,原始RDD中的元素通过函数生成新的元素,并将生成的RDD的每个集合中的元素合并为一个集合。
  • mapPartitions:获取每个分区的迭代器,在函数中对整个迭代器的元素(即整个分区的元素)进行操作。
  • union:将两个RDD合并,合并后不进行去重操作,保留所有元素。使用该操作的前提是需要保证RDD元素的数据类型相同。
  • filter:对元素进行过滤,对每个元素应用函数,返回值为True的元素被保留。
  • sample:对RDD中的元素进行采样,获取所有元素的子集。
  • cache:将RDD元素从磁盘缓存到内存,相当于persist(MEMORY_ONLY)。
  • persist:对RDD数据进行缓存,由参数StorageLevel决定数据缓存到哪里,如DISK_ONLY表示仅磁盘缓存、MEMORY_AND_DISK表示内存和磁盘均缓存等。
  • groupBy:将RDD中元素通过函数生成相应的key,然后通过key对元素进行分组。
  • reduceByKey:将数据中每个key对应的多个value进行用户自定义的规约操作。
  • join:相当于SQL中的内连接,返回两个RDD以key作为连接条件的内连接。

2. 行动

行动操作会返回结果或将RDD数据写入存储系统,是触发Spark启动计算的动因。行动操作包括foreach、collect等。下面对常用的行动操作进行介绍。

  • foreach:对RDD中每个元素都调用用户自定义函数操作,返回Unit。
  • collect:对于分布式RDD,返回一个scala中的Array数组。
  • count:返回RDD中元素的个数。
  • saveAsTextFile:将数据以文本的形式存储到HDFS的指定目录。

DataSet是分布式的数据集合,它是在Spark 1.6之后新增的一个接口,其不但具有RDD的优点,而且同时具有Spark SQL优化执行引擎的优势。DataFrame是一个具有列名的分布式数据集,可以近似看作关系数据库中的表,但DataFrame可以从多种数据源进行构建,如结构化数据文件、Hive中的表、RDD等。DataFrame API可以在Scala、Java、Python和R中使用。下面只介绍几个常用的API(更多API可以参考相关资料[插图])。

  • select(cols:Column*):选取满足表达式的列,返回一个新的DataFrame。其中,cols为列名或表达式的列表。
  • filter(condition:Column):通过给定条件过滤行。
  • count():返回DataFrame行数。
  • describe(cols:String*):计算数值型列的统计信息,包括数量、均值、标准差、最小值、最大值。
  • groupBy(cols:Column*):通过指定列进行分组,分组后可通过聚合函数对数据进行聚合。
  • join(right:Dataset[_]):和另一个DataFrame进行join操作。
  • withColumn(colName:String,col:Column):添加列或者替换具有相同名字的列,返回新的DataFrame。

1.3 XGBoost4J-Spark

随着Spark在工业界的广泛应用,积累了大量的用户,越来越多的企业以Spark为核心构建自己的数据平台来支持挖掘分析类计算、交互式实时查询计算,于是XGBoost4J-Spark应运而生。本节将介绍如何通过Spark实现机器学习,如何将XGBoost4J-Spark很好地应用于Spark机器学习处理的流水线中。

XGBoost4J-Spark在jvm-package中实现,因此在工程中调用XGBoost4J时,只需在pom.xml文件中加入如下依赖即可:

<dependency>
  <groupId>ml.dmlc</groupId>
  <artifactId>xgboost4j-spark</artifactId>
  <version>0.7</version>
</dependency>

图3展示了如何将XGBoost4J-Spark应用于Spark机器学习处理的流水线框架中。首先通过Spark将数据加载为RDD、DataFrame或DataSet。如果加载类型为DataFrame/DataSet,则可通过Spark SQL对其进行进一步处理,如去掉某些指定的列等。由Spark MLlib库完成特征工程,其提供了多种特征工程的方法供用户选择,此步骤是机器学习过程中非常重要的一步,因为好的特征可以决定机器学习的上限。特征工程完成后,便可将生成的训练数据送入XGBoost4J-Spark中进行训练,在此过程中可通过Spark MLlib进行参数调优,得到最优模型。得到训练模型后对预测集进行预测,最终得到预测结果。为了避免每次重复的训练模型,可将训练好的模型保存下来,在使用时直接加载即可。另外,训练完成后,XGBoost4J-Spark可对特征重要程度进行排名。最后,形成数据产品应用于相关业务。

图3 XGBoost4J-Spark模型训练流程图

0.70版本及以上版本的XGBoost4J-Spark支持用户在Spark中使用低级和高级内存抽象,即RDD和DataFrame/DataSet,而低版本(0.60版本)的仅支持RDD方式。DataFrame/DataSet可以近似看作数据库的一张表,不但包含数据,而且包含表结构,是结构化的数据。用户可以方便地利用Spark提供的DataFrame/DataSet API对其操作,也可以通过用户自定义函数(UDF)进行处理,例如,通过select函数可以很方便地选取需要的特征形成一个新的DataFrame/DataSet。以下示例将结构化数据保存在JSON文件中,并通过Spark的API解析为DataFrame,并以两行Scala代码来训练XGBoost模型。

1.val df = spark.read.json("data.json")  
2.//调用 XGBoost API 训练DataFrame类型的训练集
3.val xgboostModel = XGBoost.trainWithDataFrame(  
4.      df, paramMap, numRound, nWorkers, useExternalMemory) 

上述代码是XGBoost4J-Spark 0.7x版本的实现代码,XGBoost4J-Spark 0.8x及以上版本中的部分API有所改动。训练代码如下:

1.val xgbClassifier = new XGBoostClassifier(paramMap).  
2.                    setFeaturesCol("features").  
3.                    setLabelCol("label")  
4.val xgbClassificationModel = xgbClassifier.fit(df)  

下面通过示例简单介绍XGBoost4J-Spark中的一些常用API,其他可参考官方文档。 首先,加载数据集,可通过Spark进行读取,例如外部文件加载、Spark SQL等。 然后,设置模型参数,可根据具体问题及数据分布调整模型参数:

1.val paramMap = Map(  
2.    "eta" -> 0.1f,   
3.    "num_class" -> 3,   
4.    "max_depth" -> 3,   
5.    "objective" -> "multi:softmax")  

模型训练调用方式这里不再赘述,下面介绍训练函数中各参数的含义。

  • trainingData:训练集RDD。
  • params:模型训练参数。
  • round:模型迭代轮数。
  • nWorkers:XGBoost训练节点个数,如果设为0,则XGBoost会将训练集RDD的分区数作为nWorkers的数量。
  • obj:用户定义的目标函数,默认为Null。
  • eval:用户定义的评价函数,默认为Null。
  • useExternalMemory:是否利用外存缓存,如果设置为True,则可以节省运行XGBoost的RAM成本。
  • missing:数据集中指定为缺省值的值(注意,此处为XGBoost会将 missing值作为缺省值,在训练之前会将missing值置为空)。

模型训练完成之后,可将模型文件进行保存以供预测时使用。模型被保存为Hadoop文件,存储于HDFS上。0.7版本通过saveModelAsHadoopFile可实现该功能,调用示例如下:

xgboostModel.saveModelAsHadoopFile("/tmp/bst.model")  

0.8及以上版本直接可通过save函数实现,如下:

xgboostModel.write.overwrite().save("/tmp/bst.model")  

XGBoost可以将之前训练好的模型文件直接加载,以供使用,0.7x版本代码如下:

val model = XGBoost.loadModelFromHadoopFile("/tmp/bst.model")

0.8及以上版本,如下:

val model = XGBoostClassificationModel.load("/tmp/bst.model")

此处为分类模型,若为回归模型则为:

val model = XGBoostRegressionModel.load("/tmp/bst.model")

将预测集传入训练好的模型即可进行预测,0.7x版本对RDD类型数据预测代码,如下:

val predicts = model.predict(test)  

0.8及以上版本则直接对DataSet类型数据进行预测,如下:

val predicts = model.transform(test)  

Spark训练好的模型也可以下载到本地,通过本地的XGBoost(Python、Java或Scala)加载并进行预测。这样既可以实现模型通过分布式训练海量样本,提高模型的准确度,又可以通过单机调用分布式训练的模型进行预测,提高模型预测速度。

用户不仅可以通过DataFrame/DataSet API对数据集进行操作,而且可以通过Spark提供的MLlib机器学习包对特征进行处理。MLlib是构建于Spark之上的机器学习库,由通用的学习算法和工具类组成。通过MLlib可以方便地对特征进行提取和转化。MLlib还提供了非常丰富的算法,包括分类、回归、聚类、协同过滤、降维等,用户可以根据应用场景将这些算法和XGBoost结合使用。另外,MLlib还提供了模型选择工具,用户可以通过API定义的自动参数搜索过程来选择最佳模型。

特征提取、变换和选择

在将训练集送入XGBoost4J-Spark训练之前,可以首先通过MLlib对特征进行处理,包括特征提取、变换和选择。这是在进行模型训练前十分重要的一步,但不是必需的,用户可以根据应用场景进行选择。

在MLlib中,特征提取方法主要有如下3种。

  • TF-IDF:词频率-逆文档频率,是常见的文本预处理步骤。字词的重要性随着它在文件中出现的次数呈正比增加,但也会随着它在语料库中出现的频率呈反比下降。
  • Word2Vec:其将文档中的每个单词都映射为一个唯一且固定长度的向量。
  • CountVectorizer:用向量表示文档中每个词出现的次数。 特征变换在Spark机器学习流水线中占有重要地位,广泛应用在各种机器学习场景中。MLlib提供了多种特征变换的方法,此处只选择常用的方法进行介绍。

(1)StringIndexer

StringIndexer将标签的字符串列编码为标签索引列。索引取值为[0,numLabels],按标签频率排序。如表1所示,category列为原数据列,categoryIndex列为通过StringIndexer编码后的列。a出现最频繁(编码为0.0),依次为c(编码为1.0)、b(编码为2.0)。

表1 StringIndexer编码

调用代码非常简单,只需如下两行即可实现:

1.val indexer = new StringIndexer()  
2.              .setInputCol("category")  
3.              .setOutputCol("categoryIndex")  
5.val indexed = indexer.fit(df).transform(df)  

(2)OneHotEncoder

OneHotEncoder将一列标签索引映射到一列二进制向量,最多只有一个单值,可以将前面StringIndexer生成的索引列转化为向量。OneHotEncoder主要应用于类别特征上,如性别、国籍等。类别特征不能直接应用于机器学习模型中,因为即使通过StringIndexer将字符串转为数值型特征后,模型往往默认数据是连续的,并且是有序的;但是,类别特征数字并不是有序的,只是每个数字代表一个类别。

OneHotEncoder可以结合StringIndexer使用,代码如下:

1.val indexer = new StringIndexer()  
2.              .setInputCol("category")  
3.              .setOutputCol("categoryIndex")  
4.             .fit(df)  
5.val indexed = indexer.transform(df)  
7.val encoder = new OneHotEncoder()  
8.             .setInputCol("categoryIndex")  
9.             .setOutputCol("categoryVec")  
11.val encoded = encoder.transform(indexed)  

(3)Normalizer

Normalizer可以将多行向量输入转化为统一的形式。参数p(默认为2)用来指定正则化操作中使用的p-norm。正则化操作可以使输入数据标准化并提高后期模型的效果。

1.val normalizer = new Normalizer()  
2.                .setInputCol("features")  
3.                .setOutputCol("normFeatures")  
4.                .setP(1.0)  
6.val l1NormData = normalizer.transform(dataFrame)

(4)StandardScaler

StandardScaler处理Vector数据,标准化每个特征使得其有统一的标准差及(或者)均值为零。它有如下参数:

1)withStd:默认值为真,使用统一标准差方式。

2)withMean:默认为假。这种方法将产生一个稠密输出,所以不适用于稀疏输入。

1.val scaler = new StandardScaler()  
2.            .setInputCol("features")  
3.            .setOutputCol("scaledFeatures")  
4.            .setWithStd(true)  
5.            .setWithMean(false)  
7.// 通过拟合StandardScaler计算汇总统计信息
8.val scalerModel = scaler.fit(dataFrame)  
10.// 标准化特征 
11.val scaledData = scalerModel.transform(dataFrame)  

(5)MinMaxScaler

MinMaxScaler通过重新调节大小将Vector形式的列转换到指定的范围内,通常为[0,1]。它的参数有以下2个。

1)min:默认为0.0,为转换后所有特征的上边界。

2)max:默认为1.0,为转换后所有特征的下边界。

1.val scaler = new MinMaxScaler()  
2.            .setInputCol("features")  
3.            .setOutputCol("scaledFeatures")  
5.// 计算统计信息,生成MinMaxScalerModel
6.val scalerModel = scaler.fit(dataFrame)  
8.// 重新缩放每个特征至[min, max]范围
9.val scaledData = scalerModel.transform(dataFrame)  

(6)SQLTransformer

SQLTransformer实现了基于SQL语句定义的特征转换,如“SELECT...FROM__THIS__...”,其中“__THIS__”表示输入数据集的基础表。

1.val df = spark.createDataFrame(  
2.  Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")  
4.val sqlTrans = new SQLTransformer().setStatement(  
5.  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")  
7.sqlTrans.transform(df)  

(7)VectorAssembler

VectorAssembler将给定的列列表组合到单个向量列中。它可以将原始特征和一系列通过其他转换器得到的特征合并为单一的特征向量,以训练如逻辑回归和决策树等机器学习算法。

1.val assembler = new VectorAssembler()  
2.               .setInputCols(Array("hour", "mobile", "userFeatures"))  
3.               .setOutputCol("features")  
5.val output = assembler.transform(dataset) 

除了以上介绍的几种方法之外,MLlib还提供了其他特征变换方法,如用于特征分桶的Bucketizer、用于降维的PCA等,此处不再一一介绍,读者如感兴趣可查阅相关资料[插图],基于应用场景合理选择相应的特征转变换方法。

特征选择是指通过剔除不相关或冗余的特征,从而达到减少特征个数、提高模型精确度、减少运行时间的目的。MLlib提供了如下几种特征选择的方法。

  • VectorSlicer:从特征向量中输出一个新特征向量,该新特征向量为原特征向量的子集,在向量列中提取特征时很有用。
  • RFormula:选择由R模型公式指定的列。
  • ChiSqSelector:Chi-Squared特征选择,应用于类别特征数据。

XGBoost模型训练

在进行XGBoost模型训练前,通过MLlib对数据集进行特征提取、变换、选择,能够使数据集的特征更具有代表性,减少模型受到的噪声干扰,提高模型精度。另外,选取出真正相关的特征简化模型,协助理解数据产生的过程。下面通过示例介绍如何将MLlib的特征提取、变换、选择与XGBoost结合起来,此处采用iris数据集。下面给出来0.8x版本的具体实现:

1.import ml.dmlc.xgboost4j.scala.spark.{TrackerConf, XGBoostClassificationModel, 
   XGBoostClassifier, XGBoostRegressionModel, XGBoostRegressor}  
2.import org.apache.spark.ml.feature.StringIndexer  
3.import org.apache.spark.ml.feature.VectorAssembler  
4.import org.apache.spark.sql.types.{DoubleType, StringType, StructField, 
   StructType}  
6.// 读取数据集,生成DataFrame  
7.val schema = new StructType(Array(  
8.  StructField("sepal length", DoubleType, true),  
9.  StructField("sepal width", DoubleType, true),  
10.  StructField("petal length", DoubleType, true),  
11.  StructField("petal width", DoubleType, true),  
12.  StructField("class", StringType, true)))  
13.val df = spark.read.schema(schema).csv("{HDFS PATH}/iris.txt")  
15.// 定义StringIndexer,将字符串类型列class转为数值型列label  
16.val indexer = new StringIndexer()  
17.  .setInputCol("class")  
18.  .setOutputCol("label")  
20.// 对前述定义的列进行转换,并去掉原来的classz字段  
21.val labelTransformed = indexer.fit(df).transform(df).drop("class")  
23.// 对特征进行vectorAssembler,生成features列  
24.val vectorAssembler = new VectorAssembler().  
25.  setInputCols(Array("sepal length", "sepal width", "petal length", 
     "petal width")).  
26.  setOutputCol("features")  
27.val xgbInput = vectorAssembler.transform(labelTransformed).select
   ("features", "label")  
29.// 定义训练参数  
30.val paramMap = Map(  
31.    "eta" -> 0.1f,   
32.    "num_class" -> 3,   
33.    "max_depth" -> 3,   
34.    "objective" -> "multi:softmax",  
35.    "num_round" -> 10,  
36.    "num_workers" -> 1)  
38.// 训练模型  
39.val xgbClassifier = new XGBoostClassifier(paramMap).setFeaturesCol("features").
   setLabelCol("label")  
40.val xgbClassificationModel = xgbClassifier.fit(xgbInput)  

Pipelines

MLlib中的Pipeline主要受scikit-learn项目的启发,旨在更容易地将多个算法组合成单个管道或工作流,向用户提供基于DataFrame的更高层次的API库,以更方便地构建复杂的机器学习工作流式应用。一个Pipeline可以集成多个任务,如特征变换、模型训练、参数设置等。下面介绍几个重要的概念。

  • DataFrame:相比于RDD,DataFrame还包含schema信息,可以将其近似看作数据库中的表。
  • Transformer:Transformer可以看作将一个DataFrame转换成另一个DataFrame的算法。例如,模型即可看作一个Transformer,它将预测集的DataFrame转换成了预测结果的DataFrame。
  • Estimator:一种可以适应DataFrame来生成Transformer的算法,操作于DataFrame数据并生成一个Transformer。
  • Pipeline:可以连接多个Transformer和Estimator形成机器学习的工作流。
  • Parameter:设置Transformer和Estimator的参数。

Pipeline是多个阶段形成的一个序列,每个阶段都是一个Transformer或者Estimator。这些阶段按顺序执行,当数据通过DataFrame输入Pipeline中时,数据在每个阶段按相应规则进行转换。在Transformer阶段,对DataFrame调用transform()方法。在Estimator阶段,对DataFrame调用fit()方法产生一个Transformer,然后调用该Transformer的transform()。

MLlib允许用户将特征提取/变换/选择、模型训练、数据预测等构成一个完整的Pipeline。XGBoost也可以作为Pipeline集成到Spark的机器学习工作流中。下面通过示例介绍如何将特征处理的Transformer和XGBoost结合起来构成Spark的Pipeline。0.8.x版本的实现代码如下:

1.import ml.dmlc.xgboost4j.scala.spark.{TrackerConf, XGBoostClassificationModel, 
   XGBoostClassifier, XGBoostRegressionModel, XGBoostRegressor}   
2.import ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator  
3.import org.apache.spark.ml.feature.StringIndexer  
4.import org.apache.spark.ml.feature.VectorAssembler  
5.import org.apache.spark.sql.types.{DoubleType, StringType, StructField, 
   StructType}  
6.import org.apache.spark.ml.Pipeline  
8.// 读取数据集,生成DataFrame  
9.val schema = new StructType(Array(  
10.  StructField("sepal length", DoubleType, true),  
11.  StructField("sepal width", DoubleType, true),  
12.  StructField("petal length", DoubleType, true),  
13.  StructField("petal width", DoubleType, true),  
14.  StructField("class", StringType, true)))  
15.val df = spark.read.schema(schema).csv("{HDFS PATH}/iris.txt")  
17.// 定义StringIndexer,将字符串类型列class转为数值型列label  
18.val indexer = new StringIndexer().  
19.   setInputCol("class").  
20.   setOutputCol("label")  
22.// 对特征进行vectorAssembler,生成features列  
23.val vectorAssembler = new VectorAssembler().  
24.  setInputCols(Array("sepal length", "sepal width", "petal length", 
     "petal width")).  
25.  setOutputCol("features")  
27.// 定义训练参数  
28.val paramMap = Map(  
29.    "eta" -> 0.1f,   
30.    "num_class" -> 3,   
31.    "max_depth" -> 3,   
32.    "objective" -> "multi:softmax",  
33.    "num_round" -> 10,  
34.    "num_workers" -> 1)  
36.// 定义模型  
37.val xgbClassifier = new XGBoostClassifier(paramMap).
      setFeaturesCol("features").setLabelCol("label")  
39.// 构建pipeline           
40.val pipeline = new Pipeline().setStages(Array(indexer, vectorAssembler, 
   xgbClassifier))  
41.val model = pipeline.fit(df)  
43.// 预测  
44.val predict = model.transform(df)  

模型选择

模型选择是机器学习中非常重要的任务,即通过数据找到具体问题的最佳模型和参数,也称超参数调整。模型选择可以在单独的Estimator(如逻辑回归)中完成,也可以在包含多个算法或者其他步骤的Pipeline中完成。用户可以一次调整整个Pipeline中的参数,而不是单独调整Pipeline中的每一个元素。MLlib支持CrossValidator和TrainValidationSplit两个模型选择工具。

(1)CrossValidator

即交叉验证,将数据集划分为若干份子集分别进行训练和测试。例如,设置k值为3,CrossValidator将产生3组数据,每组数据中的2/3作为训练集进行训练,1/3作为测试集进行测试。CrossValidator计算3组数据训练模型的评估准则的平均值。确定了最佳参数之后,CrossValidator使用最佳参数重新对整个数据集进行拟合得到最终模型。

(2)Train-Validation Split

除了CrossValidator之外,MLlib还提供了Train-Validation Split用以超参数调整。和CrossValidator不同的是,Train-Validation Split只验证1次,而非k次。Train-Validation Split的计算代价相较于CrossValidator更低,但是当训练数据集不够大时,结果可靠性不高。Train-Validation Split通过trainRatio参数将数据集分成两个部分。例如,设置trainRatio=0.75,TrainValidation Split则将75%的数据用于训练,25%的数据用于测试。

模型选择确定最佳参数是最大限度提高XGBoost模型的关键步骤之一。通过手工调整参数是一项费时又乏味的过程。最新版本的XGBoost4J-Spark可以通过MLlib的模型选择工具进行参数调优,极大地提高了机器学习过程中参数调优的效率。下面通过一个示例来说明如何利用MLlib模型选择工具对XGBoost进行参数调优。0.8x版本的实现代码如下:

1.import org.apache.spark.ml.tuning.ParamGridBuilder  
2.import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator  
3.import org.apache.spark.ml.tuning.TrainValidationSplit  
5.// 创建xgbClassifier      
6.val xgbClassifier = new XGBoostClassifier(paramMap).
      setFeaturesCol("features").setLabelCol("label")   
8.// 设定参数调优时参数的范围    
9.val paramGrid = new ParamGridBuilder().    
10.       addGrid(xgbEstimator.maxDepth, Array(5, 6)).    
11.       addGrid(xgbEstimator.eta, Array(0.1, 0.4)).   
12.       build()    
14.// 构建TrainValidationSplit,设置trainRatio=0.8,即80%的数据用于训练,20%的数据用于测试    
15.val tv = new TrainValidationSplit().    
16.       setEstimator(xgbEstimator).    
17.       setEvaluator(new MulticlassClassificationEvaluator().
                       setLabelCol("label")).    
18.       setEstimatorParamMaps(paramGrid).