Hadoop IPC 连接的建立


Hadoop 2.x


相关参数说明

  • connections 用于保存ConnectionId到Connection的映射,位于org.apache.hadoop.ipc.Client中
    1
    private ConcurrentMap<Client.ConnectionId, Client.Connection> connections;
  • calls 当前正在处理的远程调用,位于org.apache.hadoop.ipc.Client.Connection中
    1
    private Hashtable<Integer, Client.Call> calls = new Hashtable();
  • shouldCloseConnection 连接关闭标志
    1
    private AtomicBoolean shouldCloseConnection = new AtomicBoolean();

相关方法说明

  • getConnection

    Client需要获取连接的时候,调用getConnection方法,该方法先检查connections中是否存在满足条件的IPC连接。有,则 复用 ,否则,创建新的连接
    复用是指connection相等(connection里面的三个参数相等则说明两个connection相等),就使用同一个connection

    相关代码如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
        private Client.Connection getConnection(Client.ConnectionId remoteId, Client.Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException {
    //首先,看看客户端是否还在运行
    if (!this.running.get()) {
    throw new IOException("The client is stopped");
    } else {
    while(true) {
    //2. 查一下是否存在remoteId对应的连接connection
    Client.Connection connection = (Client.Connection)this.connections.get(remoteId);
    //connection==null,表明不存在,就需要创建一个新的IPC连接
    if (connection == null) {
    //创建连接
    connection = new Client.Connection(remoteId, serviceClass);
    Client.Connection existing = (Client.Connection)this.connections.putIfAbsent(remoteId, connection);
    if (existing != null) {
    connection = existing;
    }
    }
    //将IPC调用放入IPC连接中
    if (connection.addCall(call)) {
    connection.setupIOstreams(fallbackToSimpleAuth);
    return connection;
    }

    this.connections.remove(remoteId, connection);
    }
    }
    }
    • addCall

      作用是将一个IPC调用放入IPC连接中
       如果成员变量shouldCloseConnection为true, 返回false,这样可以防止将一个IPC调用放入一个已经关闭的IPC连接中。
       否则,将调用放入IPC连接calls中
      再说一下,为什么会有这种情况?IPC来呢及可以在多个地方被触发,进入关闭过程,但知道Connection.close方法被调用,对应的connection才会在connections中删除。删除后的连接只有新建后才能将IPC调用传入连接中
      相关代码

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
       private synchronized boolean addCall(Client.Call call) {
      if (this.shouldCloseConnection.get()) {
      return false;
      } else {
      this.calls.put(call.id, call);
      //notify是唤醒等待的线程,因为这个方法会有多个地方调用,但进来的只能有一个
      this.notify();
      return true;
      }
      }
  • setupIOstreams

    该方法是使客户端和服务器通过Socket连接起来
    连接失败的话,会重传,最多maxRetries,可以设置${ipc.client.connect.max.retries}

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
     private synchronized void setupIOstreams(AtomicBoolean fallbackToSimpleAuth) {
    if (this.socket == null && !this.shouldCloseConnection.get()) {
    ...


    if (Client.LOG.isDebugEnabled()) {
    Client.LOG.debug("Connecting to " + this.server);
    }

    Span span = Tracer.getCurrentSpan();
    if (span != null) {
    span.addTimelineAnnotation("IPC client connecting to " + this.server);
    }

    short numRetries = 0;
    Random rand = null;

    while(true) {
    /**
    * 建立Socket连接,具体可以查看源代码Ctrl + 鼠标左键查看
    * connection使用Socket连接设置了tcpNoDelay标志,禁用Nagle算法,无需等待直接发送
    * 配置项${ipc.client.tcpnodelay}
    */
    this.setupConnection(ticket);
    ...
    //与IPC服务器进行握手
    this.writeConnectionContext(this.remoteId, this.authMethod);
    //更改最后访问时间lastActivity,该变量也是Client成员变量
    this.touch();
    ...
    //启动接受进程
    this.start();
    ...
    this.close();
    }
    }
    }

以上是客户端,下面为IPC连接的另一端


服务器端

服务器建立IPC连接的代码分散在Listener和Server.Connection中
Listener基于Java NIO开发的,是一个标准的NIO应用,其构造函数中打开服务器端口,创建Selector监听
注意参数backlogLength,它由ipc.server.listen.queue.size参数指定,backlogLength是调用ServerSocket.bind()时可以额外提供的一个参数,用于指定在监听端口上排队请求的最大长度,队列满了以后的客户端请求,会被拒绝

  • run方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public void run() {
    ...

    while(Server.this.running) {
    SelectionKey key = null;

    try {
    //select调用
    this.getSelector().select();

    //获取相关键
    for(Iterator iter = this.getSelector().selectedKeys().iterator(); iter.hasNext(); key = null) {
    key = (SelectionKey)iter.next();
    iter.remove();

    try {
    //判断是不是可接受事件
    if (key.isValid() && key.isAcceptable()) {

    //doAccept方法,接受客户端请求、注册Socket到选择器上,
    //并且创建Reader,Reader里面的run方法处理 OP_READ
    //该方法中最主要的是ReadAndProcess方法
    this.doAccept(key);
    }
    } catch (IOException var8) {
    }
    ...

    }
    }

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×