Hadoop通信机制和内部协议之RPC

Hadoop RPC

RPC简介

简要地说,RPC就是允许程序调用位于其他机器上的过程(也可以是同一台机器的不同进程)。
RPC调用过程是透明的

传统过程调用:传统的过程调用中,主程序将参数压人栈内并调用过程,这时候主程序停止执行并开始执行相应的过程。被调用的过程从栈中获取参数,然后执行过程函数;执行完毕后,将返回参数入栈(或者保存在寄存器里),并将控制权交还给调用方。调用方获取返回参数,并继续执行。

而RPC调用是进程间的过程调用

RPC模型

  • 通行模块: 请求-响应
  • Stub程序: 用于保证RPC的透明性。在客户端,不在本地调用,而是将请求信息通过网络模块发送给法服务器端,服务器接收后进行解码。服务器中,Stub程序依次进行 解码(请求的参数)、调用相应的服务过程、编码返回结果等处理
  • 调度程序: 调度来自通行模块的请求信息,根据其中标识选一个Stub程序运行
  • 客户程序: 请求发出者
  • 服务过程: 请求接收者 一个RPC的旅游:
  1. 客户端以本地调用方式产生本地Stub程序
  2. 该Stub程序将函数调用信息按照网络通信模块的要求封装成消息包,并交给通信模块发送到远程服务器端。
  3. 远程服务器端接收此消息后,将此消息发送给相应的Stub程序
  4. Stub程序拆封消息,形成被调过程要求的形式,并调用对应函数
  5. 服务端执行被调用函数,并将结果返回给Stub程序
  6. Stub程序将此结果封装成消息,通过网络通信模块逐级地传送给客户程序。

RPC特性

  • 透明性 调用过程就像本地调用,察觉不到它的经历
  • 高性能 :Hadoop各个系统(如HDFS、MapReduce、YARN等)均采用了Master/Slave结构,其中,Master实际上是一个RPC server,它负责响应集群中所有Slave发送的服务请求。RPC Server性能要求高,为的是能够让多个客户端并发方位
  • 易用性/可控性 Hadoop系统不采用Java内嵌的RPC(RMI,Remote Method Invocation)框架的主要原因是RPC是Hadoop底层核心模块之一,需要满足易用性、高性能、轻量级等特性

RPC例子

执行过程:

  1. CalculateClient对象本地调用产生Stub程序
  2. 经通信模块上传至服务器CalculateServer对象,在创建Server时设置了协议和业务逻辑(服务过程),处理过后根据上述RPC过程返回
  3. 客户端接收后打印到日志中

先定义一些常量

这里不需要太多的在意,直接使用在代码里面也行,在大的项目中为了使程序易于修改而这样设置

1
2
3
4
5
6
7
8
9
10
11
/**
* 静态变量声明类
*/
public interface Constants {
public interface VersionID {
public static final long RPC_VERSION = 7788L;
}

public static final String RPC_HOST = "127.0.0.1";
public static final int RPC_PORT = 8888;
}

定义一个Service接口,协议类

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolInfo;

@ProtocolInfo(protocolName = "", protocolVersion = Constants.VersionID.RPC_VERSION)
public interface CalculateService {

//真实业务逻辑,加减法,
public IntWritable add(IntWritable a, IntWritable b);
public IntWritable sub(IntWritable a, IntWritable b);
public Text echo(Text mt);

}

@ProtocolInfo(protocolName = “”, protocolVersion = Constants.VersionID.RPC_VERSION) 没有这句就不能将该类设置为协议,不过也可以通过继承VersionProtocol接口

Service接口的实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.io.IOException;

public class CalculateServiceImpl implements CalculateService {
/**
* 该方法没有也行
*
*/
public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) throws IOException{
return this.getProtocolSignature(arg0, arg1, arg2);
}
/**
* 校验hadoop RFC版本号
* @param arg0
* @param arg1
* @return
*/
public long getProtocolVersion(String arg0, long arg1) throws IOException {
return Constants.VersionID.RPC_VERSION;
}

@Override
public IntWritable add(IntWritable a, IntWritable b) {
return new IntWritable(a.get() + b.get());
}

@Override
public IntWritable sub(IntWritable a, IntWritable b) {
return new IntWritable(a.get() - b.get());
}

@Override
public Text echo(Text mt) {
return mt;
}
}

Server和Client类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.hadoop.ipc.RPC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class CalculateServer {

private static final Logger LOG = LoggerFactory.getLogger(CalculateServer.class);

public static void main(String[] args) {

try {
//构造Server,并设置协议接口,主机、端口,真实业务逻辑
RPC.Server server = new RPC.Builder(new Configuration())
.setProtocol(CalculateService.class)
.setBindAddress(Constants.RPC_HOST)
.setPort(Constants.RPC_PORT)
.setInstance(new CalculateServiceImpl())
.build();

//启动Server
server.start();
LOG.info("Server has Started!");
} catch (IOException e) {
LOG.error("Server has Error");
}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.ipc.RPC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;

public class CalculateClient {


private static final Logger LOG = LoggerFactory.getLogger(CalculateServer.class);

public static void main(String[] args) {
//格式化IP和端口
InetSocketAddress addr = new InetSocketAddress(Constants.RPC_HOST, Constants.RPC_PORT);

//校验Hadoop RPC版本号
long protocolVersion = RPC.getProtocolVersion(CalculateService.class);

try {
//获取Server连接
CalculateService proxy = RPC.getProxy(CalculateService.class, protocolVersion, addr, new Configuration());

IntWritable add = proxy.add(new IntWritable(1), new IntWritable(2));
IntWritable sub = proxy.add(new IntWritable(3), new IntWritable(2));

LOG.info("1+2 = " + add);
LOG.info("3-2 = " + sub);

} catch (IOException e) {
LOG.error("Client has error!");
}
}
}

注意: 查看本程序运行结果需要一个日志文件,如果不想加,把LOG的相关语句换为打印输出就行
在resource文件夹下创建 log4j.properties

1
2
3
4
5
6
7
8
log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

客户端运行结果

1
2
3
4
2019-10-31 18:59:28,499 WARN [org.apache.hadoop.util.Shell] - Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
2019-10-31 18:59:28,619 WARN [org.apache.hadoop.util.NativeCodeLoader] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-10-31 18:59:29,734 INFO [hadooprfc.calculate.CalculateServer] - 1+2 = 3
2019-10-31 18:59:29,734 INFO [hadooprfc.calculate.CalculateServer] - 3-2 = 5

其他开源RPC架构

  • Java RMI
  • Apache Thrift
  • Google Protocol Buffer

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×