Hbase学习笔记3之集群操作

共 2192字,需浏览 5分钟

 ·

2022-03-06 05:32

数据融合

当我们手动刷新数据到磁盘的时候,会创建一个新的文件。然后这些大小不一的小文件,最后会进行自动合并。如下所示,我们先进行几次刷新。

7bb674f57f1ce5f6c2619e584b375057.webp

每次刷新都会在列族目录下多出一个对应的数据文件来。

62f3177428ebe9aaeff60b6f60dcfecc.webp

第二个文件:

21e33bec0914c98f1093fc7db89da801.webp

第三个文件的时候发现文件进行了合并。

f428477d12b2fc8d408a522d602a808a.webp

查询结果为前几次put进去的值。

00d887c13e910f31a3d486b68ae78d32.webp

Java API

接下来我们使用Java来操作Hbase数据库

初始化

@Before
public void init() throws IOException {
//创建配置文件对象
conf = HBaseConfiguration.create();
//加载zookeeper的配置
conf.set("hbase.zookeeper.quorum","node2,node3,node4");
//获取连接
conn = ConnectionFactory.createConnection(conf);
//获取对象
admin = conn.getAdmin();
//获取数据操作对象
table = conn.getTable(tableName);
}

创建表

 @Test
public void createTable() throws IOException {
//定义表描述对象
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);
//定义列族描述对象
ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder("cf".getBytes());
//添加列族信息给表
tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
if(admin.tableExists(tableName)){
//禁用表
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
//创建表
admin.createTable(tableDescriptorBuilder.build());
}
2d200916a665dea3637fa3f993708318.webp

新增数据

@Test
public void insert() throws IOException {
Put put = new Put(Bytes.toBytes("2222"));
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes("lisi"));
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes("341"));
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("sex"),Bytes.toBytes("women"));
table.put(put);
}
9e05654c2b404659f0091440af9b2afd.webp

查询数据

 @Test
public void scan() throws IOException {
Scan scan = new Scan();
ResultScanner rss = table.getScanner(scan);
for (Result rs: rss) {
Cell cell1 = rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("name"));
Cell cell2 = rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("age"));
Cell cell3 = rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("sex"));
String name = Bytes.toString(CellUtil.cloneValue(cell1));
String age = Bytes.toString(CellUtil.cloneValue(cell2));
String sex = Bytes.toString(CellUtil.cloneValue(cell3));
System.out.println(name);
System.out.println(age);
System.out.println(sex);
}
}
823d5806ad148e56afd0a5d95a732cb7.webp

插入十万数据

/**
* 假设有10个用户,每个用户一年产生10000条记录
*/
@Test
public void insertMangData() throws Exception {
List<Put> puts = new ArrayList<>();
for(int i = 0;i<10;i++){
String phoneNumber = getNumber("158");
for(int j = 0 ;j<10000;j++){
String dnum = getNumber("177");
String length = String.valueOf(random.nextInt(100));
String date = getDate("2019");
String type = String.valueOf(random.nextInt(2));
//rowkey
String rowkey = phoneNumber+"_"+(Long.MAX_VALUE-sdf.parse(date).getTime());
Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("dnum"),Bytes.toBytes(dnum));
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("length"),Bytes.toBytes(length));
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("date"),Bytes.toBytes(date));
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("type"),Bytes.toBytes(type));
puts.add(put);
}
}
table.put(puts);
}

SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");

private String getDate(String s) {
return s+String.format("%02d%02d%02d%02d%02d",random.nextInt(12)+1,random.nextInt(31),random.nextInt(24),random.nextInt(60),random.nextInt(60));
}

Random random = new Random();
public String getNumber(String str){
return str+String.format("%08d",random.nextInt(99999999));
}
db9919fb6098aa1e94bd8d0ef1fe1521.webp

查询指定条件

/**
* 查询某一个用户3月份的通话记录
*/
@Test
public void scanByCondition() throws Exception {
Scan scan = new Scan();
String startRow = "15883348450_"+(Long.MAX_VALUE-sdf.parse("20190331000000").getTime());
String stopRow = "15883348450_"+(Long.MAX_VALUE-sdf.parse("20190301000000").getTime());
scan.withStartRow(Bytes.toBytes(startRow));
scan.withStopRow(Bytes.toBytes(stopRow));
ResultScanner rss = table.getScanner(scan);
for (Result rs:rss) {
System.out.print(Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("dnum")))));
System.out.print("--"+Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("type")))));
System.out.print("--"+Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("length")))));
System.out.println("--"+Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("date")))));
}
}


/**
* 查询某个用户所有的主叫电话(type=1)
* 某个用户
* type=1
*
*/
@Test
public void getType() throws IOException {
Scan scan = new Scan();
//创建过滤器集合
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
//创建过滤器
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("cf"),Bytes.toBytes("type"),CompareOperator.EQUAL,Bytes.toBytes("1"));
filters.addFilter(filter1);
//前缀过滤器
PrefixFilter filter2 = new PrefixFilter(Bytes.toBytes("15883348450"));
filters.addFilter(filter2);
scan.setFilter(filters);
ResultScanner rss = table.getScanner(scan);
for (Result rs:rss) {
System.out.print(Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("dnum")))));
System.out.print("--"+Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("type")))));
System.out.print("--"+Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("length")))));
System.out.println("--"+Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("date")))));
}
}


浏览 28
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报