iotdb-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #1112: [IOTDB-354] Implement batch append entries
Date Wed, 06 May 2020 01:49:37 GMT

jt2594838 commented on a change in pull request #1112:
URL: https://github.com/apache/incubator-iotdb/pull/1112#discussion_r420503823



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
##########
@@ -398,7 +399,92 @@ public void appendEntry(AppendEntryRequest request, AsyncMethodCallback
resultHa
 
   @Override
   public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback resultHandler)
{
-    //TODO-Cluster#354: implement
+    logger.debug("{} received an AppendEntriesRequest", name);
+
+    // the term checked here is that of the leader, not that of the log
+    if (!checkRequestTerm(request, resultHandler)) {
+      return;
+    }
+
+    try {
+      long response = 0;
+      List<Log> logs = new ArrayList<>();
+      for (ByteBuffer buffer : request.getEntries()) {
+        Log log = LogParser.getINSTANCE().parse(buffer);
+        logs.add(log);
+      }
+
+      response = appendEntries(logs);
+      resultHandler.onComplete(response);
+      logger.debug("{} AppendEntriesRequest of log size {} completed", name,
+          request.getEntries().size());
+    } catch (UnknownLogTypeException e) {
+      resultHandler.onError(e);
+    }
+  }
+
+  /**
+   * Find the local previous log of "log". If such log is found, discard all local logs behind
it
+   * and append "log" to it. Otherwise report a log mismatch.
+   *
+   * @param logs
+   * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
+   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   */
+  private long appendEntries(List<Log> logs) {
+    if (logs.isEmpty()) {
+      return Response.RESPONSE_AGREE;
+    }
+
+    long resp;
+    synchronized (logManager) {
+      if (logs.get(0).getCurrLogIndex() > logManager.getLastLogIndex() + 1) {
+        // the incoming log points to an illegal position, reject it
+        resp = Response.RESPONSE_LOG_MISMATCH;
+      } else {
+        logManager.append(logs);

Review comment:
       LebronAl has now provided a safer method called `maybeAppendBatch` in `RaftLogManager`,
please switch to that and check the return value.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
##########
@@ -398,7 +399,92 @@ public void appendEntry(AppendEntryRequest request, AsyncMethodCallback
resultHa
 
   @Override
   public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback resultHandler)
{
-    //TODO-Cluster#354: implement
+    logger.debug("{} received an AppendEntriesRequest", name);
+
+    // the term checked here is that of the leader, not that of the log
+    if (!checkRequestTerm(request, resultHandler)) {
+      return;
+    }
+
+    try {
+      long response = 0;
+      List<Log> logs = new ArrayList<>();
+      for (ByteBuffer buffer : request.getEntries()) {
+        Log log = LogParser.getINSTANCE().parse(buffer);
+        logs.add(log);
+      }
+
+      response = appendEntries(logs);
+      resultHandler.onComplete(response);
+      logger.debug("{} AppendEntriesRequest of log size {} completed", name,
+          request.getEntries().size());
+    } catch (UnknownLogTypeException e) {
+      resultHandler.onError(e);
+    }
+  }
+
+  /**
+   * Find the local previous log of "log". If such log is found, discard all local logs behind
it
+   * and append "log" to it. Otherwise report a log mismatch.
+   *
+   * @param logs
+   * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
+   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   */
+  private long appendEntries(List<Log> logs) {
+    if (logs.isEmpty()) {
+      return Response.RESPONSE_AGREE;
+    }
+
+    long resp;
+    synchronized (logManager) {
+      if (logs.get(0).getCurrLogIndex() > logManager.getLastLogIndex() + 1) {
+        // the incoming log points to an illegal position, reject it
+        resp = Response.RESPONSE_LOG_MISMATCH;
+      } else {
+        logManager.append(logs);
+        if (logger.isDebugEnabled()) {
+          logger.debug("{} append new logs list {}", name, logs);
+        }
+        resp = Response.RESPONSE_AGREE;
+      }
+    }
+    return resp;
+  }
+
+  /**
+   * Check the term of the AppendEntryRequest. The term checked is the term of the leader,
not the
+   * term of the log. A new leader can still send logs of old leaders.
+   *
+   * @param request
+   * @param resultHandler if the term is illegal, the "resultHandler" will be invoked so
the caller
+   *                      does not need to invoke it again
+   * @return true if the term is legal, false otherwise
+   */
+  private boolean checkRequestTerm(AppendEntriesRequest request,
+      AsyncMethodCallback resultHandler) {
+    long leaderTerm = request.getTerm();
+    long localTerm;
+
+    synchronized (term) {
+      // if the request comes before the heartbeat arrives, the local term may be smaller
than the
+      // leader term
+      localTerm = term.get();
+      if (leaderTerm < localTerm) {
+        logger.debug("{} rejected the AppendEntryRequest for term: {}/{}", name, leaderTerm,

Review comment:
       The implementation of `checkRequestTerm` for `AppendEntryRequest` has been changed
slightly, so please see to it and make necessary modifications.
   By the way, the log message should use `AppendEntriesRequest` here instead of `AppendEntriesRequest`.





----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message