Hadoop RPC RPC简介
简要地说,RPC就是允许程序调用位于其他机器上的过程(也可以是同一台机器的不同进程)。 RPC调用过程是透明的
传统过程调用:传统的过程调用中,主程序将参数压人栈内并调用过程,这时候主程序停止执行并开始执行相应的过程。被调用的过程从栈中获取参数,然后执行过程函数;执行完毕后,将返回参数入栈(或者保存在寄存器里),并将控制权交还给调用方。调用方获取返回参数,并继续执行。
而RPC调用是进程间的过程调用
RPC模型
通行模块: 请求-响应
Stub程序: 用于保证RPC的透明性。在客户端,不在本地调用,而是将请求信息通过网络模块发送给法服务器端,服务器接收后进行解码。服务器中,Stub程序依次进行 解码(请求的参数)、调用相应的服务过程、编码返回结果等处理
调度程序: 调度来自通行模块的请求信息,根据其中标识选一个Stub程序运行
客户程序: 请求发出者
服务过程: 请求接收者
一个RPC的旅游:
客户端以本地调用方式产生本地Stub程序
该Stub程序将函数调用信息按照网络通信模块的要求封装成消息包,并交给通信模块发送到远程服务器端。
远程服务器端接收此消息后,将此消息发送给相应的Stub程序
Stub程序拆封消息,形成被调过程要求的形式,并调用对应函数
服务端执行被调用函数,并将结果返回给Stub程序
Stub程序将此结果封装成消息,通过网络通信模块逐级地传送给客户程序。
RPC特性
透明性 调用过程就像本地调用,察觉不到它的经历
高性能 :Hadoop各个系统(如HDFS、MapReduce、YARN等)均采用了Master/Slave结构,其中,Master实际上是一个RPC server,它负责响应集群中所有Slave发送的服务请求。RPC Server性能要求高,为的是能够让多个客户端并发方位
易用性/可控性 Hadoop系统不采用Java内嵌的RPC(RMI,Remote Method Invocation)框架的主要原因是RPC是Hadoop底层核心模块之一,需要满足易用性、高性能、轻量级等特性
RPC例子 执行过程:
CalculateClient对象本地调用产生Stub程序
经通信模块上传至服务器CalculateServer对象,在创建Server时设置了协议和业务逻辑(服务过程),处理过后根据上述RPC过程返回
客户端接收后打印到日志中
先定义一些常量
这里不需要太多的在意,直接使用在代码里面也行,在大的项目中为了使程序易于修改而这样设置
1 2 3 4 5 6 7 8 9 10 11 /** * 静态变量声明类 */ public interface Constants { public interface VersionID { public static final long RPC_VERSION = 7788L; } public static final String RPC_HOST = "127.0.0.1"; public static final int RPC_PORT = 8888; }
定义一个Service接口,协议类 1 2 3 4 5 6 7 8 9 10 11 12 13 import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtocolInfo; @ProtocolInfo(protocolName = "", protocolVersion = Constants.VersionID.RPC_VERSION) public interface CalculateService { //真实业务逻辑,加减法, public IntWritable add(IntWritable a, IntWritable b); public IntWritable sub(IntWritable a, IntWritable b); public Text echo(Text mt); }
@ProtocolInfo(protocolName = “”, protocolVersion = Constants.VersionID.RPC_VERSION) 没有这句就不能将该类设置为协议,不过也可以通过继承VersionProtocol接口
Service接口的实现类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import java.io.IOException; public class CalculateServiceImpl implements CalculateService { /** * 该方法没有也行 * */ public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) throws IOException{ return this.getProtocolSignature(arg0, arg1, arg2); } /** * 校验hadoop RFC版本号 * @param arg0 * @param arg1 * @return */ public long getProtocolVersion(String arg0, long arg1) throws IOException { return Constants.VersionID.RPC_VERSION; } @Override public IntWritable add(IntWritable a, IntWritable b) { return new IntWritable(a.get() + b.get()); } @Override public IntWritable sub(IntWritable a, IntWritable b) { return new IntWritable(a.get() - b.get()); } @Override public Text echo(Text mt) { return mt; } }
Server和Client类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import org.apache.hadoop.ipc.RPC; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; public class CalculateServer { private static final Logger LOG = LoggerFactory.getLogger(CalculateServer.class); public static void main(String[] args) { try { //构造Server,并设置协议接口,主机、端口,真实业务逻辑 RPC.Server server = new RPC.Builder(new Configuration()) .setProtocol(CalculateService.class) .setBindAddress(Constants.RPC_HOST) .setPort(Constants.RPC_PORT) .setInstance(new CalculateServiceImpl()) .build(); //启动Server server.start(); LOG.info("Server has Started!"); } catch (IOException e) { LOG.error("Server has Error"); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.ipc.RPC; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; public class CalculateClient { private static final Logger LOG = LoggerFactory.getLogger(CalculateServer.class); public static void main(String[] args) { //格式化IP和端口 InetSocketAddress addr = new InetSocketAddress(Constants.RPC_HOST, Constants.RPC_PORT); //校验Hadoop RPC版本号 long protocolVersion = RPC.getProtocolVersion(CalculateService.class); try { //获取Server连接 CalculateService proxy = RPC.getProxy(CalculateService.class, protocolVersion, addr, new Configuration()); IntWritable add = proxy.add(new IntWritable(1), new IntWritable(2)); IntWritable sub = proxy.add(new IntWritable(3), new IntWritable(2)); LOG.info("1+2 = " + add); LOG.info("3-2 = " + sub); } catch (IOException e) { LOG.error("Client has error!"); } } }
注意: 查看本程序运行结果需要一个日志文件,如果不想加,把LOG的相关语句换为打印输出就行 在resource文件夹下创建 log4j.properties
1 2 3 4 5 6 7 8 log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
客户端运行结果 1 2 3 4 2019-10-31 18:59:28,499 WARN [org.apache.hadoop.util.Shell] - Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems 2019-10-31 18:59:28,619 WARN [org.apache.hadoop.util.NativeCodeLoader] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2019-10-31 18:59:29,734 INFO [hadooprfc.calculate.CalculateServer] - 1+2 = 3 2019-10-31 18:59:29,734 INFO [hadooprfc.calculate.CalculateServer] - 3-2 = 5
其他开源RPC架构
Java RMI
Apache Thrift
Google Protocol Buffer