irpas技术客

【Spark Core实战】电商网站用户行为数据分析 (1)_CCH21_spark电商数据分析

irpas 2169

本文实战案例数据来自尚硅谷大数据技术之Spark教程,资源下载链接见评论区。

目录 环境版本信息数据集说明需求说明实现方式1实现方式2运行结果

环境版本信息

操作系统:Windows 10家庭中文版 (21H1) 编程语言:Scala 集成开发环境:IntelliJ IDEA 2021.1.1 (Ultimate Edition) JDK版本:13.0.1 Maven版本:3.8.1 Scala版本:2.12.11 Spark版本:3.0.0

数据集说明

数据集是尚硅谷的电商网站用户行为数据,主要包含用户的4种行为:搜索,点击,下单,支付。数据规则如下:

数据文件中每行数据采用下划线分隔。每一行数据表示用户的一次行为。如果搜索关键字为null,表示数据不是搜索数据。如果点击的品类ID和产品ID为-1,表示数据不是点击数据。针对于下单行为,一次可以下单多个商品,所以品类ID和产品ID可以是多个,ID之间采用逗号分隔;如果本次不是下单行为,则数据采用null表示。支付行为和下单行为类似。

详细字段说明如下:

编号字段名称字段类型字段含义1dateString用户点击行为的日期2user_idLong用户ID3session_idStringSession ID4page_idLong页面ID5action_timeString动作的时间点6search_keywordString用户搜索关键字7click_category_idLong商品品类ID8click_product_idLong商品ID9order_category_idsString一次订单中所有品类的ID集合10order_product_idsString一次订单中所有商品的ID集合11pay_category_idsString一次支付中所有品类的ID集合12pay_product_idsString一次支付中所有商品的ID集合13city_idLong城市ID
需求说明

统计热门品类的Top 10。这里对热门品类的定义是:按照每个品类的点击数量、下单数量、支付数量从高到低排列,即先按照点击数量排名,若相同则比较下单数量,若下单数量相同则比较支付数量。

实现方式1

读取文件中的数据,转换数据结构,其中点击行为转换为(商品品类ID, (1, 0, 0)),下单行为转换为(商品品类ID, (0, 1, 0)),支付行为转换为(商品品类ID, (0, 0, 1))。将商品品类ID相同的数据进行分组聚合,对其降序排序并取Top 10即可满足需求。 在IDEA中新建Maven工程,导入Scala插件支持。

创建data目录,用于存放数据集。项目结构如下:

在pom.xml文件中导入Spark相关依赖:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://·piler.source>13</maven.compiler.source> <maven.compiler.target>13</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.0.0</version> </dependency> </dependencies> </project>

在Maven项目的target/classes/目录下新建log4j.properties文件,输入以下内容,过滤掉除ERROR以外的信息:

log4j.rootCategory=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Set the default spark-shell log level to ERROR. When running the spark-shell, the # log level for this class is used to overwrite the root logger's log level, so that # the user can have different defaults for the shell and regular Spark apps. log4j.logger.org.apache.spark.repl.Main=ERROR # Settings to quiet third party logs that are too verbose log4j.logger.org.spark_project.jetty=ERROR log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

Scala代码如下:

package com.cch.bigdata.spark.core.req import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 需求1:Top10热门品类 */ object Req1_HotCategoryTop10Analysis { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis") val sc = new SparkContext(sparkConf) val actionRDD: RDD[String] = sc.textFile("data/user_visit_action.txt").cache() // 转换数据结构 val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap( action => { val data: Array[String] = action.split("_") if (data(6) != "-1") { // 点击 List((data(6), (1, 0, 0))) } else if (data(8) != "null") { // 下单 val ids: Array[String] = data(8).split(",") ids.map(id => (id, (0, 1, 0))) } else if (data(10) != "null") { // 支付 val ids: Array[String] = data(10).split(",") ids.map(id => (id, (0, 0, 1))) } else { Nil } } ) // 相同品类ID数据分组聚合 val analysisRDD: RDD[(String, (Int, Int, Int))] = flatRDD.reduceByKey( (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3) ) // 降序排序并取Top10 val result: Array[(String, (Int, Int, Int))] = analysisRDD.sortBy(_._2, ascending = false).take(10) result.foreach(println) sc.stop() } } 实现方式2

在使用reduceByKey()方法时会引入Shuffle操作,可以使用累加器对数据进行聚合,提高程序性能。 Scala代码如下:

package com.cch.bigdata.spark.core.req import org.apache.spark.rdd.RDD import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable /** * 需求1:Top10热门品类 */ object Req1_HotCategoryTop10Analysis_Acc { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10AnalysisAcc")) val actionRDD: RDD[String] = sc.textFile("data/user_visit_action.txt").cache() val acc = new HotCategoryAccumulator sc.register(acc, "hotCategory") actionRDD.foreach( action => { val data: Array[String] = action.split("_") if (data(6) != "-1") { // 点击 acc.add((data(6), "click")) } else if (data(8) != "null") { // 下单 val ids: Array[String] = data(8).split(",") ids.foreach(id => acc.add((id, "order"))) } else if (data(10) != "null") { // 支付 val ids: Array[String] = data(10).split(",") ids.foreach(id => acc.add((id, "pay"))) } } ) val accVal: mutable.Map[String, HotCategory] = acc.value val categories: Iterable[HotCategory] = accVal.values val result: List[(String, (Int, Int, Int))] = categories.toList.sortWith( (left, right) => { if (left.clickCnt > right.clickCnt) { true } else if (left.clickCnt == right.clickCnt) { if (left.orderCnt > right.orderCnt) { true } else if (left.orderCnt == right.orderCnt) { left.payCnt > right.payCnt } else { false } } else { false } } ).take(10).map(hc => (hc.cid, (hc.clickCnt, hc.orderCnt, hc.payCnt))) result.foreach(println) sc.stop() } case class HotCategory(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int) /** * 自定义累加器 */ class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] { private val hcMap = mutable.Map[String, HotCategory]() override def isZero: Boolean = hcMap.isEmpty override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = new HotCategoryAccumulator override def reset(): Unit = hcMap.clear() override def add(v: (String, String)): Unit = { val cid = v._1 val actionType = v._2 val category: HotCategory = hcMap.getOrElse(cid, HotCategory(cid, 0, 0, 0)) if (actionType == "click") { category.clickCnt += 1 } else if (actionType == "order") { category.orderCnt += 1 } else if (actionType == "pay") { category.payCnt += 1 } hcMap.update(cid, category) } override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = { val map1: mutable.Map[String, HotCategory] = this.hcMap val map2: mutable.Map[String, HotCategory] = other.value map2.foreach { case (cid, hc) => val category: HotCategory = map1.getOrElse(cid, HotCategory(cid, 0, 0, 0)) category.clickCnt += hc.clickCnt category.orderCnt += hc.orderCnt category.payCnt += hc.payCnt map1.update(cid, category) } } override def value: mutable.Map[String, HotCategory] = hcMap } } 运行结果 (15,(6120,1672,1259)) (2,(6119,1767,1196)) (20,(6098,1776,1244)) (12,(6095,1740,1218)) (11,(6093,1781,1202)) (17,(6079,1752,1231)) (7,(6074,1796,1252)) (9,(6045,1736,1230)) (19,(6044,1722,1158)) (13,(6036,1781,1161))


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #spark电商数据分析 #10家庭中文版 #Idea #202111 #ultimate