irpas技术客

java api 操作 hbase,插入数据(单行,多行,批量插入)_SXF2410_hbase批量写入

网络投稿 8236

java api 操作 hbase (主要批量存放数据) 操作步骤如下: #创建配置对象,创建连接对象 Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum","single01:2181 "); Connection hbaseCon=ConnectionFactory.createConnection(config); //操作对象 Admin admin=hbaseCon.getAdmin(); admin.xxx(tableName) //创建表名对象 ?final String HTable="kb16nb:student"; TableName tableName = TableName.valueOf(HTable); //操作数据 Table table=hbaseCon.getTable(tableName); //单行添加数据? Put row = new Put(byte[] rowkey); row.addColumn(byte[] columnfamily,byte[] column,byte[] value) ... table.put(row); //多行(少量) List<Put> rows =new ArrayList(); rows.add(row); ... table.put(rows); //批处理 ? //lambada 创建hbase 批量插入数据异常侦听对象 ? //java中函数式接口,可以用lambda表达式写 BufferedMutator.ExceptionListener listener=(e,mutator)->{ ?? ?//异常信息(原因) ?? ?String msg = e.getMessage(); ?? ?//出异常的行数 ?? ?int numExceptions = e.getNumExceptions(); ?? ?//记录出异常的行的行键,以便事后检查并再处理 ?? ?//用Log4j记录 ?? ?logger.error("HBASE MUTATE EXCEPTION : "+msg+","+numExceptions); ?? ?if(numExceptions>0){ ?? ??? ?StringBuilder builder=new StringBuilder(); ?? ??? ?builder.append(Bytes.toString(e.getRow(0).getRow())); ?? ??? ?final String SEP=","; ?? ??? ?for (int i = 0; i <numExceptions ; i++) { ?? ??? ??? ?builder.append(SEP); ?? ??? ??? ?builder.append(Bytes.toString(e.getRow(i).getRow())); ?? ??? ?} ?? ??? ?logger.error(builder.toString()); ?? ?} }; final int BUFFER_SIZE=8*1024; BufferedMutatorParams bmp=new BufferedMutatorParams(tableName) ?? ??? ?.listener(listener).writeBufferSize(BUFFER_SIZE); BufferedMutator mutator=hbaseCon.getBufferedMutator(bmp); //创建List List<Put> list=new ArrayList<>(BUFFER_SIZE); ... //放入数据 mutator.mutate(list);

log4j日志 #1导入jar包 #2做配置信息 resources/log4j.properties? log4j.rootLogger=INFO, stdout, logfile log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=log/hd.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n #3创建对象 private static final Logger logger=Logger.getLogger(App.class);

代码如下:

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; /** * java api 操作 hbase,插入数据 * */ public class App { private static final Logger logger=Logger.getLogger(App.class); static void close(AutoCloseable...closes){ for (AutoCloseable close : closes) { if(null!=close){ try { close.close(); } catch (Exception e) { e.printStackTrace(); } } } } public static void main( String[] args ) { Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum","single01:2181 "); //base连接对象 Connection hbaseCon=null; //admin操作对象(命名空间,数据表...) Admin admin =null; //table操作表数据 Table table =null; try { hbaseCon = ConnectionFactory.createConnection(config); admin=hbaseCon.getAdmin(); final String HTable="kb16nb:student"; TableName tableName = TableName.valueOf(HTable); if (admin.tableExists(tableName)) { table=hbaseCon.getTable(tableName); //lambda 创建hbase 批量插入数据的异常侦听对象 //java中一个接口只有一个方法,可以用lambda表达式写 BufferedMutator.ExceptionListener listener=(e,mutator)->{ //异常信息(原因) String msg = e.getMessage(); //出异常的行数 int numExceptions = e.getNumExceptions(); //用Log4j记录,记录出异常的行的行键,以便事后检查并再处理 logger.error("HBASE MUTATE EXCEPTION : "+msg+","+numExceptions); if(numExceptions>0){ StringBuilder builder=new StringBuilder(); builder.append(Bytes.toString(e.getRow(0).getRow())); final String SEP=","; for (int i = 1; i <numExceptions ; i++) { builder.append(SEP); builder.append(Bytes.toString(e.getRow(i).getRow())); } logger.error(builder.toString()); } }; final int BUFFER_SIZE=8*1024; BufferedMutatorParams bmp=new BufferedMutatorParams(tableName) .listener(listener).writeBufferSize(BUFFER_SIZE); BufferedMutator mutator=hbaseCon.getBufferedMutator(bmp); //创建List,放入数据 List<Put> list=new ArrayList<>(BUFFER_SIZE); Random rand=new Random(); int sum = 0; for (int i = 0; i < 100000; i++) { Put put = new Put(Bytes.toBytes(1 + i + "")); put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("name"),Bytes.toBytes("henry"+i)); put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("age"),Bytes.toBytes(18+rand.nextInt(1))); int product =rand.nextInt(2); String columnFamily=product ==0?"bigdata":"cloud"; String subject01=product ==0 ? "hive":"net"; String subject02=product ==0 ? "hbase":"shell"; int socre01=55+rand.nextInt(45),socre02=55+rand.nextInt(45); put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("product"),Bytes.toBytes(columnFamily)); put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(subject01),Bytes.toBytes(socre01)); put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(subject02),Bytes.toBytes(socre02)); list.add(put); if(list.size()>=BUFFER_SIZE){ mutator.mutate(list); sum+=list.size(); System.out.println("CURRENT"+list.size()+",SUM"+sum); list.clear(); } } if (!list.isEmpty()){ mutator.mutate(list); sum+=list.size(); System.out.println("CURRENT"+list.size()+",SUM"+sum); list.clear(); } //单行添加数据 // 通过rowkey创建 Put(ROW行) 对象 //Bytes.toBytes("Object") 把任意对象转变为字节数组 /*Put put = new Put(Bytes.toBytes("1")); put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("name"),Bytes.toBytes("lisi")); put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("age"),Bytes.toBytes(18)); put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("product"),Bytes.toBytes("bigdata")); put.addColumn(Bytes.toBytes("bigdata"),Bytes.toBytes("hive"),Bytes.toBytes(66)); put.addColumn(Bytes.toBytes("bigdata"),Bytes.toBytes("hbasee"),Bytes.toBytes(73)); table.put(put); System.out.println("SUCCEED");*/ }else{ System.out.println(HTable+" NOT EXISTE"); } } catch (IOException e) { e.printStackTrace(); }finally{ close(table,admin,hbaseCon); } } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #hbase批量写入 #JAVA #API #操作 #HBase #config