asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject [2/2] incubator-asterixdb git commit: Txn Log Replication Optimizations
Date Wed, 01 Jun 2016 19:17:04 GMT
Txn Log Replication Optimizations

- Add configurations for txn log replication.
- Establish handshake for continuous log replication.
- Replicate logs in batches instead of one by one.
- Eliminate HashMap iterator object creation per logs batch.
- Fix synchronization for jobs' ACKs from remote replicas.
- Bug fixes for big objects replication.
- Some SONAR fixes for old code.

Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
Reviewed-on: https://asterix-gerrit.ics.uci.edu/883
Reviewed-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/0cf7c329
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/0cf7c329
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/0cf7c329

Branch: refs/heads/master
Commit: 0cf7c329f65bc893966e9d35c2c8cb1cd393e988
Parents: 2d2a200
Author: Murtadha Hubail <mhubail@uci.edu>
Authored: Tue May 31 23:48:50 2016 -0700
Committer: Murtadha Hubail <hubailmor@gmail.com>
Committed: Wed Jun 1 12:16:25 2016 -0700

----------------------------------------------------------------------
 .../bootstrap/NCApplicationEntryPoint.java      |   2 +-
 .../resources/asterix-build-configuration.xml   |   6 -
 .../config/AsterixReplicationProperties.java    |  39 +-
 .../replication/IRemoteRecoveryManager.java     |   3 +-
 .../common/replication/IReplicationManager.java |  17 +-
 .../asterix/common/replication/Replica.java     |   3 +-
 .../asterix/common/transactions/ILogRecord.java |  12 +-
 .../asterix/common/transactions/LogRecord.java  | 176 +++-----
 .../logging/ReplicationLogBuffer.java           | 106 +++--
 .../logging/ReplicationLogFlusher.java          | 107 -----
 .../replication/logging/TxnLogReplicator.java   | 108 +++++
 .../replication/management/NetworkingUtil.java  |  14 +-
 .../management/ReplicationChannel.java          | 247 ++++++-----
 .../management/ReplicationManager.java          | 437 ++++++++++++-------
 .../recovery/RemoteRecoveryManager.java         |  21 +-
 .../management/service/logging/LogBuffer.java   |  21 +-
 .../logging/LogManagerWithReplication.java      |  14 +-
 17 files changed, 732 insertions(+), 601 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 0a6d62d..c71d77e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -150,7 +150,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
         }
     }
 
-    private void startReplicationService() {
+    private void startReplicationService() throws InterruptedException {
         //Open replication channel
         runtimeContext.getReplicationChannel().start();
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
index b1d0649..1d693f9 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -98,10 +98,4 @@
     <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
     </description>
   </property>
-  <property>
-    <name>log.level</name>
-    <value>WARNING</value>
-    <description>The minimum log level to be displayed. (Default = INFO)
-    </description>
-  </property>
 </asterixConfiguration>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index 5d31d9a..7f51bbd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -26,19 +26,31 @@ import java.util.logging.Logger;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
 
 public class AsterixReplicationProperties extends AbstractAsterixProperties {
 
     private static final Logger LOGGER = Logger.getLogger(AsterixReplicationProperties.class.getName());
 
-    private static int REPLICATION_DATAPORT_DEFAULT = 2000;
-    private static int REPLICATION_FACTOR_DEFAULT = 1;
-    private static int REPLICATION_TIME_OUT_DEFAULT = 15;
+    private static final int REPLICATION_DATAPORT_DEFAULT = 2000;
+    private static final int REPLICATION_FACTOR_DEFAULT = 1;
+    private static final int REPLICATION_TIME_OUT_DEFAULT = 15;
     private static final int MAX_REMOTE_RECOVERY_ATTEMPTS = 5;
     private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
     private final String NODE_NAME_PREFIX;
     private final Cluster cluster;
 
+    private static final String REPLICATION_LOG_BATCH_SIZE_KEY = "replication.log.batchsize";
+    private static final int REPLICATION_LOG_BATCH_SIZE_DEFAULT = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
+
+    private static final String REPLICATION_LOG_BUFFER_NUM_PAGES_KEY = "replication.log.buffer.numpages";
+    private static final int REPLICATION_LOG_BUFFER_NUM_PAGES_DEFAULT = 8;
+
+    private static final String REPLICATION_LOG_BUFFER_PAGE_SIZE_KEY = "replication.log.buffer.pagesize";
+    private static final int REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT = StorageUtil.getSizeInBytes(128,
+            StorageUnit.KILOBYTE);
+
     public AsterixReplicationProperties(AsterixPropertiesAccessor accessor, Cluster cluster) {
         super(accessor);
         this.cluster = cluster;
@@ -90,7 +102,7 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
     }
 
     public Set<Replica> getRemoteReplicas(String nodeId) {
-        Set<Replica> remoteReplicas = new HashSet<Replica>();;
+        Set<Replica> remoteReplicas = new HashSet<>();;
 
         int numberOfRemoteReplicas = getReplicationFactor() - 1;
         //Using chained-declustering
@@ -161,7 +173,7 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
     }
 
     public Set<String> getRemoteReplicasIds(String nodeId) {
-        Set<String> remoteReplicasIds = new HashSet<String>();
+        Set<String> remoteReplicasIds = new HashSet<>();
         Set<Replica> remoteReplicas = getRemoteReplicas(nodeId);
 
         for (Replica replica : remoteReplicas) {
@@ -176,7 +188,7 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
     }
 
     public Set<String> getNodeReplicasIds(String nodeId) {
-        Set<String> replicaIds = new HashSet<String>();
+        Set<String> replicaIds = new HashSet<>();
         replicaIds.add(nodeId);
         replicaIds.addAll(getRemoteReplicasIds(nodeId));
         return replicaIds;
@@ -245,4 +257,19 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
     public int getMaxRemoteRecoveryAttempts() {
         return MAX_REMOTE_RECOVERY_ATTEMPTS;
     }
+
+    public int getLogBufferPageSize() {
+        return accessor.getProperty(REPLICATION_LOG_BUFFER_PAGE_SIZE_KEY, REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
+
+    public int getLogBufferNumOfPages() {
+        return accessor.getProperty(REPLICATION_LOG_BUFFER_NUM_PAGES_KEY, REPLICATION_LOG_BUFFER_NUM_PAGES_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
+
+    public int getLogBatchSize() {
+        return accessor.getProperty(REPLICATION_LOG_BATCH_SIZE_KEY, REPLICATION_LOG_BATCH_SIZE_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index a2738e8..9f9d74b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -43,6 +43,7 @@ public interface IRemoteRecoveryManager {
      * Requests the remaining LSM disk components files from active remote replicas.
      *
      * @throws IOException
+     * @throws InterruptedException
      */
-    public void completeFailbackProcess() throws IOException;
+    public void completeFailbackProcess() throws IOException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index 755fbbd..6bd1505 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.replication;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Set;
 
 import org.apache.asterix.common.transactions.ILogRecord;
@@ -31,8 +32,9 @@ public interface IReplicationManager extends IIOReplicationManager {
      *
      * @param logRecord
      *            The log record to be replicated,
+     * @throws InterruptedException
      */
-    public void replicateLog(ILogRecord logRecord);
+    public void replicateLog(ILogRecord logRecord) throws InterruptedException;
 
     /**
      * Checks whether a log record has been replicated
@@ -79,8 +81,10 @@ public interface IReplicationManager extends IIOReplicationManager {
 
     /**
      * Starts processing of ASYNC replication jobs as well as Txn logs.
+     *
+     * @throws InterruptedException
      */
-    public void startReplicationThreads();
+    public void startReplicationThreads() throws InterruptedException;
 
     /**
      * Checks and sets each remote replica state.
@@ -114,4 +118,13 @@ public interface IReplicationManager extends IIOReplicationManager {
      * @throws IOException
      */
     public void requestFlushLaggingReplicaIndexes(long nonSharpCheckpointTargetLSN) throws IOException;
+
+    /**
+     * Transfers the contents of the {@code buffer} to active remote replicas.
+     * The transfer starts from the {@code buffer} current position to its limit.
+     * After the transfer, the {@code buffer} position will be its limit.
+     *
+     * @param buffer
+     */
+    public void replicateTxnLogBatch(ByteBuffer buffer);
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
index 4c3f728..b8fe4b2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
@@ -62,8 +62,7 @@ public class Replica {
     public InetSocketAddress getAddress(AsterixReplicationProperties asterixReplicationProperties) {
         String replicaIPAddress = node.getClusterIp();
         int replicationPort = asterixReplicationProperties.getDataReplicationPort(node.getId());
-        InetSocketAddress replicaAddress = InetSocketAddress.createUnresolved(replicaIPAddress, replicationPort);
-        return replicaAddress;
+        return InetSocketAddress.createUnresolved(replicaIPAddress, replicationPort);
     }
 
     public static Replica create(DataInput input) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 3738cd1..1992a00 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -110,9 +110,7 @@ public interface ILogRecord {
 
     public String getNodeId();
 
-    public int writeRemoteRecoveryLog(ByteBuffer buffer);
-
-    public RecordReadStatus readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog);
+    public void readRemoteLog(ByteBuffer buffer);
 
     public void setReplicationThread(IReplicationThread replicationThread);
 
@@ -120,11 +118,7 @@ public interface ILogRecord {
 
     public byte getLogSource();
 
-    public int getSerializedLogSize();
-
-    public void writeLogRecord(ByteBuffer buffer, long appendLSN);
-
-    public ByteBuffer getSerializedLog();
+    public int getRemoteLogSize();
 
     public void setNodeId(String nodeId);
 
@@ -138,4 +132,6 @@ public interface ILogRecord {
      * @return a flag indicating whether the log record should be sent to remote replicas
      */
     public boolean isReplicated();
+
+    public void writeRemoteLogRecord(ByteBuffer buffer);
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index fd56913..4823a92 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -29,7 +29,7 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReference;
 import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
 
-/*
+/**
  * == LogRecordFormat ==
  * ---------------------------
  * [Header1] (6 bytes) : for all log types
@@ -44,8 +44,7 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
  * PKValueSize(4)
  * PKValue(PKValueSize)
  * ---------------------------
- * [Header3] (20 bytes) : only for update log type
- * PrevLSN(8)
+ * [Header3] (12 bytes) : only for update log type
  * ResourceId(8) //stored in .metadata of the corresponding index in NC node
  * LogRecordSize(4)
  * ---------------------------
@@ -61,17 +60,35 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
  * = LogSize =
  * 1) JOB_COMMIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8))
  * 2) ENTITY_COMMIT || UPSERT_ENTITY_COMMIT: (Header1(6) + Header2(16) + Tail(8)) + PKValueSize
- *    --> ENTITY_COMMIT_LOG_BASE_SIZE = 30
+ * --> ENTITY_COMMIT_LOG_BASE_SIZE = 30
  * 3) UPDATE: (Header1(6) + Header2(16) + + Header3(20) + Body(9) + Tail(8)) + PKValueSize + NewValueSize
- *    --> UPDATE_LOG_BASE_SIZE = 59
+ * --> UPDATE_LOG_BASE_SIZE = 59
  * 4) FLUSH: 18 bytes (Header1(6) + DatasetId(4) + Tail(8))
  * 5) WAIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8))
- *    --> WAIT_LOG only requires LogType Field, but in order to conform the log reader protocol
- *        it also includes LogSource and JobId fields.
+ * --> WAIT_LOG only requires LogType Field, but in order to conform the log reader protocol
+ * it also includes LogSource and JobId fields.
  */
 
 public class LogRecord implements ILogRecord {
 
+    private static final int LOG_SOURCE_LEN = Byte.BYTES;
+    private static final int TYPE_LEN = Byte.BYTES;
+    public static final int PKHASH_LEN = Integer.BYTES;
+    public static final int PKSZ_LEN = Integer.BYTES;
+    private static final int RS_PARTITION_LEN = Integer.BYTES;
+    private static final int RSID_LEN = Long.BYTES;
+    private static final int LOGRCD_SZ_LEN = Integer.BYTES;
+    private static final int FLDCNT_LEN = Integer.BYTES;
+    private static final int NEWOP_LEN = Byte.BYTES;
+    private static final int NEWVALSZ_LEN = Integer.BYTES;
+    private static final int CHKSUM_LEN = Long.BYTES;
+
+    private static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
+    private static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN
+            + PKSZ_LEN;
+    private static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
+    private static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
+
     // ------------- fields in a log record (begin) ------------//
     private byte logSource;
     private byte logType;
@@ -101,9 +118,8 @@ public class LogRecord implements ILogRecord {
     private int[] PKFields;
     private PrimaryIndexOperationTracker opTracker;
     private IReplicationThread replicationThread;
-    private ByteBuffer serializedLog;
     /**
-     * The fields (numOfFlushedIndexes and nodeId) are used for serialized flush logs only
+     * The fields (numOfFlushedIndexes and nodeId) are used for remote flush logs only
      * to indicate the source of the log and how many indexes were flushed using its LSN.
      */
     private int numOfFlushedIndexes;
@@ -119,25 +135,6 @@ public class LogRecord implements ILogRecord {
         logSource = LogSource.LOCAL;
     }
 
-    private final static int LOG_SOURCE_LEN = Byte.BYTES;
-    private final static int TYPE_LEN = Byte.BYTES;
-    public final static int PKHASH_LEN = Integer.BYTES;
-    public final static int PKSZ_LEN = Integer.BYTES;
-    private final static int RS_PARTITION_LEN = Integer.BYTES;
-    private final static int RSID_LEN = Long.BYTES;
-    private final static int LOGRCD_SZ_LEN = Integer.BYTES;
-    private final static int FLDCNT_LEN = Integer.BYTES;
-    private final static int NEWOP_LEN = Byte.BYTES;
-    private final static int NEWVALSZ_LEN = Integer.BYTES;
-    private final static int CHKSUM_LEN = Long.BYTES;
-
-    private final static int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
-    private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN
-            + PKSZ_LEN;
-    private final static int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
-    private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
-    private final static int REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN = Long.BYTES + Integer.BYTES + Integer.BYTES;
-
     private void writeLogRecordCommonFields(ByteBuffer buffer) {
         buffer.put(logSource);
         buffer.put(logType);
@@ -173,39 +170,15 @@ public class LogRecord implements ILogRecord {
         buffer.putLong(checksum);
     }
 
-    // this method is used when replication is enabled to include the log record LSN in the serialized version
     @Override
-    public void writeLogRecord(ByteBuffer buffer, long appendLSN) {
-        int beginOffset = buffer.position();
+    public void writeRemoteLogRecord(ByteBuffer buffer) {
         writeLogRecordCommonFields(buffer);
-
-        if (replicated) {
-            //copy the serialized log to send it to replicas
-            int serializedLogSize = getSerializedLogSize();
-            if (serializedLog == null || serializedLog.capacity() < serializedLogSize) {
-                serializedLog = ByteBuffer.allocate(serializedLogSize);
-            } else {
-                serializedLog.clear();
-            }
-
-            int currentPosition = buffer.position();
-            int currentLogSize = (currentPosition - beginOffset);
-
-            buffer.position(beginOffset);
-            buffer.get(serializedLog.array(), 0, currentLogSize);
-            serializedLog.position(currentLogSize);
-            if (logType == LogType.FLUSH) {
-                serializedLog.putLong(appendLSN);
-                serializedLog.putInt(numOfFlushedIndexes);
-                serializedLog.putInt(nodeId.length());
-                serializedLog.put(nodeId.getBytes());
-            }
-            serializedLog.flip();
-            buffer.position(currentPosition);
+        if (logType == LogType.FLUSH) {
+            buffer.putLong(LSN);
+            buffer.putInt(numOfFlushedIndexes);
+            buffer.putInt(nodeId.length());
+            buffer.put(nodeId.getBytes());
         }
-
-        checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
-        buffer.putLong(checksum);
     }
 
     private void writePKValue(ByteBuffer buffer) {
@@ -221,8 +194,12 @@ public class LogRecord implements ILogRecord {
     }
 
     private void writeTuple(ByteBuffer buffer, ITupleReference tuple, int size) {
-        tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
-        // writeTuple() doesn't change the position of the buffer.
+        if (logSource == LogSource.LOCAL) {
+            tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
+        } else {
+            //since the tuple is already serialized in remote logs, just copy it from beginning to end.
+            System.arraycopy(tuple.getFieldData(0), 0, buffer.array(), buffer.position(), size);
+        }
         buffer.position(buffer.position() + size);
     }
 
@@ -323,47 +300,19 @@ public class LogRecord implements ILogRecord {
     }
 
     @Override
-    public RecordReadStatus readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog) {
-        int beginOffset = buffer.position();
-
+    public void readRemoteLog(ByteBuffer buffer) {
         //read common fields
-        RecordReadStatus status = readLogCommonFields(buffer);
-        if (status != RecordReadStatus.OK) {
-            buffer.position(beginOffset);
-            return status;
-        }
+        readLogCommonFields(buffer);
 
         if (logType == LogType.FLUSH) {
-            if (buffer.remaining() >= REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN) {
-                LSN = buffer.getLong();
-                numOfFlushedIndexes = buffer.getInt();
-                //read serialized node id
-                int nodeIdLength = buffer.getInt();
-                if (buffer.remaining() >= nodeIdLength) {
-                    byte[] nodeIdBytes = new byte[nodeIdLength];
-                    buffer.get(nodeIdBytes);
-                    nodeId = new String(nodeIdBytes);
-                } else {
-                    buffer.position(beginOffset);
-                    return RecordReadStatus.TRUNCATED;
-                }
-            } else {
-                buffer.position(beginOffset);
-                return RecordReadStatus.TRUNCATED;
-            }
-        }
-
-        //remote recovery logs need to have the LSN to check which should be replayed
-        if (remoteRecoveryLog) {
-            if (buffer.remaining() >= Long.BYTES) {
-                LSN = buffer.getLong();
-            } else {
-                buffer.position(beginOffset);
-                return RecordReadStatus.TRUNCATED;
-            }
+            LSN = buffer.getLong();
+            numOfFlushedIndexes = buffer.getInt();
+            //read serialized node id
+            int nodeIdLength = buffer.getInt();
+            byte[] nodeIdBytes = new byte[nodeIdLength];
+            buffer.get(nodeIdBytes);
+            nodeId = new String(nodeIdBytes);
         }
-
-        return RecordReadStatus.OK;
     }
 
     private ITupleReference readPKValue(ByteBuffer buffer) {
@@ -445,16 +394,6 @@ public class LogRecord implements ILogRecord {
         return builder.toString();
     }
 
-    @Override
-    public int writeRemoteRecoveryLog(ByteBuffer buffer) {
-        int bufferBegin = buffer.position();
-        writeLogRecordCommonFields(buffer);
-        //FLUSH logs should not included in remote recovery
-        //LSN must be included in all remote recovery logs
-        buffer.putLong(LSN);
-        return buffer.position() - bufferBegin;
-    }
-
     ////////////////////////////////////////////
     // getter and setter methods
     ////////////////////////////////////////////
@@ -535,18 +474,18 @@ public class LogRecord implements ILogRecord {
     }
 
     @Override
-    public int getSerializedLogSize() {
-        int serilizedSize = logSize;
+    public int getRemoteLogSize() {
+        int remoteLogSize = logSize;
         if (logType == LogType.FLUSH) {
             //LSN
-            serilizedSize += Long.BYTES;
+            remoteLogSize += Long.BYTES;
             //num of indexes
-            serilizedSize += Integer.BYTES;
+            remoteLogSize += Integer.BYTES;
             //serialized node id String
-            serilizedSize += Integer.BYTES + nodeId.length();
+            remoteLogSize += Integer.BYTES + nodeId.length();
         }
-        serilizedSize -= CHKSUM_LEN;
-        return serilizedSize;
+        remoteLogSize -= CHKSUM_LEN;
+        return remoteLogSize;
     }
 
     @Override
@@ -631,15 +570,6 @@ public class LogRecord implements ILogRecord {
     }
 
     @Override
-    public ByteBuffer getSerializedLog() {
-        return serializedLog;
-    }
-
-    public void setSerializedLog(ByteBuffer serializedLog) {
-        this.serializedLog = serializedLog;
-    }
-
-    @Override
     public String getNodeId() {
         return nodeId;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index 588968c..283f69f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -18,44 +18,37 @@
  */
 package org.apache.asterix.replication.logging;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.asterix.replication.management.ReplicationManager;
 
 public class ReplicationLogBuffer {
     private final int logBufferSize;
     private final AtomicBoolean full;
     private int appendOffset;
-    private int flushOffset;
+    private int replicationOffset;
     private final ByteBuffer appendBuffer;
-    private final ByteBuffer flushBuffer;
+    private final ByteBuffer replicationBuffer;
     private boolean stop;
-    private Map<String, SocketChannel> replicaSockets;
     private ReplicationManager replicationManager;
+    private final int batchSize;
 
-    public ReplicationLogBuffer(ReplicationManager replicationManager, int logBufferSize) {
+    public ReplicationLogBuffer(ReplicationManager replicationManager, int logBufferSize, int batchSize) {
         this.replicationManager = replicationManager;
         this.logBufferSize = logBufferSize;
+        this.batchSize = batchSize;
         appendBuffer = ByteBuffer.allocate(logBufferSize);
-        flushBuffer = appendBuffer.duplicate();
+        replicationBuffer = appendBuffer.duplicate();
         full = new AtomicBoolean(false);
         appendOffset = 0;
-        flushOffset = 0;
+        replicationOffset = 0;
     }
 
     public void append(ILogRecord logRecord) {
-        appendBuffer.putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
-        appendBuffer.putInt(logRecord.getSerializedLogSize());
-        appendBuffer.put(logRecord.getSerializedLog());
+        appendBuffer.putInt(logRecord.getRemoteLogSize());
+        logRecord.writeRemoteLogRecord(appendBuffer);
 
         synchronized (this) {
             appendOffset += getLogReplicationSize(logRecord);
@@ -63,10 +56,6 @@ public class ReplicationLogBuffer {
         }
     }
 
-    public void setReplicationSockets(Map<String, SocketChannel> replicaSockets) {
-        this.replicaSockets = replicaSockets;
-    }
-
     public synchronized void isFull(boolean full) {
         this.full.set(full);
         this.notify();
@@ -77,18 +66,18 @@ public class ReplicationLogBuffer {
     }
 
     private static int getLogReplicationSize(ILogRecord logRecord) {
-        //request type + request length + serialized log length
-        return Integer.BYTES + Integer.BYTES + logRecord.getSerializedLogSize();
+        //log length (4 bytes) + remote log size
+        return Integer.BYTES + logRecord.getRemoteLogSize();
     }
 
     public void reset() {
         appendBuffer.position(0);
         appendBuffer.limit(logBufferSize);
-        flushBuffer.position(0);
-        flushBuffer.limit(logBufferSize);
+        replicationBuffer.position(0);
+        replicationBuffer.limit(logBufferSize);
         full.set(false);
         appendOffset = 0;
-        flushOffset = 0;
+        replicationOffset = 0;
         stop = false;
     }
 
@@ -96,57 +85,66 @@ public class ReplicationLogBuffer {
         int endOffset;
         while (!full.get()) {
             synchronized (this) {
-                if (appendOffset - flushOffset == 0 && !full.get()) {
+                if (appendOffset - replicationOffset == 0 && !full.get()) {
                     try {
                         if (stop) {
                             break;
                         }
                         this.wait();
                     } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
                         continue;
                     }
                 }
                 endOffset = appendOffset;
             }
-            internalFlush(flushOffset, endOffset);
+            internalFlush(replicationOffset, endOffset);
         }
-
-        internalFlush(flushOffset, appendOffset);
+        internalFlush(replicationOffset, appendOffset);
     }
 
     private void internalFlush(int beginOffset, int endOffset) {
         if (endOffset > beginOffset) {
-            int begingPos = flushBuffer.position();
-            flushBuffer.limit(endOffset);
-            sendRequest(replicaSockets, flushBuffer);
-            flushBuffer.position(begingPos + (endOffset - beginOffset));
-            flushOffset = endOffset;
+            int begingPos = replicationBuffer.position();
+            replicationBuffer.limit(endOffset);
+            transferBuffer(replicationBuffer);
+            replicationBuffer.position(begingPos + (endOffset - beginOffset));
+            replicationOffset = endOffset;
         }
     }
 
-    private void sendRequest(Map<String, SocketChannel> replicaSockets, ByteBuffer requestBuffer) {
-        Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator();
-        int begin = requestBuffer.position();
-        while (iterator.hasNext()) {
-            Entry<String, SocketChannel> replicaSocket = iterator.next();
-            SocketChannel clientSocket = replicaSocket.getValue();
-            try {
-                NetworkingUtil.transferBufferToChannel(clientSocket, requestBuffer);
-            } catch (IOException e) {
-                if (clientSocket.isOpen()) {
-                    try {
-                        clientSocket.close();
-                    } catch (IOException e2) {
-                        e2.printStackTrace();
-                    }
+    private void transferBuffer(ByteBuffer buffer) {
+        if (buffer.remaining() <= batchSize) {
+            //the current batch can be sent as it is
+            replicationManager.replicateTxnLogBatch(buffer);
+            return;
+        }
+        /**
+         * break the batch into smaller batches
+         */
+        int totalTransferLimit = buffer.limit();
+        while (buffer.hasRemaining()) {
+            if (buffer.remaining() > batchSize) {
+
+                //mark the beginning of this batch
+                buffer.mark();
+                int currentBatchSize = 0;
+                while (currentBatchSize < batchSize) {
+                    int logSize = replicationBuffer.getInt();
+                    //add the size of the log record itself + 4 bytes for its size
+                    currentBatchSize += logSize + Integer.BYTES;
+                    //go to the beginning of the next log
+                    buffer.position(buffer.position() + logSize);
                 }
-                replicationManager.reportFailedReplica(replicaSocket.getKey());
-                iterator.remove();
-            } finally {
-                requestBuffer.position(begin);
+                //set the limit to the end of this batch
+                buffer.limit(buffer.position());
+                //return to the beginning of the batch position
+                buffer.reset();
             }
+            replicationManager.replicateTxnLogBatch(buffer);
+            //return the original limit to check the new remaining size
+            buffer.limit(totalTransferLimit);
         }
-
     }
 
     public boolean isStop() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java
deleted file mode 100644
index 3312cb1..0000000
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.logging;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.transactions.LogRecord;
-
-/**
- * This class is responsible for sending transactions logs to remote replicas.
- */
-public class ReplicationLogFlusher implements Callable<Boolean> {
-    private final static Logger LOGGER = Logger.getLogger(ReplicationLogFlusher.class.getName());
-    private final static ReplicationLogBuffer POISON_PILL = new ReplicationLogBuffer(null,
-            LogRecord.JOB_TERMINATE_LOG_SIZE);
-    private final LinkedBlockingQueue<ReplicationLogBuffer> emptyQ;
-    private final LinkedBlockingQueue<ReplicationLogBuffer> flushQ;
-    private ReplicationLogBuffer flushPage;
-    private final AtomicBoolean isStarted;
-    private final AtomicBoolean terminateFlag;
-
-    public ReplicationLogFlusher(LinkedBlockingQueue<ReplicationLogBuffer> emptyQ,
-            LinkedBlockingQueue<ReplicationLogBuffer> flushQ) {
-        this.emptyQ = emptyQ;
-        this.flushQ = flushQ;
-        flushPage = null;
-        isStarted = new AtomicBoolean(false);
-        terminateFlag = new AtomicBoolean(false);
-    }
-
-    public void terminate() {
-        //make sure the LogFlusher thread started before terminating it.
-        synchronized (isStarted) {
-            while (!isStarted.get()) {
-                try {
-                    isStarted.wait();
-                } catch (InterruptedException e) {
-                    //ignore
-                }
-            }
-        }
-
-        terminateFlag.set(true);
-        if (flushPage != null) {
-            synchronized (flushPage) {
-                flushPage.isStop(true);
-                flushPage.notify();
-            }
-        }
-        //[Notice]
-        //The return value doesn't need to be checked
-        //since terminateFlag will trigger termination if the flushQ is full.
-        flushQ.offer(POISON_PILL);
-    }
-
-    @Override
-    public Boolean call() {
-        Thread.currentThread().setName("Replication Log Flusher");
-        synchronized (isStarted) {
-            isStarted.set(true);
-            isStarted.notify();
-        }
-        try {
-            while (true) {
-                flushPage = null;
-                try {
-                    flushPage = flushQ.take();
-                    if (flushPage == POISON_PILL || terminateFlag.get()) {
-                        return true;
-                    }
-                } catch (InterruptedException e) {
-                    if (flushPage == null) {
-                        continue;
-                    }
-                }
-                flushPage.flush();
-                // TODO: pool large pages
-                if (flushPage.getLogBufferSize() == flushPage.getReplicationManager().getLogPageSize()) {
-                    emptyQ.offer(flushPage);
-                }
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.severe("ReplicationLogFlusher is terminating abnormally. Logs Replication Stopped.");
-            throw e;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
new file mode 100644
index 0000000..118fde6
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.logging;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This class is responsible for sending transactions logs to remote replicas.
+ */
+public class TxnLogReplicator implements Callable<Boolean> {
+    private static final Logger LOGGER = Logger.getLogger(TxnLogReplicator.class.getName());
+    private static final ReplicationLogBuffer POISON_PILL = new ReplicationLogBuffer(null, 0, 0);
+    private final LinkedBlockingQueue<ReplicationLogBuffer> emptyQ;
+    private final LinkedBlockingQueue<ReplicationLogBuffer> flushQ;
+    private ReplicationLogBuffer flushPage;
+    private final AtomicBoolean isStarted;
+    private final AtomicBoolean terminateFlag;
+
+    public TxnLogReplicator(LinkedBlockingQueue<ReplicationLogBuffer> emptyQ,
+            LinkedBlockingQueue<ReplicationLogBuffer> flushQ) {
+        this.emptyQ = emptyQ;
+        this.flushQ = flushQ;
+        flushPage = null;
+        isStarted = new AtomicBoolean(false);
+        terminateFlag = new AtomicBoolean(false);
+    }
+
+    public void terminate() {
+        //make sure the LogFlusher thread started before terminating it.
+        synchronized (isStarted) {
+            while (!isStarted.get()) {
+                try {
+                    isStarted.wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        terminateFlag.set(true);
+        if (flushPage != null) {
+            synchronized (flushPage) {
+                flushPage.isStop(true);
+                flushPage.notify();
+            }
+        }
+        //[Notice]
+        //The return value doesn't need to be checked
+        //since terminateFlag will trigger termination if the flushQ is full.
+        flushQ.offer(POISON_PILL);
+    }
+
+    @Override
+    public Boolean call() {
+        Thread.currentThread().setName("TxnLog Replicator");
+        synchronized (isStarted) {
+            isStarted.set(true);
+            isStarted.notify();
+        }
+
+        while (true) {
+            try {
+                if (terminateFlag.get()) {
+                    return true;
+                }
+
+                flushPage = null;
+                flushPage = flushQ.take();
+                if (flushPage == POISON_PILL) {
+                    continue;
+                }
+                flushPage.flush();
+                // TODO: pool large pages
+                if (flushPage.getLogBufferSize() == flushPage.getReplicationManager().getLogPageSize()) {
+                    emptyQ.offer(flushPage);
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.log(Level.SEVERE, "TxnLogReplicator is terminating abnormally. Logs Replication Stopped.",
+                            e);
+                }
+                throw e;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
index 6023cb1..62c1e4a 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
@@ -21,6 +21,7 @@ package org.apache.asterix.replication.management;
 import java.io.EOFException;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
@@ -31,6 +32,10 @@ import java.util.Enumeration;
 
 public class NetworkingUtil {
 
+    private NetworkingUtil() {
+        throw new AssertionError("This util class should not be initialized.");
+    }
+
     public static void readBytes(SocketChannel socketChannel, ByteBuffer byteBuffer, int length) throws IOException {
         byteBuffer.clear();
         byteBuffer.limit(length);
@@ -88,7 +93,8 @@ public class NetworkingUtil {
         return hostName;
     }
 
-    public static void transferBufferToChannel(SocketChannel socketChannel, ByteBuffer requestBuffer) throws IOException {
+    public static void transferBufferToChannel(SocketChannel socketChannel, ByteBuffer requestBuffer)
+            throws IOException {
         while (requestBuffer.hasRemaining()) {
             socketChannel.write(requestBuffer);
         }
@@ -107,4 +113,10 @@ public class NetworkingUtil {
         long fileSize = fileChannel.size();
         fileChannel.transferFrom(socketChannel, pos, fileSize);
     }
+
+    public static InetSocketAddress getSocketAddress(SocketChannel socketChannel) {
+        String hostAddress = socketChannel.socket().getInetAddress().getHostAddress();
+        int port = socketChannel.socket().getPort();
+        return InetSocketAddress.createUnresolved(hostAddress, port);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index a152f6c..cabfc77 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -83,6 +83,7 @@ import org.apache.hyracks.util.StorageUtil.StorageUnit;
 public class ReplicationChannel extends Thread implements IReplicationChannel {
 
     private static final Logger LOGGER = Logger.getLogger(ReplicationChannel.class.getName());
+    private static final int LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE = 1;
     private final ExecutorService replicationThreads;
     private final String localNodeID;
     private final ILogManager logManager;
@@ -91,8 +92,8 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
     private ServerSocketChannel serverSocketChannel = null;
     private final IReplicationManager replicationManager;
     private final AsterixReplicationProperties replicationProperties;
-    private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider;
-    private final static int INTIAL_BUFFER_SIZE = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
+    private final IAsterixAppRuntimeContextProvider appContextProvider;
+    private static final int INTIAL_BUFFER_SIZE = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
     private final LinkedBlockingQueue<LSMComponentLSNSyncTask> lsmComponentRemoteLSN2LocalLSNMappingTaskQ;
     private final LinkedBlockingQueue<LogRecord> pendingNotificationRemoteLogsQ;
     private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap;
@@ -110,10 +111,10 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         this.replicaResourcesManager = (ReplicaResourcesManager) replicaResoucesManager;
         this.replicationManager = replicationManager;
         this.replicationProperties = replicationProperties;
-        this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
-        lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<LSMComponentLSNSyncTask>();
+        this.appContextProvider = asterixAppRuntimeContextProvider;
+        lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<>();
         pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>();
-        lsmComponentId2PropertiesMap = new ConcurrentHashMap<String, LSMComponentProperties>();
+        lsmComponentId2PropertiesMap = new ConcurrentHashMap<>();
         replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap<>();
         lsmComponentLSNMappingService = new LSMComponentsSyncService();
         replicationNotifier = new ReplicationNotifier();
@@ -166,16 +167,17 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
 
         //clean up when all the LSM component files have been received.
         if (remainingFile == 0) {
-            if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null) {
-                //if this LSN wont be used for any other index, remove it
-                if (replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) {
-                    int remainingIndexes = replicaUniqueLSN2RemoteMapping
-                            .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet();
-                    if (remainingIndexes == 0) {
-                        //Note: there is a chance that this is never deleted because some index in the dataset was not flushed because it is empty.
-                        //This could be solved by passing only the number of successfully flushed indexes
-                        replicaUniqueLSN2RemoteMapping.remove(lsmCompProp.getNodeUniqueLSN());
-                    }
+            if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null
+                    && replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) {
+                int remainingIndexes = replicaUniqueLSN2RemoteMapping
+                        .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet();
+                if (remainingIndexes == 0) {
+                    /**
+                     * Note: there is a chance that this will never be removed because some
+                     * index in the dataset was not flushed because it is empty. This could
+                     * be solved by passing only the number of successfully flushed indexes.
+                     */
+                    replicaUniqueLSN2RemoteMapping.remove(lsmCompProp.getNodeUniqueLSN());
                 }
             }
 
@@ -242,20 +244,23 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                         case FLUSH_INDEX:
                             handleFlushIndex();
                             break;
-                        default: {
+                        default:
                             throw new IllegalStateException("Unknown replication request");
-                        }
                     }
                     replicationFunction = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
                 }
             } catch (Exception e) {
-                e.printStackTrace();
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, "Unexpectedly error during replication.", e);
+                }
             } finally {
                 if (socketChannel.isOpen()) {
                     try {
                         socketChannel.close();
                     } catch (IOException e) {
-                        e.printStackTrace();
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.log(Level.WARNING, "Filed to close replication socket.", e);
+                        }
                     }
                 }
             }
@@ -263,15 +268,17 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
 
         private void handleFlushIndex() throws IOException {
             inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
-            //1. read which indexes are requested to be flushed from remote replica
+            //read which indexes are requested to be flushed from remote replica
             ReplicaIndexFlushRequest request = ReplicationProtocol.readReplicaIndexFlushRequest(inBuffer);
             Set<Long> requestedIndexesToBeFlushed = request.getLaggingRescouresIds();
 
-            //2. check which indexes can be flushed (open indexes) and which cannot be flushed (closed or have empty memory component)
-            IDatasetLifecycleManager datasetLifeCycleManager = asterixAppRuntimeContextProvider
-                    .getDatasetLifecycleManager();
+            /**
+             * check which indexes can be flushed (open indexes) and which cannot be
+             * flushed (closed or have empty memory component).
+             */
+            IDatasetLifecycleManager datasetLifeCycleManager = appContextProvider.getDatasetLifecycleManager();
             List<IndexInfo> openIndexesInfo = datasetLifeCycleManager.getOpenIndexesInfo();
-            Set<Integer> datasetsToForceFlush = new HashSet<Integer>();
+            Set<Integer> datasetsToForceFlush = new HashSet<>();
             for (IndexInfo iInfo : openIndexesInfo) {
                 if (requestedIndexesToBeFlushed.contains(iInfo.getResourceId())) {
                     AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
@@ -281,7 +288,10 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                         //remove index to indicate that it will be flushed
                         requestedIndexesToBeFlushed.remove(iInfo.getResourceId());
                     } else if (!((AbstractLSMIndex) iInfo.getIndex()).isCurrentMutableComponentEmpty()) {
-                        //if an index has something to be flushed, then the request to flush it will succeed and we need to schedule it to be flushed.
+                        /**
+                         * if an index has something to be flushed, then the request to flush it
+                         * will succeed and we need to schedule it to be flushed.
+                         */
                         datasetsToForceFlush.add(iInfo.getDatasetId());
                         //remove index to indicate that it will be flushed
                         requestedIndexesToBeFlushed.remove(iInfo.getResourceId());
@@ -289,13 +299,13 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                 }
             }
 
-            //3. force flush datasets requested to be flushed
+            //schedule flush for datasets requested to be flushed
             for (int datasetId : datasetsToForceFlush) {
                 datasetLifeCycleManager.flushDataset(datasetId, true);
             }
 
             //the remaining indexes in the requested set are those which cannot be flushed.
-            //4. respond back to the requester that those indexes cannot be flushed
+            //respond back to the requester that those indexes cannot be flushed
             ReplicaIndexFlushRequest laggingIndexesResponse = new ReplicaIndexFlushRequest(requestedIndexesToBeFlushed);
             outBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse);
             NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
@@ -363,7 +373,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             List<String> filesList;
             Set<String> replicaIds = request.getReplicaIds();
             Set<String> requesterExistingFiles = request.getExistingFiles();
-            Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
+            Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) appContextProvider
                     .getAppContext()).getMetadataProperties().getNodePartitions();
             for (String replicaId : replicaIds) {
                 //get replica partitions
@@ -414,50 +424,70 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         }
 
         private void handleLogReplication() throws IOException, ACIDException {
-            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            //set initial buffer size to a log buffer page size
+            inBuffer = ByteBuffer.allocate(logManager.getLogPageSize());
+            while (true) {
+                //read a batch of logs
+                inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+                //check if it is end of handshake (a single byte log)
+                if (inBuffer.remaining() == LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE) {
+                    break;
+                }
 
-            //Deserialize log
-            remoteLog.readRemoteLog(inBuffer, false);
-            remoteLog.setLogSource(LogSource.REMOTE);
+                processLogsBatch(inBuffer);
+            }
+        }
 
-            switch (remoteLog.getLogType()) {
-                case LogType.UPDATE:
-                case LogType.ENTITY_COMMIT:
-                case LogType.UPSERT_ENTITY_COMMIT:
-                    //if the log partition belongs to a partitions hosted on this node, replicate it
-                    if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
+        private void processLogsBatch(ByteBuffer buffer) throws ACIDException {
+            while (buffer.hasRemaining()) {
+                //get rid of log size
+                inBuffer.getInt();
+                //Deserialize log
+                remoteLog.readRemoteLog(inBuffer);
+                remoteLog.setLogSource(LogSource.REMOTE);
+
+                switch (remoteLog.getLogType()) {
+                    case LogType.UPDATE:
+                    case LogType.ENTITY_COMMIT:
+                    case LogType.UPSERT_ENTITY_COMMIT:
+                        //if the log partition belongs to a partitions hosted on this node, replicate it
+                        if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
+                            logManager.log(remoteLog);
+                        }
+                        break;
+                    case LogType.JOB_COMMIT:
+                    case LogType.ABORT:
+                        LogRecord jobTerminationLog = new LogRecord();
+                        TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getJobId(),
+                                remoteLog.getLogType() == LogType.JOB_COMMIT);
+                        jobTerminationLog.setReplicationThread(this);
+                        jobTerminationLog.setLogSource(LogSource.REMOTE);
+                        logManager.log(jobTerminationLog);
+                        break;
+                    case LogType.FLUSH:
+                        //store mapping information for flush logs to use them in incoming LSM components.
+                        RemoteLogMapping flushLogMap = new RemoteLogMapping();
+                        flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
+                        flushLogMap.setRemoteLSN(remoteLog.getLSN());
                         logManager.log(remoteLog);
-                    }
-                    break;
-                case LogType.JOB_COMMIT:
-                case LogType.ABORT:
-                    LogRecord jobTerminationLog = new LogRecord();
-                    TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getJobId(),
-                            remoteLog.getLogType() == LogType.JOB_COMMIT);
-                    jobTerminationLog.setReplicationThread(this);
-                    jobTerminationLog.setLogSource(LogSource.REMOTE);
-                    logManager.log(jobTerminationLog);
-                    break;
-                case LogType.FLUSH:
-                    //store mapping information for flush logs to use them in incoming LSM components.
-                    RemoteLogMapping flushLogMap = new RemoteLogMapping();
-                    flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
-                    flushLogMap.setRemoteLSN(remoteLog.getLSN());
-                    logManager.log(remoteLog);
-                    //the log LSN value is updated by logManager.log(.) to a local value
-                    flushLogMap.setLocalLSN(remoteLog.getLSN());
-                    flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
-                    replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
-                    synchronized (flushLogslock) {
-                        flushLogslock.notify();
-                    }
-                    break;
-                default:
-                    LOGGER.severe("Unsupported LogType: " + remoteLog.getLogType());
+                        //the log LSN value is updated by logManager.log(.) to a local value
+                        flushLogMap.setLocalLSN(remoteLog.getLSN());
+                        flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
+                        replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
+                        synchronized (flushLogslock) {
+                            flushLogslock.notify();
+                        }
+                        break;
+                    default:
+                        LOGGER.severe("Unsupported LogType: " + remoteLog.getLogType());
+                }
             }
         }
 
-        //this method is called sequentially by LogPage (notifyReplicationTerminator) for JOB_COMMIT and JOB_ABORT log types.
+        /**
+         * this method is called sequentially by LogPage (notifyReplicationTerminator)
+         * for JOB_COMMIT and JOB_ABORT log types.
+         */
         @Override
         public void notifyLogReplicationRequester(LogRecord logRecord) {
             pendingNotificationRemoteLogsQ.offer(logRecord);
@@ -480,24 +510,27 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                 try {
                     LogRecord logRecord = pendingNotificationRemoteLogsQ.take();
                     //send ACK to requester
-                    try {
-                        logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream()
-                                .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getJobId()
-                                        + System.lineSeparator()).getBytes());
-                    } catch (IOException e) {
-                        LOGGER.severe("Failed to send job replication ACK " + logRecord.getLogRecordForDisplay());
+                    logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream()
+                            .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getJobId()
+                                    + System.lineSeparator()).getBytes());
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } catch (IOException e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.log(Level.WARNING, "Failed to send job replication ACK", e);
                     }
-                } catch (InterruptedException e1) {
-                    LOGGER.severe("ReplicationNotifier Thread interrupted.");
                 }
             }
         }
     }
 
     /**
-     * This thread is responsible for synchronizing the LSN of the received LSM components to a local LSN.
+     * This thread is responsible for synchronizing the LSN of
+     * the received LSM components to a local LSN.
      */
     private class LSMComponentsSyncService extends Thread {
+        private static final int BULKLOAD_LSN = 0;
+
         @Override
         public void run() {
             Thread.currentThread().setName("LSMComponentsSyncService Thread");
@@ -506,23 +539,24 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                 try {
                     LSMComponentLSNSyncTask syncTask = lsmComponentRemoteLSN2LocalLSNMappingTaskQ.take();
                     LSMComponentProperties lsmCompProp = lsmComponentId2PropertiesMap.get(syncTask.getComponentId());
-                    try {
-                        syncLSMComponentFlushLSN(lsmCompProp, syncTask);
-                        updateLSMComponentRemainingFiles(lsmCompProp.getComponentId());
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
+                    syncLSMComponentFlushLSN(lsmCompProp, syncTask);
+                    updateLSMComponentRemainingFiles(lsmCompProp.getComponentId());
                 } catch (InterruptedException e) {
-                    //ignore
+                    Thread.currentThread().interrupt();
+                } catch (Exception e) {
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.log(Level.SEVERE, "Unexpected exception during LSN synchronization", e);
+                    }
                 }
+
             }
         }
 
         private void syncLSMComponentFlushLSN(LSMComponentProperties lsmCompProp, LSMComponentLSNSyncTask syncTask)
-                throws Exception {
+                throws InterruptedException, IOException {
             long remoteLSN = lsmCompProp.getOriginalLSN();
             //LSN=0 (bulkload) does not need to be updated and there is no flush log corresponding to it
-            if (remoteLSN == 0) {
+            if (remoteLSN == BULKLOAD_LSN) {
                 //since this is the first LSM component of this index,
                 //then set the mapping in the LSN_MAP to the current log LSN because
                 //no other log could've been received for this index since bulkload replication is synchronous.
@@ -536,16 +570,21 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                 if (lsmCompProp.getOpType() == LSMOperationType.FLUSH) {
                     //need to look up LSN mapping from memory
                     RemoteLogMapping remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
-                    if (remoteLogMap == null) {
+                    //wait until flush log arrives, and verify the LSM component file still exists
+                    //The component file could be deleted if its NC fails.
+                    while (remoteLogMap == null && Files.exists(path)) {
                         synchronized (flushLogslock) {
-                            remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
-                            //wait until flush log arrives, and verify the LSM component file still exists
-                            //The component file could be deleted if its NC fails.
-                            while (remoteLogMap == null && Files.exists(path)) {
-                                flushLogslock.wait();
-                                remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
-                            }
+                            flushLogslock.wait();
                         }
+                        remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
+                    }
+
+                    /**
+                     * file has been deleted due to its remote primary replica failure
+                     * before its LSN could've been synchronized.
+                     */
+                    if (remoteLogMap == null) {
+                        return;
                     }
                     lsmCompProp.setReplicaLSN(remoteLogMap.getLocalLSN());
                 } else if (lsmCompProp.getOpType() == LSMOperationType.MERGE) {
@@ -554,13 +593,14 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                             .getReplicaIndexLSNMap(lsmCompProp.getReplicaComponentPath(replicaResourcesManager));
                     Long mappingLSN = lsmMap.get(lsmCompProp.getOriginalLSN());
                     if (mappingLSN == null) {
-                        /*
-                         * this shouldn't happen unless this node just recovered and the first component it received
-                         * is a merged component due to an on-going merge operation while recovery on the remote replica.
-                         * In this case, we use the current append LSN since no new records exist for this index,
-                         * otherwise they would've been flushed.
-                         * This could be prevented by waiting for any IO to finish on the remote replica during recovery.
-                         *
+                        /**
+                         * this shouldn't happen unless this node just recovered and
+                         * the first component it received is a merged component due
+                         * to an on-going merge operation while recovery on the remote
+                         * replica. In this case, we use the current append LSN since
+                         * no new records exist for this index, otherwise they would've
+                         * been flushed. This could be prevented by waiting for any IO
+                         * to finish on the remote replica during recovery.
                          */
                         mappingLSN = logManager.getAppendLSN();
                     }
@@ -569,9 +609,10 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             }
 
             if (Files.notExists(path)) {
-                /*
-                 * This could happen when a merged component arrives and deletes the flushed
-                 * component (which we are trying to update) before its flush log arrives since logs and components are received
+                /**
+                 * This could happen when a merged component arrives and deletes
+                 * the flushed component (which we are trying to update) before
+                 * its flush log arrives since logs and components are received
                  * on different threads.
                  */
                 return;
@@ -594,4 +635,4 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             }
         }
     }
-}
\ No newline at end of file
+}


Mime
View raw message