ratis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [incubator-ratis] branch master updated: RATIS-470. De-couple the LogService "index" from the RaftLog "index"
Date Wed, 30 Jan 2019 18:37:25 GMT
This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 447ccfb  RATIS-470. De-couple the LogService "index" from the RaftLog "index"
447ccfb is described below

commit 447ccfb1f152a605c10e06f940df17bc5de6ea37
Author: Josh Elser <elserj@apache.org>
AuthorDate: Wed Jan 9 11:22:13 2019 -0500

    RATIS-470. De-couple the LogService "index" from the RaftLog "index"
    
    We've been using the Raft log index as the "record offset" that we expose
    to users via the LogService API. However, because one message that we push
    to the Raft log may contain many LogService records, this generates
    incorrect results.
    
    We have to "hide" the Raft log index, and identify just the LogService records
    so that we're exposing only their Log's information (e.g. none of the interal
    Raft quorum configuration entries).
    
    Closes #7
---
 .../org/apache/ratis/logservice/api/LogWriter.java |   7 +-
 .../ratis/logservice/impl/LogWriterImpl.java       |  38 ++++---
 .../ratis/logservice/server/LogStateMachine.java   | 114 ++++++++++++++++++---
 .../ratis/logservice/util/LogServiceProtoUtil.java |   7 +-
 .../ratis/logservice/LogServiceReadWriteBase.java  |  63 +++++++++++-
 .../ratis/logservice/server/TestMetaServer.java    |   5 +-
 6 files changed, 185 insertions(+), 49 deletions(-)

diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogWriter.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogWriter.java
index 5a51a3c..52b487a 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogWriter.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogWriter.java
@@ -42,12 +42,7 @@ public interface LogWriter extends AutoCloseable {
    * @param records Records to append
    * @return The largest recordId assigned to the records written
    */
-  default long write(List<ByteBuffer> records) throws IOException {
-    for (ByteBuffer record : records) {
-      write(record);
-    }
-    return records.size();
-  }
+  long write(List<ByteBuffer> records) throws IOException;
 
   /**
    * Guarantees that all previous data appended by {@link #write(ByteBuffer)} are persisted
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
index e7a7d4a..e408d1e 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
@@ -62,27 +62,25 @@ public class LogWriterImpl implements LogWriter {
     List<ByteBuffer> list = new ArrayList<ByteBuffer>();
     list.add(data);
      return write(list);
-   }
-
-   @Override
-   public long write(List<ByteBuffer> list) throws IOException {
+  }
 
-     try {
-       RaftClientReply reply = raftClient.send(Message.valueOf(
-         LogServiceProtoUtil.toAppendBBEntryLogRequestProto(parent.getName(), list).toByteString()));
-       AppendLogEntryReplyProto proto =
-           AppendLogEntryReplyProto.parseFrom(reply.getMessage().getContent());
-       if (proto.hasException()) {
-         LogServiceException e = proto.getException();
-         throw new IOException(e.getErrorMsg());
-       }
-       List<Long> ids = proto.getRecordIdList();
-       // The above call Always returns one id (regardless of a batch size)
-       return ids.get(0);
-     } catch (Exception e) {
-       throw new IOException(e);
-   }
- }
+  @Override
+  public long write(List<ByteBuffer> list) throws IOException {
+    try {
+      RaftClientReply reply = raftClient.send(
+          Message.valueOf(LogServiceProtoUtil.toAppendBBEntryLogRequestProto(parent.getName(),
list).toByteString()));
+      AppendLogEntryReplyProto proto = AppendLogEntryReplyProto.parseFrom(reply.getMessage().getContent());
+      if (proto.hasException()) {
+        LogServiceException e = proto.getException();
+        throw new IOException(e.getErrorMsg());
+      }
+      List<Long> ids = proto.getRecordIdList();
+      // The above call Always returns one id (regardless of a batch size)
+      return ids.get(0);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
 
  @Override
  public long sync() throws IOException {
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
index 900aac1..a8aecd8 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -31,7 +31,15 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.ratis.logservice.api.LogName;
-import org.apache.ratis.logservice.proto.LogServiceProtos.*;
+import org.apache.ratis.logservice.proto.LogServiceProtos.AppendLogEntryRequestProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.CloseLogReplyProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.CloseLogRequestProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.GetLogLengthRequestProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.GetLogSizeRequestProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.GetStateRequestProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto.RequestCase;
+import org.apache.ratis.logservice.proto.LogServiceProtos.ReadLogRequestProto;
 import org.apache.ratis.logservice.util.LogServiceProtoUtil;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -50,6 +58,8 @@ import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
+import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,8 +74,13 @@ public class LogStateMachine extends BaseStateMachine {
   /*
    *  State is a log's length, size, and state (closed/open);
    */
-  private long size;
   private long length;
+
+  /**
+   * The size (number of bytes) of the log records. Does not include Ratis storage overhead
+   */
+  private long dataRecordsSize;
+
   private State state = State.OPEN;
 
   private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
@@ -91,7 +106,7 @@ public class LogStateMachine extends BaseStateMachine {
    */
   void reset() {
     this.length = 0;
-    this.size = 0;
+    this.dataRecordsSize = 0;
     setLastAppliedTermIndex(null);
   }
 
@@ -132,7 +147,7 @@ public class LogStateMachine extends BaseStateMachine {
         final ObjectOutputStream out = new ObjectOutputStream(
         new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
       out.writeLong(length);
-      out.writeLong(size);
+      out.writeLong(dataRecordsSize);
       out.writeObject(state);
     } catch(IOException ioe) {
       LOG.warn("Failed to write snapshot file \"" + snapshotFile
@@ -166,7 +181,7 @@ public class LogStateMachine extends BaseStateMachine {
       }
       setLastAppliedTermIndex(last);
       this.length = in.readLong();
-      this.size = in.readLong();
+      this.dataRecordsSize = in.readLong();
       this.state = (State) in.readObject();
     } catch (ClassNotFoundException e) {
       throw new IllegalStateException(e);
@@ -187,6 +202,9 @@ public class LogStateMachine extends BaseStateMachine {
       checkInitialization();
       LogServiceRequestProto logServiceRequestProto =
           LogServiceRequestProto.parseFrom(request.getContent());
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Processing LogService query: {}", TextFormat.shortDebugString(logServiceRequestProto));
+      }
 
       switch (logServiceRequestProto.getRequestCase()) {
 
@@ -253,9 +271,9 @@ public class LogStateMachine extends BaseStateMachine {
   private CompletableFuture<Message> processGetSizeRequest(LogServiceRequestProto proto)
{
     GetLogSizeRequestProto msgProto = proto.getSizeRequest();
     Throwable t = verifyState(State.OPEN);
-    LOG.debug("QUERY: {}, RESULT: {}", msgProto, this.size);
+    LOG.debug("QUERY: {}, RESULT: {}", msgProto, this.dataRecordsSize);
     return CompletableFuture.completedFuture(Message
-      .valueOf(LogServiceProtoUtil.toGetLogSizeReplyProto(this.size, t).toByteString()));
+      .valueOf(LogServiceProtoUtil.toGetLogSizeReplyProto(this.dataRecordsSize, t).toByteString()));
   }
 
   private CompletableFuture<Message> processGetLengthRequest(LogServiceRequestProto
proto) {
@@ -274,26 +292,88 @@ public class LogStateMachine extends BaseStateMachine {
 
     ReadLogRequestProto msgProto = proto.getReadNextQuery();
     long startRecordId = msgProto.getStartRecordId();
-    int num = msgProto.getNumRecords();
+    int numRecordsToRead = msgProto.getNumRecords();
     Throwable t = verifyState(State.OPEN);
     List<byte[]> list = new ArrayList<byte[]>();
     LOG.info("Start Index: {}", startRecordId);
-    LOG.info("Total to read: {}", num);
+    LOG.info("Total to read: {}", numRecordsToRead);
+    long raftLogIndex = log.getStartIndex();
+    if (t == null) {
+      // Seek to first entry
+      long logServiceIndex = 0;
+      while (logServiceIndex < startRecordId) {
+        try {
+          LogEntryProto entry = log.get(raftLogIndex);
+          // Skip "meta" entries
+          if (entry == null || entry.hasConfigurationEntry()) {
+            raftLogIndex++;
+            continue;
+          }
+
+          LogServiceRequestProto logServiceProto =
+              LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
+          // TODO is it possible to get LogService messages that aren't appends?
+          if (RequestCase.APPENDREQUEST != logServiceProto.getRequestCase()) {
+            raftLogIndex++;
+            continue;
+          }
+
+          AppendLogEntryRequestProto append = logServiceProto.getAppendRequest();
+          int numRecordsInAppend = append.getDataCount();
+          if (logServiceIndex + numRecordsInAppend > startRecordId) {
+            // The starting record is within this raft log entry.
+            break;
+          }
+          // We didn't find the log record, increment the logService record counter
+          logServiceIndex += numRecordsInAppend;
+          // And increment the raft log index
+          raftLogIndex++;
+        } catch (RaftLogIOException e) {
+          t = e;
+          list = null;
+          break;
+        } catch (InvalidProtocolBufferException e) {
+          LOG.error("Failed to read LogService protobuf from Raft log", e);
+          t = e;
+          list = null;
+          break;
+        }
+      }
+    }
+    LOG.debug("Starting to read {} logservice records starting at raft log index {}", numRecordsToRead,
raftLogIndex);
     if (t == null) {
-      for (long index = startRecordId; index < startRecordId + num; index++) {
+      // Make sure we don't read off the end of the Raft log
+      for (long index = raftLogIndex; index < log.getLastCommittedIndex(); index++) {
         try {
           LogEntryProto entry = log.get(index);
-          LOG.info("Index: {} Entry: {}", index, entry);
+          LOG.trace("Index: {} Entry: {}", index, entry);
           if (entry == null || entry.hasConfigurationEntry()) {
             continue;
           }
-          //TODO: how to distinguish log records from
-          // DML commands logged by the service?
-          list.add(entry.getStateMachineLogEntry().getLogData().toByteArray());
+
+          LogServiceRequestProto logServiceProto =
+              LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
+          // TODO is it possible to get LogService messages that aren't appends?
+          if (RequestCase.APPENDREQUEST != logServiceProto.getRequestCase()) {
+            continue;
+          }
+
+          AppendLogEntryRequestProto append = logServiceProto.getAppendRequest();
+          for (int i = 0; i < append.getDataCount() && list.size() < numRecordsToRead;
i++) {
+            list.add(append.getData(i).toByteArray());
+          }
+          if (list.size() == numRecordsToRead) {
+            break;
+          }
         } catch (RaftLogIOException e) {
           t = e;
           list = null;
           break;
+        } catch (InvalidProtocolBufferException e) {
+          LOG.error("Failed to read LogService protobuf from Raft log", e);
+          t = e;
+          list = null;
+          break;
         }
       }
     }
@@ -330,7 +410,7 @@ public class LogStateMachine extends BaseStateMachine {
           for (byte[] bb : entries) {
             newSize += bb.length;
           }
-          this.size += newSize;
+          this.dataRecordsSize += newSize;
           this.length += entries.size();
           // TODO do we need this for other write request (close, sync)
           updateLastAppliedTermIndex(entry.getTerm(), index);
@@ -342,9 +422,9 @@ public class LogStateMachine extends BaseStateMachine {
         CompletableFuture.completedFuture(
           Message.valueOf(LogServiceProtoUtil.toAppendLogReplyProto(ids, t).toByteString()));
     final RaftProtos.RaftPeerRole role = trx.getServerRole();
-    LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, proto, length);
+    LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, proto, dataRecordsSize);
     if (LOG.isTraceEnabled()) {
-      LOG.trace("{}-{}: variables={}", getId(), index, length);
+      LOG.trace("{}-{}: variables={}", getId(), index, dataRecordsSize);
     }
     return f;
   }
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
index 00b6e2a..2cc4c58 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
@@ -145,7 +145,12 @@ public class LogServiceProtoUtil {
     AppendLogEntryRequestProto.Builder builder = AppendLogEntryRequestProto.newBuilder();
     builder.setLogName(logNameProto);
     for (int i=0; i < entries.size(); i++) {
-      builder.addData(ByteString.copyFrom(entries.get(i)));
+      ByteBuffer currentBuf = entries.get(i);
+      // Save the current position
+      int pos = currentBuf.position();
+      builder.addData(ByteString.copyFrom(currentBuf));
+      // Reset it after we're done reading the bytes
+      currentBuf.position(pos);
     }
     return LogServiceRequestProto.newBuilder().setAppendRequest(builder.build()).build();
   }
diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
index 8c08f44..047a9ee 100644
--- a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
+++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
@@ -18,9 +18,12 @@
 package org.apache.ratis.logservice;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.ratis.BaseTest;
@@ -101,10 +104,58 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
       long startId = logStream.getStartRecordId();
       LOG.info("start id {}", startId);
 
-      reader.seek(lastId + 1);
+      reader.seek(startId);
       // Read records back
-      List<ByteBuffer> data = reader.readBulk(1);
-      assertEquals(1, data.size());
+      List<ByteBuffer> data = reader.readBulk(records.size());
+      assertEquals(records.size(), data.size());
+
+      // Make sure we got the same 10 records that we wrote.
+      Iterator<ByteBuffer> expectedIter = records.iterator();
+      Iterator<ByteBuffer> actualIter = data.iterator();
+      while (expectedIter.hasNext() && actualIter.hasNext()) {
+        ByteBuffer expected = expectedIter.next();
+        ByteBuffer actual = actualIter.next();
+        assertEquals(expected, actual);
+      }
+    }
+  }
+
+  @Test
+  public void testSeeking() throws Exception {
+    final RaftClient raftClient =
+        RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
+            .build();
+    final LogName logName = LogName.of("log1");
+    final int numRecords = 100;
+    // TODO need API to circumvent metadata service for testing
+    try (LogStream logStream = new LogStreamImpl(logName, raftClient)) {
+      try (LogWriter writer = logStream.createWriter()) {
+        LOG.info("Writing {} records", numRecords);
+        // Write records 0 through 99 (inclusive)
+        for (int i = 0; i < numRecords; i++) {
+          writer.write(ByteBuffer.wrap(Integer.toString(i).getBytes(StandardCharsets.UTF_8)));
+        }
+      }
+
+      LOG.debug("Seek and read'ing records");
+      try (LogReader reader = logStream.createReader()) {
+        for (int i = 9; i < numRecords; i += 10) {
+          LOG.info("Seeking to {}", i);
+          reader.seek(i);
+          LOG.info("Reading one record");
+          ByteBuffer bb = reader.readNext();
+          assertEquals(i, fromBytes(bb));
+        }
+
+        assertTrue("We're expecting at least two records were written", numRecords > 1);
+        for (int i = numRecords - 2; i >= 0; i -= 6) {
+          LOG.info("Seeking to {}", i);
+          reader.seek(i);
+          LOG.info("Reading one record");
+          ByteBuffer bb = reader.readNext();
+          assertEquals(i, fromBytes(bb));
+        }
+      }
     }
   }
 
@@ -112,4 +163,10 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
   public void tearDown() {
     cluster.shutdown();
   }
+
+  private int fromBytes(ByteBuffer bb) {
+    byte[] bytes = new byte[bb.remaining()];
+    System.arraycopy(bb.array(), bb.arrayOffset(), bytes, 0, bb.remaining());
+    return Integer.parseInt(new String(bytes, StandardCharsets.UTF_8));
+  }
 }
diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
index cbdd2b5..47aa60a 100644
--- a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
+++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
@@ -36,7 +36,9 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.stream.IntStream;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 
@@ -95,8 +97,7 @@ public class TestMetaServer {
         }
 //        assert(stream.getSize() > 0); //TODO: Doesn't work
         LogReader reader = stream.createReader();
-        ByteBuffer res = reader.readNext(); //TODO: first is conf log entry
-        res = reader.readNext();
+        ByteBuffer res = reader.readNext();
         assert(res.array().length > 0);
     }
 


Mime
View raw message