如何开发一个自己的 RPC 框架(三)
之前的两篇文章中,已经介绍了实现一个 RPC 框架所需要的客户端和注册中心逻辑,那么这一章,将主要介绍如何实现 RPC 框架中的服务端(服务提供方) 和完善框架的其他补充逻辑。
RPC 服务端实现
服务注解(RpcProducer)
和客户端调用标识类似,服务提供方也需要设置服务标识,用来服务的注册和相关配置信息拓展
@RpcProducer 服务提供方注解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) public @interface RpcProducer {
Class<?> serviceItf() default Object.class;
String groupName() default "default";
long timeout() default 3000L;
int weight() default 1;
int workThreads() default 10; }
|
服务提供方节点注册
为了实现所有被 @RpcProducer 注解的实现类都可以被注册到注册中心,在 Spring 中,需要实现
- ApplicationListener:
onApplicationEvent 监听应用程序事件
- ApplicationContextAware:
setApplicationContext 设置当前上下文
- DisposableBean:
destroy 方法,bean 被销毁时执行
这三个接口,通过 setApplicationContext 设置当前上下文,通过 onApplicationEvent 监听服务实现类然后提交到注册中心
理清楚上面流程后,便可以创建实现类 ProducerAnnotationBean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Slf4j public class ProducerAnnotationBean implements ApplicationListener<ApplicationContextEvent>, ApplicationContextAware, DisposableBean {
private ApplicationContext applicationContext;
@Override public void destroy() throws Exception { log.debug("AnnotationServicesPublisher bean destroy"); }
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { 1️⃣ this.applicationContext = applicationContext; }
@Override public void onApplicationEvent(ApplicationContextEvent event) {2️⃣} }
|
1️⃣:设置当前上下文
2️⃣:监听 Spring Bean 的动作
onApplicationEvent 的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Override public void onApplicationEvent(ApplicationContextEvent event) { 1️⃣ if (event.getApplicationContext() != applicationContext) { log.debug("Received a event from another application context {}, ignoring it", event.getApplicationContext()); }
if (event instanceof ContextRefreshedEvent) { Map<String, Object> annotation = applicationContext.getBeansWithAnnotation(RpcProducer.class); if (MapUtils.isEmpty(annotation)) { log.info("no simple rpc exist");
} else { annotation.forEach((beanName, bean) -> { log.info("simple rpc beanName: {}, bean: {}", beanName, bean); 2️⃣ if (bean.getClass().isAnnotationPresent(RpcProducer.class)) { 3️⃣ List<Producer> producerList = this.buildProviderService(bean); 4️⃣ NettyService.getInstance().startService(RpcPropertiesUtil.getServerPort()); 5️⃣ IRegisterCenterZkImpl.getInstance().register(producerList); } }); } } else if (event instanceof ContextClosedEvent) { 6️⃣ IRegisterCenterZkImpl.getInstance().destroy(null); log.info("simple rpc closed"); } }
|
1️⃣:判断上下文和当前 Spring 上下文是否一致
2️⃣:判断当前实例对象是否被 RpcProducer 注解
3️⃣:根据当前 bean 构造 Producer 对象
实现代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public List<Producer> buildProviderService(Object bean) { RpcProducer annotation = bean.getClass().getAnnotation(RpcProducer.class); Class<?> serviceItf = annotation.serviceItf(); if (serviceItf == Object.class) { serviceItf = bean.getClass().getInterfaces()[0]; } NettyHandlerServer.PRODUCER_BEAN_MAP.put(serviceItf.getName(), bean); String groupName = annotation.groupName(); long timeout = annotation.timeout(); int weight = annotation.weight(); int workThreads = annotation.workThreads();
Class<?> finalServiceItf = serviceItf; return Arrays.stream(bean.getClass().getDeclaredMethods()) .filter(method -> !method.getDeclaringClass().equals(Object.class)) .map(method -> { Producer producer = Producer.builder() .serviceItf(finalServiceItf) .serviceObject(bean) .ip(IpUtil.getLocalIP()) .port(RpcPropertiesUtil.getServerPort()) .timeout(timeout) .appKey(RpcPropertiesUtil.getAppKey()) .groupName(groupName) .weight(weight) .workerThreads(workThreads) .method(method) .build(); return producer; }).collect(Collectors.toList()); }
|
4️⃣:启动 Netty Server 服务,这一段比较重要,稍后讲解
5️⃣:向注册中心注册服务提供方的节点信息
6️⃣:服务关闭时,清除所有的节点注册信息
完成上述步骤后,所有被 RpcProducer 注解的 class 就都将被注册到注册中心中。
服务提供方启动 Netty 服务
上一步的步骤4️⃣中,在向服务中心注册节点时,同时根据注解的端口启动了 Netty 服务,那么这个 Netty 服务是如何启动的,然后启动的目的是什么呢?
在第一章中我们有讲到,客户端的接口在执行反射操作时,会连接服务端的 Netty 服务,然后发送 NettyRequest 请求。那么服务端接收到 NettyRequest 请求后会做那么事情呢?服务端又是如何根据 NettyRequest映射到具体的方法上,同时将方法执行的结果进行返回的呢?这一节我们一起弄清楚这一点。
sequenceDiagram
客户端->>服务提供方: 1️⃣发送 NettyRequest 请求
服务提供方->>接口实现: 2️⃣通过反射执行 method
接口实现->>服务提供方: 3️⃣返回方法执行结果
服务提供方 -> 客户端: 4️⃣返回 NettyResponse 结果
新建一个 NettyService.class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| @Slf4j public class NettyService { private static final NettyService instance = new NettyService();
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
public static NettyService getInstance() { return instance; }
public void startService(int port) { synchronized (NettyService.class) { if (bossGroup != null || workGroup != null) { log.debug("netty server is already start"); return; }
bossGroup = new NioEventLoopGroup(1); workGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyDecoderHandler(NettyRequest.class)); socketChannel.pipeline().addLast(new NettyEncoderHandler()); 1️⃣ socketChannel.pipeline().addLast(new NettyHandlerServer()); } });
try { serverBootstrap.bind(port).sync().channel(); log.info("NettyServer start {} start now!!!", IpUtil.getLocalIP() + UrlConstants.COLON + port);
} catch (Exception e) { log.error("NettyServer startServer error", e); } } } }
|
1️⃣:设置消息处理器
上述代码都比较常规,主要是使用 Netty 根据传入的端口进行了服务的启动,其中最主要的代码是
socketChannel.pipeline().addLast(new NettyHandlerServer()); 设置了消息的处理器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| public class NettyHandlerServer extends SimpleChannelInboundHandler<NettyRequest> {
public static final Map<String, Object> PRODUCER_BEAN_MAP = Maps.newConcurrentMap();
@Override protected void channelRead0(ChannelHandlerContext ctx, NettyRequest nettyRequest) throws Exception { if (!ctx.channel().isWritable()) { log.error("channel closed!"); return; }
Producer localProducer = this.getLocalProducer(nettyRequest); if (localProducer == null) { log.error("service not found, request={}", nettyRequest); return; }
Object result = this.invockMethod(localProducer, nettyRequest); NettyResponse response = NettyResponse.builder() .uniqueKey(nettyRequest.getUniqueKey()) .result(result) .build(); ctx.writeAndFlush(response); }
private Producer getLocalProducer(NettyRequest request) { String methodName = request.getInvokeMethodName(); String name = request.getProducer().getServiceItf().getName(); List<Producer> producerList = IRegisterCenterZkImpl.getInstance().getProducersMap().get(name); return Collections2.filter(producerList, producer -> { assert producer != null; Method method = producer.getMethod(); return method != null && method.getName().equals(methodName); }).iterator().next(); }
private Object invockMethod(Producer producer, NettyRequest request) { Object serviceObject = producer.getServiceObject(); Method method = producer.getMethod(); Object result = null;
try { result = method.invoke(serviceObject, request.getArgs()); return result; } catch (Exception e) { result = e; log.error("NettyServerBizHandler invokeMethod error, provider={}, request={}", producer, request, e); }
return result; }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("NettyServerBizHandler error, now ctx closed", cause); ctx.close(); } }
|
消息处理器中,在 invockMethod 方法中,根据传入的 request 的 method 参数进行了方法的反射获取执行结果,然后进行返回。反射执行完成后,将结果组装成 NettyResponse 结果进行返回。
那么到这里就完成了整个远程服务的执行。
Spring 注解服务扫描
定义 @EnableRpc 注解
1 2 3 4 5 6
| @Documented @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Import({RpcImportSelector.class}) public @interface EnableRpc { }
|
注解中 @Import 了 RpcImportSelector.class 对象
1 2 3 4 5 6 7 8 9
| public class RpcImportSelector implements ImportSelector {
@Override public String[] selectImports(AnnotationMetadata annotationMetadata) { return new String[]{"com.simon.spring.ProducerAnnotationBean", "com.simon.spring.ConsumerAnnotaionBean"}; }
}
|
RpcImportSelector 对象实现了 ImportSelector 接口, selectImports 方法中返回了处理客户端和服务端注解的类。
客户端启动类 ConsumerApplication.class
1 2 3 4 5 6 7
| @EnableRpc @SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }
|
服务启动时,只需要加上 @EnableRpc 注解即可启动 RPC 服务
实例
客户端
ConsumerApplication.class 启动类
1 2 3 4 5 6 7
| @EnableRpc @SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }
|
UserController.class 请求接口
1 2 3 4 5 6 7 8 9 10 11 12 13
| @RestController @RequestMapping("/user") public class UserController { @RpcClient(remoteAppKey = "test") private UserService userService;
@GetMapping("/getUser/{username}") public String getUser(@PathVariable String username) { return userService.get(username); } }
|
配置文件
application.properties
rpc.properties
1 2 3 4 5 6 7
| # netty 配置 rpc_app_key=test # ZK 配置 zk_server=127.0.0.1:2181 rpc_session_timeout=3000 rpc_connection_timeout=3000 rpc_channel_connect_size=3
|
服务端
ProducerApplication.class 启动类
1 2 3 4 5 6 7
| @EnableRpc @SpringBootApplication public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } }
|
UserServiceImpl.class 接口实现类
1 2 3 4 5 6 7 8 9
| @Slf4j @Service @RpcProducer public class UserServiceImpl implements UserService { @Override public String get(String username) { return "username " + username + "服务信息: " + RpcPropertiesUtil.getServerPort(); } }
|
配置信息
application.properties
rpc.properties
1 2 3 4 5 6 7 8
| # netty 配置 rpc_app_key=test rpc_server_port=9999 # ZK 配置 zk_server=127.0.0.1:2181 rpc_session_timeout=3000 rpc_connection_timeout=3000 rpc_channel_connect_size=3
|
服务启动
分别启动服务端和客户端,然后请求 http://127.0.0.1:8082/user/getUser/simon, 可以查看结果

总结
至此,如何开发一个自己的 RPC 框架就全部完成了,框架的内容非常粗糙,距离真正商用版本还差很多距离。但是通过参考别人的源码,可以理解一个基础的 RPC 框架的大体结构和所需要用到的知识,包括 Netty、Spring 、序列化、负载均衡策略、 zookeeper、 Java 的反射等等。但不管怎么说,有进步就是一件很快乐的事情!