Skip to content

Commit 042a5d0

Browse files
committed
Primary shard failure with initializing replica shards can cause the replica shard to cause allocation failures
fixes #2592
1 parent a7bb3c2 commit 042a5d0

File tree

2 files changed

+91
-15
lines changed

2 files changed

+91
-15
lines changed

‎src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java‎

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.cluster.routing.allocation;
2121

2222
import com.google.common.collect.Lists;
23-
2423
import org.elasticsearch.ElasticSearchException;
2524
import org.elasticsearch.ElasticSearchIllegalStateException;
2625
import org.elasticsearch.cluster.ClusterState;
@@ -103,7 +102,7 @@ public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, Shar
103102
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
104103
Collections.shuffle(routingNodes.unassigned());
105104
FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), failedShard);
106-
boolean changed = applyFailedShard(allocation, failedShard);
105+
boolean changed = applyFailedShard(allocation, failedShard, true);
107106
if (!changed) {
108107
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
109108
}
@@ -165,7 +164,7 @@ public RoutingAllocation.Result rerouteWithNoReassign(ClusterState clusterState)
165164

166165
// elect primaries *before* allocating unassigned, so backups of primaries that failed
167166
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
168-
changed |= electPrimaries(allocation.routingNodes());
167+
changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
169168

170169
if (!changed) {
171170
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
@@ -185,13 +184,13 @@ private boolean reroute(RoutingAllocation allocation) {
185184

186185
// elect primaries *before* allocating unassigned, so backups of primaries that failed
187186
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
188-
changed |= electPrimaries(allocation.routingNodes());
187+
changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
189188

190189
// now allocate all the unassigned to available nodes
191190
if (allocation.routingNodes().hasUnassigned()) {
192191
changed |= shardsAllocators.allocateUnassigned(allocation);
193192
// elect primaries again, in case this is needed with unassigned allocation
194-
changed |= electPrimaries(allocation.routingNodes());
193+
changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
195194
}
196195

197196
// move shards that no longer can be allocated
@@ -242,8 +241,9 @@ private boolean moveShards(RoutingAllocation allocation) {
242241
return changed;
243242
}
244243

245-
private boolean electPrimaries(RoutingNodes routingNodes) {
244+
private boolean electPrimariesAndUnassignDanglingReplicas(RoutingAllocation allocation) {
246245
boolean changed = false;
246+
RoutingNodes routingNodes = allocation.routingNodes();
247247
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
248248
if (shardEntry.primary() && !shardEntry.assignedToNode()) {
249249
boolean elected = false;
@@ -283,6 +283,29 @@ private boolean electPrimaries(RoutingNodes routingNodes) {
283283
}
284284
}
285285
}
286+
287+
// go over and remove dangling replicas that are initializing, but we couldn't elect primary ones...
288+
List<ShardRouting> shardsToFail = null;
289+
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
290+
if (shardEntry.primary() && !shardEntry.assignedToNode()) {
291+
for (RoutingNode routingNode : routingNodes.nodesToShards().values()) {
292+
for (MutableShardRouting shardEntry2 : routingNode.shards()) {
293+
if (shardEntry.shardId().equals(shardEntry2.shardId()) && !shardEntry2.active()) {
294+
changed = true;
295+
if (shardsToFail == null) {
296+
shardsToFail = new ArrayList<ShardRouting>();
297+
}
298+
shardsToFail.add(shardEntry2);
299+
}
300+
}
301+
}
302+
}
303+
}
304+
if (shardsToFail != null) {
305+
for (ShardRouting shardToFail : shardsToFail) {
306+
applyFailedShard(allocation, shardToFail, false);
307+
}
308+
}
286309
return changed;
287310
}
288311

@@ -310,8 +333,7 @@ private boolean deassociateDeadNodes(RoutingAllocation allocation) {
310333
changed = true;
311334
// now, go over all the shards routing on the node, and fail them
312335
for (MutableShardRouting shardRouting : new ArrayList<MutableShardRouting>(node.shards())) {
313-
// we create a copy of the shard routing, since applyFailedShard assumes its a new copy
314-
applyFailedShard(allocation, shardRouting);
336+
applyFailedShard(allocation, shardRouting, false);
315337
}
316338
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
317339
// since it relies on the fact that the RoutingNode exists in the list of nodes
@@ -372,7 +394,7 @@ private boolean applyStartedShards(RoutingNodes routingNodes, Iterable<? extends
372394
* Applies the relevant logic to handle a failed shard. Returns <tt>true</tt> if changes happened that
373395
* require relocation.
374396
*/
375-
private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard) {
397+
private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard, boolean addToIgnoreList) {
376398
// create a copy of the failed shard, since we assume we can change possible refernces to it without
377399
// changing the state of failed shard
378400
failedShard = new ImmutableShardRouting(failedShard);
@@ -397,8 +419,10 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
397419
it.remove();
398420
shardRouting.deassignNode();
399421

400-
// make sure we ignore this shard on the relevant node
401-
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
422+
if (addToIgnoreList) {
423+
// make sure we ignore this shard on the relevant node
424+
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
425+
}
402426

403427
break;
404428
}
@@ -433,8 +457,10 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
433457
dirty = true;
434458
shardRouting.cancelRelocation();
435459
it.remove();
436-
// make sure we ignore this shard on the relevant node
437-
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
460+
if (addToIgnoreList) {
461+
// make sure we ignore this shard on the relevant node
462+
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
463+
}
438464

439465
allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(),
440466
null, failedShard.primary(), ShardRoutingState.UNASSIGNED, failedShard.version() + 1));
@@ -469,8 +495,10 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
469495
MutableShardRouting shardRouting = it.next();
470496
if (shardRouting.equals(failedShard)) {
471497
dirty = true;
472-
// make sure we ignore this shard on the relevant node
473-
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
498+
if (addToIgnoreList) {
499+
// make sure we ignore this shard on the relevant node
500+
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
501+
}
474502

475503
it.remove();
476504

‎src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/PrimaryElectionRoutingTests.java‎

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.cluster.routing.RoutingNodes;
2525
import org.elasticsearch.cluster.routing.RoutingTable;
2626
import org.elasticsearch.cluster.routing.allocation.AllocationService;
27+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
2728
import org.elasticsearch.common.logging.ESLogger;
2829
import org.elasticsearch.common.logging.Loggers;
2930
import org.testng.annotations.Test;
@@ -98,4 +99,51 @@ public void testBackupElectionToPrimaryWhenPrimaryCanBeAllocatedToAnotherNode()
9899
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node2"));
99100
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node3"));
100101
}
102+
103+
@Test
104+
public void testRemovingInitializingReplicasIfPrimariesFails() {
105+
AllocationService allocation = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
106+
107+
logger.info("Building initial routing table");
108+
109+
MetaData metaData = newMetaDataBuilder()
110+
.put(newIndexMetaDataBuilder("test").numberOfShards(2).numberOfReplicas(1))
111+
.build();
112+
113+
RoutingTable routingTable = routingTable()
114+
.addAsNew(metaData.index("test"))
115+
.build();
116+
117+
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
118+
119+
logger.info("Adding two nodes and performing rerouting");
120+
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
121+
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
122+
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
123+
124+
logger.info("Start the primary shards");
125+
RoutingNodes routingNodes = clusterState.routingNodes();
126+
rerouteResult = allocation.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
127+
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
128+
routingNodes = clusterState.routingNodes();
129+
130+
assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(2));
131+
assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(2));
132+
133+
// now, fail one node, while the replica is initializing, and it also holds a primary
134+
logger.info("--> fail node with primary");
135+
String nodeIdToFail = clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId();
136+
String nodeIdRemaining = nodeIdToFail.equals("node1") ? "node2" : "node1";
137+
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
138+
.put(newNode(nodeIdRemaining))
139+
).build();
140+
rerouteResult = allocation.reroute(clusterState);
141+
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
142+
routingNodes = clusterState.routingNodes();
143+
144+
assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(1));
145+
assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(1));
146+
assertThat(routingNodes.node(nodeIdRemaining).shardsWithState(INITIALIZING).get(0).primary(), equalTo(true));
147+
148+
}
101149
}

0 commit comments

Comments
 (0)