Java中的远程过程调用(RPC)#
目录#
1. 背景#
这学期上了《分布式系统》课程,内容主要是基于Java实现分布式计算,所以老师前几节课主要在给我们讲用Java做分布式可能会用到的一些技术。为了方便学习和记录,我将老师讲的内容结合书籍和资料做了一些整理,这一篇主要讨论远程过程调用。
2. 远程过程调用的定义#
远程过程调用是一种进程间通信技术,用于基于客户端-服务器的应用程序。它也被称为子例程调用或函数调用。
客户端有一个请求消息,RPC将其转换并发送给服务器。此请求可以是对远程服务器的过程或函数调用。当服务器接收到请求时,它将所需的响应发送回客户机。客户端在服务器处理调用时被阻塞,只有在服务器完成后才恢复执行。1
远程过程调用的顺序如下:
- 客户端存根由客户端调用。
- 客户端存根发出一个系统调用,将消息发送到服务器,并将参数放入消息中。
- 客户端操作系统将消息从客户端发送到服务器。
- 消息由服务器操作系统传递到服务器存根。
- 服务器存根从消息中删除参数。
- 然后,服务器存根调用服务器执行操作。
- 执行完成后,结果原路返回到客户端。
过程如下图所示
3. 远程过程调用的优点#
RPC的一些优点如下:
- 远程过程调用支持面向过程和面向线程的模型。
- RPC的内部消息传递机制对用户隐藏。重写和重新开发代码的工作量在远程过程调用中是最小的。
- 远程过程调用可以在分布式环境中使用,也可以在本地环境中使用。
- 为了提高性能,RPC省略了许多协议层。
4. 远程过程调用的缺点#
RPC的一些缺点如下:
- 远程过程调用是一个可以用不同方式实现的概念。它不是一个标准。
- RPC对于硬件架构没有灵活性。它只基于交互。
- 远程过程调用而增加了成本。
5. 实例#
Java 中可以通过使用反射机制和动态代理来实现一个RPC框架。
5.1. 客户端#
5.1.1. 客户端实现#
package service;
import rpc.DynamicProxyFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class clienTest {
static void test(int threadId) {
// 这里实现了一个登陆的操作
// 实现细节并不重要,所以没有给出 loginService 的代码
// 通过动态代理与服务器建立连接
loginService loginservice =
DynamicProxyFactory.getProxy(loginService.class, "localhost", 8000);
// 执行远程操作
String Result = loginservice.login("123", "123");
// 输出结果
System.out.println(String.format("Client Thread %d result= %s",threadId, Result));
}
public static void main(String[] args) throws Exception {
int threadNum = 4;
// 创建多线程
ExecutorService executor = Executors.newFixedThreadPool(4);
// 多线程调用
for (int i = 0; i < 4; i++) {
int finalI = i;
executor.submit(() -> {
test(finalI + 1);
});
}
}
}
5.1.2. 动态代理类#
package rpc;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
// 动态代理类
public class DynamicProxyFactory {
@SuppressWarnings("unchecked")
public static <T> T getProxy(final Class<T> classType, final String host, final int port) {//泛型方法,传入什么类型返回什么类型
InvocationHandler handler = (proxy, method, args) -> {
Connector connector = null;
try {
connector = new Connector(host, port);
RemoteCall call = new RemoteCall(classType.getName(), method.getName(), method.getParameterTypes(), args);
connector.send(call);
call = (RemoteCall) connector.receive();
return call.getResult();
} finally {
if (connector != null) connector.close();
}
};
System.out.println("代理开始执行");
return (T) Proxy.newProxyInstance(classType.getClassLoader(),new Class<?>[]{classType}, handler);
}
}
5.1.3. 连接器#
package rpc;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.Socket;
public class Connector {
private String host;
private int port;
private Socket skt;
private InputStream is;
private ObjectInputStream ois;
private OutputStream os;
private ObjectOutputStream oos;
public Connector(String host, int port) throws Exception {
this.host = host;
this.port = port;
connect(host, port);
}
// 发送对象方法
public void send(Object obj) throws Exception {
oos.writeObject(obj);
}
// 接收对象方法
public Object receive() throws Exception {
return ois.readObject();
}
// 建立与远程服务器的连接
public void connect() throws Exception {
connect(host, port);
}
// 建立与远程服务器的连接
public void connect(String host, int port) throws Exception {
skt = new Socket(host, port);
os = skt.getOutputStream();
oos = new ObjectOutputStream(os);
is = skt.getInputStream();
ois = new ObjectInputStream(is);
}
// 关闭连接
public void close() {
try {
ois.close();
oos.close();
skt.close();
} catch (Exception e) {
System.out.println(" Conector.close :" + e);
}
}
}
5.2. 远程对象#
package rpc;
import java.io.Serializable;
//相当于一个javaBean
public class RemoteCall implements Serializable {
private static final long serialVersionUID = 1L;
private String className;// 表示服务类名或接口名
private String methodName; // 表示功能方法名
private Class<?>[] paramTypes;//表示方法参数类型
private Object[] params;//表示方法参数值/如果方法正常执行,则resul 为方法返回值,如果方法抛出异常,则resul 为该异常
private Object result;
public RemoteCall() {
}
public RemoteCall(String className, String methodName, Class<?>[] paramTypes, Object[] params) {
this.className = className;
this.methodName = methodName;
this.paramTypes = paramTypes;
this.params = params;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParamTypes() {
return paramTypes;
}
public void setParamTypes(Class<?>[] paramTypes) {
this.paramTypes = paramTypes;
}
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public String toString() {
return "className=" + className + " methodName=" + methodName;
}
}
5.3. 服务端#
package rpc;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.Proxy;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import service.loginServiceImp;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Server {
private final static String CLASS_PATH = "service.";
// 存放远程对象的缓存
private HashMap<String, Object> remoteObjects = new HashMap<>();
//注册服务:把一个远程对象放到缓存中
public void register(String className, Object remoteObject) {
remoteObjects.put(className, remoteObject);
}
// 8000端口负载均衡
public void transferService(ServerSocket serverSocket, int threadId) throws Exception {
//随机数生成器
final Random random = new Random();
// 循环运行
while (true) {
System.out.println(String.format("Thread:%d 负载均衡启动......", threadId));
// 创建hashMap
HashMap<String, Object> map = new HashMap<>();
// 等待服务
Socket socket = serverSocket.accept();
// 获取客户端输入流
ObjectInputStream clientOis = new ObjectInputStream(socket.getInputStream());
// 获取客户端输出流
ObjectOutputStream clientOos = new ObjectOutputStream(socket.getOutputStream());
// 将客户端传输的对象存入HashMap
map.put("clientObject", (RemoteCall) clientOis.readObject());
// 随机获取一个端口
int serverIndex = 8001 + random.nextInt(2);
// 打开端口
Socket targetSocket = new Socket("localhost", serverIndex);
// 输出
System.out.println(String.format("Thread:%d 负载均衡传输任务到端口 %d......", threadId, serverIndex));
// 获取输出对象流
ObjectOutputStream oos = new ObjectOutputStream(targetSocket.getOutputStream());
// 传输hashMap
oos.writeObject(map);
// 获取输入对象流
ObjectInputStream serverOis = new ObjectInputStream(targetSocket.getInputStream());
// 接收服务器发送的Call 对象
RemoteCall remotecallobj = (RemoteCall) serverOis.readObject();
// 向客户端发送对象
clientOos.writeObject(remotecallobj);
// 关闭资源
oos.close();
socket.close();
clientOos.close();
}
}
// 暴露服务,创建基于流的Socket,并在8001、8002 端口监听
public void exportService(ServerSocket serverSocket, int threadId) throws Exception {
// 循环运行
while (true) {
System.out.println(String.format("Thread:%d 服务器启动......", threadId));
Socket socket = serverSocket.accept();
// 获取负载服务器输入流
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
// 获取负载服务器输出流
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
// 获取负载服务器传入的hashMap
HashMap<String, Object> map = (HashMap<String, Object>) ois.readObject();
// 接收客户发送的Call 对象
RemoteCall remotecallobj = (RemoteCall) map.get("clientObject");
// 输出
System.out.println(remotecallobj);
// 调用相关对象的方法
System.out.println(String.format("Thread:%d calling......", threadId));
remotecallobj = invoke(remotecallobj);
// 向客户发送包含了执行结果的remotecallobj 对象
oos.writeObject(remotecallobj);
ois.close();
oos.close();
socket.close();
}
}
public RemoteCall invoke(RemoteCall call) {
Object result = null;
try {
String className = call.getClassName();
String methodName = call.getMethodName();
Object[] params = call.getParams();
Class<?> classType = Class.forName(className);
Class<?>[] paramTypes = call.getParamTypes();
Method method = classType.getMethod(methodName, paramTypes);
// 从hashmap缓存中取出相关的远程对象Object
Object remoteObject = remoteObjects.get(className);
if (remoteObject == null) {
throw new Exception(className + " 的远程对象不存在");
} else {
result = method.invoke(remoteObject, params);
System.out.println("远程调用结束:remotObject:" + remoteObject.toString() + ",params:" + params.toString());
}
} catch (Exception e) {
System.out.println("错误:" + e.getMessage());
}
call.setResult(result);
return call;
}
public static void main(String args[]) throws Exception {
// 线程数量
int threadNum = 6;
// 初始化
Server server = new Server();
// 创建多线程
ExecutorService executor = Executors.newFixedThreadPool(threadNum);
//把事先创建的RemoteServceImpl 对象加人到服务器的缓存中
//在服务注册中心注册服务
server.register(CLASS_PATH + "loginService", new loginServiceImp());
// 多线程运行
Future[] future = new Future[threadNum];
// 创建端口数组
ServerSocket[] serverSocket = new ServerSocket[threadNum];
// 创建8000端口
ServerSocket port8000 = new ServerSocket(8000);
// 创建8001端口
ServerSocket port8001 = new ServerSocket(8001);
// 创建8002端口
ServerSocket port8002 = new ServerSocket(8002);
for (int i = 0; i < threadNum; i++) {
// 第一、二个线程端口为8000,负责调度资源,负载均衡
if (i <= 1) {
serverSocket[i] = port8000;
} else if (i <= 3) {
// 第三、四个线程端口为8001,模拟第一台主机
serverSocket[i] = port8001;
} else {
// 第五、六个线程端口为8002,模拟第二台主机
serverSocket[i] = port8002;
}
}
// 循环创建线程
for (int i = 0; i < threadNum; i++) {
int finalI = i;
future[i] = executor.submit(() -> {
try {
// 8000端口负载均衡
if (finalI <= 1) {
server.transferService(serverSocket[finalI], finalI + 1);
} else {
//打开网络端口,接受外部请求,执行服务功能,返回结果
server.exportService(serverSocket[finalI], finalI + 1);
}
} catch (Exception e) {
e.printStackTrace();
}
return 1;
});
}
// 循环获取线程结果,阻塞同步
for (int i = 0; i < threadNum; i++) {
future[i].get();
}
// 关闭端口
for (int i = 0; i < threadNum; i++) {
serverSocket[i].close();
}
}
}
6. 参考文献#
[1] https://www.tutorialspoint.com/remote-procedure-call-rpc
联系邮箱:curren_wong@163.com
Github:https://github.com/CurrenWong
欢迎转载/Star/Fork,有问题欢迎通过邮箱交流。