博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hbase0.98.9中实现endpoints
阅读量:6453 次
发布时间:2019-06-23

本文共 6649 字,大约阅读时间需要 22 分钟。

  hot3.png

在我的前面一篇博客中,抄录了hbase官网的一段话,讲的是0.96.0以后版本中endpoint的实现机制由于引进了protobuf框架,有了比较彻底的改变。本文承接上面一篇博客,给出定制一个endpoint的过程。

下面是实现过程:

1、定义接口描述文件(该功能有protobuf提供出来

option java_package = "coprocessor.endpoints.generated";option java_outer_classname = "RowCounterEndpointProtos";option java_generic_services = true;option java_generate_equals_and_hash = true;option optimize_for = SPEED;message CountRequest {}message CountResponse {  required int64 count = 1 [default = 0];}service RowCountService {  rpc getRowCount(CountRequest)    returns (CountResponse);  rpc getKeyValueCount(CountRequest)    returns (CountResponse);}

个文件我直接拿的hbase提供的example中的例子。其中的语法应该有过类似经验的一看就清楚了,实在不清楚就请查查protobuf的帮助手册吧。

2、根据接口描述文件生成java接口类(该功能有protobuf提供出来)

有了接口描述文件,还需要生成java语言的接口类。这个需要借助protobuf提供的工具protoc。

$protoc --java_out=./ Examples.proto

简单解释下,protoc这个命令在你装了protobuf后就有了。Examples.proto这个是文件名,也就是刚才编写的那个接口描述文件。“--java_out”这个用来指定生成后的java类放的地方。

所以,这地方如果你没有装protobuf,你需要装一个,window和linux版都有,多说一句,如果你去装hadoop64位的编译环境的话,应该是要装protobuf。

3、实现接口

package coprocessor;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.Coprocessor;import org.apache.hadoop.hbase.CoprocessorEnvironment;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.coprocessor.CoprocessorException;import org.apache.hadoop.hbase.coprocessor.CoprocessorService;import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;import org.apache.hadoop.hbase.protobuf.ResponseConverter;import org.apache.hadoop.hbase.regionserver.InternalScanner;import org.apache.hadoop.hbase.util.Bytes;import com.google.protobuf.RpcCallback;import com.google.protobuf.RpcController;import com.google.protobuf.Service;import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest;import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse;import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService;public class RowCounterEndpointExample extends RowCountService implements		Coprocessor, CoprocessorService {	private RegionCoprocessorEnvironment env;	public RowCounterEndpointExample() {	}	@Override	public Service getService() {		return this;	}	@Override	public void getRowCount(RpcController controller, CountRequest request,			RpcCallback
done) { Scan scan = new Scan(); scan.setFilter(new FirstKeyOnlyFilter()); CountResponse response = null; InternalScanner scanner = null; try { scanner = env.getRegion().getScanner(scan); List
results = new ArrayList
(); boolean hasMore = false; byte[] lastRow = null; long count = 0; do { hasMore = scanner.next(results); for (Cell kv : results) { byte[] currentRow = CellUtil.cloneRow(kv); if (lastRow == null || !Bytes.equals(lastRow, currentRow)) { lastRow = currentRow; count++; } } results.clear(); } while (hasMore); response = CountResponse.newBuilder().setCount(count).build(); } catch (IOException ioe) { ResponseConverter.setControllerException(controller, ioe); } finally { if (scanner != null) { try { scanner.close(); } catch (IOException ignored) { } } } done.run(response); } @Override public void getKeyValueCount(RpcController controller, CountRequest request, RpcCallback
done) { CountResponse response = null; InternalScanner scanner = null; try { scanner = env.getRegion().getScanner(new Scan()); List
results = new ArrayList
(); boolean hasMore = false; long count = 0; do { hasMore = scanner.next(results); for (Cell kv : results) { count++; } results.clear(); } while (hasMore); response = CountResponse.newBuilder().setCount(count).build(); } catch (IOException ioe) { ResponseConverter.setControllerException(controller, ioe); } finally { if (scanner != null) { try { scanner.close(); } catch (IOException ignored) { } } } done.run(response); } @Override public void start(CoprocessorEnvironment env) throws IOException { if (env instanceof RegionCoprocessorEnvironment) { this.env = (RegionCoprocessorEnvironment) env; } else { throw new CoprocessorException("Must be loaded on a table region!"); } } @Override public void stop(CoprocessorEnvironment env) throws IOException { // TODO Auto-generated method stub }}

4、注册接口(Hbase功能,通过配置文件或者表模式方式注册

这部分,可以看hbase权威指南了,我就看这部分做的。

5、测试调用

package coprocessor;import java.io.IOException;import java.util.Map;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.coprocessor.Batch;import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;import org.apache.hadoop.hbase.ipc.ServerRpcController;import org.apache.hadoop.hbase.util.Bytes;import com.google.protobuf.ServiceException;import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest;import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse;import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService;import util.HBaseHelper;public class RowCounterEndpointClientExample {	public static void main(String[] args) throws ServiceException, Throwable {		Configuration conf = HBaseConfiguration.create();		HBaseHelper helper = HBaseHelper.getHelper(conf);		//helper.dropTable("testtable");		//helper.createTable("testtable", "colfam1", "colfam2");		System.out.println("Adding rows to table...");		helper.fillTable("testtable", 1, 10, 10, "colfam1", "colfam2");		HTable table = new HTable(conf, "testtable");		final CountRequest request = CountRequest.getDefaultInstance();				final Batch.Call
call =new Batch.Call
() { public Long call(RowCountService counter) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback
rpcCallback = new BlockingRpcCallback
(); counter.getRowCount(controller, request, rpcCallback); CountResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } return (response != null && response.hasCount()) ? response .getCount() : 0; } }; Map
results = table.coprocessorService( RowCountService.class, null, null, call); for(byte[] b : results.keySet()){ System.err.println(Bytes.toString(b) + ":" + results.get(b)); } }}

转载于:https://my.oschina.net/psuyun/blog/363634

你可能感兴趣的文章
Confluence 6 管理全站权限和用户组
查看>>
浏览器全屏非全屏切换
查看>>
02 pandas Series_创建、属性
查看>>
【实践】视频播放成功率下降很多?可能是你密钥管理的方式不对!
查看>>
策略模式
查看>>
分布式系统理论基础8:zookeeper分布式协调服务
查看>>
Java并发指南8:AQS中的公平锁与非公平锁,Condtion
查看>>
AI学习笔记——卷积神经网络(CNN)
查看>>
如何将本地的demo上传到自己的github
查看>>
nginx 负载均衡配置
查看>>
Netty in Action笔记(Chapter 3)Netty from the ground up
查看>>
记一次凉凉的小米前端面试(应届内推)
查看>>
[C#6] 0-概览
查看>>
锤子开源 Smartisan T1/T2、坚果 Pro 等手机内核源代码
查看>>
openSUSE Tumbleweed 支持 Linux Kernel 4.20
查看>>
在C#中使用Spire.doc对word的操作总结
查看>>
jvm调优
查看>>
限流和降级(上) | 如何打造平台稳定性能力(一)
查看>>
Dubbo 生态添新兵,Dubbo Admin 发布 v0.1
查看>>
穿越东西冲,享受最美海岸线
查看>>