Hadoop 2.x
相关参数说明
- connections 用于保存ConnectionId到Connection的映射,位于org.apache.hadoop.ipc.Client中
1
private ConcurrentMap<Client.ConnectionId, Client.Connection> connections; - calls 当前正在处理的远程调用,位于org.apache.hadoop.ipc.Client.Connection中
1
private Hashtable<Integer, Client.Call> calls = new Hashtable(); - shouldCloseConnection 连接关闭标志
1
private AtomicBoolean shouldCloseConnection = new AtomicBoolean();
相关方法说明
getConnection
Client需要获取连接的时候,调用getConnection方法,该方法先检查connections中是否存在满足条件的IPC连接。有,则 复用 ,否则,创建新的连接
复用是指connection相等(connection里面的三个参数相等则说明两个connection相等),就使用同一个connection相关代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27private Client.Connection getConnection(Client.ConnectionId remoteId, Client.Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException {
//首先,看看客户端是否还在运行
if (!this.running.get()) {
throw new IOException("The client is stopped");
} else {
while(true) {
//2. 查一下是否存在remoteId对应的连接connection
Client.Connection connection = (Client.Connection)this.connections.get(remoteId);
//connection==null,表明不存在,就需要创建一个新的IPC连接
if (connection == null) {
//创建连接
connection = new Client.Connection(remoteId, serviceClass);
Client.Connection existing = (Client.Connection)this.connections.putIfAbsent(remoteId, connection);
if (existing != null) {
connection = existing;
}
}
//将IPC调用放入IPC连接中
if (connection.addCall(call)) {
connection.setupIOstreams(fallbackToSimpleAuth);
return connection;
}
this.connections.remove(remoteId, connection);
}
}
}- addCall
作用是将一个IPC调用放入IPC连接中
如果成员变量shouldCloseConnection为true, 返回false,这样可以防止将一个IPC调用放入一个已经关闭的IPC连接中。
否则,将调用放入IPC连接calls中
再说一下,为什么会有这种情况?IPC来呢及可以在多个地方被触发,进入关闭过程,但知道Connection.close方法被调用,对应的connection才会在connections中删除。删除后的连接只有新建后才能将IPC调用传入连接中
相关代码1
2
3
4
5
6
7
8
9
10private synchronized boolean addCall(Client.Call call) {
if (this.shouldCloseConnection.get()) {
return false;
} else {
this.calls.put(call.id, call);
//notify是唤醒等待的线程,因为这个方法会有多个地方调用,但进来的只能有一个
this.notify();
return true;
}
}
- addCall
setupIOstreams
该方法是使客户端和服务器通过Socket连接起来
连接失败的话,会重传,最多maxRetries,可以设置${ipc.client.connect.max.retries}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37private synchronized void setupIOstreams(AtomicBoolean fallbackToSimpleAuth) {
if (this.socket == null && !this.shouldCloseConnection.get()) {
...
if (Client.LOG.isDebugEnabled()) {
Client.LOG.debug("Connecting to " + this.server);
}
Span span = Tracer.getCurrentSpan();
if (span != null) {
span.addTimelineAnnotation("IPC client connecting to " + this.server);
}
short numRetries = 0;
Random rand = null;
while(true) {
/**
* 建立Socket连接,具体可以查看源代码Ctrl + 鼠标左键查看
* connection使用Socket连接设置了tcpNoDelay标志,禁用Nagle算法,无需等待直接发送
* 配置项${ipc.client.tcpnodelay}
*/
this.setupConnection(ticket);
...
//与IPC服务器进行握手
this.writeConnectionContext(this.remoteId, this.authMethod);
//更改最后访问时间lastActivity,该变量也是Client成员变量
this.touch();
...
//启动接受进程
this.start();
...
this.close();
}
}
}
以上是客户端,下面为IPC连接的另一端
服务器端
服务器建立IPC连接的代码分散在Listener和Server.Connection中
Listener基于Java NIO开发的,是一个标准的NIO应用,其构造函数中打开服务器端口,创建Selector监听
注意参数backlogLength,它由ipc.server.listen.queue.size参数指定,backlogLength是调用ServerSocket.bind()时可以额外提供的一个参数,用于指定在监听端口上排队请求的最大长度,队列满了以后的客户端请求,会被拒绝
- run方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public void run() {
...
while(Server.this.running) {
SelectionKey key = null;
try {
//select调用
this.getSelector().select();
//获取相关键
for(Iterator iter = this.getSelector().selectedKeys().iterator(); iter.hasNext(); key = null) {
key = (SelectionKey)iter.next();
iter.remove();
try {
//判断是不是可接受事件
if (key.isValid() && key.isAcceptable()) {
//doAccept方法,接受客户端请求、注册Socket到选择器上,
//并且创建Reader,Reader里面的run方法处理 OP_READ
//该方法中最主要的是ReadAndProcess方法
this.doAccept(key);
}
} catch (IOException var8) {
}
...
}
}