irpas技术客

Spark 读写Hbase_金星000_spark读取hbase数据

未知 5822

启动

zookeeper----》hadoop----》hbase

创建hbase表student

create 'student' ,'info'

添加数据

put 'student' ,'1' ,'info:name','James'

put 'student' ,'1' ,'info:age','23'

put 'student' ,'1' ,'info:gender','F'

put 'student' ,'2' ,'info:name','Smith'

put 'student' ,'2' ,'info:age','24'

put 'student' ,'2' ,'info:gender','M'

根据rowkey查询一条记录

get 'student','1'

读取hbase数据,在mycode目录下创建SparkOperateHBase.py文件,添加如下代码

#!/usr/bin/env python3 from pyspark import SparkConf,SparkContext conf = SparkConf().setMaster('local').setAppName("ReadHBase") sc = SparkContext(conf = conf) host = 'localhost' table = 'student' conf = {"hbase.zookeeper.quorum": host,"hbase.mapreduce.inputtable": table} keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" valueConv="org.apache.spark.examples.pythonconverters,HBaseResultToStringConverter" hbase_rdd=sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf) count=hbase_rdd.count() hbase_rdd.cache() output=hbase_rdd.collect() for(k,v) in output: print(k,v)

运行程序

./spark-submit /usr/local/software/spark/mycode/SparkOperateHBase.py

把spark读取hbase的支持jar包导入spark的jars目录下?

cp /usr/local/software/hbase/hbase-2.4.9/lib/hbase*.jar? ?/usr/local/software/spark/spark-3.0.3-bin-hadoop2.7/tars

再次运行

缺少把HBase数据转换成python可读数据的jar包?

在spark jars目录下新建hbase目录

mkdir hbase

下载转换包到spark? ?的jars/hbase目录下

wget https://repo.typesafe.com/typesafe/maven-releases/org/apache/spark/spark-examples_2.11/1.6.0-typesafe-001/spark-examples_2.11-1.6.0-typesafe-001.jar

配置spark-env.sh

export SPARK_DIST_CLASSPATH=$(/usr/local/software/hadoop/hadoop-3.3.0/bin/hadoop classpath):$(/usr/local/software/hbase/hbase-2.4.9/bin/hbase classpath):/usr/local/software/spark/spark-3.0.3-bin-hadoop2.7/jars/hbase/*

再次运行

启动HBase

再次运行

?

?读取成功!

写入

在spark/mycode目录下新建文件SparkWriteHBase.py,代码如下:

#!/usr/bin/env python3 from pyspark import SparkConf,SparkContext conf = SparkConf().setMaster('local').setAppName("WriteHBase") sc = SparkContext(conf = conf) host = 'localhost' table = 'student' keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" valueConv="org.apache.spark.examples.pythonconverters.StringListToPutConverter" conf = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class":"org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class":"org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class":"org.apache.hadoop.io.Writable"} rawData=['3,info,name,Rongcheng','3,info,gender,M','3,info,age,26','4,info,name,Guanhua','4,info,gender,M','4,info,age,27'] sc.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)

执行程序,报如下错误:

这里是因为上面导入的spark-examples_2.11-1.6.0-typesafe-001.jar与当前的hbase版本兼容,因为hbase2.x之后的版本Put类中的add()方法被调整成了addColumn(),导致调用时找不到对应的方法。

这个需要自行下载Spark的1.6版本的源码,修改后重新编译打包

?替换原来的spark-examples_2.11-1.6.0-typesafe-001.jar即可

继续执行程序,

然后进入hbase shell界面,

[hadoop@master bin]$ ./hbase shell

scan 'student' 会看到数据3,4会被写入。

hbase:002:0> scan 'student' ROW COLUMN+CELL 1 column=info:age, timestamp=2022-03-24T21:57:49.420, value=23 1 column=info:gender, timestamp=2022-03-24T21:57:39.878, value=F 1 column=info:name, timestamp=2022-03-24T21:57:20.047, value=Xueqian 2 column=info:age, timestamp=2022-03-24T21:58:23.296, value=24 2 column=info:gender, timestamp=2022-03-24T21:58:40.337, value=M 2 column=info:name, timestamp=2022-03-24T21:58:10.985, value=Weiliang 3 column=info:age, timestamp=2022-03-26T21:39:28.041, value=26 3 column=info:gender, timestamp=2022-03-26T21:39:28.041, value=M 3 column=info:name, timestamp=2022-03-26T21:39:28.041, value=Rongcheng 4 column=info:age, timestamp=2022-03-26T21:39:28.041, value=27 4 column=info:gender, timestamp=2022-03-26T21:39:28.041, value=M 4 column=info:name, timestamp=2022-03-26T21:39:28.041, value=Guanhua 4 row(s)


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #spark读取hbase数据 #Student #info添加数据put #1 #infoname #Jamesput