eureka server eureka 分为服务器和客户端,这里先分析服务器端的主要代码
自动装配 Spring Cloud 的eureka
利用了 Spring Boot 提供的自动装配功能
1 2 org.springframework.boot.autoconfigure.EnableAutoConfiguration =\ org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
Spring启动的时候加载EurekaServerAutoConfiguration
1 2 3 4 5 6 7 8 9 @Configuration(proxyBeanMethods = false) @Import(EurekaServerInitializerConfiguration.class) @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) @EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class }) @PropertySource("classpath:/eureka/server.properties") public class EurekaServerAutoConfiguration implements WebMvcConfigurer { }
注入的条件是来自EurekaServerMarkerConfiguration的 Marker, 看一下这个类
1 2 3 4 5 6 7 8 9 10 11 12 13 @Configuration(proxyBeanMethods = false) public class EurekaServerMarkerConfiguration { @Bean public Marker eurekaServerMarkerBean () { return new Marker(); } class Marker { } }
可以发现需要EurekaServerMarkerConfiguration去初始化这个Marker, 而EurekaServerMarkerConfiguration的初始化可以从下面的EnableEurekaServer来完成
1 2 3 4 5 6 7 8 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(EurekaServerMarkerConfiguration.class) public @interface EnableEurekaServer { }
只有添加了@EnableEurekaServer
注解,才会有后面的动作,这是成为一个eureka server的前提
主要配置类和初始化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Bean @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true) public EurekaController eurekaController () { return new EurekaController(this .applicationInfoManager); }@Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry (ServerCodecs serverCodecs) { this .eurekaClient.getApplications(); return new InstanceRegistry(this .eurekaServerConfig, this .eurekaClientConfig, serverCodecs, this .eurekaClient, this .instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(), this .instanceRegistryProperties.getDefaultOpenForTrafficCount()); }@Bean @ConditionalOnMissingBean public PeerEurekaNodes peerEurekaNodes (PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs, ReplicationClientAdditionalFilters replicationClientAdditionalFilters) { return new RefreshablePeerEurekaNodes(registry, this .eurekaServerConfig, this .eurekaClientConfig, serverCodecs, this .applicationInfoManager, replicationClientAdditionalFilters); }@Bean @ConditionalOnMissingBean public EurekaServerContext eurekaServerContext (ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) { return new DefaultEurekaServerContext(this .eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this .applicationInfoManager); }@Bean public EurekaServerBootstrap eurekaServerBootstrap (PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) { return new EurekaServerBootstrap(this .applicationInfoManager, this .eurekaClientConfig, this .eurekaServerConfig, registry, serverContext); }@Bean public FilterRegistrationBean<?> jerseyFilterRegistration(javax.ws.rs.core.Application eurekaJerseyApp) { FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>(); bean.setFilter(new ServletContainer(eurekaJerseyApp)); bean.setOrder(Ordered.LOWEST_PRECEDENCE); bean.setUrlPatterns(Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*" )); return bean; }
继续看PeerEurekaNodes
的start方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class PeerEurekaNodes { public void start () { taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread (Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater" ); thread.setDaemon(true ); return thread; } } ); try { updatePeerEurekaNodes(resolvePeerUrls()); Runnable peersUpdateTask = new Runnable() { @Override public void run () { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes" , e); } } }; taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: {}" , node.getServiceUrl()); } } }
DefaultEurekaServerContext
是eureka server的默认上下文,这边就会获取其他的服务节点信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class DefaultEurekaServerContext { @PostConstruct @Override public void initialize () { logger.info("Initializing ..." ); peerEurekaNodes.start(); try { registry.init(peerEurekaNodes); } catch (Exception e) { throw new RuntimeException(e); } logger.info("Initialized" ); } }
重点看eureka的初始化过程,在eureka server启动过程中,继承了smartLifecycle
,这样可以在Bean初始化完做一些事情, 在syncUp方法中,会获取其他eureka server的注册信息,并保存到本地的register中,结构是 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class PeerAwareInstanceRegistryImpl { public int syncUp () { int count = 0 ; for (int i = 0 ; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0 )); i++) { if (i > 0 ) { try { Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.." ); break ; } } Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { register(instance, instance.getLeaseInfo().getDurationInSecs(), true ); count++; } } catch (Throwable t) { logger.error("During DS init copy" , t); } } } } return count; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class PeerAwareInstanceRegistryImpl { @Override public void openForTraffic (ApplicationInfoManager applicationInfoManager, int count) { this .expectedNumberOfClientsSendingRenews = count; updateRenewsPerMinThreshold(); logger.info("Got {} instances from neighboring DS node" , count); logger.info("Renew threshold is: {}" , numberOfRenewsPerMinThreshold); this .startupTime = System.currentTimeMillis(); if (count > 0 ) { this .peerInstancesTransferEmptyOnStartup = false ; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.." ); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP" ); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); super .postInit(); } }
服务注册过程
首先通过Jersey的接口监听注册请求
1 2 3 4 5 6 7 8 9 10 11 12 public class ApplicationResource { @POST @Consumes({"application/json", "application/xml"}) public Response addInstance (InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { registry.register(info, "true" .equals(isReplication)); return Response.status(204 ).build(); } }
调用register
方法注册到本地缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 public class PeerAwareInstanceRegistryImpl { public void register (final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0 ) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super .register(info, leaseDuration, isReplication); replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null , isReplication); } }
super.register
主要是注册节点到本地缓存的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class AbstractInstanceRegistry { public void register (InstanceInfo registrant, int leaseDuration, boolean isReplication) { read.lock(); try { Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == null ) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null ) { gMap = gNewMap; } } Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null ) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); } finally { read.unlock(); } } }
把该注册信息同步到其他节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public class A { private void replicateToPeers (Action action, String appName, String id, InstanceInfo info , InstanceStatus newStatus , boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return ; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue ; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } } private void replicateInstanceActionsToPeers (Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break ; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false ); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false ); break ; case Register: node.register(info); break ; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false ); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break ; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false ); node.deleteStatusOverride(appName, id, infoFromRegistry); break ; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}" , node.getServiceUrl(), action.name(), t); } finally { CurrentRequestVersion.remove(); } } }
心跳【续约】接口
1 2 3 4 5 6 7 public class InstanceResource { public Response renewLease () { boolean isFromReplicaNode = "true" .equals(isReplication); boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); } }
这边调用了registry.renew方法
1 2 3 4 5 6 7 8 9 10 11 12 public class PeerAwareInstanceRegistryImpl { public boolean renew (final String appName, final String id, final boolean isReplication) { if (super .renew(appName, id, isReplication)) { replicateToPeers(Action.Heartbeat, appName, id, null , null , isReplication); return true ; } return false ; } }
eureka client eureka client会注册到server端,并且发送心跳给服务器端。 client在启动的时候需要
读取配置文件
注册到server
开启定时任务发送心跳,主动拉取 server的注册信息刷新到本地的缓存【eureka不会主动推送】
自动装配和注册
自动装配初始化配置类,读取配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnClass(EurekaClientConfig.class) @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) @ConditionalOnDiscoveryEnabled @AutoConfigureBefore({ CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class }) @AutoConfigureAfter(name = { "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration", "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration", "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" }) public class EurekaClientAutoConfiguration { @Bean @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT) public EurekaInstanceConfigBean eurekaInstanceConfigBean (InetUtils inetUtils, ManagementMetadataProvider managementMetadataProvider) { String hostname = getProperty("eureka.instance.hostname" ); boolean preferIpAddress = Boolean.parseBoolean(getProperty("eureka.instance.prefer-ip-address" )); String ipAddress = getProperty("eureka.instance.ip-address" ); boolean isSecurePortEnabled = Boolean.parseBoolean(getProperty("eureka.instance.secure-port-enabled" )); String serverContextPath = env.getProperty("server.servlet.context-path" , "/" ); int serverPort = Integer.parseInt(env.getProperty("server.port" , env.getProperty("port" , "8080" ))); Integer managementPort = env.getProperty("management.server.port" , Integer.class); } }
在DiscoveryClient的初始化中,获取server的注册信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public class DiscoveryClient { @Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { } if (clientConfig.shouldFetchRegistry()) { boolean primaryFetchRegistryResult = fetchRegistry(false ); } } private boolean fetchRegistry (boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null ) || (applications.getRegisteredApplications().size() == 0 ) || (applications.getVersion() == -1 )) { logger.info("Disable delta property : {}" , clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}" , clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}" , forceFullRegistryFetch); logger.info("Application is null : {}" , (applications == null )); logger.info("Registered Applications size is zero : {}" , (applications.getRegisteredApplications().size() == 0 )); logger.info("Application version is -1: {}" , (applications.getVersion() == -1 )); getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}" , appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e)); return false ; } finally { if (tracer != null ) { tracer.stop(); } } onCacheRefreshed(); updateInstanceRemoteStatus(); return true ; } }
在全量获取的时候,使用了jersey框架获取了applications信息,然后使用了乐观锁设置到本地缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void getAndStoreFullRegistry () throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server" ); Applications apps = null ; EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}" , httpResponse.getStatusCode()); if (apps == null ) { logger.error("The application is null for some reason. Not storing this information" ); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1 )) { localRegionApps.set(this .filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}" , apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already" ); } }
注册自己
1 2 3 4 5 6 7 8 9 10 11 12 13 14 boolean register () throws Throwable { logger.info(PREFIX + "{}: registering service..." , appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}" , appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}" , appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }
定时器获取注册信息和发送心跳
回到DiscoveryClient
的构造方法,调用完fetchRegistry方法之后会执行initScheduledTasks
方法,作为定时器的初始化, 定时任务基本和之前初始化获取注册信息instance
一样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void refreshRegistry () { boolean success = fetchRegistry(remoteRegionsModified); }boolean renew () { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null ); logger.debug(PREFIX + "{} - Heartbeat status: {}" , appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}" , appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!" , appPathIdentifier, e); return false ; } }
服务主动下线会调用清理定时任务和server下架操作
1 2 3 4 5 6 7 8 9 10 11 12 void unregister () { if (eurekaTransport != null && eurekaTransport.registrationClient != null ) { try { logger.info("Unregistering ..." ); EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId()); logger.info(PREFIX + "{} - deregister status: {}" , appPathIdentifier, httpResponse.getStatusCode()); } catch (Exception e) { logger.error(PREFIX + "{} - de-registration failed{}" , appPathIdentifier, e.getMessage(), e); } } }