asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject [2/6] incubator-asterixdb git commit: Asterix NCs Failback Support
Date Thu, 18 Feb 2016 09:54:28 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 06a1957..ef2b498 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -34,8 +34,8 @@ import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFa
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
-public class SecondaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory
implements
-        IModificationOperationCallbackFactory {
+public class SecondaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory
+        implements IModificationOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
     private final IndexOperation indexOp;
@@ -48,7 +48,7 @@ public class SecondaryIndexModificationOperationCallbackFactory extends
Abstract
 
     @Override
     public IModificationOperationCallback createModificationOperationCallback(String resourcePath,
long resourceId,
-            Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+            int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException
{
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
@@ -60,8 +60,8 @@ public class SecondaryIndexModificationOperationCallbackFactory extends
Abstract
         try {
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId,
false);
             IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(datasetId,
-                    primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
resourceId, resourceType,
-                    indexOp);
+                    primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
resourceId,
+                    resourcePartition, resourceType, indexOp);
             txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback)
modCallback, false);
             return modCallback;
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
index 69aad24..32d3461 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
@@ -39,8 +39,9 @@ public class TempDatasetIndexModificationOperationCallback extends AbstractIndex
 
     public TempDatasetIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
             ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem,
long resourceId,
-            byte resourceType, IndexOperation indexOp) {
-        super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId,
resourceType, indexOp);
+            int resourcePartition, byte resourceType, IndexOperation indexOp) {
+        super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId,
resourcePartition,
+                resourceType, indexOp);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index f2a6820..b08798c 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -49,7 +49,7 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory
extends
 
     @Override
     public IModificationOperationCallback createModificationOperationCallback(String resourcePath,
long resourceId,
-            Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+            int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException
{
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
@@ -61,8 +61,8 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory
extends
         try {
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId,
false);
             IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(datasetId,
-                    primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
resourceId, resourceType,
-                    indexOp);
+                    primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
resourceId,
+                    resourcePartition, resourceType, indexOp);
             txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback)
modCallback, true);
             return modCallback;
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 8d838a3..403d68d 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -49,7 +49,7 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory
exten
 
     @Override
     public IModificationOperationCallback createModificationOperationCallback(String resourcePath,
long resourceId,
-            Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+            int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException
{
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
@@ -61,8 +61,8 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory
exten
         try {
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId,
false);
             IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(datasetId,
-                    primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
resourceId, resourceType,
-                    indexOp);
+                    primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
resourceId,
+                    resourcePartition, resourceType, indexOp);
             txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback)
modCallback, false);
             return modCallback;
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
index dfc622a..f98083a 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
@@ -31,9 +31,10 @@ public class UpsertOperationCallback extends AbstractIndexModificationOperationC
         implements IModificationOperationCallback {
 
     public UpsertOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext
txnCtx,
-            ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
byte resourceType,
-            IndexOperation indexOp) {
-        super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId,
resourceType, indexOp);
+            ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
int resourcePartition,
+            byte resourceType, IndexOperation indexOp) {
+        super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId,
resourcePartition,
+                resourceType, indexOp);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index 0c83ab5..707f986 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -47,7 +47,7 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac
 
     @Override
     public IModificationOperationCallback createModificationOperationCallback(String resourceName,
long resourceId,
-            Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+            int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException
{
 
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
@@ -60,7 +60,8 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac
         try {
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId,
false);
             IModificationOperationCallback modCallback = new UpsertOperationCallback(datasetId,
primaryKeyFields,
-                    txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType,
indexOp);
+                    txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourcePartition,
resourceType,
+                    indexOp);
             txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback)
modCallback, true);
             return modCallback;
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 655fd2a..9cb456f 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -28,11 +28,13 @@ import java.io.ObjectOutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -40,6 +42,7 @@ import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.replication.AsterixReplicationJob;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IODeviceHandle;
@@ -67,6 +70,9 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
     private boolean isReplicationEnabled = false;
     private Set<String> filesToBeReplicated;
     private final SortedMap<Integer, ClusterPartition> clusterPartitions;
+    private final Set<Integer> nodeOriginalPartitions;
+    private final Set<Integer> nodeActivePartitions;
+    private Set<Integer> nodeInactivePartitions;
 
     public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId,
             AsterixMetadataProperties metadataProperties) throws HyracksDataException {
@@ -86,6 +92,15 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
             }
         }
         resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
+
+        ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
+        //initially the node active partitions are the same as the original partitions
+        nodeOriginalPartitions = new HashSet<>(nodePartitions.length);
+        nodeActivePartitions = new HashSet<>(nodePartitions.length);
+        for (ClusterPartition partition : nodePartitions) {
+            nodeOriginalPartitions.add(partition.getPartitionId());
+            nodeActivePartitions.add(partition.getPartitionId());
+        }
     }
 
     private static String getStorageMetadataDirPath(String mountPoint, String nodeId, int
ioDeviceId) {
@@ -301,6 +316,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
     }
 
     private static final FilenameFilter METADATA_FILES_FILTER = new FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
             if (name.equalsIgnoreCase(METADATA_FILE_NAME)) {
                 return true;
@@ -316,6 +332,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
 
         if (isReplicationEnabled) {
             filesToBeReplicated = new HashSet<String>();
+            nodeInactivePartitions = ConcurrentHashMap.newKeySet();
         }
     }
 
@@ -404,4 +421,43 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
         //currently each partition is replicated on the same IO device number on all NCs.
         return mountPoints[clusterPartitions.get(partition).getIODeviceNum()];
     }
-}
+
+    public Set<Integer> getActivePartitions() {
+        return Collections.unmodifiableSet(nodeActivePartitions);
+    }
+
+    public Set<Integer> getInactivePartitions() {
+        return Collections.unmodifiableSet(nodeInactivePartitions);
+    }
+
+    public Set<Integer> getNodeOrignalPartitions() {
+        return Collections.unmodifiableSet(nodeOriginalPartitions);
+    }
+
+    public synchronized void addActivePartition(int partitonId) {
+        nodeActivePartitions.add(partitonId);
+        nodeInactivePartitions.remove(partitonId);
+    }
+
+    public synchronized void addInactivePartition(int partitonId) {
+        nodeInactivePartitions.add(partitonId);
+        nodeActivePartitions.remove(partitonId);
+    }
+
+    /**
+     * @param resourceAbsolutePath
+     * @return the resource relative path starting from the partition directory
+     */
+    public static String getResourceRelativePath(String resourceAbsolutePath) {
+        String[] tokens = resourceAbsolutePath.split(File.separator);
+        //partiton/dataverse/idx/fileName
+        return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator
+                + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1];
+    }
+
+    public static int getResourcePartition(String resourceAbsolutePath) {
+        String[] tokens = resourceAbsolutePath.split(File.separator);
+        //partiton/dataverse/idx/fileName
+        return StoragePathUtil.getPartitonNumFromName(tokens[tokens.length - 4]);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 1966c39..5649710 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -41,6 +41,9 @@ public class LogManagerWithReplication extends LogManager {
             throw new IllegalStateException();
         }
 
+        //only locally generated logs should be replicated
+        logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL);
+
         //Remote flush logs do not need to be flushed separately since they may not trigger
local flush
         if (logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() ==
LogSource.LOCAL) {
             flushLogsQ.offer(logRecord);
@@ -54,7 +57,7 @@ public class LogManagerWithReplication extends LogManager {
     protected void appendToLogTail(ILogRecord logRecord) throws ACIDException {
         syncAppendToLogTail(logRecord);
 
-        if (logRecord.getLogSource() == LogSource.LOCAL) {
+        if (logRecord.isReplicated()) {
             replicationManager.replicateLog(logRecord);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
index e0cddee..a018dc2 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
@@ -26,11 +26,10 @@ import java.util.logging.Logger;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ILogReader;
 import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.ILogRecord.RECORD_STATUS;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.MutableLong;
 
-import static org.apache.asterix.common.transactions.LogRecord.*;
-
 /**
  * NOTE: Many method calls of this class are not thread safe.
  * Be very cautious using it in a multithreaded context.

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/RemoteLogReader.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/RemoteLogReader.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/RemoteLogReader.java
new file mode 100644
index 0000000..8f88321
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/RemoteLogReader.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.logging;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ILogReader;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.ILogRecord.RECORD_STATUS;
+import org.apache.asterix.common.transactions.LogRecord;
+
+public class RemoteLogReader implements ILogReader {
+
+    private final FileChannel fileChannel;
+    private final ILogRecord logRecord;
+    private final ByteBuffer readBuffer;
+    private long readLSN;
+    private final int logPageSize;
+
+    public RemoteLogReader(FileChannel fileChannel, long logFileSize, int logPageSize) {
+        this.fileChannel = fileChannel;
+        this.logPageSize = logPageSize;
+        logRecord = new LogRecord();
+        readBuffer = ByteBuffer.allocate(logPageSize);
+    }
+
+    @Override
+    public void initializeScan(long beginLSN) throws ACIDException {
+        readLSN = beginLSN;
+        fillLogReadBuffer();
+    }
+
+    private boolean fillLogReadBuffer() throws ACIDException {
+        int size = 0;
+        int read = 0;
+        readBuffer.position(0);
+        readBuffer.limit(logPageSize);
+        try {
+            fileChannel.position(readLSN);
+            //We loop here because read() may return 0, but this simply means we are waiting
on IO.
+            //Therefore we want to break out only when either the buffer is full, or we reach
EOF.
+            while (size < logPageSize && read != -1) {
+                read = fileChannel.read(readBuffer);
+                if (read > 0) {
+                    size += read;
+                }
+            }
+        } catch (IOException e) {
+            throw new ACIDException(e);
+        }
+        readBuffer.position(0);
+        readBuffer.limit(size);
+        if (size == 0 && read == -1) {
+            return false; //EOF
+        }
+        return true;
+    }
+
+    @Override
+    public ILogRecord read(long LSN) throws ACIDException {
+        throw new UnsupportedOperationException("Random read is not supported.");
+    }
+
+    @Override
+    public ILogRecord next() throws ACIDException {
+        if (readBuffer.position() == readBuffer.limit()) {
+            boolean hasRemaining = fillLogReadBuffer();
+            if (!hasRemaining) {
+                return null;
+            }
+        }
+
+        RECORD_STATUS status = logRecord.readRemoteLog(readBuffer, true);
+        switch (status) {
+            case TRUNCATED: {
+                //we may have just read off the end of the buffer, so try refiling it
+                if (!fillLogReadBuffer()) {
+                    return null;
+                }
+                //now see what we have in the refilled buffer
+                status = logRecord.readRemoteLog(readBuffer, true);
+                switch (status) {
+                    case TRUNCATED: {
+                        return null;
+                    }
+                    case OK:
+                        break;
+                    default:
+                        break;
+                }
+                //if we have exited the inner switch,
+                // this means status is really "OK" after buffer refill
+                break;
+            }
+            case BAD_CHKSUM: {
+                return null;
+            }
+            case OK:
+                break;
+        }
+
+        readLSN += logRecord.getSerializedLogSize();
+        return logRecord;
+    }
+
+    @Override
+    public void close() throws ACIDException {
+        try {
+            if (fileChannel != null) {
+                if (fileChannel.isOpen()) {
+                    fileChannel.close();
+                }
+            }
+        } catch (IOException e) {
+            throw new ACIDException(e);
+        }
+    }
+
+}


Mime
View raw message