irpas技术客

Java API操作 HBase_noobiee

网络 1289

导入数据问题 使用HBase原生Client API。(Shell)使用HBase提供的TableOutputFormat,原理是通过一个Mapreduce作业将数据导入HBase。使用Bulk Load方式:原理是使用MapReduce作业以HBase的内部数据格式输出表数据,然后直接将生成的HFile加载到正在运行的HBase中。

对比:

前两种方式:需要频繁的与数据所存储的RegionServer通信,一次性导入大量数据时,可能占用大量Regionserver资源,影响存储在该Regionserver上其他表的查询。第三种方式:HBase在HDFS中是以HFile文件结构存储的,一个比较高效便捷的方法就是先生成HFile,再将生成的HFile加载到正在运行的HBase中。即使用HBase提供的HFileOutputFormat2类或者importtsv工具来完成上述操作。通常来说,在数据量很大的情况下,使用第三种方式(Bulk Load)更好。

Java API 操作 HBase

HBase 2.0.5?Jave?API 中使用HTableDescriptor与HColumnDescriptor时提示不推荐使用了,并且在3.0.0版本将删除,而是使用TableDescriptorBuilder和ColumnFamilyDescriptorBuilder.

public class HBaseOperation { public static Configuration configuration; public static Connection connection; public static Admin admin; // 建立链接 public static void init() { configuration=HBaseConfiguration.create(); configuration.set("hbase.rootdir", "hdfs://localhost:9000/hbase"); try { connection=ConnectionFactory.createConnection(configuration); admin=connection.getAdmin(); }catch(IOException e) { e.printStackTrace(); } } // 关闭连接 public static void close() { try { if(admin !=null) { admin.close(); } if(connection !=null) { connection.close(); } }catch(IOException e) { e.printStackTrace(); } } // 建立表 public static void createTable(String myTableName,String[] colFamily) throws IOException{ init(); TableName tableName=TableName.valueOf(myTableName); if (admin.tableExists(tableName)) { System.out.println("table is exist"); }else { List<ColumnFamilyDescriptor> colFamilyList=new ArrayList<>(); TableDescriptorBuilder tableDesBuilder=TableDescriptorBuilder.newBuilder(tableName); for(String str:colFamily) { ColumnFamilyDescriptor colFamilyDes=ColumnFamilyDescriptorBuilder.newBuilder(str.getBytes()).build(); colFamilyList.add(colFamilyDes); } TableDescriptor tableDes=tableDesBuilder.setColumnFamilies(colFamilyList).build(); admin.createTable(tableDes); } close(); } // 删除表 public static void deleteTable(String myTableName) throws IOException { init(); TableName tableName=TableName.valueOf(myTableName); if(admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); System.out.println("delete "+myTableName+" successful!"); } close(); } // 列出数据库中所有的表 public static void listTables() throws IOException{ init(); for(TableName table:admin.listTableNames()) { System.out.println(table); } close(); } // 向表中添加一个列族 public static void addColumnFamily(String myTableName, String colFamily) throws IOException{ init(); TableName tableName=TableName.valueOf(myTableName); if(admin.tableExists(tableName)) { TableDescriptor tableDes=TableDescriptorBuilder.newBuilder(tableName).build(); ColumnFamilyDescriptor colFamilyDes=ColumnFamilyDescriptorBuilder.newBuilder(colFamily.getBytes()).build(); admin.addColumnFamily(tableName, colFamilyDes); System.out.println("add "+colFamily+" successful!"); } close(); } // 从表中移除一个列族 public static void removeColumnFamily(String myTableName,String colFamily) throws IOException { init(); TableName tableName=TableName.valueOf(myTableName); if(admin.tableExists(tableName)) { TableDescriptor tableDes=TableDescriptorBuilder.newBuilder(tableName).build(); admin.deleteColumnFamily(tableName, colFamily.getBytes()); System.out.println("remove "+colFamily+" successful!"); } close(); } // 描述表的详细信息 public static void describeTable(String myTableName) throws IOException{ init(); TableName tableName=TableName.valueOf(myTableName); if(admin.tableExists(tableName)) { ColumnFamilyDescriptor[] colFamilies=admin.getDescriptor(tableName).getColumnFamilies(); System.out.println("==============describe "+myTableName+" ================"); for(ColumnFamilyDescriptor colFamily:colFamilies) { System.out.println(colFamily.getNameAsString()); System.out.println(colFamily.getBlocksize()); System.out.println(colFamily.getConfigurationValue(myTableName)); System.out.println(colFamily.getMaxVersions()); System.out.println(colFamily.getEncryptionType()); System.out.println(colFamily.getTimeToLive()); System.out.println(colFamily.getDFSReplication()); System.out.println(); } } close(); } } //添加数据 public void insert() { Configuration conf=HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.112.111"); //通过配置获取一个链接 Connection connection=null; Table table =null; try { //通过配置获取一个链接 connection = ConnectionFactory.createConnection(conf); //通过链接获取一个table table = connection.getTable(TableName.valueOf("mystudent")); //然后在构建一个put对象,每行数据都是一个Put对象,创建行健的时候是通过行健创建的 Put put=new Put(Bytes.toBytes("s001")); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"), Bytes.toBytes("Tom")); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("sex"), Bytes.toBytes("man")); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("yuwen"), Bytes.toBytes("89")); table.put(put); } catch (Exception e) { e.printStackTrace(); }finally { try { table.close(); connection.close(); } catch (Exception e2) { e2.printStackTrace(); } } } //获取数据 public void testGet() { Configuration conf=HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.112.111"); //通过配置获取一个链接 Connection connection=null; Table table =null; try { //通过配置获取一个链接 connection = ConnectionFactory.createConnection(conf); //通过链接获取一个table table = connection.getTable(TableName.valueOf("mystudent")); //构造一个get对象 Get get=new Get(Bytes.toBytes("s001")); //指定返回列,如果不指定返回列那么就是返回正行的数据 get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); get.addColumn(Bytes.toBytes("grade"), Bytes.toBytes("yuwen")); Result result = table.get(get); System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("grade"), Bytes.toBytes("yuwen")))); } catch (Exception e) { e.printStackTrace(); }finally { try { table.close(); connection.close(); } catch (Exception e2) { e2.printStackTrace(); } } }

Rowkey问题

HBase的数据分片按表进行,以行为粒度(行是region划分的最小单位),基于rowkey范围进行拆分,每个分片称为一个region。一个集群有多张表,每张表划分为多个region,每台服务器服务很多region。所以,HBase的服务器称为RegionServer,简称RS。RS与表是正交的,即一张表的region会分布到多台RS上,一台RS也会调度多张表的region。

region划分的粒度是行,region就是这个表中多个连续的行构成的集合。行的唯一标识符是rowkey,所以,可以将region理解为一段连续分布的rowkey的集合。所以,称这种方式为基于rowkey范围的划分。

一个region负责的rowkey范围是一个左闭右开区间,所以,后一个region的start key是前一个region的end key。注意,第一个region是没有start key的,最后一个region是没有end key的。这样,这个表的所有region加在一起就能覆盖任意的rowkey值域。

HBase的rowkey是一串二进制数据,在Java中就是一个byte[],是一行数据的唯一标识符。而业务的主键可能是有各种数据类型的,所以,这里要解决2个问题:

将各种实际使用的数据类型与byte[]进行相互转换保序:byte[]形式的rowkey的排序结果与原始数据的排序结果一致

rowkey的比较就是byte[]的比较,按字典序进行比较(二进制排序),简单说,就是c语言中memcmp函数。

总结:

表的每行都是按照RowKey的字典序排序存储表的数据是按照RowKey区间进行分割存储成多个region

所以HBase主要适用下面这两种常见场景:

适用于基于rowkey的单行数据快速随机读写适合基于rowkey前缀的范围扫描

二级索引

HBase里面只有rowkey作为一级索引,?如果要对库里的非rowkey字段进行数据检索和查询, 往往要通过MapReduce/Spark等分布式计算框架进行,硬件资源消耗和时间延迟都会比较高。为了使用非rowkey字段检索也能做到秒级响应,或者支持各个字段进行模糊查询和多字段组合查询等,需要构建二级索引。

基于Coprocessor方案:Apache Phoenix非Coprocessor方案:Apache Lucene的Elasticsearch (ES)

HBase Shell命令 #查看16进制的中文 两种方法 hbase(main):036:0> scan 'test',{FORMATTER => 'toString'} hbase(main):041:0> scan 'test',{COLUMNS => 'cf:name:toString'} #根据 RowKey 范围获取数据 scan '表名', {STARTROW => '起始rowKey', ENDROW => '结束rowKey'} # LIMIT返回的行数 scan '表名', {LIMIT => 行数} #列举所有表 list

踩过的坑 出现同一数据的重复性问题:Hadoop迭代器对象重用(Reduce阶段)

复用原理:Java引用类型创建和存储机制:引用其实就是一个地址,放在栈内存中,实际存储的东西会放在堆内存中。引用中的地址是实际存储的东西的首地址。 所以reduce方法中 value只在使用前时被创建一次,在内存中分配一个地址(假设为0x0001),而在此后循环时,每次都从0x0001将上次放入的值取出,再放入新的值。 假设for循环中只存在一个 list.add(value) 语句(如下代码块),就相当于每次循环都将固定0x0001地址的value放入list集合,当在for循环结束时,集合list中的所有value元素的地址指向的都是0x0001,而此时0x0001只存放着最后一次循环的值。

虽然reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。所以如果要保存key或者value的结果,只能将其中的值取出另存或者重新clone一个对象

#车祸代码 public class MyReducer extends Reducer<LongWritable, Text,LongWritable,Text> { ArrayList<Strnig> str = new ArrayList<Strnig>(); @Override protected void reduce(LongWritable key, Iterable<String> values, Context context) { for (String value : values) { str.add(value); } v = str; //集合中的value全都相等,都等于Iterable迭代器中最后一个值 context.write(k,v); } }

Hadoop|Reduce中Iterable迭代器K,V对象复用机制_desperado0726的博客-CSDN博客Hadoop reduce阶段遍历Iterable的问题及解决方案_Kefault的博客-CSDN博客_hadoop iterable

同一数值不同类型的Long和String 转Bytes后使用结果不同 byte[] byteArray = str.getBytes(); /**很简单,就是调用String类的getBytes()方法。看JDK源码可以发现该方法最终调用了String类的getBytes(Charset charset)方法。 如果调用的是不带参数的getBytes()方法,则使用默认的编码方式,在Windows操作系统下,默认编码方式为"GBK"。 **/ /**long占8个字节(byte)!**/ public static byte[] LongToBytes(long values) { byte[] buffer = new byte[8]; for (int i = 0; i < 8; i++) { int offset = 64 - (i + 1) * 8; buffer[i] = (byte) ((values >> offset) & 0xff); } return buffer; } public static long BytesToLong(byte[] buffer) { long values = 0; for (int i = 0; i < 8; i++) { values <<= 8; values|= (buffer[i] & 0xff); } return values; } Could not initialize class org.fusesource.jansi.internal.Kernel32

缺少org.fusesource.jansi.internal.Kernel32造成,下载jansi-1.18.jar包放到hbase-2.3.5\lib下

Windows10 单机 Hadoop3.1.3 Hbase 2.3.5_sleetdream的博客-CSDN博客

Ref:

HBase Java API之TableDescriptorBuilder,ColumnDescriptorBuilder创建表、列族_追梦*小生的博客-CSDN博客

JAVA连接HBase客户端及HBase写入数据和读取数据原理解析_菜鸟周星星的博客-CSDN博客

浅谈HBase的数据分布 - 知乎

Hbase API以及java如何操作Hbase的说明_Uncle_Mo的博客-CSDN博客_hbase java

HBase的JavaAPI使用--基础篇 - 云+社区 - 腾讯云

Hbase利用多线程插入和读取数据(读取.CSV文件并写入到Hbase中。)_天,取个名字咋就这么困难!的博客-CSDN博客?hbase导入csv文件_HBase应用(一):数据批量导入说明_以号拼命多次的博客-CSDN博客

hbase组合rowkey_HBase二级索引的几种方案_反斗大飞机的博客-CSDN博客


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #JAVA #api操作 #HBase