如何开发一个自己的 RPC 框架 (二)
上篇文章中,从客户端的调用流程,详细的了解了各个环节的数据是如何串联起来的。那么本章内容,将主要从注册中心的角度出发,看看注册中心是如何维护客户端和服务端各自上报的信息。
能够作为注册中心的中间件非常多,常见的比如 ZooKeeper、Consul、Nacos 等,本项目中采用的是 Zookeeper 作为注册中心中间件
功能梳理
同样,开始之前我们先来理一下注册中心主要实现那些功能
flowchart TD
注册中心 --> 维护服务提供方节点信息
注册中心 --> 维护服务消费方节点信息
注册中心 --> 服务消费方服务信息订阅
接口定义
定义一个名为 IRegisterCenter 的接口,其中接口主要实现如下功能:
- 服务提供方的节点上报
- 服务提供方节点信息更新
- 服务提供方的本地数据缓存和节点信息订阅
- 服务消费方根据服务名获取节点地址
- 服务消费方订阅服务节点信息
- 服务消费方信息注册
有了上面 6 大需求,就可以知道接口接口中主要包含的方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public interface IRegisterCenter {
void register(List<Producer> producerList);
Map<String, List<Producer>> getProducersMap();
void destroy(String name);
void initProviderMap(String remoteAppKey, String groupName);
List<Producer> getServiceNode();
List<Producer> getServiceProducer(String serviceName, String methodName);
void registerConsumer(Consumer consumer);
}
|
配置文件读取
注册中心的接口定义完成之后,就需要使用 zookeeper 进行接口实现。但是在接口实现之前,需要实现一个帮助方法读取 RPC 框架的相关配置项。
根据前面的设计,可以大概罗列一下需要用到的配置参数
- appkey: 服务的唯一标识
- groupName: 服务的分组名称
- zookServer: zk 中间件服务地址
- nettyIp: 服务提供方调用地址
- nettyPort: 服务提供方调用端口
当然还包含一些其他参数,比如服务调用超时时间、服务会话超时时间等
完成上面需求整理之后,就可以开始 RpcPropertiesUtil 方法的编写了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Slf4j public class RpcPropertiesUtil { private static final Map<String, Object> PRODUCER_MAP;
private static final String PROPERTIES_FILE_NAME = "rpc.properties";
static { PRODUCER_MAP = Maps.newConcurrentMap(); try { Properties properties = PropertiesLoaderUtils.loadAllProperties(PROPERTIES_FILE_NAME); for (Map.Entry<Object, Object> entry : properties.entrySet()) { PRODUCER_MAP.put(entry.getKey().toString(), entry.getValue()); }
} catch (IOException e) { log.error("读取配置文件异常", e); throw new RuntimeException("Producer Load Properties Exception"); } }
private static String getString(Map<String, Object> map, String key, String defaultValue) { String envKey = System.getenv().get(key.toUpperCase()); String result = envKey != null ? envKey : MapUtils.getString(map, key); return StringUtils.isEmpty(result) ? defaultValue : result; }
private static Integer getInteger(Map<String, Object> map, String key, Integer defaultValue) { String envKey = System.getenv().get(key.toUpperCase()); Integer result = envKey != null ? Integer.parseInt(envKey) : MapUtils.getInteger(map, key); return result == null ? defaultValue : result; } }
|
- 定义一个
PRODUCER_MAP 用来做配置文件缓存
- static 静态方法中,读取
[rpc.properties](http://rpc.properties) 配置文件,转换成 Map
- 定义了两个
getString 和 getInteger 的方法,表示先从环境变量中读取配置,不存在时从配置文件中读取
完成上述基础编写之后,实例一个 rpc_app_key 的读取
1 2 3
| public static String getAppKey() { return getString(PRODUCER_MAP, "rpc_app_key", "test"); }
|
zookeeper 接口实现
由于接口的方法都已经定义完成,只需要根据接口的方法含义,进行对应的实现即可
定义一个类 IRegisterCenterZkImpl 实现 IRegisterCenter 接口
成员变量介绍
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| private static final IRegisterCenterZkImpl instance = new IRegisterCenterZkImpl();
private volatile ZkClient zkClient = null;
private static final String ZK_ADDRESS = RpcPropertiesUtil.getZkServers(); private static final int ZK_SESSION_TIMEOUT = RpcPropertiesUtil.getSessionTimeout(); private static final int ZK_CONNECTION_TIMEOUT = RpcPropertiesUtil.getConnectionTimeout(); private static final String LOCAL_IP = RpcPropertiesUtil.getServerIp();
private static final Map<String, List<Producer>> PROVIDER_MAP = new ConcurrentHashMap<>(); private static final Map<String, List<Producer>> SERVICE_METADATA = Maps.newConcurrentMap();
private static final String ROOT_PATH = "/config_register"; public static final String PROVIDER_TYPE = "/provider"; public static final String CONSUMER_TYPE = "/consumer";
public static final Map<String, Object> PRODUCER_BEAN_MAP = Maps.newConcurrentMap();
|
register 方法实现
register 应该是整个注册中心最重要的方法了,他主要目的是接收服务提供方的节点信息上报
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| @Override public void register(List<Producer> producerList) { if (CollectionUtils.isEmpty(producerList)) { log.debug("RegisterCenterImpl registerProvider providers is empty, ignore it, providers={}", producerList); return; }
synchronized (IRegisterCenterZkImpl.class) { 1️⃣ this.initZkClient();
2️⃣ this.setLocalCache(producerList); 3️⃣ for (Map.Entry<String, List<Producer>> entry : PROVIDER_MAP.entrySet()) { String serviceName = entry.getKey(); List<Producer> producers = entry.getValue();
Producer firstProducer = producers.get(0); String appKey = firstProducer.getAppKey(); String rootNode = getRootPath(appKey); 4️⃣ this.createRootNode(rootNode);
String groupName = firstProducer.getGroupName(); 5️⃣ String servicePath = getProducerServicePath(appKey, groupName, serviceName);
for (Producer producer : producers) { this.createServiceNode(servicePath); String producerMathodPath = producerToPath(servicePath, producer); 6️⃣ this.createCurrentServiceIpNode(producerMathodPath); log.debug("create current service node success, node path = {} ,method path = {}", servicePath, producerMathodPath); }
7️⃣ subscribeChildChanges(serviceName, servicePath, PROVIDER_MAP); } } }
|
1️⃣:本地初始化 zk 的请求客户端
1 2 3 4 5 6
| private void initZkClient() { if (zkClient == null) { zkClient = new ZkClient(ZK_ADDRESS, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT); } }
|
对象定义成员变量 zkClient
2️⃣:设置本地节点缓存
1 2 3 4 5 6 7 8 9 10 11 12 13
| private void setLocalCache(List<Producer> producerList) { for (Producer producer : producerList) { String name = producer.getServiceItf().getName(); List<Producer> producerListCache = PROVIDER_MAP.get(name); if (producerListCache == null) { producerListCache = Lists.newArrayList(); }
producerListCache.add(producer); PROVIDER_MAP.put(name, producerListCache); } }
|
存在本地成员变量 PROVIDER_MAP,其中 Map 的 Key 为服务接口的类名
3️⃣:根据类名遍历节点信息
4️⃣:创建根节点信息,根节点地址为 /config_register/{appKey}
5️⃣:根据服务名称创建服务节点信息,服务节点地址为: /config_register/{appKey}/{groupName}/{serviceName} 其中 serviceName 就是服务接口的类名
6️⃣:根据接口类名创建具体的方法节点信息,其中 producerToPath 方法,主要目的是将 Producer 对象转换成 Zk 的路径信息,其中主要实现如下:
1 2 3 4 5 6 7 8
| private String producerToPath(String servicePath, Producer producer) { return servicePath + "/" + producer.getIp() + "|" + producer.getPort() + "|" + producer.getWeight() + "|" + producer.getWorkerThreads() + "|" + producer.getMethod().getName() + "|" + producer.getGroupName(); }
|
返回的地址信息示例如下: /config_register/{appKey}/{groupName}/{serviceName}/127.0.0.1|9999|1|10|get|defaut
127.0.0.1|9999|1|10|get|defaut 这段参数的含义就是:服务地址 + 服务端口 + 权重 + 工作线程数量 + 方法名称 + 服务组名称
createCurrentServiceIpNode 方法实现如下:
1 2 3 4 5 6
| private void createCurrentServiceIpNode(String currentServiceIpNode) { if (!zkClient.exists(currentServiceIpNode)) { zkClient.createEphemeral(currentServiceIpNode); } }
|
7️⃣:服务地址监听
subscribeChildChanges 方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| private void subscribeChildChanges(String serviceName, String servicePath, Map<String, List<Producer>> dataMap) { zkClient.subscribeChildChanges(servicePath, (parentPath, currentChilds) -> { if (currentChilds == null) { currentChilds = new ArrayList<>(); }
List<Producer> producers = currentChilds.stream().map(currentChild -> pathToProducer(serviceName, currentChild)).collect(Collectors.toList()); dataMap.put(serviceName, producers); }); }
|
initProviderMap 服务消费方节点获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Override public void initProviderMap(String remoteAppKey, String groupName) { if (MapUtils.isEmpty(SERVICE_METADATA)) { SERVICE_METADATA.putAll(this.fetchOrUpdateServiceMetaData(remoteAppKey, groupName)); } }
private Map<String, List<Producer>> fetchOrUpdateServiceMetaData(String remoteAppKey, String groupName) { final Map<String, List<Producer>> providerServiceMap = Maps.newHashMap(); this.initZkClient();
String providerNode = getRootPath(remoteAppKey) + UrlConstants.SLASH + groupName; 1️⃣ List<String> producerServices = zkClient.getChildren(providerNode);
for (String serviceName : producerServices) { String servicePath = getProducerServicePath(remoteAppKey, groupName, serviceName); 2️⃣ List<String> producerPaths = zkClient.getChildren(servicePath);
for (String producerPath : producerPaths) { 3️⃣ Producer pathToProducer = pathToProducer(serviceName, producerPath); List<Producer> providerList = providerServiceMap.get(serviceName); if (providerList == null) { providerList = new ArrayList<>(); }
providerList.add(pathToProducer); 4️⃣ providerServiceMap.put(serviceName, providerList); }
5️⃣ subscribeChildChanges(serviceName, servicePath, SERVICE_METADATA); }
return providerServiceMap; }
|
1️⃣:根据 appKey 和 groupName 从 zk 中查询注册的所有服务节点
2️⃣:根据 serviceName 获取该服务下的所有节点信息
3️⃣:使用 pathToProducer 将 zk 的 path 信息,转换成 Producer 对象
4️⃣:将节点信息缓存到本地
5️⃣:和服务提供方注册类似,订阅服务下方节点变化,然后刷新本地 Map
getServiceProducer 服务消费方获取服务节点
1 2 3 4 5 6 7 8
| @Override public List<Producer> getServiceProducer(String serviceName, String methodName) { List<Producer> producers = SERVICE_METADATA.get(serviceName); return producers == null ? null : producers.stream().filter(producer -> producer.getMethod() != null && producer.getMethod().getName().equals(methodName)).collect(Collectors.toList()); }
|
这里的实现代码就非常简单了,只需要遍历本地缓存的 SERVICE_METADATA 对象即可
由于其他方法的操作都是对本地缓存 Map 的操作,所以这里就不再详细展开了
到这里,已经完成了注册中心最重要方法编写,处理服务提供方的服务注册,和服务消费放的服务节点订阅。
下一章我们将完成服务提供方节点信息的注册和 Netty 消息的处理。