irpas技术客

HBase中Java相关操作_胜利的曙光_hbase java

irpas 8045

HBase Java操作

需求说明:

某自来水公司,需要存储大量的缴费明细数据。以下截取了缴费明细的一部分内容

用户ID姓名用户地址性别缴费时间表示数(本次)表示数(上次)用量(立方)合计金额查表日期最迟缴费日期1234567张三北京市海淀区中关村大街社区6单元216室男2021-05-10308.1283.1251502021-04-252021-06-09

准备工作:

创建IDEA Maven 项目: groupId:com.netsty artifactId:hbse_demo 导入相关pom依赖 <repositories><!-- 代码库 --> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> <updatePolicy>never</updatePolicy> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <version>6.14.3</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <target>1.8</target> <source>1.8</source> </configuration> </plugin> </plugins> </build> 创建包结构和类 在test目录创建 com.netsty.hbase.admin.api_test 包结构 创建TableAmdinTest类 使用java代码创建表

创建一个名为WATER_BILL的表,包含一个列族C1。

// 1) 如何创建表: WATER_BILL 列族为C1 和 C2 @Test public void test01() throws Exception { //1. 创建 Java 连接 hbase的连接对象 Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181"); Connection hBaseConn = ConnectionFactory.createConnection(conf); //2. 根据连接对象获取相关的管理对象: admin | Table Admin admin = hBaseConn.getAdmin(); //3. 执行相关的操作: //3.1: 判断目标表是否存在, 如果不存在 才创建表 : 如果存在返回true 如果不存在 false TableName tableName = TableName.valueOf("WATER_BILL"); boolean flag = admin.tableExists(tableName); if (!flag) { // 说明表不存在: //3.3: 创建表的构建器 TableDescriptorBuilder descriptorBuilder = TableDescriptorBuilder.newBuilder(tableName); List<ColumnFamilyDescriptor> families = new ArrayList<>(); families.add(ColumnFamilyDescriptorBuilder.newBuilder("C1".getBytes()).build()); //families.add(ColumnFamilyDescriptorBuilder.newBuilder("C2".getBytes()).build()); //3.4: 在表构建器中添加 表的列族信息 descriptorBuilder.setColumnFamilies(families); //3.5: 通过表构建器, 构建表信息对象 TableDescriptor desc = descriptorBuilder.build(); //3.2: 执行创建表的操作 admin.createTable(desc); } //4. 释放资源 admin.close(); hBaseConn.close(); } 往表中插入一条数据 @Test public void test02() throws Exception { //1. 根据连接工厂构建hbase的连接对象 Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181"); Connection hBaseConn = ConnectionFactory.createConnection(conf); //2. 根据连接对象, 获取相关的管理对象: admin table Table table = hBaseConn.getTable(TableName.valueOf("WATER_BILL")); //3. 执行相关的操作 Put put = new Put("1234567".getBytes()); put.addColumn("C1".getBytes(), "name".getBytes(), "张三".getBytes()); put.addColumn("C1".getBytes(), "address".getBytes(), "北京市海淀区中关村大街社区6单元216室".getBytes()); put.addColumn("C2".getBytes(), "sex".getBytes(), "男".getBytes()); table.put(put); //4. 释放资源 table.close(); hBaseConn.close(); }

优化操作: 抽取公共方法

private Connection hBaseConn; private Admin admin; private Table table; private String tableName = "WATER_BILL" ; @Before public void before() throws Exception { //1. 根据连接工厂构建hbase的连接对象 Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181"); hBaseConn = ConnectionFactory.createConnection(conf); //2. 根据连接对象, 获取相关的管理对象: admin table admin = hBaseConn.getAdmin(); table = hBaseConn.getTable(TableName.valueOf(tableName)); } @After public void after() throws Exception { //4. 释放资源 table.close(); admin.close(); hBaseConn.close(); } 查看一条数据 //6. 查询 rowkey 为 9996666 的数据 @Test public void test06() throws Exception{ //3. 执行相关的操作 //3.1: 执行查询 Get get = new Get("9996666".getBytes()); Result result = table.get(get); //3.2: 处理结果集 List<Cell> cellList = result.listCells(); // 获取 这一行所有的单元格 for (Cell cell : cellList) { // 单元格: rowkey + 列族 + 列名 + 列值 byte[] rowkeyBytes = CellUtil.cloneRow(cell); byte[] familyBytes = CellUtil.cloneFamily(cell); byte[] qualifierBytes = CellUtil.cloneQualifier(cell); byte[] valueBytes = CellUtil.cloneValue(cell); String rowkey = Bytes.toString(rowkeyBytes); String family = Bytes.toString(familyBytes); String qualifier = Bytes.toString(qualifierBytes); if(qualifier.equals("NUM_CURRENT") || qualifier.equals("NUM_PREVIOUS") ||qualifier.equals("TOTAL_MONEY") || qualifier.equals("NUM_USAGE") ){ Double value2 = Bytes.toDouble(valueBytes); System.out.println("rowkey为:"+rowkey +"; family:"+family +"; qualifier:"+qualifier+"; value:"+value2); }else{ String value1 = Bytes.toString(valueBytes); System.out.println("rowkey为:"+rowkey +"; family:"+family +"; qualifier:"+qualifier+"; value:"+value1); } } } 删除一条数据 //3. 删除数据: 4944191 @Test public void test03() throws Exception { //3. 执行相关操作: Delete delete = new Delete("4944191".getBytes()); delete.addFamily("C2".getBytes()); delete.addColumn("C1".getBytes(),"name".getBytes()); table.delete(delete); } 删除表 //4. 删除表: @Test public void test04() throws Exception { //3. 执行相关操作 if(admin.isTableEnabled(TableName.valueOf(tableName))){ admin.disableTable(TableName.valueOf(tableName)); } admin.deleteTable(TableName.valueOf(tableName)); } 导入数据

说明:

在HBase中,有一个Import的MapReduce作业,可以专门用来将数据文件导入到HBase中。

用法:

hbase org.apache.hadoop.hbase.mapreduce.Import 表名 HDFS数据文件路径

开始导入:

将文件上传到hdfs中

hadoop fs -mkdir -p /water_bill/water_data hadoop fs -put part-m-00000 /water_bill/water_data

执行命令:

hbase org.apache.hadoop.hbase.mapreduce.Import WATER_BILL /water_bill/water_data

注意: 一定要启动yarn集群

导出数据 hbase org.apache.hadoop.hbase.mapreduce.Export WATER_BILL /water_bill/water_data_export 查询数据

查询2020年6月份所有用户的用水量 : C1:RECORD_DATE

@Test public void test05() throws Exception{ //3. 执行相关的操作: scan 扫描 Scan scan = new Scan(); //3.1: 封装过滤条件 SingleColumnValueFilter singleColumnValueFilter1 = new SingleColumnValueFilter("C1".getBytes(),"RECORD_DATE".getBytes(), CompareOperator.GREATER_OR_EQUAL,new BinaryComparator("2021-06-01".getBytes())); SingleColumnValueFilter singleColumnValueFilter2 = new SingleColumnValueFilter("C1".getBytes(),"RECORD_DATE".getBytes(), CompareOperator.LESS,new BinaryComparator("2021-07-01".getBytes())); FilterList filterList = new FilterList(); filterList.addFilter(singleColumnValueFilter1); filterList.addFilter(singleColumnValueFilter2); scan.setFilter(filterList); //3.2: 执行扫描操作 返回多行结果数据 ResultScanner results = table.getScanner(scan); //3.3: 处理结果集 for (Result result : results) { List<Cell> cellList = result.listCells(); // 获取 这一行所有的单元格 for (Cell cell : cellList) { // 单元格: rowkey + 列族 + 列名 + 列值 byte[] rowkeyBytes = CellUtil.cloneRow(cell); byte[] familyBytes = CellUtil.cloneFamily(cell); byte[] qualifierBytes = CellUtil.cloneQualifier(cell); byte[] valueBytes = CellUtil.cloneValue(cell); String rowkey = Bytes.toString(rowkeyBytes); String family = Bytes.toString(familyBytes); String qualifier = Bytes.toString(qualifierBytes); if(qualifier.equals("NUM_CURRENT") || qualifier.equals("NUM_PREVIOUS") ||qualifier.equals("TOTAL_MONEY") || qualifier.equals("NUM_USAGE") ){ Double value = Bytes.toDouble(valueBytes); System.out.println("rowkey为:"+rowkey +"; family:"+family +"; qualifier:"+qualifier+"; value:"+value); }else { String value = Bytes.toString(valueBytes); System.out.println("rowkey为:"+rowkey +"; family:"+family +"; qualifier:"+qualifier+"; value:"+value); } } System.out.println("---------------------------------"); } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #HBase #JAVA