irpas技术客

Eureka 服务实现细节_echoes2

大大的周 7162

文章目录 一、Eureka sever 实现细节一、服务注册Register二、服务续约 Renew三、服务下线Cancel四、获取注册表 Fetch Registries五、服务剔除Eviction六、服务发现七、新增Eureka Server节点 二、Eureka Client (Service Provider) 实现细节一、Register二、Renew三、Cancel四、Discovered 三、Eureka Client (Service Consumer) 实现细节一、获取注册表二、更新注册表 一、Eureka sever 实现细节 来源:《深度剖析服务发现组件Netflix Eureka》https://zhuanlan.zhihu.com/p/24829766

一、Eureka sever 实现细节

注册的服务列表保存在一个嵌套的hash map中:

第一层hash map的key是app name,也就是应用名字

第二层hash map的key是instance name,也就是实例名字

Hash map定义如下:

以此为例:

app name为Consumer-client-dev,instance name 为172.20.10.2:consumer-client-dev:8202

一、服务注册Register ApplicationResource类接收Http服务请求,调用PeerAwareInstanceRegistryImpl的register方法PeerAwareInstanceRegistryImpl完成服务注册后,调用replicateToPeers向其它Eureka Server节点(Peer)做状态同步(异步操作)

a、ApplicationResource.java

//注册实例 @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { ....... registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }

b、PeerAwareInstanceRegistryImpl.java

/** 向eureka server 注册实例 isReplication:若来自其他副本eureka server节点复制注册,则为true,否则false **/ @Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } //调用父类方法,进行注册 super.register(info, leaseDuration, isReplication); //把当前eureka操作,复制到其他eureka server副本节点 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); } ......... private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { //当前活动计时开始 Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // 判断是否已复制 if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. //排除自己eureka server,防止向自己复制 if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } } 二、服务续约 Renew

Renew(服务续约)操作由Service Provider定期调用,类似于heartbeat。主要是用来告诉Eureka Server Service Provider还活着,避免服务被剔除掉。首先更新自身状态,再同步到其它Peer。

当客户端进行服务注册之后,会有一个定时任务,默认是每30s 向Eureka Server端进行服务续约,也就是告诉Eureka Server 我这个客户端当前还活着,别给我从注册表中剔除了,服务续约其实就是心跳,如果是一段时间没有向Eureka Server 发送续约请求,就会被认为服务发生故障,故而从注册表中剔除。

a、InstanceResource.java

/** 接收eureka client 客户端发过来的,服务续约请求 **/ @PUT public Response renewLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { boolean isFromReplicaNode = "true".equals(isReplication); //调用服务续约方法 boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); // Not found in the registry, immediately ask for a register //若未在注册表中找到,请立即要求注册 if (!isSuccess) { logger.warn("Not Found (Renew): {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } .... }

b、PeerAwareInstanceRegistryImpl.java

/* * (non-Javadoc) * * @see com.netflix.eureka.registry.InstanceRegistry#renew(java.lang.String, * java.lang.String, long, boolean) */ public boolean renew(final String appName, final String id, final boolean isReplication) { //调用父类方法,进行续约,如果 if (super.renew(appName, id, isReplication)) { //把当前eureka操作,复制到其他eureka server副本节点 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); return true; } return false; }

c、AbstractInstanceRegistry.java

/** * Marks the given instance of the given app name as renewed, and also marks whether it originated from * replication. */ public boolean renew(String appName, String id, boolean isReplication) { RENEW.increment(isReplication); //获取微服务名下的所有实例租赁列表 Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToRenew = null; if (gMap != null) { //根据instance name 获取指定续租实例(id=“172.20.10.2:provider-client-dev:8102”) leaseToRenew = gMap.get(id); } if (leaseToRenew == null) { ..... } else { InstanceInfo instanceInfo = leaseToRenew.getHolder(); if (instanceInfo != null) { // touchASGCache(instanceInfo.getASGName()); //获取覆盖状态 InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); ....... //若续租实例的状态与获得的获取覆盖状态不一致,则将实例状态设为覆盖状态 if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { ...... instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); } } // renew计数,自我保护机制会用到 renewsLastMin.increment(); leaseToRenew.renew(); /** 其实就是更新lastUpdateTimestamp 这个字段。 将lastUpdateTimestamp 设置成 当前时间+ 过期间隔, 如果说一旦lastUpdateTimestamp 小于了当前时间,就说明这个实例 租约过期了,这个实例可能会被Eureka Server 后台线程从注册表中剔除。 public void renew() { lastUpdateTimestamp = System.currentTimeMillis() + duration; } **/ return true; } } 三、服务下线Cancel

Cancel(服务下线)一般在Service Provider shut down的时候调用,用来把自身的服务从Eureka Server中删除,以防客户端调用不存在的服务。

a、InstanceResource.java#cancelLease()

//接收服务下线请求并处理 @DELETE public Response cancelLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { try { boolean isSuccess = registry.cancel(app.getName(), id, "true".equals(isReplication)); ........ } catch (Throwable e) { return Response.serverError().build(); } }

b、PeerAwareInstanceRegistryImpl.java

public boolean cancel(final String appName, final String id, final boolean isReplication) { //调用父类cancel(),处理服务下线 if (super.cancel(appName, id, isReplication)) { //把当前eureka操作,复制到其他eureka server副本节点 replicateToPeers(Action.Cancel, appName, id, null, null, isReplication); return true; } return false; }

c、AbstractInstanceRegistry.java

@Override public boolean cancel(String appName, String id, boolean isReplication) { return internalCancel(appName, id, isReplication); } protected boolean internalCancel(String appName, String id, boolean isReplication) { //获取读写锁的读锁 read.lock(); try { CANCEL.increment(isReplication); // 获取appName 对应的所有实例集合 Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToCancel = null; if (gMap != null) { // 根据实例id 移除对应的实例租约信息 leaseToCancel = gMap.remove(id); } // 将变更信息 扔到最近删除队列中 recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")")); //从实例状态map中移除该实例的状态信息。 InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id); ...... if (leaseToCancel == null) { CANCEL_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id); return false; } else { // 变更实例剔除时间 leaseToCancel.cancel(); InstanceInfo instanceInfo = leaseToCancel.getHolder(); String vip = null; String svip = null; if (instanceInfo != null) { instanceInfo.setActionType(ActionType.DELETED); // 放入最近变更队列中 recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel)); //更新实例剔除时间 instanceInfo.setLastUpdatedTimestamp(); vip = instanceInfo.getVIPAddress(); svip = instanceInfo.getSecureVipAddress(); } // 删除实例对应的缓存数据 invalidateCache(appName, vip, svip); logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication); } } finally { read.unlock(); } // 更新 需要发送心跳的客户端数量,只要为了更新自我保护机制触发阈值,这块涉及到Eureka Server 自我保护机制实现原理 synchronized (lock) { //这个是与自我保护机制有关的 if (this.expectedNumberOfClientsSendingRenews > 0) { // 每有一个的客户端下线,就会-1,表示未来要发送心跳的客户端-1 // Since the client wants to cancel it, reduce the number of clients to send renews. this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1; // 更新下 自己能容忍的最少心跳数量 updateRenewsPerMinThreshold(); } } return true; } 四、获取注册表 Fetch Registries

Fetch Registries由客户端调用,用来获取Eureka Server上注册的服务。为了提高性能,服务列表在Eureka Server会缓存一份,同时每30秒更新一次。

全量更新 ApplicationsResource#getContainers 增量更新 ApplicationsResource#getContainerDifferential

a、ApplicationsResource.java

public Response getContainers(@PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) { ...... // 构建缓存键 Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); Response response; //这里从responseCache当中获取了Applications的序列号结果直接返回了 // 返回不同的编码类型的数据,去缓存中取数据的方法基本一致 if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { response = Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { response = Response.ok(responseCache.get(cacheKey)) .build(); } CurrentRequestVersion.remove(); return response; }

responseCache来源是,在ApplicationsResource构造的时候从Registry中获取的

@Inject ApplicationsResource(EurekaServerContext eurekaServer) { this.serverConfig = eurekaServer.getServerConfig(); this.registry = eurekaServer.getRegistry(); this.responseCache = registry.getResponseCache(); }

b、ResponseCacheImpl.java

//从缓存中读取数据。 public byte[] getGZIP(Key key) { Value payload = getValue(key, shouldUseReadOnlyResponseCache); if (payload == null) { return null; } return payload.getGzipped(); } @VisibleForTesting Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; try { if (useReadOnlyCache) { final Value currentPayload = readOnlyCacheMap.get(key); if (currentPayload != null) { payload = currentPayload; } else { payload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } } catch (Throwable t) { logger.error("Cannot get value for key : {}", key, t); } return payload; }

c、Key.java

这个对象中包含了缓存键,这个hashKey最后的结果就是类似于这样的:ApplicationALL_APPSJSONV2full

public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) { this.regions = regions; this.entityType = entityType; this.entityName = entityName; this.requestType = type; this.requestVersion = v; this.eurekaAccept = eurekaAccept; hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "") + requestType.name() + requestVersion.name() + this.eurekaAccept.name(); } 五、服务剔除Eviction

Eviction(失效服务剔除)用来定期(默认为每60秒)在Eureka Server检测失效的服务,检测标准就是超过一定时间没有Renew的服务。默认失效时间为90秒,也就是如果有服务超过90秒没有向Eureka Server发起Renew请求的话,就会被当做失效服务剔除掉。

失效时间可以通过eureka.instance.leaseExpirationDurationInSeconds进行配置,定期扫描时间可以通过eureka.server.evictionIntervalTimerInMs进行配置。

//注意: //?Eureka的服务剔除会因为Eureka的自我保护机制而受到影响,导致不会剔除掉已经认为下线的服务

a、EurekaBootStrap.java

Eureka Server端启用的时执行的EurekaBootStrap类中initEurekaServerContext方法找到了服务剔除任务的初始化。

protected void initEurekaServerContext() throws Exception { .......... // Copy registry from neighboring eureka node int registryCount = registry.syncUp(); //初始化我们的服务剔除任务 registry.openForTraffic(applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); }

b.PeerAwareInstanceRegistryImpl.java

@Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. this.expectedNumberOfClientsSendingRenews = count; updateRenewsPerMinThreshold(); logger.info("Got {} instances from neighboring DS node", count); logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); //调用父类清理 super.postInit(); }

c.AbstractInstanceRegistry.java

protected void postInit() { renewsLastMin.start(); // 初始化 清理租约过期任务 if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } //添加清理租约过期任务 evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), //配置 eureka.evictionIntervalTimerInMs ,清理租约过期任务执行频率,单位:毫秒。默认,60000 毫秒。 serverConfig.getEvictionIntervalTimerInMs()); } //过期逻辑 public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); //是否启用租约到期 if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } //获得所有过期租约集 List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); //遍历所有应用组 for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { //遍历应用下面的实例组 for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } } // 为了补偿GC暂停或本地时间漂移,我们需要使用当前注册表大小作为触发自我保护的基础。没有它,我们就会把整个注册表都抹掉。 int registrySize = (int) getLocalRegistrySize(); int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold; int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit); Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { // Pick a random item (Knuth shuffle algorithm) int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id); //调用删除服务方法 internalCancel(appName, id, false); } } }

d.内部类EvictionTask.java

/* visible for testing */ //清理租约过期任务 class EvictionTask extends TimerTask { private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l); @Override public void run() { try { //调用 #compensationTimeMs() 方法,获得补偿时间毫秒数。计算公式 = 当前时间 - 最后任务执行时间 - 任务执行频率。 long compensationTimeMs = getCompensationTimeMs(); logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs); // 清理过期租约逻辑 evict(compensationTimeMs); } catch (Throwable e) { logger.error("Could not run the evict task", e); } } ....... }

e.lease.java

/** *检查租约是否已过期 * @param additionalLeaseMs 获得补偿时间毫秒数。计算公式 = 当前时间 - 最后任务执行时间 - 任务执行频率。 Eureka是通过lastUpdateTimestamp这个上次更新时间来判断我们的服务是否可用,lastUpdateTimestamp每次服务续约时会更新 */ public boolean isExpired(long additionalLeaseMs) { return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)); } public void renew() { lastUpdateTimestamp = System.currentTimeMillis() + duration; } 六、服务发现

Eureka Server在启动后会调用EurekaClientConfig.getEurekaServerServiceUrls来获取所有的Peer节点,并且会定期更新。定期更新频率可以通过eureka.server.peerEurekaNodesUpdateIntervalMs配置。

a.PeerEurekaNodes.java

PeerEurekaNodes创建一个定时任务, 定时执行updateP eerEurekaNodes()方法,更新peerEurekaNodes

public void start() { //创建定时任务线程 taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { // 初始化 集群节点信息 updatePeerEurekaNodes(resolvePeerUrls()); //创建节点信息更新任务 Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { //更新对等Eureka节点 updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; //将节点信息更新任务,放入定时任务中,默认10分钟执行一次 taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: {}", node.getServiceUrl()); } } ..... //解析对等节点urls 获取Server集群的所有serviceUrl,不包括自身 protected List<String> resolvePeerUrls() { // 获得 Eureka-Server 集群服务地址数组 InstanceInfo myInfo = applicationInfoManager.getInfo(); String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo); // 获取相同Region下的所有serviceUrl List<String> replicaUrls = EndpointUtils .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo)); int idx = 0; while (idx < replicaUrls.size()) { if (isThisMyUrl(replicaUrls.get(idx))) { replicaUrls.remove(idx); } else { idx++; } } return replicaUrls; } ....

b.EndpointUtils.java

/** * 获取所有eureka服务URL的列表 */ public static List<String> getDiscoveryServiceUrls(EurekaClientConfig clientConfig, String zone, ServiceUrlRandomizer randomizer) { ..... return getServiceUrlsFromConfig(clientConfig, zone, clientConfig.shouldPreferSameZoneEureka()); } /** 描述一下整个过程: 获取Region,若没有配置或找不到对应的Region,则使用默认值,一个微服务只能找到一个Region 通过获取的Region获取可用的Zone数组,一Region可对应多个Zone,若获取Zone失败,则使用默认值 在可用数组中查找当前配置的Zone实例,若找到则返回第一个匹配的下标,若没有找到则返回0表示默认值 将与Zone匹配的已经配置好的可用的serviceUrls加入到orderedUrls中 **/ public static List<String> getServiceUrlsFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) { List<String> orderedUrls = new ArrayList(); //从配置寻找Region String region = getRegion(clientConfig); //根据Region寻找可用的Zone String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); if (availZones == null || availZones.length == 0) { //若是Zone为空则使用默认值 availZones = new String[]{"default"}; } logger.debug("The availability zone for the given region {} are {}", region, availZones); //根据配置的Zone来匹配获取的可用Zone数组,有则返回对应下标,无则返回0 int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones); //根据客户端配置且已匹配了的Zone来查找Eureka服务端已经存在的defaultZone List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]); if (serviceUrls != null) { orderedUrls.addAll(serviceUrls); } //循环变量,这样是为了循环错开循环量myZoneOffset int currentOffset = myZoneOffset == availZones.length - 1 ? 0 : myZoneOffset + 1; //循环,因为可能客户端配置了多个Zone,同样的步骤 while(currentOffset != myZoneOffset) { serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[currentOffset]); if (serviceUrls != null) { orderedUrls.addAll(serviceUrls); } if (currentOffset == availZones.length - 1) { currentOffset = 0; } else { ++currentOffset; } } if (orderedUrls.size() < 1) { throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!"); } else { //返回最中可用的同Zone的服务端serviceUrls,即获取对应服务端的eureka.client.serviceUrls.defaultZone return orderedUrls; } }

c.EurekaClientConfigBean.java

@Override public List<String> getEurekaServerServiceUrls(String myZone) { //根据zoneName,获取 String serviceUrls = this.serviceUrl.get(myZone); if (serviceUrls == null || serviceUrls.isEmpty()) { serviceUrls = this.serviceUrl.get(DEFAULT_ZONE); } if (!StringUtils.isEmpty(serviceUrls)) { //多个注册的serviceURL,使用逗号分割为数组 final String[] serviceUrlsSplit = StringUtils .commaDelimitedListToStringArray(serviceUrls); List<String> eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length); for (String eurekaServiceUrl : serviceUrlsSplit) { if (!endsWithSlash(eurekaServiceUrl)) { eurekaServiceUrl += "/"; } eurekaServiceUrls.add(eurekaServiceUrl.trim()); } return eurekaServiceUrls; } return new ArrayList<>(); }

a.PeerEurekaNodes.java

//更新对等Eureka节点 protected void updatePeerEurekaNodes(List<String> newPeerUrls) { if (newPeerUrls.isEmpty()) { logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry"); return; } //Eureka-Server 计算 删除的集群节点地址 Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls); toShutdown.removeAll(newPeerUrls); // 计算 新增的集群节点地址 Set<String> toAdd = new HashSet<>(newPeerUrls); toAdd.removeAll(peerEurekaNodeUrls); if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change return; } //关闭删除的集群节点 // Remove peers no long available List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes); if (!toShutdown.isEmpty()) { logger.info("Removing no longer available peer nodes {}", toShutdown); int i = 0; while (i < newNodeList.size()) { PeerEurekaNode eurekaNode = newNodeList.get(i); if (toShutdown.contains(eurekaNode.getServiceUrl())) { newNodeList.remove(i); eurekaNode.shutDown(); } else { i++; } } } //新增创建集群节点 // Add new peers if (!toAdd.isEmpty()) { logger.info("Adding new peer nodes {}", toAdd); for (String peerUrl : toAdd) { newNodeList.add(createPeerEurekaNode(peerUrl)); } } this.peerEurekaNodes = newNodeList; this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls); } 七、新增Eureka Server节点

简而言之就是启动时把自己当做是Service Consumer从其它Peer Eureka获取所有服务的注册信息。然后对每个服务,在自己这里执行Register,isReplication=true,从而完成初始化。

a .PeerAwareInstanceRegistryImpl.java

public int syncUp() { // Copy entire entry from neighboring DS node int count = 0; ....... //获取其它Peer Eureka获取所有服务的注册信息 Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; } 二、Eureka Client (Service Provider) 实现细节

主要Service Provider的实现细节,主要就是Register、Renew、Cancel这3个操作。

一、Register

只需要在启动时和实例状态变化时调用Eureka Server的接口注册即可。需要注意的是,需要确保配置eureka.client.registerWithEureka=true。

InstanceInfoReplicator.java

public void run() { try { //刷新当前本地instanceInfo discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { //Register with the eureka service by making the appropriate REST call. //通过rest 向eureka server 注册 discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }

DiscoveryClient.java

/** * Register with the eureka service by making the appropriate REST call. (eurekaTransport.registrationClient.register(instanceInfo);) */ boolean register() throws Throwable { EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { ....... } ..... return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }

EurekaHttpClientDecorator.java

@Override public EurekaHttpResponse<Void> register(final InstanceInfo info) { return execute(new RequestExecutor<Void>() { @Override public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) { return delegate.register(info); } .... }); }

AbstractJerseyEurekaHttpClient.java

//向eureka server服务端发送注册请求 public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { ...... } } 二、Renew

Renew操作会在Service Provider端定期发起,用来通知Eureka Server自己还活着。 这里有两个比较重要的配置需要注意一下:

instance.leaseRenewalIntervalInSeconds

Renew频率。默认是30秒,也就是每30秒会向Eureka Server发起Renew操作。

instance.leaseExpirationDurationInSeconds

服务失效时间。默认是90秒,也就是如果Eureka Server在90秒内没有接收到来自Service Provider的Renew操作,就会把Service Provider剔除。

三、Cancel

在Service Provider服务shut down的时候,需要及时通知Eureka Server把自己剔除,从而避免客户端调用已经下线的服务。

逻辑本身比较简单,通过对方法标记@PreDestroy,从而在服务shut down的时候会被触发。

四、Discovered

这里大家疑问又来了,Service Provider是怎么知道Eureka Server的地址呢?

其实这部分的主体逻辑和3.3.7 How Peer Nodes are Discovered几乎是一样的。

也是默认从配置文件读取,如果需要更灵活的控制,可以通过override getEurekaServerServiceUrls方法来提供自己的实现。定期更新频率可以通过eureka.client.eurekaServiceUrlPollIntervalSeconds配置。

三、Eureka Client (Service Consumer) 实现细节

Service Consumer这块的实现相对就简单一些,因为它只涉及到从Eureka Server获取服务列表和更新服务列表。

一、获取注册表

Service Consumer在启动时会从Eureka Server获取所有服务列表,并在本地缓存。需要注意的是,需要确保配置eureka.client.shouldFetchRegistry=true。

二、更新注册表

由于在本地有一份缓存,所以需要定期更新,定期更新频率可以通过eureka.client.registryFetchIntervalSeconds配置。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Eureka #服务实现细节 #一Eureka #sever #实现细节