ratis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [incubator-ratis] branch master updated: RATIS-481. Encapsulate the RaftLog reading in its own class
Date Mon, 11 Feb 2019 23:28:28 GMT
This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 386f702  RATIS-481. Encapsulate the RaftLog reading in its own class
386f702 is described below

commit 386f70284277a87214115e31bc92752b443bd2e6
Author: Josh Elser <elserj@apache.org>
AuthorDate: Fri Feb 8 19:40:28 2019 -0500

    RATIS-481. Encapsulate the RaftLog reading in its own class
    
    The LogStateMachine was getting really ugly with all of the code
    to seek and read the RaftLog, unwrapping the RaftLog index to the
    LogService index.
    
    This will help encapsulate the changes for RATIS-477
    
    Closes #12
---
 .../org/apache/ratis/logservice/api/LogReader.java |   4 +-
 .../org/apache/ratis/logservice/api/LogWriter.java |   4 +-
 .../ratis/logservice/impl/LogReaderImpl.java       |   8 +-
 .../ratis/logservice/impl/LogWriterImpl.java       |  10 +-
 .../logservice/server/LogServiceRaftLogReader.java | 160 +++++++++++++++++++++
 .../ratis/logservice/server/LogStateMachine.java   | 103 +++----------
 .../ratis/logservice/LogServiceReadWriteBase.java  | 103 ++++++++++++-
 .../src/test/resources/log4j.properties            |   1 +
 8 files changed, 290 insertions(+), 103 deletions(-)

diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java
index 6db3b6e..b3bb618 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java
@@ -71,10 +71,10 @@ public interface LogReader extends AutoCloseable {
    * Each provided buffer must be capable of holding one complete record from the Log. If
the provided buffer is
    * too small, an exception will be thrown.
    *
-   * @param buffers A non-empty list of non-null ByteBuffers.
+   * @param buffers A non-empty array of non-null ByteBuffers.
    * @return The number of records returns, equivalent to the number of filled buffers.
    */
-  int readBulk(List<ByteBuffer> buffers) throws IOException;
+  int readBulk(ByteBuffer[] buffers) throws IOException;
 
   /**
    * Returns the current position of this Reader. The position is a {@code recordId}.
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogWriter.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogWriter.java
index 52b487a..fb201d9 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogWriter.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogWriter.java
@@ -40,9 +40,9 @@ public interface LogWriter extends AutoCloseable {
    * to have been written.
    *
    * @param records Records to append
-   * @return The largest recordId assigned to the records written
+   * @return The recordIds assigned to the records written
    */
-  long write(List<ByteBuffer> records) throws IOException;
+  List<Long> write(List<ByteBuffer> records) throws IOException;
 
   /**
    * Guarantees that all previous data appended by {@link #write(ByteBuffer)} are persisted
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
index 0e4600a..cbbf82d 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
@@ -149,13 +149,13 @@ public class LogReaderImpl implements LogReader {
   }
 
   @Override
-  public int readBulk(List<ByteBuffer> buffers) throws IOException {
+  public int readBulk(ByteBuffer[] buffers) throws IOException {
     Preconditions.checkNotNull(buffers, "list of buffers is NULL" );
-    Preconditions.checkArgument(buffers.size() > 0, "list of buffers is empty");
+    Preconditions.checkArgument(buffers.length > 0, "list of buffers is empty");
 
     try {
       RaftClientReply reply = raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil
-          .toReadLogRequestProto(parent.getName(), currentRecordId, buffers.size()).toByteString()));
+          .toReadLogRequestProto(parent.getName(), currentRecordId, buffers.length).toByteString()));
       ReadLogReplyProto proto = ReadLogReplyProto.parseFrom(reply.getMessage().getContent());
       if (proto.hasException()) {
         LogServiceException e = proto.getException();
@@ -165,7 +165,7 @@ public class LogReaderImpl implements LogReader {
       int n = proto.getLogRecordCount();
       currentRecordId += n;
       for (int i = 0; i < n; i++) {
-        buffers.get(i).put(proto.getLogRecord(i).toByteArray());
+        buffers[i] = ByteBuffer.wrap(proto.getLogRecord(i).toByteArray());
       }
       return n;
     } catch (Exception e) {
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
index e408d1e..d4a0f2d 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
@@ -20,6 +20,7 @@ package org.apache.ratis.logservice.impl;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
@@ -59,13 +60,12 @@ public class LogWriterImpl implements LogWriter {
 
   @Override
   public long write(ByteBuffer data) throws IOException {
-    List<ByteBuffer> list = new ArrayList<ByteBuffer>();
-    list.add(data);
-     return write(list);
+    // One record to write is always one recordId returned
+    return write(Collections.singletonList(data)).get(0);
   }
 
   @Override
-  public long write(List<ByteBuffer> list) throws IOException {
+  public List<Long> write(List<ByteBuffer> list) throws IOException {
     try {
       RaftClientReply reply = raftClient.send(
           Message.valueOf(LogServiceProtoUtil.toAppendBBEntryLogRequestProto(parent.getName(),
list).toByteString()));
@@ -76,7 +76,7 @@ public class LogWriterImpl implements LogWriter {
       }
       List<Long> ids = proto.getRecordIdList();
       // The above call Always returns one id (regardless of a batch size)
-      return ids.get(0);
+      return ids;
     } catch (Exception e) {
       throw new IOException(e);
     }
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServiceRaftLogReader.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServiceRaftLogReader.java
new file mode 100644
index 0000000..ef0eb83
--- /dev/null
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServiceRaftLogReader.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.server;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.NoSuchElementException;
+
+import org.apache.ratis.logservice.proto.LogServiceProtos.AppendLogEntryRequestProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto.RequestCase;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftLogIOException;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A reader for the {@link RaftLog} which is accessed using LogService recordId's instead
+ * of Raft log indexes. Not thread-safe.
+ */
+public class LogServiceRaftLogReader {
+  private static final Logger LOG = LoggerFactory.getLogger(LogServiceRaftLogReader.class);
+  private final RaftLog raftLog;
+
+  private long currentRecordId = -1;
+  private long currentRaftIndex = -1;
+  private AppendLogEntryRequestProto currentLogEntry = null;
+  private int currentLogEntryOffset = -1;
+  private ByteString currentRecord = null;
+
+  public LogServiceRaftLogReader(RaftLog raftLog) {
+    this.raftLog = requireNonNull(raftLog);
+  }
+
+  /**
+   * Positions this reader just before the current recordId. Use {@link #next()} to get that
+   * element, but take care to check if a value is present using {@link #hasNext()} first.
+   */
+  public void seek(long recordId) throws RaftLogIOException, InvalidProtocolBufferException
{
+    LOG.debug("Seeking to recordId={}", recordId);
+    // RaftLog starting index
+    currentRaftIndex = raftLog.getStartIndex();
+    currentRecordId = 0;
+
+    currentLogEntry = null;
+    currentLogEntryOffset = -1;
+    currentRecord = null;
+
+    loadNext();
+    while (currentRecordId < recordId && hasNext()) {
+      next();
+      currentRecordId++;
+    }
+  }
+
+  /**
+   * Returns true if there is a log entry to read.
+   */
+  public boolean hasNext() throws RaftLogIOException, InvalidProtocolBufferException {
+    return currentRecord != null;
+  }
+
+  /**
+   * Returns the next log entry. Ensure {@link #hasNext()} returns true before
+   * calling this method.
+   */
+  public ByteString next() throws RaftLogIOException, InvalidProtocolBufferException {
+    if (currentRecord == null) {
+      throw new NoSuchElementException();
+    }
+    ByteString current = currentRecord;
+    currentRecord = null;
+    loadNext();
+    return current;
+  }
+
+  /**
+   * Finds the next record from the RaftLog and sets it as {@link #currentRecord}.
+   */
+  private void loadNext() throws RaftLogIOException, InvalidProtocolBufferException {
+    // Clear the old "current" record
+    currentRecord = null;
+    LOG.debug("Loading next value: raftIndex={}, recordId={}, proto='{}', offset={}",
+        currentRaftIndex, currentRecordId,
+        currentLogEntry == null ? "null" : TextFormat.shortDebugString(currentLogEntry),
+            currentLogEntryOffset);
+    // Continue iterating over the current entry.
+    if (currentLogEntry != null) {
+      assert currentLogEntryOffset != -1;
+      currentLogEntryOffset++;
+
+      // We have an element to read from our current entry
+      if (currentLogEntryOffset < currentLogEntry.getDataCount()) {
+        currentRecord = currentLogEntry.getData(currentLogEntryOffset);
+        return;
+      }
+      // We don't have an element in our current entry so null it out.
+      currentLogEntry = null;
+      currentLogEntryOffset = -1;
+      // Also, increment to the next element in the RaftLog
+      currentRaftIndex++;
+    }
+
+    // Make sure we don't read off the end of the Raft log
+    for (; currentRaftIndex < raftLog.getLastCommittedIndex(); currentRaftIndex++) {
+      try {
+        LogEntryProto entry = raftLog.get(currentRaftIndex);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Raft Index: {} Entry: {}", currentRaftIndex,
+              TextFormat.shortDebugString(entry));
+        }
+        if (entry == null || entry.hasConfigurationEntry()) {
+          continue;
+        }
+
+        LogServiceRequestProto logServiceProto =
+            LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
+        // TODO is it possible to get LogService messages that aren't appends?
+        if (RequestCase.APPENDREQUEST != logServiceProto.getRequestCase()) {
+          continue;
+        }
+
+        currentLogEntry = logServiceProto.getAppendRequest();
+        currentLogEntryOffset = 0;
+        if (currentLogEntry.getDataCount() > 0) {
+          currentRecord = currentLogEntry.getData(currentLogEntryOffset);
+          return;
+        }
+        currentLogEntry = null;
+        currentLogEntryOffset = -1;
+      } catch (RaftLogIOException e) {
+        LOG.error("Caught exception reading from RaftLog", e);
+        throw e;
+      } catch (InvalidProtocolBufferException e) {
+        LOG.error("Caught exception reading LogService protobuf from RaftLog", e);
+        throw e;
+      }
+    }
+    // If we make it here, we've read off the end of the RaftLog.
+  }
+}
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
index a8aecd8..7095e44 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -27,6 +27,9 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -38,7 +41,6 @@ import org.apache.ratis.logservice.proto.LogServiceProtos.GetLogLengthRequestPro
 import org.apache.ratis.logservice.proto.LogServiceProtos.GetLogSizeRequestProto;
 import org.apache.ratis.logservice.proto.LogServiceProtos.GetStateRequestProto;
 import org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto;
-import org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto.RequestCase;
 import org.apache.ratis.logservice.proto.LogServiceProtos.ReadLogRequestProto;
 import org.apache.ratis.logservice.util.LogServiceProtoUtil;
 import org.apache.ratis.proto.RaftProtos;
@@ -51,14 +53,12 @@ import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.impl.ServerState;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.server.storage.RaftLogIOException;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
-import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.slf4j.Logger;
@@ -289,92 +289,29 @@ public class LogStateMachine extends BaseStateMachine {
    * @return reply message
    */
   private CompletableFuture<Message> processReadRequest(LogServiceRequestProto proto)
{
-
     ReadLogRequestProto msgProto = proto.getReadNextQuery();
+    // Get the recordId the user wants to start reading at
     long startRecordId = msgProto.getStartRecordId();
+    // And the number of records they want to read
     int numRecordsToRead = msgProto.getNumRecords();
     Throwable t = verifyState(State.OPEN);
-    List<byte[]> list = new ArrayList<byte[]>();
-    LOG.info("Start Index: {}", startRecordId);
-    LOG.info("Total to read: {}", numRecordsToRead);
-    long raftLogIndex = log.getStartIndex();
-    if (t == null) {
-      // Seek to first entry
-      long logServiceIndex = 0;
-      while (logServiceIndex < startRecordId) {
-        try {
-          LogEntryProto entry = log.get(raftLogIndex);
-          // Skip "meta" entries
-          if (entry == null || entry.hasConfigurationEntry()) {
-            raftLogIndex++;
-            continue;
-          }
-
-          LogServiceRequestProto logServiceProto =
-              LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
-          // TODO is it possible to get LogService messages that aren't appends?
-          if (RequestCase.APPENDREQUEST != logServiceProto.getRequestCase()) {
-            raftLogIndex++;
-            continue;
-          }
+    List<byte[]> list = null;
 
-          AppendLogEntryRequestProto append = logServiceProto.getAppendRequest();
-          int numRecordsInAppend = append.getDataCount();
-          if (logServiceIndex + numRecordsInAppend > startRecordId) {
-            // The starting record is within this raft log entry.
-            break;
-          }
-          // We didn't find the log record, increment the logService record counter
-          logServiceIndex += numRecordsInAppend;
-          // And increment the raft log index
-          raftLogIndex++;
-        } catch (RaftLogIOException e) {
-          t = e;
-          list = null;
-          break;
-        } catch (InvalidProtocolBufferException e) {
-          LOG.error("Failed to read LogService protobuf from Raft log", e);
-          t = e;
-          list = null;
-          break;
-        }
-      }
-    }
-    LOG.debug("Starting to read {} logservice records starting at raft log index {}", numRecordsToRead,
raftLogIndex);
     if (t == null) {
-      // Make sure we don't read off the end of the Raft log
-      for (long index = raftLogIndex; index < log.getLastCommittedIndex(); index++) {
-        try {
-          LogEntryProto entry = log.get(index);
-          LOG.trace("Index: {} Entry: {}", index, entry);
-          if (entry == null || entry.hasConfigurationEntry()) {
-            continue;
-          }
-
-          LogServiceRequestProto logServiceProto =
-              LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
-          // TODO is it possible to get LogService messages that aren't appends?
-          if (RequestCase.APPENDREQUEST != logServiceProto.getRequestCase()) {
-            continue;
-          }
-
-          AppendLogEntryRequestProto append = logServiceProto.getAppendRequest();
-          for (int i = 0; i < append.getDataCount() && list.size() < numRecordsToRead;
i++) {
-            list.add(append.getData(i).toByteArray());
-          }
-          if (list.size() == numRecordsToRead) {
+      LogServiceRaftLogReader reader = new LogServiceRaftLogReader(log);
+      list = new ArrayList<byte[]>();
+      try {
+        reader.seek(startRecordId);
+        for (int i = 0; i < numRecordsToRead; i++) {
+          if (!reader.hasNext()) {
             break;
           }
-        } catch (RaftLogIOException e) {
-          t = e;
-          list = null;
-          break;
-        } catch (InvalidProtocolBufferException e) {
-          LOG.error("Failed to read LogService protobuf from Raft log", e);
-          t = e;
-          list = null;
-          break;
+          list.add(reader.next().toByteArray());
         }
+      } catch (Exception e) {
+        LOG.error("Failed to execute ReadNextQuery", e);
+        t = e;
+        list = null;
       }
     }
     return CompletableFuture.completedFuture(
@@ -404,20 +341,20 @@ public class LogStateMachine extends BaseStateMachine {
     final long index = entry.getIndex();
     long newSize = 0;
     Throwable t = verifyState(State.OPEN);
+    final List<Long> ids = new ArrayList<Long>();
     if (t == null) {
       try (final AutoCloseableLock writeLock = writeLock()) {
           List<byte[]> entries = LogServiceProtoUtil.toListByteArray(proto.getDataList());
           for (byte[] bb : entries) {
+            ids.add(this.length);
             newSize += bb.length;
+            this.length++;
           }
           this.dataRecordsSize += newSize;
-          this.length += entries.size();
           // TODO do we need this for other write request (close, sync)
           updateLastAppliedTermIndex(entry.getTerm(), index);
       }
     }
-    List<Long> ids = new ArrayList<Long>();
-    ids.add(index);
     final CompletableFuture<Message> f =
         CompletableFuture.completedFuture(
           Message.valueOf(LogServiceProtoUtil.toAppendLogReplyProto(ids, t).toByteString()));
diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
index 047a9ee..e378068 100644
--- a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
+++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
@@ -23,6 +23,9 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
 import java.util.Iterator;
 import java.util.List;
 
@@ -90,8 +93,8 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
 
       // Add some records
       List<ByteBuffer> records = TestUtils.getRandomData(100, 10);
-      long id = writer.write(records);
-      LOG.info("id {}", id);
+      List<Long> ids = writer.write(records);
+      LOG.info("ids {}", ids);
       // Check log size and length
       assertEquals(10 * 100, logStream.getSize());
       assertEquals(10, logStream.getLength());
@@ -121,6 +124,47 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
   }
 
   @Test
+  public void testReadAllRecords() throws Exception {
+    final RaftClient raftClient =
+        RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
+            .build();
+    final LogName logName = LogName.of("log1");
+    final int numRecords = 25;
+    // TODO need API to circumvent metadata service for testing
+    try (LogStream logStream = new LogStreamImpl(logName, raftClient)) {
+      try (LogWriter writer = logStream.createWriter()) {
+        LOG.info("Writing {} records", numRecords);
+        // Write records 0 through 99 (inclusive)
+        for (int i = 0; i < numRecords; i++) {
+          writer.write(toBytes(i));
+        }
+      }
+
+      try (LogReader reader = logStream.createReader()) {
+        reader.seek(0);
+        for (int i = 0; i < numRecords; i++) {
+          assertEquals(i, fromBytes(reader.readNext()));
+        }
+
+        reader.seek(0);
+        List<ByteBuffer> records = reader.readBulk(numRecords);
+        assertEquals(numRecords, records.size());
+        for (int i = 0; i < numRecords; i++) {
+          ByteBuffer record = records.get(i);
+          assertEquals(i, fromBytes(record));
+        }
+
+        reader.seek(0);
+        ByteBuffer[] arr = new ByteBuffer[numRecords];
+        reader.readBulk(arr);
+        for (int i = 0; i < numRecords; i++) {
+          assertEquals(i, fromBytes(arr[i]));
+        }
+      }
+    }
+  }
+
+  @Test
   public void testSeeking() throws Exception {
     final RaftClient raftClient =
         RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
@@ -133,7 +177,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
         LOG.info("Writing {} records", numRecords);
         // Write records 0 through 99 (inclusive)
         for (int i = 0; i < numRecords; i++) {
-          writer.write(ByteBuffer.wrap(Integer.toString(i).getBytes(StandardCharsets.UTF_8)));
+          writer.write(toBytes(i));
         }
       }
 
@@ -143,8 +187,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
           LOG.info("Seeking to {}", i);
           reader.seek(i);
           LOG.info("Reading one record");
-          ByteBuffer bb = reader.readNext();
-          assertEquals(i, fromBytes(bb));
+          assertEquals(i, fromBytes(reader.readNext()));
         }
 
         assertTrue("We're expecting at least two records were written", numRecords > 1);
@@ -152,8 +195,50 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
           LOG.info("Seeking to {}", i);
           reader.seek(i);
           LOG.info("Reading one record");
-          ByteBuffer bb = reader.readNext();
-          assertEquals(i, fromBytes(bb));
+          assertEquals(i, fromBytes(reader.readNext()));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSeekFromWrite() throws Exception {
+    final RaftClient raftClient =
+        RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
+            .build();
+    final LogName logName = LogName.of("log1");
+    final int numRecords = 10;
+    try (LogStream logStream = new LogStreamImpl(logName, raftClient)) {
+      final List<Long> recordIds;
+      try (LogWriter writer = logStream.createWriter()) {
+        LOG.info("Writing {} records", numRecords);
+        List<ByteBuffer> records = new ArrayList<>(numRecords * 2);
+        // Write records 0 through 10 (inclusive) as one batch
+        for (int i = 0; i < numRecords; i++) {
+          records.add(toBytes(i));
+        }
+        recordIds = new ArrayList<>(writer.write(records));
+        // Then, write another 10 records, individually.
+        for (int i = numRecords; i < numRecords*2; i++) {
+          recordIds.add(writer.write(toBytes(i)));
+        }
+      }
+
+      // We should have numRecords recordIds
+      assertEquals(numRecords * 2, recordIds.size());
+      // We should have monotonically increasing recordIds because we're the only one
+      // writing to this log.
+      assertEquals(LongStream.range(0, numRecords * 2).boxed().collect(Collectors.toList()),
+          recordIds);
+
+      try (LogReader reader = logStream.createReader()) {
+        int i = 0;
+        // We should be able to seek to the recordId given for each record
+        // we wrote and read it back.
+        for (long recordId : recordIds) {
+          reader.seek(recordId);
+          int readValue = fromBytes(reader.readNext());
+          assertEquals("Seeked to " + recordId + " but got " + readValue, i++, readValue);
         }
       }
     }
@@ -164,6 +249,10 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
     cluster.shutdown();
   }
 
+  private ByteBuffer toBytes(int i) {
+    return ByteBuffer.wrap(Integer.toString(i).getBytes(StandardCharsets.UTF_8));
+  }
+
   private int fromBytes(ByteBuffer bb) {
     byte[] bytes = new byte[bb.remaining()];
     System.arraycopy(bb.array(), bb.arrayOffset(), bytes, 0, bb.remaining());
diff --git a/ratis-logservice/src/test/resources/log4j.properties b/ratis-logservice/src/test/resources/log4j.properties
index ced0687..99f1b7c 100644
--- a/ratis-logservice/src/test/resources/log4j.properties
+++ b/ratis-logservice/src/test/resources/log4j.properties
@@ -16,3 +16,4 @@ log4j.threshold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+log4j.logger.org.apache.ratis.logservice=DEBUG
\ No newline at end of file


Mime
View raw message