I believe that many students who have read the eureka source code read the eureka client startup class, ExampleEurekaClient, followed the code and found a problem. Isn't this a client? Doesn't he have to register at the registry? Why don't you see where the registration action is? If you have such questions or doubts, believe me, you are not alone.
The startup process flow of the ExampleEurekaClient class is roughly as shown in the figure:
This is a general startup process. It seems that there is nothing related to service registration. How could that be right? How is the data deployed to the eureka registry? In fact, in this process, we have registered eureka client in the registry. It's just that eureka's code level is too deep, and I don't think the naming is appropriate in some places. As a result, it is easy to omit this code when we look at it according to our previous habit of looking at the source code.
Let me say the conclusion first. Eureka client is initializing the scheduling task. It also registers itself in the registry when executing the intiScheduledTask method of the discoveryclint class.
Source code analysis
It is said that there is no secret under the source code. Let's take the conclusions and questions to see how Eureka client registers itself in the registry. Let's first look at the code for initializing the scheduling task.
initScheduledTasks method
private void initScheduledTasks() { // Scheduling task of regularly fetching registry ...... // Scheduling tasks that regularly send heartbeats to Eureka server ...... // InstanceInfo replicator // Service instance registration instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize // A pile of other codes ...... // Register instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); }
We saw an interesting class in the code, instanceInfoReplicator. When I first looked at the source code, I thought it was a replication of a service instance. I didn't look at it carefully. Results after combing the client startup process, it was found that the client could not be registered to the server. Then I had to look back at eureka's code. The final discovery is the service registration in this place.
I have to spit out a slot here to make a service registration action. Isn't it fragrant to call it instanceRegister? It's so fancy. Who can guess. I thought this method was so simple. Later, I learned that I was too naive.
InstancInfoReplicator construction method
Enter the InstanceInfoReplicator constructor to see what they have done.
InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) { this.discoveryClient = discoveryClient; this.instanceInfo = instanceInfo; this.scheduler = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d") .setDaemon(true) .build()); this.scheduledPeriodicRef = new AtomicReference<Future>(); this.started = new AtomicBoolean(false); this.rateLimiter = new RateLimiter(TimeUnit.MINUTES); this.replicationIntervalSeconds = replicationIntervalSeconds; this.burstSize = burstSize; this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds; logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute); }
Generally speaking, there is nothing but some simple assignment and initialization operations. Among them, the parameters that are deeply related to us later are scheduler started
When analyzing the initScheduledTasks method, we found that the instanceInfoReplicator object executes a start method. Nstanceinforeplicator Start (clientconfig.getinitialinstanceinforeplicationintervalseconds());
After we have just seen the construction method of InstancInfoReplicator, let's take a look at the start method
start method
Let's start with his parameter, clientconfig Getinitialinstanceinforeplicationintervalseconds(). The annotation of this method is as follows
/** * Indicates how long initially (in seconds) to replicate instance info * to the eureka server */ int getInitialInstanceInfoReplicationIntervalSeconds();
This roughly means the time that this code is registered to the eureka server, in seconds. How long is this time? If you do not set it separately, the default is 40 seconds. The meaning in the following code is to get if there is one, and return 40 If there is no one
@Override public int getInitialInstanceInfoReplicationIntervalSeconds() { return configInstance.getIntProperty( namespace + INITIAL_REGISTRATION_REPLICATION_DELAY_KEY, 40).get(); }
After finishing the parameters, let's take a look at the code of start.
public void start(int initialDelayMs) { // It must be false for the first time, because it is manually set to false when initializing this class. This Started = new atomicboolean (false); if (started.compareAndSet(false, true)) { instanceInfo.setIsDirty(); // for initial register // Execute the thread after 40s initialDelayMs is 40s by default Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
We see two familiar parameters, scheduler started. We can see that in terms of code sequence
- He first determines whether started is false. If yes, it is modified to true.
- Then it executes a setIsDirty method, which we will see later
- Then I put myself in a thread pool. The thread pool executes after an interval of 40 seconds
- Then the future is stored in the private final AtomicReference scheduledPeriodicRef
I was really confused when I saw here, and several questions came to mind.
- What is the setIsDirty method used for?
- Why can this be run in the thread pool?
- What did the thread pool do after it was executed?
Let's take a look at the code with problems
setIsDirty method
public synchronized void setIsDirty() { isInstanceInfoDirty = true; lastDirtyTimestamp = System.currentTimeMillis(); }
There are only two assignment operations in this method. I don't know what he wants to do. We carry on with this question
run method
Just now, we talked about why this can be put into the thread pool. We slide the mouse to the beginning of the method
class InstanceInfoReplicator implements Runnable
This class inherits from the Runnable interface, so it must implement the run method. We put it in the thread pool, and finally execute the run method implemented by this class. Let's go on to see the run method
@Override public void run() { try { // Refresh the information of the service instance discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); // This parameter cannot be null during the first execution, because it has just been assigned a value when the thread is created if (dirtyTimestamp != null) { // register discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
My God, I finally saw the word register. It's not easy. See long dirtytimestamp = instanceinfo Isdirtywithtime(); This code? Later, we judge whether it is null. According to our execution sequence, it cannot be false, because we have just performed the assignment operation for it in the isSetDirty method. Then we move on to the register method.
register method
/** * Register with the eureka service by making the appropriate REST call. */ boolean register() throws Throwable { logger.info(PREFIX + appPathIdentifier + ": registering service..."); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }
Through the code, we clearly see that the request is through eurekatransport Registrationclient It is sent by the register method. You can boldly guess that the sent content should be the service instance.
Let's follow in
// How to actually perform registration @Override public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); Response response = null; try { // http://localhost:8080/v2/apps/ServiceA // The sent request is a post request. The object of the service instance is made into a json, which contains its own host, ip, port number, etc Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request(); addExtraProperties(resourceBuilder); addExtraHeaders(resourceBuilder); response = resourceBuilder .accept(MediaType.APPLICATION_JSON) .acceptEncoding("gzip") .post(Entity.json(info)); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
At this point, the problem is finally solved. The client implements this method to convert its service instance into json, and registers itself in the registry through post requests. I believe that the friends here have another problem eurekatransport When is the registrationclient class initialized?
It is actually when the network component EurekaTransport is initialized.
The code is as follows
// Initialize the network communication component EurekaTransport eurekaTransport = new EurekaTransport(); // In this method, the member variable registrationClient of eurekaTransport is assigned a value scheduleServerEndpointTask(eurekaTransport, args);
Code to initialize registrationClient
if (clientConfig.shouldRegisterWithEureka()) { EurekaHttpClientFactory newRegistrationClientFactory = null; EurekaHttpClient newRegistrationClient = null; try { newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory( eurekaTransport.bootstrapResolver, eurekaTransport.transportClientFactory, transportConfig ); newRegistrationClient = newRegistrationClientFactory.newClient(); } catch (Exception e) { logger.warn("Transport initialization failure", e); } eurekaTransport.registrationClientFactory = newRegistrationClientFactory; eurekaTransport.registrationClient = newRegistrationClient; }