elasticsearch节点的transport请求发送处理分析
作者:zziawan 时间:2022-04-05 22:38:08
transport请求的发送和处理过程
前一篇分析对nettytransport的启动及连接,本篇主要分析transport请求的发送和处理过程。
cluster中各个节点之间需要相互发送很多信息,如master检测其它节点是否存在,node节点定期检测master节点是否存储,cluster状态的发布及搜索数据请求等等。为了保证信息传输,elasticsearch定义了一个19字节长度的信息头HEADER_SIZE = 2 + 4 + 8 + 1 + 4,以'E','S'开头,接着是4字节int信息长度,然后是8字节long型信息id,接着是一个字节的status,最后是4字节int型version。
所有的节点间的信息都是以这19个字节开始。同时elasticsearch对于节点间的所有action都定义 了名字,如对master的周期检测action,internal:discovery/zen/fd/master_ping,每个action对应着相应的messagehandler。接下来会进行详分析。
request的发送过程
代码在nettytransport中如下所示:
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
//参数说明:node发送的目的节点,requestId请求id,action action名称,request请求,options包括以下几种操作 RECOVERY,BULK,REG,STATE,PING;
Channel targetChannel = nodeChannel(node, options);//获取对应节点的channel,channel在连接节点时初始化完成(请参考上一篇)
if (compress) {
options.withCompress(true);
}
byte status = 0;
//设置status 包括以下几种STATUS_REQRES = 1 << 0; STATUS_ERROR = 1 << 1; STATUS_COMPRESS = 1 << 2;
status = TransportStatus.setRequest(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);//初始写出流
boolean addedReleaseListener = false;
try {
bStream.skip(NettyHeader.HEADER_SIZE);//留出message header的位置
StreamOutput stream = bStream;
// only compress if asked, and, the request is not bytes, since then only
// the header part is compressed, and the "body" can't be extracted as compressed
if (options.compress() && (!(request instanceof BytesTransportRequest))) {
status = TransportStatus.setCompress(status);
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
}
stream = new HandlesStreamOutput(stream);
// we pick the smallest of the 2, to support both backward and forward compatibility
// note, this is the only place we need to do this, since from here on, we use the serialized version
// as the version to use also when the node receiving this request will send the response with
Version version = Version.smallest(this.version, node.version());
stream.setVersion(version);
stream.writeString(transportServiceAdapter.action(action, version));
ReleasableBytesReference bytes;
ChannelBuffer buffer;
// it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
// that create paged channel buffers, but its tricky to know when to do it (where this option is
// more explicit).
if (request instanceof BytesTransportRequest) {
BytesTransportRequest bRequest = (BytesTransportRequest) request;
assert node.version().equals(bRequest.version());
bRequest.writeThin(stream);
stream.close();
bytes = bStream.bytes();
ChannelBuffer headerBuffer = bytes.toChannelBuffer();
ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);
} else {
request.writeTo(stream);
stream.close();
bytes = bStream.bytes();
buffer = bytes.toChannelBuffer();
}
NettyHeader.writeHeader(buffer, requestId, status, version);//写信息头
ChannelFuture future = targetChannel.write(buffer);//写buffer同时获取future,发送信息发生在这里
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
future.addListener(listener);//添加listener
addedReleaseListener = true;
transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
} finally {
if (!addedReleaseListener) {
Releasables.close(bStream.bytes());
}
}
}
来源:https://www.cnblogs.com/zziawanblog/p/6528616.html
标签:elasticsearch,transport,请求,发送
![](/images/zang.png)
![](/images/jiucuo.png)
猜你喜欢
详解java IO流之缓冲流的使用
2023-08-08 18:33:16
C#通过PInvoke调用c++函数的备忘录的实例详解
2023-11-25 12:53:08
SpringCloud Eureka的使用教程
2022-03-23 22:30:59
![](https://img.aspxhome.com/file/2023/8/70348_0s.png)
Spring Cloud Config RSA简介及使用RSA加密配置文件的方法
2023-11-28 22:39:26
浅谈java项目与javaweb项目导入jar包的区别
2023-11-11 11:06:19
mybatis 查询返回Map<String,Object>类型
2023-11-14 07:06:09
![](https://img.aspxhome.com/file/2023/0/58760_0s.png)
详解spring mvc中url-pattern的写法
2023-11-11 07:30:58
如何用C#实现SAGA分布式事务
2022-11-29 20:34:49
![](https://img.aspxhome.com/file/2023/2/67552_0s.png)
SpringBoot跨域Access-Control-Allow-Origin实现解析
2023-11-28 23:04:34
java 数据结构单链表的实现
2022-07-24 09:45:33
Java截取字符串的几种方法示例
2023-11-29 12:36:32
![](https://img.aspxhome.com/file/2023/5/60585_0s.png)
Spring Bean常用依赖注入方式详解
2022-06-05 21:43:33
Springmvc模式上传和下载与enctype对比
2022-11-08 09:14:17
Visual Studio 2022 安装低版本 .Net Framework的图文教程
2023-06-22 19:18:44
![](https://img.aspxhome.com/file/2023/0/78990_0s.png)
Mybatis-Plus自动填充更新操作相关字段的实现
2023-06-04 22:37:12
SpringCloud之微服务容错的实现
2023-11-29 02:02:22
![](https://img.aspxhome.com/file/2023/5/60775_0s.png)
Springboot @Validated和@Valid的区别及使用详解
2023-05-30 18:40:25
![](https://img.aspxhome.com/file/2023/6/64916_0s.png)
Java实现图片验证码功能
2021-12-07 12:58:55
![](https://img.aspxhome.com/file/2023/5/60995_0s.jpg)
Unity实现俄罗斯方块游戏
2023-05-30 21:07:22
![](https://img.aspxhome.com/file/2023/2/69782_0s.gif)
java8 利用reduce实现将列表中的多个元素的属性求和并返回操作
2021-09-29 06:53:38