iotdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l..@apache.org
Subject [iotdb] branch cluster_scalability updated: This commit fix following issues: 1. fix a bug of remove the leader of a data group, the election can not be triggered immediately. 2. fix a bug of read same ByteBuffer when parallel send log to all data groups, which is fixed by adding clone ByteBuffer when split plan. 3. When remove a node, the data group whose header is this node can not reset leader to null which will cause slow of data migration 4. The election of data group does not need to make sure that meta log is newest too.
Date Thu, 25 Mar 2021 06:39:36 GMT
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster_scalability by this push:
     new f3df198  This commit fix following issues: 1. fix a bug of remove the leader of a
data group, the election can not be triggered immediately. 2. fix a bug of read same ByteBuffer
when parallel send log to all data groups, which is fixed by adding clone ByteBuffer when
split plan. 3. When remove a node, the data group whose header is this node can not reset
leader to null which will cause slow of data migration 4. The election of data group does
not need to make sure that meta log  [...]
f3df198 is described below

commit f3df1985c6470b535c5fc85bd5c9c895dcb34fdf
Author: lta <litiananfa@163.com>
AuthorDate: Thu Mar 25 14:33:17 2021 +0800

    This commit fix following issues:
    1. fix a bug of remove the leader of a data group, the election can not be triggered immediately.
    2. fix a bug of read same ByteBuffer when parallel send log to all data groups, which
is fixed by adding clone ByteBuffer when split plan.
    3. When remove a node, the data group whose header is this node can not reset leader to
null which will cause slow of data migration
    4. The election of data group does not need to make sure that meta log is newest too.
---
 .../iotdb/cluster/log/logtypes/AddNodeLog.java     |  2 +-
 .../iotdb/cluster/log/logtypes/RemoveNodeLog.java  |  2 +-
 .../iotdb/cluster/log/snapshot/FileSnapshot.java   |  3 --
 .../cluster/log/snapshot/PullSnapshotTask.java     |  2 -
 .../iotdb/cluster/server/DataClusterServer.java    | 53 +++++++++---------
 .../org/apache/iotdb/cluster/server/Response.java  | 13 ++---
 .../server/handlers/caller/HeartbeatHandler.java   |  4 +-
 .../server/heartbeat/DataHeartbeatThread.java      |  6 ---
 .../cluster/server/heartbeat/HeartbeatThread.java  | 17 +++---
 .../cluster/server/member/DataGroupMember.java     | 63 ++--------------------
 .../cluster/server/member/MetaGroupMember.java     |  5 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 10 ++++
 .../server/heartbeat/DataHeartbeatThreadTest.java  |  2 -
 .../cluster/server/member/DataGroupMemberTest.java | 16 ------
 .../apache/iotdb/db/qp/physical/sys/LogPlan.java   |  5 +-
 .../java/org/apache/iotdb/db/utils/IOUtils.java    |  9 ++++
 thrift/src/main/thrift/cluster.thrift              |  2 -
 17 files changed, 74 insertions(+), 140 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
index 3e5268b..5ce4aba 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
@@ -68,7 +68,7 @@ public class AddNodeLog extends Log {
   }
 
   public ByteBuffer getPartitionTable() {
-    partitionTable.clear();
+    partitionTable.rewind();
     return partitionTable;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
index f5a7b23..8609ecf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
@@ -54,7 +54,7 @@ public class RemoveNodeLog extends Log {
   }
 
   public ByteBuffer getPartitionTable() {
-    partitionTable.clear();
+    partitionTable.rewind();
     return partitionTable;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
index c4b3f98..6ff8bc4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
@@ -212,9 +212,6 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot
{
 
     private void installSnapshot(Map<Integer, FileSnapshot> snapshotMap)
         throws SnapshotInstallationException {
-      // for new node, it should sync meta log
-      dataGroupMember.getMetaGroupMember().waitUtil(dataGroupMember.getMetaGroupMember().getPartitionTable().getLastMetaLogIndex()
- 1);
-
       // In data migration, meta group member other than new node does not need to synchronize
the leader,
       // because data migration must be carried out after meta group applied add/remove node
log.
       for (Entry<Integer, FileSnapshot> integerSnapshotEntry : snapshotMap.entrySet())
{
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index 265ed32..e1ae342 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -174,7 +174,6 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void>
{
     request.setRequiredSlots(descriptor.getSlots());
     request.setRequireReadOnly(descriptor.isRequireReadOnly());
 
-    long startTime = System.currentTimeMillis();
     logger.info("{}: data migration starts.", newMember.getName());
     boolean finished = false;
     int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode())
- 1;
@@ -202,7 +201,6 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void>
{
         }
       }
     }
-    logger.info("{}: data migration ends, cost {}ms", newMember.getName(), (System.currentTimeMillis()
- startTime));
     removeTask();
     return null;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index d5a33a5..c0dbc26 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -541,7 +541,9 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
    */
   public void addNode(Node node, NodeAdditionResult result) {
     // If the node executed adding itself to the cluster, it's unnecessary to add new groups
because they already exist.
+    // Just pull snapshot.
     if (node.equals(thisNode)) {
+      pullSnapshots();
       return;
     }
     Iterator<Entry<RaftNode, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator();
@@ -559,25 +561,38 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
       }
 
       if (logger.isDebugEnabled()) {
-        logger.debug("Data cluster server: start to handle new groups when adding new node
{}", node);
+        logger
+            .debug("Data cluster server: start to handle new groups when adding new node
{}", node);
       }
-      // pull snapshot has already done when the new node starts.
-      if (!node.equals(thisNode)) {
-        for (PartitionGroup newGroup : result.getNewGroupList()) {
-          if (newGroup.contains(thisNode)) {
-            logger.info("Adding this node into a new group {}", newGroup);
-            DataGroupMember dataGroupMember = dataMemberFactory.create(newGroup, thisNode);
-            addDataGroupMember(dataGroupMember);
-            dataGroupMember
-                .pullNodeAdditionSnapshots(((SlotPartitionTable) partitionTable).getNodeSlots(node,
-                    newGroup.getId()), node);
-          }
+      for (PartitionGroup newGroup : result.getNewGroupList()) {
+        if (newGroup.contains(thisNode)) {
+          logger.info("Adding this node into a new group {}", newGroup);
+          DataGroupMember dataGroupMember = dataMemberFactory.create(newGroup, thisNode);
+          addDataGroupMember(dataGroupMember);
+          dataGroupMember
+              .pullNodeAdditionSnapshots(((SlotPartitionTable) partitionTable).getNodeSlots(node,
+                  newGroup.getId()), node);
         }
       }
     }
   }
 
   /**
+   * When the node joins a cluster, it also creates a new data group and a corresponding
member
+   * which has no data. This is to make that member pull data from other nodes.
+   */
+  public void pullSnapshots() {
+    for (int raftId = 0; raftId < ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor();
+        raftId++) {
+      RaftNode raftNode = new RaftNode(thisNode, raftId);
+      List<Integer> slots = ((SlotPartitionTable) partitionTable).getNodeSlots(raftNode);
+      DataGroupMember dataGroupMember = headerGroupMap.get(raftNode);
+      dataGroupMember.pullNodeAdditionSnapshots(slots, thisNode);
+    }
+  }
+
+
+  /**
    * Make sure the group will not receive new raft logs
    * @param header
    * @param dataGroupMember
@@ -699,20 +714,6 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
   }
 
   /**
-   * When the node joins a cluster, it also creates a new data group and a corresponding
member
-   * When the node joins a cluster, it also creates a new data group and a corresponding
member
-   * which has no data. This is to make that member pull data from other nodes.
-   */
-  public void pullSnapshots() {
-    for (int raftId = 0; raftId < ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor();
raftId++) {
-      RaftNode raftNode = new RaftNode(thisNode, raftId);
-      List<Integer> slots = ((SlotPartitionTable) partitionTable).getNodeSlots(raftNode);
-      DataGroupMember dataGroupMember = headerGroupMap.get(raftNode);
-      dataGroupMember.pullNodeAdditionSnapshots(slots, thisNode);
-    }
-  }
-
-  /**
    * @return The reports of every DataGroupMember in this node.
    */
   public List<DataMemberReport> genMemberReports() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
index 50f41ea..3573f2f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
@@ -37,22 +37,19 @@ public class Response {
   public static final long RESPONSE_IDENTIFIER_CONFLICT = -5;
   // the requested node is unreachable in the network
   public static final long RESPONSE_NO_CONNECTION = -6;
-  // the meta logs of the data group leader candidate is older than the voter, so its partition
-  // table is potentially older and such a node cannot be allowed to be the leader
-  public static final long RESPONSE_META_LOG_STALE = -7;
   // the node does not give a vote because its leader does not time out. This is to avoid
a
   // node which cannot connect to the leader changing the leader in the group frequently.
-  public static final long RESPONSE_LEADER_STILL_ONLINE = -8;
+  public static final long RESPONSE_LEADER_STILL_ONLINE = -7;
   // the operation is rejected because the cluster will not be able to have enough replicas
after
   // this operation
-  public static final long RESPONSE_CLUSTER_TOO_SMALL = -9;
+  public static final long RESPONSE_CLUSTER_TOO_SMALL = -8;
   // the new node, which tries to join the cluster, contains conflicted parameters with the
   // cluster, so the operation is rejected.
-  public static final long RESPONSE_NEW_NODE_PARAMETER_CONFLICT = -10;
+  public static final long RESPONSE_NEW_NODE_PARAMETER_CONFLICT = -9;
   // add/remove node operations should one by one
-  public static final long RESPONSE_CHANGE_MEMBERSHIP_CONFLICT = -11;
+  public static final long RESPONSE_CHANGE_MEMBERSHIP_CONFLICT = -10;
   // the data migration of previous add/remove node operations is not finished.
-  public static final long RESPONSE_DATA_MIGRATION_NOT_FINISH = -12;
+  public static final long RESPONSE_DATA_MIGRATION_NOT_FINISH = -11;
   // the request is not executed locally anc should be forwarded
   public static final long RESPONSE_NULL = Long.MIN_VALUE;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index 508052d..c3cd3d2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -50,8 +50,10 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
 
   @Override
   public void onComplete(HeartBeatResponse resp) {
-    logger.trace("{}: Received a heartbeat response", memberName);
     long followerTerm = resp.getTerm();
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: Received a heartbeat response {}", memberName, followerTerm);
+    }
     if (followerTerm == RESPONSE_AGREE) {
       // current leadership is still valid
       handleNormalHeartbeatResponse(resp);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThread.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThread.java
index 3547c17..0e62213 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThread.java
@@ -62,12 +62,6 @@ public class DataHeartbeatThread extends HeartbeatThread {
       return;
     }
     electionRequest.setHeader(dataGroupMember.getHeader());
-    electionRequest
-        .setLastLogTerm(dataGroupMember.getMetaGroupMember().getLogManager().getLastLogTerm());
-    electionRequest
-        .setLastLogIndex(dataGroupMember.getMetaGroupMember().getLogManager().getLastLogIndex());
-    electionRequest.setDataLogLastIndex(dataGroupMember.getLogManager().getLastLogIndex());
-    electionRequest.setDataLogLastTerm(dataGroupMember.getLogManager().getLastLogTerm());
 
     super.startElection();
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index c590e9b..bfc0865 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -82,7 +82,9 @@ public class HeartbeatThread implements Runnable {
           case LEADER:
             // send heartbeats to the followers
             sendHeartbeats();
-            Thread.sleep(RaftServer.getHeartBeatIntervalMs());
+            synchronized (localMember.getHeartBeatWaitObject()) {
+              localMember.getHeartBeatWaitObject().wait(RaftServer.getHeartBeatIntervalMs());
+            }
             hasHadLeader = true;
             break;
           case FOLLOWER:
@@ -97,7 +99,9 @@ public class HeartbeatThread implements Runnable {
             } else {
               logger.debug("{}: Heartbeat from leader {} is still valid", memberName,
                   localMember.getLeader());
-              Thread.sleep(RaftServer.getConnectionTimeoutInMS());
+              synchronized (localMember.getHeartBeatWaitObject()) {
+                localMember.getHeartBeatWaitObject().wait(RaftServer.getConnectionTimeoutInMS());
+              }
             }
             hasHadLeader = true;
             break;
@@ -267,6 +271,7 @@ public class HeartbeatThread implements Runnable {
   // enable timeout
   void startElection() {
     if (localMember.isSkipElection()) {
+      logger.info("{}: Skip election because this node has stopped.", memberName);
       return;
     }
     synchronized (localMember.getTerm()) {
@@ -291,12 +296,8 @@ public class HeartbeatThread implements Runnable {
 
       electionRequest.setTerm(nextTerm);
       electionRequest.setElector(localMember.getThisNode());
-      if (!electionRequest.isSetLastLogIndex()) {
-        // these field are overridden in DataGroupMember, they will be set to the term and
index
-        // of the MetaGroupMember that manages the DataGroupMember so we cannot overwrite
them
-        electionRequest.setLastLogTerm(localMember.getLogManager().getLastLogTerm());
-        electionRequest.setLastLogIndex(localMember.getLogManager().getLastLogIndex());
-      }
+      electionRequest.setLastLogTerm(localMember.getLogManager().getLastLogTerm());
+      electionRequest.setLastLogIndex(localMember.getLogManager().getLastLogIndex());
 
       requestVote(localMember.getAllNodes(), electionRequest, nextTerm, quorum,
           electionTerminated, electionValid, failingVoteCounter);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 5142796..9dd9e1a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -326,7 +326,8 @@ public class DataGroupMember extends RaftMember {
           // if the leader is removed, also start an election immediately
           synchronized (term) {
             setCharacter(NodeCharacter.ELECTOR);
-            setLastHeartbeatReceivedTime(Long.MIN_VALUE);
+            setLeader(null);
+            getHeartBeatWaitObject().notifyAll();
           }
         }
         return removedNode.equals(thisNode);
@@ -336,63 +337,6 @@ public class DataGroupMember extends RaftMember {
   }
 
   /**
-   * Process the election request from another node in the group. To win the vote from the
local
-   * member, a node must have both meta and data logs no older than then local member, or
it will be
-   * turned down.
-   *
-   * @return Response.RESPONSE_META_LOG_STALE if the meta logs of the elector fall behind
-   * Response.RESPONSE_LOG_MISMATCH if the data logs of the elector fall behind Response.SUCCESS
if
-   * the vote is given to the elector the term of local member if the elector's term is no
bigger
-   * than the local member
-   */
-  @Override
-  long checkElectorLogProgress(ElectionRequest electionRequest) {
-    // to be a data group leader, a node should also be qualified to be the meta group leader
-    // which guarantees the data group leader has the newest partition table.
-    long thatTerm = electionRequest.getTerm();
-    long thatMetaLastLogIndex = electionRequest.getLastLogIndex();
-    long thatMetaLastLogTerm = electionRequest.getLastLogTerm();
-    long thatDataLastLogIndex = electionRequest.getDataLogLastIndex();
-    long thatDataLastLogTerm = electionRequest.getDataLogLastTerm();
-    logger.info(
-        "{} received an dataGroup election request, term:{}, metaLastLogIndex:{}, metaLastLogTerm:{},
dataLastLogIndex:{}, dataLastLogTerm:{}",
-        name, thatTerm, thatMetaLastLogIndex, thatMetaLastLogTerm, thatDataLastLogIndex,
-        thatDataLastLogTerm);
-
-    // check meta logs
-    // term of the electors' MetaGroupMember is not verified, so 0 and 1 are used to make
sure
-    // the verification does not fail
-    long metaResponse = metaGroupMember.checkLogProgress(thatMetaLastLogIndex, thatMetaLastLogTerm);
-    if (metaResponse == Response.RESPONSE_LOG_MISMATCH) {
-      return Response.RESPONSE_META_LOG_STALE;
-    }
-
-    long resp = checkLogProgress(thatDataLastLogIndex, thatDataLastLogTerm);
-    if (resp == Response.RESPONSE_AGREE) {
-      logger.info(
-          "{} accepted an dataGroup election request, term:{}/{}, dataLogIndex:{}/{}, dataLogTerm:{}/{},
metaLogIndex:{}/{},metaLogTerm:{}/{}",
-          name, thatTerm, term.get(), thatDataLastLogIndex, logManager.getLastLogIndex(),
-          thatDataLastLogTerm,
-          logManager.getLastLogTerm(), thatMetaLastLogIndex,
-          metaGroupMember.getLogManager().getLastLogIndex(), thatMetaLastLogTerm,
-          metaGroupMember.getLogManager().getLastLogTerm());
-      setCharacter(NodeCharacter.FOLLOWER);
-      lastHeartbeatReceivedTime = System.currentTimeMillis();
-      setVoteFor(electionRequest.getElector());
-      updateHardState(thatTerm, getVoteFor());
-    } else {
-      logger.info(
-          "{} rejected an dataGroup election request, term:{}/{}, dataLogIndex:{}/{}, dataLogTerm:{}/{},
metaLogIndex:{}/{},metaLogTerm:{}/{}",
-          name, thatTerm, term.get(), thatDataLastLogIndex, logManager.getLastLogIndex(),
-          thatDataLastLogTerm,
-          logManager.getLastLogTerm(), thatMetaLastLogIndex,
-          metaGroupMember.getLogManager().getLastLogIndex(), thatMetaLastLogTerm,
-          metaGroupMember.getLogManager().getLastLogTerm());
-    }
-    return resp;
-  }
-
-  /**
    * Deserialize and install a snapshot sent by the leader. The type of the snapshot must
be
    * currently PartitionedSnapshot with FileSnapshot inside.
    */
@@ -799,7 +743,8 @@ public class DataGroupMember extends RaftMember {
           // if the leader is removed, also start an election immediately
           synchronized (term) {
             setCharacter(NodeCharacter.ELECTOR);
-            setLastHeartbeatReceivedTime(Long.MIN_VALUE);
+            setLeader(null);
+            getHeartBeatWaitObject().notifyAll();
           }
         }
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 4144b55..95dbd87 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -603,7 +603,6 @@ public class MetaGroupMember extends RaftMember {
       logger.info("Node {} admitted this node into the cluster", node);
       ByteBuffer partitionTableBuffer = resp.partitionTableBytes;
       acceptPartitionTable(partitionTableBuffer, true);
-      getDataClusterServer().pullSnapshots();
       return true;
     } else if (resp.getRespNum() == Response.RESPONSE_IDENTIFIER_CONFLICT) {
       logger.info("The identifier {} conflicts the existing ones, regenerate a new one",
@@ -1798,10 +1797,10 @@ public class MetaGroupMember extends RaftMember {
         }
         return status;
       } else {
-        logger.warn("Forward {} to {} timed out", plan, node);
+        logger.debug("Forward {} to {} timed out", plan, node);
       }
     }
-    logger.warn("Forward {} to {} timed out", plan, group);
+    logger.debug("Forward {} to {} timed out", plan, group);
     return StatusUtils.TIME_OUT;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 89fd12d..04e9e79 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -134,6 +134,8 @@ public abstract class RaftMember {
    */
   private final Object snapshotApplyLock = new Object();
 
+  private final Object heartBeatWaitObject = new Object();
+
   protected Node thisNode = ClusterConstant.EMPTY_NODE;
 
   /**
@@ -327,6 +329,7 @@ public abstract class RaftMember {
    * induce side effects.
    */
   public void stop() {
+    setSkipElection(true);
     closeLogManager();
     if (heartBeatService == null) {
       return;
@@ -453,6 +456,9 @@ public abstract class RaftMember {
    * whether to accept by examining the log status of the elector.
    */
   public long processElectionRequest(ElectionRequest electionRequest) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: start to handle request from elector {}", name, electionRequest.getElector());
+    }
     synchronized (term) {
       long currentTerm = term.get();
       long response = checkElectorTerm(currentTerm, electionRequest.getTerm(),
@@ -1913,6 +1919,10 @@ public abstract class RaftMember {
     this.hasSyncedLeaderBeforeRemoved = hasSyncedLeaderAfterRemoved;
   }
 
+  public Object getHeartBeatWaitObject() {
+    return heartBeatWaitObject;
+  }
+
   public boolean isSkipElection() {
     return skipElection;
   }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThreadTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThreadTest.java
index f96845d..06d0415 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThreadTest.java
@@ -121,8 +121,6 @@ public class DataHeartbeatThreadTest extends HeartbeatThreadTest {
           assertEquals(11, request.getTerm());
           assertEquals(6, request.getLastLogIndex());
           assertEquals(6, request.getLastLogTerm());
-          assertEquals(13, request.getDataLogLastTerm());
-          assertEquals(13, request.getDataLogLastIndex());
           if (respondToElection) {
             resultHandler.onComplete(Response.RESPONSE_AGREE);
           }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 08f96d9..588c227 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -347,8 +347,6 @@ public class DataGroupMemberTest extends MemberTest {
     electionRequest.setTerm(1);
     electionRequest.setLastLogIndex(100);
     electionRequest.setLastLogTerm(100);
-    electionRequest.setDataLogLastTerm(100);
-    electionRequest.setDataLogLastIndex(100);
     TestHandler handler = new TestHandler();
     new DataAsyncService(dataGroupMember).startElection(electionRequest, handler);
     assertEquals(10, handler.getResponse());
@@ -376,31 +374,17 @@ public class DataGroupMemberTest extends MemberTest {
     new DataAsyncService(dataGroupMember).startElection(electionRequest, handler);
     assertEquals(Response.RESPONSE_AGREE, handler.getResponse());
 
-    // a request with larger term and stale meta log
-    // should reject election but update term
-    electionRequest.setTerm(13);
-    electionRequest.setLastLogIndex(1);
-    electionRequest.setLastLogTerm(1);
-    handler = new TestHandler();
-    new DataAsyncService(dataGroupMember).startElection(electionRequest, handler);
-    assertEquals(Response.RESPONSE_META_LOG_STALE, handler.getResponse());
-    assertEquals(13, dataGroupMember.getTerm().get());
-
     // a request with with larger term and stale data log
     // should reject election but update term
     electionRequest.setTerm(14);
     electionRequest.setLastLogIndex(100);
     electionRequest.setLastLogTerm(100);
-    electionRequest.setDataLogLastTerm(1);
-    electionRequest.setDataLogLastIndex(1);
     new DataAsyncService(dataGroupMember).startElection(electionRequest, handler);
     assertEquals(Response.RESPONSE_LOG_MISMATCH, handler.getResponse());
     assertEquals(14, dataGroupMember.getTerm().get());
 
     // a valid request with with larger term
     electionRequest.setTerm(15);
-    electionRequest.setDataLogLastTerm(100);
-    electionRequest.setDataLogLastIndex(100);
     new DataAsyncService(dataGroupMember).startElection(electionRequest, handler);
     assertEquals(Response.RESPONSE_AGREE, handler.getResponse());
     assertEquals(15, dataGroupMember.getTerm().get());
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
index d0118db..4b7293b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.utils.IOUtils;
 
 /**
  * It's used by cluster to wrap log to plan
@@ -45,11 +46,11 @@ public class LogPlan extends PhysicalPlan {
 
   public LogPlan(LogPlan plan) {
     super(false);
-    this.log = plan.log;
+    this.log = IOUtils.clone(plan.log);
   }
 
   public ByteBuffer getLog() {
-    log.clear();
+    log.rewind();
     return log;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/IOUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/IOUtils.java
index c04dfb6..79e1ede 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/IOUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/IOUtils.java
@@ -177,4 +177,13 @@ public class IOUtils {
       }
     }
   }
+
+  public static ByteBuffer clone(ByteBuffer original) {
+    ByteBuffer clone = ByteBuffer.allocate(original.capacity());
+    original.rewind();
+    clone.put(original);
+    original.rewind();
+    clone.flip();
+    return clone;
+  }
 }
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 53f5ae4..0a8a7c3 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -70,8 +70,6 @@ struct ElectionRequest {
   // member should process the request or response. Only used in data group communication.
   5: optional Node header
   6: required int raftId
-  7: optional long dataLogLastIndex
-  8: optional long dataLogLastTerm
 }
 
 // leader -> follower

Mime
View raw message