微服务组件(阿里巴巴微服务组件)

  本篇文章为你整理了微服务组件(阿里巴巴微服务组件)的详细内容,包含有微服务的五大组件 阿里巴巴微服务组件 springcloud微服务组件 微服务组件及原理 微服务组件,希望能帮助你了解 微服务组件。

  核心功能点

  【1】服务注册:Nacos Client会通过发送REST请求的方式向Nacos Server注册自己的服务,提供自身的元数据,比如ip地址、端口等信息。Nacos Server接收到注册请求后,就会把这些元数据信息存储在一个双层的内存Map中。

  【2】服务心跳:在服务注册后,Nacos Client会维护一个定时心跳来持续通知Nacos Server,说明服务一直处于可用状态,防止被剔除。默认5s发送一次心跳。

  【3】服务同步:Nacos Server集群之间会互相同步服务实例,用来保证服务信息的一致性。

  【4】服务发现:服务消费者(Nacos Client)在调用服务提供者的服务时,会发送一个REST请求给Nacos Server,获取上面注册的服务清单,并且缓存在Nacos Client本地,同时会在Nacos Client本地开启一个定时任务定时拉取服务端最新的注册表信息更新到本地缓存

  【5】服务健康检查:Nacos Server会开启一个定时任务用来检查注册服务实例的健康情况,对于超过15s没有收到客户端心跳的实例会将它的healthy属性置为false(客户端服务发现时不会发现),如果某个实例超过30秒没有收到心跳,直接剔除该实例(被剔除的实例如果恢复发送心跳则会重新注册)

  

  源码精髓总结

  【1】注册表的结构说明(这个仅是记录):

  

//Map namespaceId, Map service_name, Service 【ConcurrentSkipListMap】 

 

  private final Map String, Map String, Service serviceMap = new ConcurrentHashMap ();

  //再分析里面的Service,Map clusterName, Cluster

  private Map String, Cluster clusterMap = new HashMap ();

  //再分析Cluster

  private Set Instance persistentInstances = new HashSet ();

  private Set Instance ephemeralInstances = new HashSet ();

 

  【2】分析注册表为何要这么设计

  

1.注册表是基于第一层ConcurrentHashMap,第二层ConcurrentSkipListMap,第三层HashMap,然后定位到对应的Cluster。

 

  2.至于为什么要这样设计,一方面是将粒度划分的更细,通过源码分析可知,nacos更新注册表是进行小范围的更新,如定位到Cluster的临时列表ephemeralInstances或者持久列表persistentInstances【这两个都是set集合,所以排除了会有重复的数据】。因为粒度小所以更新速度会更快。

  3.其次采用的是 写时复制思想,也就是说,不会影响读取的效率,因为是新开一个副本,将新旧的数据合并到一个新数据里面,然后将引用指向新数据。

  4.其次是为了高扩展,对namespace进行划分【对开发环境隔离】,对service进行划分【对服务进行隔离】,对Cluster进行划分【多机房部署,加快访问速度】

  5.为了解决并发读写问题,采用的是ConcurrentHashMap与ConcurrentSkipListMap的分段锁,加上Cluster里面的写时复制。其次Cluster里面是不加锁的,因为是单线程进行修改,不存在冲突。

  6.虽说牺牲了,一定的实时性,但是大大提高了并发的性能。

 

  【3】分析AP架构下为什么高性能的原因

  

1.因为采用的是异步任务加队列的形式来实现注册的,所以响应很快,然后任务是慢慢做的。

 

  2.Notifier 是在DistroConsistencyServiceImpl类中初始化,默认单线程,而且队列为ArrayBlockingQueue (1024 * 1024)。

  3.缩小了变更数据的粒度,单线程避免了线程安全问题【不用加锁】。

  4.这种方式毫无疑问是会存在问题的,就是响应了但是没有注册上。但是对于这个问题,在客户端里面做了心跳机制,如果检测不到会重新注册。

 

  【4】分析Nacos为什么感知快的原因

  

采用的是客户端定时进行一次拉取,兼服务端采用异步的形式使用UDP发送更新的数据到客户端;

 

  虽然UDP存在通知丢失的情况,但是每隔1s的拉取依旧能很好的保持数据的最终一致性。

 

  

  验证服务端

  【1】在启动的时候我们一般是调用shell脚本启动,查看startup.sh脚本

   从以下看实际上是调用了java命令启动了个java的项目(-jar ${BASE_DIR}/target/${SERVER}.jar 将参数对应替换后 -jar ${BASE_DIR}/target/nacos-server.jar)

   去寻找启动入口的时候会发现,它其实是SpringBoot搭建的一个WEB服务。

  

cygwin=false

 

  darwin=false

  os400=false

  case "`uname`" in

  CYGWIN*) cygwin=true;;

  Darwin*) darwin=true;;

  OS400*) os400=true;;

  error_exit ()

   echo "ERROR: $1 !!"

   exit 1

  [ ! -e "$JAVA_HOME/bin/java" ] JAVA_HOME=$HOME/jdk/java

  [ ! -e "$JAVA_HOME/bin/java" ] JAVA_HOME=/usr/java

  [ ! -e "$JAVA_HOME/bin/java" ] JAVA_HOME=/opt/taobao/java

  [ ! -e "$JAVA_HOME/bin/java" ] unset JAVA_HOME

  if [ -z "$JAVA_HOME" ]; then

   if $darwin; then

   if [ -x /usr/libexec/java_home ] ; then

   export JAVA_HOME=`/usr/libexec/java_home`

   elif [ -d "/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home" ]; then

   export JAVA_HOME="/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home"

   else

   JAVA_PATH=`dirname $(readlink -f $(which javac))`

   if [ "x$JAVA_PATH" != "x" ]; then

   export JAVA_HOME=`dirname $JAVA_PATH 2 /dev/null`

   if [ -z "$JAVA_HOME" ]; then

   error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)! jdk8 or later is better!"

  export SERVER="nacos-server"

  export MODE="cluster"

  export FUNCTION_MODE="all"

  export MEMBER_LIST=""

  export EMBEDDED_STORAGE=""

  while getopts ":m:f:s:c:p:" opt

   case $opt in

   MODE=$OPTARG;;

   FUNCTION_MODE=$OPTARG;;

   SERVER=$OPTARG;;

   MEMBER_LIST=$OPTARG;;

   EMBEDDED_STORAGE=$OPTARG;;

   echo "Unknown parameter"

   exit 1;;

   esac

  export JAVA_HOME

  export JAVA="$JAVA_HOME/bin/java"

  export BASE_DIR=`cd $(dirname $0)/..; pwd`

  export CUSTOM_SEARCH_LOCATIONS=file:${BASE_DIR}/conf/

  #===========================================================================================

  # JVM Configuration

  #===========================================================================================

  if [[ "${MODE}" == "standalone" ]]; then

   JAVA_OPT="${JAVA_OPT} -Xms512m -Xmx512m -Xmn256m"

   JAVA_OPT="${JAVA_OPT} -Dnacos.standalone=true"

   if [[ "${EMBEDDED_STORAGE}" == "embedded" ]]; then

   JAVA_OPT="${JAVA_OPT} -DembeddedStorage=true"

   JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

   JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${BASE_DIR}/logs/java_heapdump.hprof"

   JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"

  if [[ "${FUNCTION_MODE}" == "config" ]]; then

   JAVA_OPT="${JAVA_OPT} -Dnacos.functionMode=config"

  elif [[ "${FUNCTION_MODE}" == "naming" ]]; then

   JAVA_OPT="${JAVA_OPT} -Dnacos.functionMode=naming"

  JAVA_OPT="${JAVA_OPT} -Dnacos.member.list=${MEMBER_LIST}"

  JAVA_MAJOR_VERSION=$($JAVA -version 2 1 sed -E -n s/.* version "([0-9]*).*$/\1/p)

  if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then

   JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${BASE_DIR}/logs/nacos_gc.log:time,tags:filecount=10,filesize=102400"

   JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext"

   JAVA_OPT="${JAVA_OPT} -Xloggc:${BASE_DIR}/logs/nacos_gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"

  JAVA_OPT="${JAVA_OPT} -Dloader.path=${BASE_DIR}/plugins/health,${BASE_DIR}/plugins/cmdb"

  JAVA_OPT="${JAVA_OPT} -Dnacos.home=${BASE_DIR}"

  JAVA_OPT="${JAVA_OPT} -jar ${BASE_DIR}/target/${SERVER}.jar"

  JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"

  JAVA_OPT="${JAVA_OPT} --spring.config.additional-location=${CUSTOM_SEARCH_LOCATIONS}"

  JAVA_OPT="${JAVA_OPT} --logging.config=${BASE_DIR}/conf/nacos-logback.xml"

  JAVA_OPT="${JAVA_OPT} --server.max-http-header-size=524288"

  if [ ! -d "${BASE_DIR}/logs" ]; then

   mkdir ${BASE_DIR}/logs

  echo "$JAVA ${JAVA_OPT}"

  if [[ "${MODE}" == "standalone" ]]; then

   echo "nacos is starting with standalone"

   echo "nacos is starting with cluster"

  # check the start.out log output file

  if [ ! -f "${BASE_DIR}/logs/start.out" ]; then

   touch "${BASE_DIR}/logs/start.out"

  # start

  echo "$JAVA ${JAVA_OPT}" ${BASE_DIR}/logs/start.out 2 1

  nohup $JAVA ${JAVA_OPT} nacos.nacos ${BASE_DIR}/logs/start.out 2 1

  echo "nacos is starting,you can check the ${BASE_DIR}/logs/start.out"

 

  

  从客户端开始分析

  【1】根据自动装配原理(寻找spring.factories文件配置)

  

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\

 

   com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\

   com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\

   com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\

   com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\

   com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration

  org.springframework.cloud.bootstrap.BootstrapConfiguration=\

   com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

 

  【2】分析NacosDiscoveryAutoConfiguration类自动装配了什么

  

@Configuration

 

  @EnableConfigurationProperties

  @ConditionalOnNacosDiscoveryEnabled

  @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)

  @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class })

  public class NacosDiscoveryAutoConfiguration {

   @Bean

   public NacosServiceRegistry nacosServiceRegistry(

   NacosDiscoveryProperties nacosDiscoveryProperties) {

   return new NacosServiceRegistry(nacosDiscoveryProperties);

   @Bean

   @ConditionalOnBean(AutoServiceRegistrationProperties.class)

   public NacosRegistration nacosRegistration(

   NacosDiscoveryProperties nacosDiscoveryProperties,

   ApplicationContext context) {

   return new NacosRegistration(nacosDiscoveryProperties, context);

   //可以看出是将上面两个Bean当做参数传入了这个Bean

   @Bean

   @ConditionalOnBean(AutoServiceRegistrationProperties.class)

   public NacosAutoServiceRegistration nacosAutoServiceRegistration(

   NacosServiceRegistry registry,

   AutoServiceRegistrationProperties autoServiceRegistrationProperties,

   NacosRegistration registration) {

   return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);

  }

 

  

  【3】分析NacosAutoServiceRegistration类有什么重要性

   利用监听机制,达到注册服务的目的。监听WebServer初始化事件
 

  

//class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration Registration 

 

  //abstract class AbstractAutoServiceRegistration R extends Registration implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener WebServerInitializedEvent

  //因为继承了ApplicationListener,必然会有监听方法

  public void onApplicationEvent(WebServerInitializedEvent event) {

   bind(event);

  @Deprecated

  public void bind(WebServerInitializedEvent event) {

   ApplicationContext context = event.getApplicationContext();

   if (context instanceof ConfigurableWebServerApplicationContext) {

   if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {

   return;

   this.port.compareAndSet(0, event.getWebServer().getPort());

   this.start();

  public void start() {

   if (!isEnabled()) {return;

   // only initialize if nonSecurePort is greater than 0 and it isnt already running

   // because of containerPortInitializer below

   if (!this.running.get()) {

   this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));

   register();

   if (shouldRegisterManagement()) {

   registerManagement();

   this.context.publishEvent(new InstanceRegisteredEvent (this, getConfiguration()));

   this.running.compareAndSet(false, true);

  protected void register() {

   this.serviceRegistry.register(getRegistration());

  @Override

  public void register(Registration registration) {

   if (StringUtils.isEmpty(registration.getServiceId())) {

   return;

   NamingService namingService = namingService();

   String serviceId = registration.getServiceId();

   String group = nacosDiscoveryProperties.getGroup();

   Instance instance = getNacosInstanceFromRegistration(registration);

   try {

   namingService.registerInstance(serviceId, group, instance);

   catch (Exception e) {

   // rethrow a RuntimeException if the registration is failed.

   // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132

   rethrowRuntimeException(e);

  }

 

  

  【4】分析如何注册的【服务注册】

  

//NacosNamingService类的registerInstance方法

 

  @Override

  public void registerInstance(String serviceName, Instance instance) throws NacosException {

   registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);

  //NacosNamingService类#registerInstance方法

  @Override

  public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

   NamingUtils.checkInstanceIsLegal(instance);

   String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);

   if (instance.isEphemeral()) {

   BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
//添加一个延时执行的心跳任务

   beatReactor.addBeatInfo(groupedServiceName, beatInfo);

   }
//进行服务注册

   serverProxy.registerService(groupedServiceName, groupName, instance);

  //NamingProxy类#registerService方法

  public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

   //构建注册参数

   final Map String, String params = new HashMap String, String (16);

   params.put(CommonParams.NAMESPACE_ID, namespaceId);

   params.put(CommonParams.SERVICE_NAME, serviceName);

   params.put(CommonParams.GROUP_NAME, groupName);

   params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());

   params.put("ip", instance.getIp());

   params.put("port", String.valueOf(instance.getPort()));

   params.put("weight", String.valueOf(instance.getWeight()));

   params.put("enable", String.valueOf(instance.isEnabled()));

   params.put("healthy", String.valueOf(instance.isHealthy()));

   params.put("ephemeral", String.valueOf(instance.isEphemeral()));

   params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));

   //向服务端发送请求

   //UtilAndComs.nacosUrlInstance=/nacos/v1/ns/instance 也就是官网所示的注册接口地址

   reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);

  public String reqApi(String api, Map String, String params, String method) throws NacosException {

   return reqApi(api, params, Collections.EMPTY_MAP, method);

  public String reqApi(String api, Map String, String params, Map String, String body, String method) throws NacosException {

   return reqApi(api, params, body, getServerList(), method);

  public String reqApi(String api, Map String, String params, Map String, String body, List String servers, String method) throws NacosException {

   params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

   if (CollectionUtils.isEmpty(servers) StringUtils.isBlank(nacosDomain)) {

   throw new NacosException(...);

   NacosException exception = new NacosException();

   if (StringUtils.isNotBlank(nacosDomain)) {

   for (int i = 0; i maxRetry; i++) {

   try {

   return callServer(api, params, body, nacosDomain, method);

   } catch (NacosException e) {

   exception = e;

   } else {

   Random random = new Random(System.currentTimeMillis());

   int index = random.nextInt(servers.size());

   for (int i = 0; i servers.size(); i++) {

   String server = servers.get(index);

   try {

   return callServer(api, params, body, server, method);

   } catch (NacosException e) {

   exception = e;

   index = (index + 1) % servers.size();

   throw new NacosException(...);

  public String callServer(String api, Map String, String params, Map String, String body, String curServer, String method) throws NacosException {

   long start = System.currentTimeMillis();

   long end = 0;

   injectSecurityInfo(params);

   Header header = builderHeader();

   String url;

   if (curServer.startsWith(UtilAndComs.HTTPS) curServer.startsWith(UtilAndComs.HTTP)) {

   url = curServer + api;

   } else {

   if (!IPUtil.containsPort(curServer)) {

   curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;

   url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;

   try {

   //真正远程调用

   HttpRestResult String restResult = nacosRestTemplate

   .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);

   end = System.currentTimeMillis();

   MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe(end - start);

   if (restResult.ok()) {

   return restResult.getData();

   if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {

   return StringUtils.EMPTY;

   throw new NacosException(restResult.getCode(), restResult.getMessage());

   } catch (Exception e) {

   throw new NacosException(NacosException.SERVER_ERROR, e);

  public T HttpRestResult T exchangeForm(String url, Header header, Query query, Map String, String bodyValues, String httpMethod, Type responseType) throws Exception {

   RequestHttpEntity requestHttpEntity = new RequestHttpEntity( header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);

   return execute(url, httpMethod, requestHttpEntity, responseType);

  private T HttpRestResult T execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type responseType) throws Exception {

   URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());

   ResponseHandler T responseHandler = super.selectResponseHandler(responseType);

   HttpClientResponse response = null;

   try {

   //使用JdkHttpClientRequest去发起请求

   response = this.requestClient().execute(uri, httpMethod, requestEntity);

   return responseHandler.handle(response);

   } finally {

   if (response != null) {

   response.close();

  //JdkHttpClientRequest类#execute方法

  @Override

  public HttpClientResponse execute(URI uri, String httpMethod, RequestHttpEntity requestHttpEntity) throws Exception {

   final Object body = requestHttpEntity.getBody();

   final Header headers = requestHttpEntity.getHeaders();

   replaceDefaultConfig(requestHttpEntity.getHttpClientConfig());

   HttpURLConnection conn = (HttpURLConnection) uri.toURL().openConnection();

   Map String, String headerMap = headers.getHeader();

   if (headerMap != null headerMap.size() 0) {

   for (Map.Entry String, String entry : headerMap.entrySet()) {

   conn.setRequestProperty(entry.getKey(), entry.getValue());

   conn.setConnectTimeout(this.httpClientConfig.getConTimeOutMillis());

   conn.setReadTimeout(this.httpClientConfig.getReadTimeOutMillis());

   conn.setRequestMethod(httpMethod);

   if (body != null !"".equals(body)) {

   String contentType = headers.getValue(HttpHeaderConsts.CONTENT_TYPE);

   String bodyStr = JacksonUtils.toJson(body);

   if (MediaType.APPLICATION_FORM_URLENCODED.equals(contentType)) {

   Map String, String map = JacksonUtils.toObj(bodyStr, HashMap.class);

   bodyStr = HttpUtils.encodingParams(map, headers.getCharset());

   if (bodyStr != null) {

   conn.setDoOutput(true);

   byte[] b = bodyStr.getBytes();

   conn.setRequestProperty("Content-Length", String.valueOf(b.length));

   OutputStream outputStream = conn.getOutputStream();

   outputStream.write(b, 0, b.length);

   outputStream.flush();

   IoUtils.closeQuietly(outputStream);

   conn.connect();

   return new JdkHttpClientResponse(conn);

  }

 

  

  【5】beatReactor.addBeatInfo 心跳任务的流程【服务心跳】

  

//BeatReactor类#构造方法

 

  public BeatReactor(NamingProxy serverProxy, int threadCount) {

   this.serverProxy = serverProxy;

   //定义延迟的线程池

   this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {

   @Override

   public Thread newThread(Runnable r) {

   Thread thread = new Thread(r);

   thread.setDaemon(true);

   thread.setName("com.alibaba.nacos.naming.beat.sender");

   return thread;

  //添加任务方法

  public void addBeatInfo(String serviceName, BeatInfo beatInfo) {

   NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);

   String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());

   BeatInfo existBeat = null;

   //fix #1733

   if ((existBeat = dom2Beat.remove(key)) != null) {

   existBeat.setStopped(true);

   dom2Beat.put(key, beatInfo);

   //实际上就是往延迟的线程池添加任务

   executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);

   MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());

  //分析心跳任务类,主要都是run方法

  //这种调用方式eureka中也是

  class BeatTask implements Runnable {

   BeatInfo beatInfo;

   public BeatTask(BeatInfo beatInfo) {

   this.beatInfo = beatInfo;

   @Override

   public void run() {

   if (beatInfo.isStopped()) {

   return;

   long nextTime = beatInfo.getPeriod();

   try {

   //调用server代理实例发送心跳接口

   JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);

   long interval = result.get("clientBeatInterval").asLong();

   boolean lightBeatEnabled = false;

   if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {

   lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();

   BeatReactor.this.lightBeatEnabled = lightBeatEnabled;

   if (interval 0) {

   nextTime = interval;

   int code = NamingResponseCode.OK;

   if (result.has(CommonParams.CODE)) {

   code = result.get(CommonParams.CODE).asInt();

   //服务返回没有,则再次注册

   if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {

   Instance instance = new Instance();

   instance.setPort(beatInfo.getPort());

   instance.setIp(beatInfo.getIp());

   instance.setWeight(beatInfo.getWeight());

   instance.setMetadata(beatInfo.getMetadata());

   instance.setClusterName(beatInfo.getCluster());

   instance.setServiceName(beatInfo.getServiceName());

   instance.setInstanceId(instance.getInstanceId());

   instance.setEphemeral(true);

   try {

   //又是一个注册方法的调用

   serverProxy.registerService(beatInfo.getServiceName(),

   NamingUtils.getGroupName(beatInfo.getServiceName()), instance);

   } catch (Exception ignore) {

   } catch (NacosException ex) {...}

   //方法内再次将任务塞入,形成循环调用

   executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);

  public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {

   Map String, String params = new HashMap String, String (8);

   Map String, String bodyMap = new HashMap String, String (2);

   if (!lightBeatEnabled) {

   bodyMap.put("beat", JacksonUtils.toJson(beatInfo));

   params.put(CommonParams.NAMESPACE_ID, namespaceId);

   params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());

   params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());

   params.put("ip", beatInfo.getIp());

   params.put("port", String.valueOf(beatInfo.getPort()));

   //地址为/nacos/v1/ns/instance/beat

   String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);

   return JacksonUtils.toObj(result);

  }

 

  

  【6】分析如何引入服务的【服务发现】

  

//NacosNamingService类#getAllInstances方法

 

  @Override

  public List Instance getAllInstances(String serviceName, String groupName, List String clusters, boolean subscribe) throws NacosException {

   ServiceInfo serviceInfo;

   // 是否是订阅模式,默认是true

   if (subscribe) {

   // 先从客户端缓存获取服务信息

   serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));

   } else {

   // 如果本地缓存不存在服务信息,则进行订阅

   serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));

   List Instance list;

   // 从服务信息中获取实例列表

   if (serviceInfo == null CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {

   return new ArrayList Instance ();

   return list;

  }

 

  

  【6.1】分析先从缓存中拿的hostReactor.getServiceInfo方法

  

//获取服务信息

 

  public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

   NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());

   String key = ServiceInfo.getKey(serviceName, clusters);

   if (failoverReactor.isFailoverSwitch()) {

   return failoverReactor.getService(key);

   //获取服务的信息

   ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

   //客户端第一次获取这个注册表信息为空

   if (null == serviceObj) {

   serviceObj = new ServiceInfo(serviceName, clusters);

   serviceInfoMap.put(serviceObj.getKey(), serviceObj);

   updatingMap.put(serviceName, new Object());

   //会去拉取这个注册中心里面的注册表信息

   updateServiceNow(serviceName, clusters);

   updatingMap.remove(serviceName);

   //如果本地缓存里面已有这个注册表信息

   else if (updatingMap.containsKey(serviceName)) {

   if (UPDATE_HOLD_INTERVAL 0) {

   // hold a moment waiting for update finish

   synchronized (serviceObj) {

   try {

   serviceObj.wait(UPDATE_HOLD_INTERVAL);

   } catch (InterruptedException e) {...}

   //客户端会开启一个定时任务,每隔几秒会去拉取注册中心里面的全部实例的信息

   scheduleUpdateIfAbsent(serviceName, clusters);

   return serviceInfoMap.get(serviceObj.getKey());

  //HostReactor类# Map String, ServiceInfo serviceInfoMap属性【这个便是客户端保存实例数据的缓存所在】

  //实际上是先从serviceInfoMap属性里面拿的

  private ServiceInfo getServiceInfo0(String serviceName, String clusters) {

   String key = ServiceInfo.getKey(serviceName, clusters);

   return serviceInfoMap.get(key);

  }

 

  

  【6.1.1】分析远程拉取流程updateServiceNow方法

  

private void updateServiceNow(String serviceName, String clusters) {

 

   try {

   updateService(serviceName, clusters);

   } catch (NacosException e) {...}

  public void updateService(String serviceName, String clusters) throws NacosException {

   ServiceInfo oldService = getServiceInfo0(serviceName, clusters);

   try {

   //远程调用

   String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);

   if (StringUtils.isNotEmpty(result)) {

   //处理并塞入serviceInfoMap,还会发送一个InstancesChangeEvent事件

   processServiceJson(result);

   } finally {

   if (oldService != null) {

   synchronized (oldService) {

   oldService.notifyAll();

  public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {

   final Map String, String params = new HashMap String, String (8);

   params.put(CommonParams.NAMESPACE_ID, namespaceId);

   params.put(CommonParams.SERVICE_NAME, serviceName);

   params.put("clusters", clusters);

   params.put("udpPort", String.valueOf(udpPort));

   params.put("clientIP", NetUtils.localIP());

   params.put("healthyOnly", String.valueOf(healthyOnly));

   //调用服务的API,获取服务注册中心里面的全部实例

   return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);

  }

 

  

  【6.1.1.1】分析定时任务scheduleUpdateIfAbsent方法做了什么

  

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {

 

   if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {

   return;

   synchronized (futureMap) {

   if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {

   return;

   ScheduledFuture ? future = addTask(new UpdateTask(serviceName, clusters));

   futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);

  //DEFAULT_DELAY = 1000L,也就是说是1s

  public synchronized ScheduledFuture ? addTask(UpdateTask task) {

   return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);

  //分析UpdateTask类的run方法

  @Override

  public void run() {

   long delayTime = DEFAULT_DELAY;

   try {

   // 根据serviceName获取到当前服务的信息,包括服务器地址列表

   ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

   // 如果为空,则重新拉取最新的服务列表

   if (serviceObj == null) {

   updateService(serviceName, clusters);

   return;

   // 如果时间戳 =上次更新的时间,则进行更新操作

   if (serviceObj.getLastRefTime() = lastRefTime) {

   updateService(serviceName, clusters);

   serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

   } else {

   // 如果serviceObj的refTime更晚,

   // 则表示服务通过主动push机制已被更新,这时我们只进行刷新操作

   refreshOnly(serviceName, clusters);

   // 刷新服务的更新时间

   lastRefTime = serviceObj.getLastRefTime();

   // 如果订阅被取消,则停止更新任务

   if (!notifier.isSubscribed(serviceName, clusters) !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {

   return;

   // 如果没有可供调用的服务列表,则统计失败次数+1

   if (CollectionUtils.isEmpty(serviceObj.getHosts())) {

   incFailCount();

   return;

   // 设置延迟一段时间后进行查询

   delayTime = serviceObj.getCacheMillis();

   // 将失败查询次数重置为0

   resetFailCount();

   } catch (Throwable e) {

   incFailCount();

   } finally {

   // 设置下一次查询任务的触发时间

   // 默认是1s,按照失败次数翻。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: