Skip to content

Commit 82be638

Browse files
committed
eagerly reroute when a node leaves the cluster
1 parent 4539655 commit 82be638

File tree

4 files changed

+40
-3
lines changed

4 files changed

+40
-3
lines changed

‎src/main/java/org/elasticsearch/discovery/Discovery.java‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.cluster.block.ClusterBlock;
2424
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2525
import org.elasticsearch.cluster.node.DiscoveryNode;
26+
import org.elasticsearch.cluster.routing.allocation.AllocationService;
2627
import org.elasticsearch.common.component.LifecycleComponent;
2728
import org.elasticsearch.common.inject.internal.Nullable;
2829
import org.elasticsearch.node.service.NodeService;
@@ -50,6 +51,12 @@ public interface Discovery extends LifecycleComponent<Discovery> {
5051
*/
5152
void setNodeService(@Nullable NodeService nodeService);
5253

54+
/**
55+
* Another hack to solve dep injection problem..., note, this will be called before
56+
* any start is called.
57+
*/
58+
void setAllocationService(AllocationService allocationService);
59+
5360
/**
5461
* Publish all the changes to the cluster from the master (can be called just by the master). The publish
5562
* process should not publish this state to the master as well! (the master is sending it...).

‎src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java‎

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.elasticsearch.cluster.node.DiscoveryNode;
2727
import org.elasticsearch.cluster.node.DiscoveryNodeService;
2828
import org.elasticsearch.cluster.node.DiscoveryNodes;
29+
import org.elasticsearch.cluster.routing.allocation.AllocationService;
30+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
2931
import org.elasticsearch.common.component.AbstractLifecycleComponent;
3032
import org.elasticsearch.common.inject.Inject;
3133
import org.elasticsearch.common.inject.internal.Nullable;
@@ -58,6 +60,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
5860

5961
private final DiscoveryNodeService discoveryNodeService;
6062

63+
private AllocationService allocationService;
64+
6165
private final ClusterName clusterName;
6266

6367
private DiscoveryNode localNode;
@@ -88,6 +92,11 @@ public void setNodeService(@Nullable NodeService nodeService) {
8892
// nothing to do here
8993
}
9094

95+
@Override
96+
public void setAllocationService(AllocationService allocationService) {
97+
this.allocationService = allocationService;
98+
}
99+
91100
@Override
92101
protected void doStart() throws ElasticSearchException {
93102
synchronized (clusterGroups) {
@@ -209,7 +218,10 @@ public ClusterState execute(ClusterState currentState) {
209218
if (delta.added()) {
210219
logger.warn("No new nodes should be created when a new discovery view is accepted");
211220
}
212-
return newClusterStateBuilder().state(currentState).nodes(newNodes).build();
221+
// reroute here, so we eagerly remove dead nodes from the routing
222+
ClusterState updatedState = newClusterStateBuilder().state(currentState).nodes(newNodes).build();
223+
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).build());
224+
return newClusterStateBuilder().state(updatedState).routingResult(routingResult).build();
213225
}
214226
});
215227
}

‎src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java‎

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.elasticsearch.cluster.node.DiscoveryNodeService;
3232
import org.elasticsearch.cluster.node.DiscoveryNodes;
3333
import org.elasticsearch.cluster.routing.RoutingTable;
34+
import org.elasticsearch.cluster.routing.allocation.AllocationService;
35+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
3436
import org.elasticsearch.common.UUID;
3537
import org.elasticsearch.common.component.AbstractLifecycleComponent;
3638
import org.elasticsearch.common.component.Lifecycle;
@@ -78,6 +80,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
7880

7981
private final ClusterService clusterService;
8082

83+
private AllocationService allocationService;
84+
8185
private final ClusterName clusterName;
8286

8387
private final DiscoveryNodeService discoveryNodeService;
@@ -161,6 +165,11 @@ public void setNodeService(@Nullable NodeService nodeService) {
161165
this.nodeService = nodeService;
162166
}
163167

168+
@Override
169+
public void setAllocationService(AllocationService allocationService) {
170+
this.allocationService = allocationService;
171+
}
172+
164173
@Override
165174
protected void doStart() throws ElasticSearchException {
166175
Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
@@ -370,7 +379,9 @@ public ClusterState execute(ClusterState currentState) {
370379
if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
371380
return rejoin(currentState, "not enough master nodes");
372381
}
373-
return currentState;
382+
// eagerly run reroute to remove dead nodes from routing table
383+
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(currentState).build());
384+
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
374385
}
375386
});
376387
} else {
@@ -399,7 +410,9 @@ public ClusterState execute(ClusterState currentState) {
399410
if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
400411
return rejoin(currentState, "not enough master nodes");
401412
}
402-
return currentState;
413+
// eagerly run reroute to remove dead nodes from routing table
414+
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(currentState).build());
415+
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
403416
}
404417

405418
@Override

‎src/main/java/org/elasticsearch/node/internal/InternalNode.java‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.cluster.ClusterNameModule;
3333
import org.elasticsearch.cluster.ClusterService;
3434
import org.elasticsearch.cluster.routing.RoutingService;
35+
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3536
import org.elasticsearch.common.CacheRecycler;
3637
import org.elasticsearch.common.StopWatch;
3738
import org.elasticsearch.common.collect.Tuple;
@@ -50,6 +51,7 @@
5051
import org.elasticsearch.common.settings.Settings;
5152
import org.elasticsearch.common.settings.SettingsModule;
5253
import org.elasticsearch.common.util.concurrent.ThreadLocals;
54+
import org.elasticsearch.discovery.Discovery;
5355
import org.elasticsearch.discovery.DiscoveryModule;
5456
import org.elasticsearch.discovery.DiscoveryService;
5557
import org.elasticsearch.env.Environment;
@@ -187,6 +189,9 @@ public Node start() {
187189
ESLogger logger = Loggers.getLogger(Node.class, settings.get("name"));
188190
logger.info("{{}}[{}]: starting ...", Version.CURRENT, JvmInfo.jvmInfo().pid());
189191

192+
// hack around dependency injection problem (for now...)
193+
injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class));
194+
190195
for (Class<? extends LifecycleComponent> plugin : pluginsService.services()) {
191196
injector.getInstance(plugin).start();
192197
}

0 commit comments

Comments
 (0)