nacos dubbo 服务发现,dubbo服务注册发现的过程

  nacos dubbo 服务发现,dubbo服务注册发现的过程

  00-1010前言简介流程源代码分析具体实现服务注册服务订阅结论

  在10-1010上,我们谈到了我们的dubbo服务从redis到nacos注册中心的迁移。迁移后发现会时不时抛出异常:error com . Alibaba . nacos . client . naming-[client-beat]发送beat 3360失败。因此,在这个分析过程中,我们最终发现异常是我们的SLB网络映射问题,与nacos无关。

  Dubbo版本:2.7.4.1nacos客户端版本:1.0.0nacos服务器版本:1.1.3

  

目录

dubbo端:通过nacos注册中心实现dubbo,向nacos注册服务,添加心跳任务,每5s发送一次服务健康心跳。同时检查nacos服务列表是否每1s更新一次。如果有更新,会触发服务实例更新通知,更新dubbo的本地服务列表。nacos端:接收到心跳后,如果此时服务实例不存在,则会创建一个新的服务实例。如果服务实例此时不健康,它将被设置为健康状态,并且该状态将被主动推送到客户端。nacos中有一个检查服务状态的任务。如果15秒内没有健康的心跳报告,服务实例将被设置为不健康。如果30秒内没有健康的心跳报告,服务实例将脱机,状态将推送到客户端。

 

  00-1010在dubbo的注册表包下,为服务注册行为定义了四个接口,所有服务注册(zookeeper、nacos、redis、etcd等。)支持这些接口的实现。

  NotifyListener:服务变更通知监控的接口定义。在实现注册中心时,不需要关心实现。只需将这个实例传递给特定的侦听器。RegistryService:服务注册、注销、定义、退订、服务搜索的接口定义是核心接口。包括注册表的核心功能:RegistryService和Node的封装,更多检查服务是否可用的方法,破坏离线服务。一般直接实现注册表接口RegistryFactory:通过注册表的URL获取注册表的接口定义,设计dubbo的spi设计。对于每个具体的实现,映射一个注册表协议头,比如nacos://对应的nacos实现新对接一个注册表,不需要直接实现注册表接口,可以直接继承FailbackRegistry抽象类,实现相关的do方法。Dubbo对服务注册的抽象和nacos服务注册非常一致,大部分接口都可以直接连接使用。只是服务订阅监听器的定义不同,可以稍微打包转换一下,所以实现起来非常简单。

  

前言

org . Apache . dubbo . registry . nacos . nacos registry :152

 

  @ Override public void do register(URL URL){ final String service name=get service name(URL);final Instance Instance=create Instance(URL);execute(naming service-naming service . register instance(service name,instance));}dubbo,所有服务都封装成URL,对应nacos中的服务实例Instance。因此,在注册服务时,只需将URL转换为实例,就可以在nacos中注册服务。我们来看看namingService中的具体注册行为。

  com . Alibaba . nacos . client . naming . nacosnamingservice :283

  @ Override public void register Instance(String service name,String groupName,Instance instance)抛出NacosException { if(Instance . isephemeral()){ BeatInfo BeatInfo=new BeatInfo();beatinfo . set service name(namingutils . getgroupedname(service name,group name));beatinfo . setip(instance . getip());beatinfo . setport(instance . get port());beatInfo.setCluster(instanc

  e.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }如上代码,除了注册实例外,还判断了instance实例是否是临时实例,如果是临时实例,则加入了beatReactor的心跳列表。这是因为,nacos将服务分成了两类,一类是临时性的服务, 像dubbo、spring cloud这种,需要通过心跳来保活,如果心跳没有及时发送,服务端会自动下线这个instance。一类是永久性服务,如数据库、缓存服务等, 客户端不会也没法发送心跳,这类服务就由服务端通过TCP端口检测等方式反向探活。下面看看临时实例的心跳是怎么发送的。

  

com.alibaba.nacos.client.naming.NacosNamingService:104

 

  

private int initClientBeatThreadCount(Properties properties) { if (properties == null) { return UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT; } return NumberUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_CLIENT_BEAT_THREAD_COUNT), UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT); } //可通过配置dubbo.registries.nacos.parameters.namingClientBeatThreadCount = 10设置维护心跳的线程数

先看一段获取心跳beatReactor线程池线程数量的初始化代码,传入的Properties是配置dubbo注册中心时的参数列表,如果配置了namingClientBeatThreadCount,则取配置的值, 默认维护心跳的线程池大小为:如果是单核的,就是一个线程,多核的就CPU核心数一半的线程。继续心跳逻辑

 

  

com.alibaba.nacos.client.naming.beat.BeatReactor:78

 

  

class BeatProcessor implements Runnable { @Override public void run() { try { for (Map.Entry entry : dom2Beat.entrySet()) { BeatInfo beatInfo = entry.getValue(); if (beatInfo.isScheduled()) { continue; } beatInfo.setScheduled(true); executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS); } } catch (Exception e) { NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e); } finally { executorService.schedule(this, clientBeatInterval, TimeUnit.MILLISECONDS); } } } class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { long result = serverProxy.sendBeat(beatInfo); beatInfo.setScheduled(false); if (result > 0) { clientBeatInterval = result; } } }

dom2Beat是一个存放需要心跳上报的临时实例的map容器,NacosNamingService.registerInstance中通过判断临时节点添加到心跳列表的逻辑, 最终添加到了这个map里。BeatReactor初始化后会触发BeatProcessor线程的调用,BeatProcessor线程是一个不断自我触发调用的线程,前一次 心跳上报逻辑执行完后,间隔5S触发下一次心跳上报。间隔时间由变量clientBeatInterval控制,受nacos服务端返回的心跳结果值的影响 心跳间隔可能会改变,nacos服务端从instance的元数据中寻找key为preserved.heart.beat.interval的值返回,如果为空则返回5S。 这个功能在dubbo2.7.4.1的版本里还不成熟,只能通过注解元素指定,如@Reference(parameters = "preserved.heart.beat.interval,10000"), 后面如果能够直接在注册中心的url参数配置就算成熟了,所以这个功能暂时不推荐使用,可以作为实验功能试试。

 

  

 

  

服务订阅

org.apache.dubbo.registry.nacos.NacosRegistry:399

 

  

private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener) throws NacosException { if (!nacosListeners.containsKey(serviceName)) { EventListener eventListener = event -> { if (event instanceof NamingEvent) { NamingEvent e = (NamingEvent) event; notifySubscriber(url, listener, e.getInstances()); } }; namingService.subscribe(serviceName, eventListener); nacosListeners.put(serviceName, eventListener); } }

nacos的服务监听是EventListener,所以dubbo的服务订阅只需要将NotifyListener的处理包装进onEvent中处理即可, 通过namingService.subscribe添加nacos的订阅。最终EventListener对象会被添加到事件调度器的监听器列表中,见如下代码:

 

  

com.alibaba.nacos.client.naming.core.EventDispatcher:

 

  

public class EventDispatcher { private ExecutorService executor = null; private BlockingQueuechangedServices = new LinkedBlockingQueue(); private ConcurrentMap observerMap = new ConcurrentHashMap(); public EventDispatcher() { executor = Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener"); thread.setDaemon(true); return thread; } }); executor.execute(new Notifier()); } public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) { NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map"); Listobservers = Collections.synchronizedList(new ArrayList()); observers.add(listener); observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers); if (observers != null) { observers.add(listener); } serviceChanged(serviceInfo); } public void removeListener(String serviceName, String clusters, EventListener listener) { NAMING_LOGGER.info("[LISTENER] removing " + serviceName + " with " + clusters + " from listener map"); Listobservers = observerMap.get(ServiceInfo.getKey(serviceName, clusters)); if (observers != null) { Iteratoriter = observers.iterator(); while (iter.hasNext()) { EventListener oldListener = iter.next(); if (oldListener.equals(listener)) { iter.remove(); } } if (observers.isEmpty()) { observerMap.remove(ServiceInfo.getKey(serviceName, clusters)); } } } public ListgetSubscribeServices() { ListserviceInfos = new ArrayList(); for (String key : observerMap.keySet()) { serviceInfos.add(ServiceInfo.fromKey(key)); } return serviceInfos; } public void serviceChanged(ServiceInfo serviceInfo) { if (serviceInfo == null) { return; } changedServices.add(serviceInfo); } private class Notifier implements Runnable { @Override public void run() { while (true) { ServiceInfo serviceInfo = null; try { serviceInfo = changedServices.poll(5, TimeUnit.MINUTES); } catch (Exception ignore) { } if (serviceInfo == null) { continue; } try { Listlisteners = observerMap.get(serviceInfo.getKey()); if (!CollectionUtils.isEmpty(listeners)) { for (EventListener listener : listeners) { Listhosts = Collections.unmodifiableList(serviceInfo.getHosts()); listener.onEvent(new NamingEvent(serviceInfo.getName(), hosts)); } } } catch (Exception e) { NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e); } } } } public void setExecutor(ExecutorService executor) { ExecutorService oldExecutor = this.executor; this.executor = executor; oldExecutor.shutdown(); }}

EventDispatcher中维护了一个监听器列表observerMap,同时维护了一个事件变更的阻塞队列changedServices,监听调度器初始化后,会触发一个线程消费阻塞队列的 数据,当注册服务发生变化时,将变更数据入队,就能唤醒线程更新dubbo内存中的服务列表了。上面已经聊到,nacos client会以1s的频次拉取注册的实例,当拉取到的实例和本地内存的 有出入时,就会触发入队操作,如:

 

  

com.alibaba.nacos.client.naming.core.HostReactor:296

 

  

public class UpdateTask implements Runnable { long lastRefTime = Long.MAX_VALUE; private String clusters; private String serviceName; public UpdateTask(String serviceName, String clusters) { this.serviceName = serviceName; this.clusters = clusters; } @Override public void run() { try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); if (serviceObj == null) { updateServiceNow(serviceName, clusters); executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS); return; } if (serviceObj.getLastRefTime() <= lastRefTime) { updateServiceNow(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // if serviceName already updated by push, we should not override it // since the push data may be different from pull through force push refreshOnly(serviceName, clusters); } executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS); lastRefTime = serviceObj.getLastRefTime(); } catch (Throwable e) { NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } } }

DEFAULT_DELAY值为1s,同时,nacos也会主动的推送数据变更事件,当遇到nacos主动推送时,serviceInfoMap中的serviceObj会被更新,那么下次 nacos client拉取的时间间隔会被设置成10S之后,具体的和本地列表比对的逻辑都在updateServiceNow方法内,这里就不展开讲述了。

 

  

 

  

结语

dubbo注册服务到nacos以及订阅服务是一个比较复杂的过程,在剖析的过程中,带着疑问去看源码会有事半功倍的效果,比如博主在看源码前, 首先是为了寻找nacos的心跳异常,然后对nacos如何实现事件监听比较好奇。然后层层剖析渐进明朗恍然大悟。当然在剖析dubbo注册服务到nacos时,也需要了解 nacos服务端的处理逻辑,nacos服务端非常核心的两个类ClientBeatCheckTask、ClientBeatProcessor,包含了心跳处理、健康检测和事件推送的逻辑, 有兴趣可以看看

 

  以上就是dubbo服务注册到nacos的过程剖析的详细内容,更多关于dubbo服务注册到nacos的资料请关注盛行IT其它相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: