From commits-return-111544-archive-asf-public=cust-asf.ponee.io@lucene.apache.org Tue Dec 3 04:37:46 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1A9CA18064E for ; Tue, 3 Dec 2019 05:37:46 +0100 (CET) Received: (qmail 42340 invoked by uid 500); 3 Dec 2019 04:37:45 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 42331 invoked by uid 99); 3 Dec 2019 04:37:44 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Dec 2019 04:37:44 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id B72E48B690; Tue, 3 Dec 2019 04:37:44 +0000 (UTC) Date: Tue, 03 Dec 2019 04:37:44 +0000 To: "commits@lucene.apache.org" Subject: [lucene-solr] branch branch_8x updated: SOLR-13995: Move ZkShardTerms.Terms to SolrJ MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157534786447.9155.17149136556744532309@gitbox.apache.org> From: noble@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: lucene-solr X-Git-Refname: refs/heads/branch_8x X-Git-Reftype: branch X-Git-Oldrev: 17f4cc36880e24e5b1c4efa8b03f998fc20a89e3 X-Git-Newrev: 664d93591f45336d3e89df002c29124cd07f334d X-Git-Rev: 664d93591f45336d3e89df002c29124cd07f334d X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. noble pushed a commit to branch branch_8x in repository https://gitbox.apache.org/repos/asf/lucene-solr.git The following commit(s) were added to refs/heads/branch_8x by this push: new 664d935 SOLR-13995: Move ZkShardTerms.Terms to SolrJ 664d935 is described below commit 664d93591f45336d3e89df002c29124cd07f334d Author: noble AuthorDate: Tue Dec 3 15:16:34 2019 +1100 SOLR-13995: Move ZkShardTerms.Terms to SolrJ --- .../solr/cloud/RecoveringCoreTermWatcher.java | 3 +- .../java/org/apache/solr/cloud/ZkShardTerms.java | 259 +++------------------ .../org/apache/solr/cloud/ZkShardTermsTest.java | 3 +- .../apache/solr/client/solrj/cloud/ShardTerms.java | 256 ++++++++++++++++++++ 4 files changed, 286 insertions(+), 235 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java index 007d221..5d4ec17 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java @@ -20,6 +20,7 @@ package org.apache.solr.cloud; import java.lang.invoke.MethodHandles; import java.util.concurrent.atomic.AtomicLong; +import org.apache.solr.client.solrj.cloud.ShardTerms; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.SolrCore; @@ -44,7 +45,7 @@ public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher { } @Override - public boolean onTermChanged(ZkShardTerms.Terms terms) { + public boolean onTermChanged(ShardTerms terms) { if (coreContainer.isShutDown()) return false; try (SolrCore solrCore = coreContainer.getCore(coreDescriptor.getName())) { diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java index 2c97164..2c2a24a 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java @@ -18,15 +18,14 @@ package org.apache.solr.cloud; import java.lang.invoke.MethodHandles; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.solr.client.solrj.cloud.ShardTerms; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; @@ -74,14 +73,12 @@ public class ZkShardTerms implements AutoCloseable{ private final Set listeners = new HashSet<>(); private final AtomicBoolean isClosed = new AtomicBoolean(false); - private static final String RECOVERING_TERM_SUFFIX = "_recovering"; - - private Terms terms; + private ShardTerms terms; // Listener of a core for shard's term change events interface CoreTermWatcher { // return true if the listener wanna to be triggered in the next time - boolean onTermChanged(Terms terms); + boolean onTermChanged(ShardTerms terms); } public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) { @@ -103,12 +100,15 @@ public class ZkShardTerms implements AutoCloseable{ public void ensureTermsIsHigher(String leader, Set replicasNeedingRecovery) { if (replicasNeedingRecovery.isEmpty()) return; - Terms newTerms; + ShardTerms newTerms; while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) { if (forceSaveTerms(newTerms)) return; } } + public ShardTerms getShardTerms() { + return terms; + } /** * Can this replica become leader? * @param coreNodeName of the replica @@ -148,7 +148,7 @@ public class ZkShardTerms implements AutoCloseable{ // package private for testing, only used by tests Map getTerms() { synchronized (writingLock) { - return new HashMap<>(terms.values); + return terms.getTerms(); } } @@ -178,7 +178,7 @@ public class ZkShardTerms implements AutoCloseable{ // package private for testing, only used by tests // return true if this object should not be reused boolean removeTerm(String coreNodeName) { - Terms newTerms; + ShardTerms newTerms; while ( (newTerms = terms.removeTerm(coreNodeName)) != null) { try { if (saveTerms(newTerms)) return false; @@ -195,7 +195,7 @@ public class ZkShardTerms implements AutoCloseable{ * @param coreNodeName of the replica */ void registerTerm(String coreNodeName) { - Terms newTerms; + ShardTerms newTerms; while ( (newTerms = terms.registerTerm(coreNodeName)) != null) { if (forceSaveTerms(newTerms)) break; } @@ -207,14 +207,14 @@ public class ZkShardTerms implements AutoCloseable{ * @param coreNodeName of the replica */ public void setTermEqualsToLeader(String coreNodeName) { - Terms newTerms; + ShardTerms newTerms; while ( (newTerms = terms.setTermEqualsToLeader(coreNodeName)) != null) { if (forceSaveTerms(newTerms)) break; } } public void setTermToZero(String coreNodeName) { - Terms newTerms; + ShardTerms newTerms; while ( (newTerms = terms.setTermToZero(coreNodeName)) != null) { if (forceSaveTerms(newTerms)) break; } @@ -224,7 +224,7 @@ public class ZkShardTerms implements AutoCloseable{ * Mark {@code coreNodeName} as recovering */ public void startRecovering(String coreNodeName) { - Terms newTerms; + ShardTerms newTerms; while ( (newTerms = terms.startRecovering(coreNodeName)) != null) { if (forceSaveTerms(newTerms)) break; } @@ -234,27 +234,22 @@ public class ZkShardTerms implements AutoCloseable{ * Mark {@code coreNodeName} as finished recovering */ public void doneRecovering(String coreNodeName) { - Terms newTerms; + ShardTerms newTerms; while ( (newTerms = terms.doneRecovering(coreNodeName)) != null) { if (forceSaveTerms(newTerms)) break; } } public boolean isRecovering(String name) { - return terms.values.containsKey(recoveringTerm(name)); - } - - public static String recoveringTerm(String coreNodeName) { - return coreNodeName + RECOVERING_TERM_SUFFIX; + return terms.isRecovering(name); } - /** * When first updates come in, all replicas have some data now, * so we must switch from term 0 (registered) to 1 (have some data) */ public void ensureHighestTermsAreNotZero() { - Terms newTerms; + ShardTerms newTerms; while ( (newTerms = terms.ensureHighestTermsAreNotZero()) != null) { if (forceSaveTerms(newTerms)) break; } @@ -282,7 +277,7 @@ public class ZkShardTerms implements AutoCloseable{ * @param newTerms to be set * @return true if terms is saved successfully to ZK, false if otherwise */ - private boolean forceSaveTerms(Terms newTerms) { + private boolean forceSaveTerms(ShardTerms newTerms) { try { return saveTerms(newTerms); } catch (KeeperException.NoNodeException e) { @@ -297,11 +292,11 @@ public class ZkShardTerms implements AutoCloseable{ * @return true if terms is saved successfully to ZK, false if otherwise * @throws KeeperException.NoNodeException correspond ZK term node is not created */ - private boolean saveTerms(Terms newTerms) throws KeeperException.NoNodeException { - byte[] znodeData = Utils.toJSON(newTerms.values); + private boolean saveTerms(ShardTerms newTerms) throws KeeperException.NoNodeException { + byte[] znodeData = Utils.toJSON(newTerms); try { - Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true); - setNewTerms(new Terms(newTerms.values, stat.getVersion())); + Stat stat = zkClient.setData(znodePath, znodeData, newTerms.getVersion(), true); + setNewTerms(new ShardTerms(newTerms, stat.getVersion())); log.info("Successful update of terms at {} to {}", znodePath, newTerms); return true; } catch (KeeperException.BadVersionException e) { @@ -344,11 +339,11 @@ public class ZkShardTerms implements AutoCloseable{ * Fetch latest terms from ZK */ public void refreshTerms() { - Terms newTerms; + ShardTerms newTerms; try { Stat stat = new Stat(); byte[] data = zkClient.getData(znodePath, null, stat, true); - newTerms = new Terms((Map) Utils.fromJSON(data), stat.getVersion()); + newTerms = new ShardTerms((Map) Utils.fromJSON(data), stat.getVersion()); } catch (KeeperException e) { Thread.interrupted(); throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e); @@ -411,10 +406,10 @@ public class ZkShardTerms implements AutoCloseable{ * Atomically update {@link ZkShardTerms#terms} and call listeners * @param newTerms to be set */ - private void setNewTerms(Terms newTerms) { + private void setNewTerms(ShardTerms newTerms) { boolean isChanged = false; synchronized (writingLock) { - if (terms == null || newTerms.version > terms.version) { + if (terms == null || newTerms.getVersion() > terms.getVersion()) { terms = newTerms; isChanged = true; } @@ -422,211 +417,9 @@ public class ZkShardTerms implements AutoCloseable{ if (isChanged) onTermUpdates(newTerms); } - private void onTermUpdates(Terms newTerms) { + private void onTermUpdates(ShardTerms newTerms) { synchronized (listeners) { listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(newTerms)); } } - - /** - * Hold values of terms, this class is immutable - */ - static class Terms { - private final Map values; - private final long maxTerm; - // ZK node version - private final int version; - - public Terms () { - this(new HashMap<>(), 0); - } - - public Terms(Map values, int version) { - this.values = values; - this.version = version; - if (values.isEmpty()) this.maxTerm = 0; - else this.maxTerm = Collections.max(values.values()); - } - - /** - * Can {@code coreNodeName} become leader? - * @param coreNodeName of the replica - * @return true if {@code coreNodeName} can become leader, false if otherwise - */ - boolean canBecomeLeader(String coreNodeName) { - return haveHighestTermValue(coreNodeName) && !values.containsKey(recoveringTerm(coreNodeName)); - } - - /** - * Is {@code coreNodeName}'s term highest? - * @param coreNodeName of the replica - * @return true if term of {@code coreNodeName} is highest - */ - boolean haveHighestTermValue(String coreNodeName) { - if (values.isEmpty()) return true; - long maxTerm = Collections.max(values.values()); - return values.getOrDefault(coreNodeName, 0L) == maxTerm; - } - - Long getTerm(String coreNodeName) { - return values.get(coreNodeName); - } - - /** - * Return a new {@link Terms} in which term of {@code leader} is higher than {@code replicasNeedingRecovery} - * @param leader coreNodeName of leader - * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term - * @return null if term of {@code leader} is already higher than {@code replicasNeedingRecovery} - */ - Terms increaseTerms(String leader, Set replicasNeedingRecovery) { - if (!values.containsKey(leader)) { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader); - } - - boolean changed = false; - boolean foundReplicasInLowerTerms = false; - - HashMap newValues = new HashMap<>(values); - long leaderTerm = newValues.get(leader); - for (Map.Entry entry : newValues.entrySet()) { - String key = entry.getKey(); - if (replicasNeedingRecovery.contains(key)) foundReplicasInLowerTerms = true; - if (Objects.equals(entry.getValue(), leaderTerm)) { - if(skipIncreaseTermOf(key, replicasNeedingRecovery)) { - changed = true; - } else { - newValues.put(key, leaderTerm+1); - } - } - } - - // We should skip the optimization if there are no replicasNeedingRecovery present in local terms, - // this may indicate that the current value is stale - if (!changed && foundReplicasInLowerTerms) return null; - return new Terms(newValues, version); - } - - private boolean skipIncreaseTermOf(String key, Set replicasNeedingRecovery) { - if (key.endsWith(RECOVERING_TERM_SUFFIX)) { - key = key.substring(0, key.length() - RECOVERING_TERM_SUFFIX.length()); - } - return replicasNeedingRecovery.contains(key); - } - - /** - * Return a new {@link Terms} in which highest terms are not zero - * @return null if highest terms are already larger than zero - */ - Terms ensureHighestTermsAreNotZero() { - if (maxTerm > 0) return null; - else { - HashMap newValues = new HashMap<>(values); - for (String replica : values.keySet()) { - newValues.put(replica, 1L); - } - return new Terms(newValues, version); - } - } - - /** - * Return a new {@link Terms} in which terms for the {@code coreNodeName} are removed - * @param coreNodeName of the replica - * @return null if term of {@code coreNodeName} is already not exist - */ - Terms removeTerm(String coreNodeName) { - if (!values.containsKey(recoveringTerm(coreNodeName)) && !values.containsKey(coreNodeName)) { - return null; - } - - HashMap newValues = new HashMap<>(values); - newValues.remove(coreNodeName); - newValues.remove(recoveringTerm(coreNodeName)); - - return new Terms(newValues, version); - } - - /** - * Return a new {@link Terms} in which the associate term of {@code coreNodeName} is not null - * @param coreNodeName of the replica - * @return null if term of {@code coreNodeName} is already exist - */ - Terms registerTerm(String coreNodeName) { - if (values.containsKey(coreNodeName)) return null; - - HashMap newValues = new HashMap<>(values); - newValues.put(coreNodeName, 0L); - return new Terms(newValues, version); - } - - Terms setTermToZero(String coreNodeName) { - if (values.getOrDefault(coreNodeName, -1L) == 0) { - return null; - } - HashMap newValues = new HashMap<>(values); - newValues.put(coreNodeName, 0L); - return new Terms(newValues, version); - } - - /** - * Return a new {@link Terms} in which the term of {@code coreNodeName} is max - * @param coreNodeName of the replica - * @return null if term of {@code coreNodeName} is already maximum - */ - Terms setTermEqualsToLeader(String coreNodeName) { - long maxTerm = getMaxTerm(); - if (values.get(coreNodeName) == maxTerm) return null; - - HashMap newValues = new HashMap<>(values); - newValues.put(coreNodeName, maxTerm); - newValues.remove(recoveringTerm(coreNodeName)); - return new Terms(newValues, version); - } - - long getMaxTerm() { - return maxTerm; - } - - /** - * Mark {@code coreNodeName} as recovering - * @param coreNodeName of the replica - * @return null if {@code coreNodeName} is already marked as doing recovering - */ - Terms startRecovering(String coreNodeName) { - long maxTerm = getMaxTerm(); - if (values.get(coreNodeName) == maxTerm) - return null; - - HashMap newValues = new HashMap<>(values); - if (!newValues.containsKey(recoveringTerm(coreNodeName))) { - long currentTerm = newValues.getOrDefault(coreNodeName, 0L); - // by keeping old term, we will have more information in leader election - newValues.put(recoveringTerm(coreNodeName), currentTerm); - } - newValues.put(coreNodeName, maxTerm); - return new Terms(newValues, version); - } - - /** - * Mark {@code coreNodeName} as finished recovering - * @param coreNodeName of the replica - * @return null if term of {@code coreNodeName} is already finished doing recovering - */ - Terms doneRecovering(String coreNodeName) { - if (!values.containsKey(recoveringTerm(coreNodeName))) { - return null; - } - - HashMap newValues = new HashMap<>(values); - newValues.remove(recoveringTerm(coreNodeName)); - return new Terms(newValues, version); - } - - @Override - public String toString() { - return "Terms{" + - "values=" + values + - ", version=" + version + - '}'; - } - } } diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java index a56202a..56ed8ae7 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.cloud.ShardTerms; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.common.util.TimeSource; import org.apache.solr.util.TimeOut; @@ -267,7 +268,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase { public void testEnsureTermsIsHigher() { Map map = new HashMap<>(); map.put("leader", 0L); - ZkShardTerms.Terms terms = new ZkShardTerms.Terms(map, 0); + ShardTerms terms = new ShardTerms(map, 0); terms = terms.increaseTerms("leader", Collections.singleton("replica")); assertEquals(1L, terms.getTerm("leader").longValue()); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java new file mode 100644 index 0000000..3b2f754 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.cloud; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.solr.common.MapWriter; +import org.apache.solr.common.SolrException; + +/** + * Hold values of terms, this class is immutable. Create a new instance for every mutation + */ +public class ShardTerms implements MapWriter { + private static final String RECOVERING_TERM_SUFFIX = "_recovering"; + private final Map values; + private final long maxTerm; + // ZK node version + private final int version; + + public ShardTerms () { + this(new HashMap<>(), 0); + } + + public ShardTerms(ShardTerms newTerms, int version) { + this(newTerms.values, version); + } + + @Override + public void writeMap(EntryWriter ew) throws IOException { + values.forEach(ew.getBiConsumer()); + } + + public ShardTerms(Map values, int version) { + this.values = values; + this.version = version; + if (values.isEmpty()) this.maxTerm = 0; + else this.maxTerm = Collections.max(values.values()); + } + + /** + * Can {@code coreNodeName} become leader? + * @param coreNodeName of the replica + * @return true if {@code coreNodeName} can become leader, false if otherwise + */ + public boolean canBecomeLeader(String coreNodeName) { + return haveHighestTermValue(coreNodeName) && !values.containsKey(recoveringTerm(coreNodeName)); + } + + /** + * Is {@code coreNodeName}'s term highest? + * @param coreNodeName of the replica + * @return true if term of {@code coreNodeName} is highest + */ + public boolean haveHighestTermValue(String coreNodeName) { + if (values.isEmpty()) return true; + long maxTerm = Collections.max(values.values()); + return values.getOrDefault(coreNodeName, 0L) == maxTerm; + } + + public Long getTerm(String coreNodeName) { + return values.get(coreNodeName); + } + + /** + * Return a new {@link ShardTerms} in which term of {@code leader} is higher than {@code replicasNeedingRecovery} + * @param leader coreNodeName of leader + * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term + * @return null if term of {@code leader} is already higher than {@code replicasNeedingRecovery} + */ + public ShardTerms increaseTerms(String leader, Set replicasNeedingRecovery) { + if (!values.containsKey(leader)) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader); + } + + boolean changed = false; + boolean foundReplicasInLowerTerms = false; + + HashMap newValues = new HashMap<>(values); + long leaderTerm = newValues.get(leader); + for (Map.Entry entry : newValues.entrySet()) { + String key = entry.getKey(); + if (replicasNeedingRecovery.contains(key)) foundReplicasInLowerTerms = true; + if (Objects.equals(entry.getValue(), leaderTerm)) { + if(skipIncreaseTermOf(key, replicasNeedingRecovery)) { + changed = true; + } else { + newValues.put(key, leaderTerm+1); + } + } + } + + // We should skip the optimization if there are no replicasNeedingRecovery present in local terms, + // this may indicate that the current value is stale + if (!changed && foundReplicasInLowerTerms) return null; + return new ShardTerms(newValues, version); + } + + private boolean skipIncreaseTermOf(String key, Set replicasNeedingRecovery) { + if (key.endsWith(RECOVERING_TERM_SUFFIX)) { + key = key.substring(0, key.length() - RECOVERING_TERM_SUFFIX.length()); + } + return replicasNeedingRecovery.contains(key); + } + + /** + * Return a new {@link ShardTerms} in which highest terms are not zero + * @return null if highest terms are already larger than zero + */ + public ShardTerms ensureHighestTermsAreNotZero() { + if (maxTerm > 0) return null; + else { + HashMap newValues = new HashMap<>(values); + for (String replica : values.keySet()) { + newValues.put(replica, 1L); + } + return new ShardTerms(newValues, version); + } + } + + /** + * Return a new {@link ShardTerms} in which terms for the {@code coreNodeName} are removed + * @param coreNodeName of the replica + * @return null if term of {@code coreNodeName} is already not exist + */ + public ShardTerms removeTerm(String coreNodeName) { + if (!values.containsKey(recoveringTerm(coreNodeName)) && !values.containsKey(coreNodeName)) { + return null; + } + + HashMap newValues = new HashMap<>(values); + newValues.remove(coreNodeName); + newValues.remove(recoveringTerm(coreNodeName)); + + return new ShardTerms(newValues, version); + } + + /** + * Return a new {@link ShardTerms} in which the associate term of {@code coreNodeName} is not null + * @param coreNodeName of the replica + * @return null if term of {@code coreNodeName} is already exist + */ + public ShardTerms registerTerm(String coreNodeName) { + if (values.containsKey(coreNodeName)) return null; + + HashMap newValues = new HashMap<>(values); + newValues.put(coreNodeName, 0L); + return new ShardTerms(newValues, version); + } + + public ShardTerms setTermToZero(String coreNodeName) { + if (values.getOrDefault(coreNodeName, -1L) == 0) { + return null; + } + HashMap newValues = new HashMap<>(values); + newValues.put(coreNodeName, 0L); + return new ShardTerms(newValues, version); + } + + /** + * Return a new {@link ShardTerms} in which the term of {@code coreNodeName} is max + * @param coreNodeName of the replica + * @return null if term of {@code coreNodeName} is already maximum + */ + public ShardTerms setTermEqualsToLeader(String coreNodeName) { + long maxTerm = getMaxTerm(); + if (values.get(coreNodeName) == maxTerm) return null; + + HashMap newValues = new HashMap<>(values); + newValues.put(coreNodeName, maxTerm); + newValues.remove(recoveringTerm(coreNodeName)); + return new ShardTerms(newValues, version); + } + + public long getMaxTerm() { + return maxTerm; + } + + /** + * Mark {@code coreNodeName} as recovering + * @param coreNodeName of the replica + * @return null if {@code coreNodeName} is already marked as doing recovering + */ + public ShardTerms startRecovering(String coreNodeName) { + long maxTerm = getMaxTerm(); + if (values.get(coreNodeName) == maxTerm) + return null; + + HashMap newValues = new HashMap<>(values); + if (!newValues.containsKey(recoveringTerm(coreNodeName))) { + long currentTerm = newValues.getOrDefault(coreNodeName, 0L); + // by keeping old term, we will have more information in leader election + newValues.put(recoveringTerm(coreNodeName), currentTerm); + } + newValues.put(coreNodeName, maxTerm); + return new ShardTerms(newValues, version); + } + + /** + * Mark {@code coreNodeName} as finished recovering + * @param coreNodeName of the replica + * @return null if term of {@code coreNodeName} is already finished doing recovering + */ + public ShardTerms doneRecovering(String coreNodeName) { + if (!values.containsKey(recoveringTerm(coreNodeName))) { + return null; + } + + HashMap newValues = new HashMap<>(values); + newValues.remove(recoveringTerm(coreNodeName)); + return new ShardTerms(newValues, version); + } + + public static String recoveringTerm(String coreNodeName) { + return coreNodeName + RECOVERING_TERM_SUFFIX; + } + + @Override + public String toString() { + return "Terms{" + + "values=" + values + + ", version=" + version + + '}'; + } + + public int getVersion() { + return version; + } + + public Map getTerms() { + return new HashMap<>(this.values); + } + + public boolean isRecovering(String name) { + return values.containsKey(recoveringTerm(name)); + } +}