irpas技术客

在pyspark上使用xgboost_汪喵行_pyspark xgboost模型

网络 3549

xgb是机器学习业界常用模型,在spark上不像RF等有现成的build in model,所以需要自己弄一下,不过也不是很难。

1. 预备工作

首先需要下两个jar文件,xgboost4j-spark-0.72.jar 和xgboost4j-0.72.jar,链接如下。之后要下载一个sparkxgb.zip,里面包括了pyspark代码去call jar文件以及set up一些参数。

xgboost4j

xgboost4j-spark

XGBoost python wrapper

?我们以bank.csv为例来讲如何在spark上运行xgboost

dataset:?https://·/janiobachmann/bank-marketing-dataset

import package

import numpy as np import pandas as pd import os import re from sklearn import metrics import matplotlib.pyplot as plt os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark-0.72.jar,xgboost4j-0.72.jar pyspark-shell' import findspark findspark.init() import pyspark from pyspark.conf import SparkConf from pyspark.sql import SparkSession from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler from pyspark.ml import Pipeline from pyspark.sql.functions import col spark = SparkSession\ .builder\ .appName("PySpark XGBOOST")\ .master("local[*]")\ .getOrCreate() from pyspark.sql.types import * from pyspark.ml.feature import StringIndexer, VectorAssembler spark.sparkContext.addPyFile("sparkxgb.zip") from sparkxgb import XGBoostEstimator import pyspark.sql.functions as F import pyspark.sql.types as T

Load 文件(这部分我的是用的s3改过来没有test,所以可能你在自己电脑上弄得话你用local的需要自己调整下)

df_all = spark\ .read\ .option("header", "true")\ .csv("bank.csv")

因为spark不接受column name是带.的,所以这里把column names都修正一下以防报错。

tran_tab = str.maketrans({x:None for x in list('{()}')}) df_all = df_all.toDF(*(re.sub(r'[\.\s]+', '_', c).translate(tran_tab) for c in df_all.columns)) # fill na df_all = df_all.na.fill(0)

2.data processing

在pyspark train model 时,都需要构建pipeline,pipeline里定义stage来指定操作顺序。

对于一些categorial的变量,我们需要进行一些stringindex的转换,也可以在运用OneHotEncoder再次转换,对于numerical的变量就可以直接定义stage。

转换string 变量:

unused_col = ['day','month'] df_all = df_all.select([col for col in df_all.columns if col not in unused_col]) numeric_features = [t[0] for t in df_all.dtypes if t[1] == 'int'] cols = df_all.columns string_col = [t[0] for t in df_all.dtypes if t[1] != 'int'] string_col = [x for x in string_col if x!='deposit'] for S in string_col: globals()[str(S)+'Indexer'] = StringIndexer()\ .setInputCol(S)\ .setOutputCol(str(S)+'Index')\ .setHandleInvalid("keep") globals()[str(S)+'classVecIndexer'] = OneHotEncoderEstimator(inputCols=[globals()[str(S)+'Indexer'].getOutputCol()], outputCols=[str(S)+ "classVec"]) # zip to one 'feature' columns feature_col = [s+'Index' for s in string_col] feature_col.extend([str(s)+ "classVec" for s in string_col]) feature_col.extend(numeric_features) vectorAssembler = VectorAssembler()\ .setInputCols(feature_col)\ .setOutputCol("features") # index label columns label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label') # define xgboost xgboost = XGBoostEstimator( featuresCol="features", labelCol="label", predictionCol="prediction" )

3.定义pipeline并把之前的操作都加入stages

feat_stage = [globals()[str(S)+'Indexer'] for S in string_col] feat_stage.extend([globals()[str(s)+ "classVecIndexer"] for s in string_col]) feat_stage.extend([vectorAssembler,label_stringIdx,xgboost]) xgb_pipeline = Pipeline().setStages(feat_stage) # split train & test trainDF, testDF = df_all.randomSplit([0.8, 0.2], seed=24)

看看pipeline和stage

4.model training和test

# train model model = xgb_pipeline.fit(trainDF) # predict pre = model.transform(testDF)\ .select(col("label"),col('probabilities'),col("prediction")) # to pandas df cx = pre.toPandas() cx["probabilities"] = cx["probabilities"].apply(lambda x: x.values) cx[['prob_0','prob_1']] = pd.DataFrame(cx.probabilities.tolist(), index= cx.index) cx = cx[["label",'prob_1']].sort_values(by = ['prob_1'],ascending = False)

查看结果

5.evaluate results

#evaluate metrics.roc_auc_score(cx.label, cx.prob_1) # plot ROC curve y_pred_proba =cx.prob_1 fpr, tpr, _ = metrics.roc_curve(cx.label, y_pred_proba) auc = metrics.roc_auc_score(cx.label, y_pred_proba) plt.plot(fpr,tpr,label="data 1, auc="+str(auc)) plt.legend(loc=4) plt.show()

auc:?0.8788617014295159

roc curve:

没怎么调参,大家可以再自己调下参。方法是去到sparkxgb.zip的那个xgboost.py里面调max_depth之类的就可以


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #pyspark #xgboost模型 #in #1