asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject [1/2] incubator-asterixdb git commit: Txn Log Replication Optimizations
Date Wed, 01 Jun 2016 19:17:03 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master 2d2a2007b -> 0cf7c329f


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index ee872a5..5ba6ad2 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -71,7 +71,7 @@ import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
 import org.apache.asterix.replication.functions.ReplicationProtocol;
 import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.logging.ReplicationLogBuffer;
-import org.apache.asterix.replication.logging.ReplicationLogFlusher;
+import org.apache.asterix.replication.logging.TxnLogReplicator;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
@@ -86,6 +86,8 @@ import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
 
 /**
  * This class is used to process replication jobs and maintain remote replicas states
@@ -93,7 +95,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 public class ReplicationManager implements IReplicationManager {
 
     private static final Logger LOGGER = Logger.getLogger(ReplicationManager.class.getName());
-    private final int INITIAL_REPLICATION_FACTOR = 1;
+    private static final int INITIAL_REPLICATION_FACTOR = 1;
     private final String nodeId;
     private ExecutorService replicationListenerThreads;
     private final Map<Integer, Set<String>> jobCommitAcks;
@@ -114,7 +116,7 @@ public class ReplicationManager implements IReplicationManager {
     private final AtomicBoolean replicationSuspended;
     private AtomicBoolean terminateJobsReplication;
     private AtomicBoolean jobsReplicationSuspended;
-    private final static int INITIAL_BUFFER_SIZE = 4000; //4KB
+    private static final int INITIAL_BUFFER_SIZE = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
     private final Set<String> shuttingDownReplicaIds;
     //replication threads
     private ReplicationJobsProccessor replicationJobsProcessor;
@@ -128,9 +130,10 @@ public class ReplicationManager implements IReplicationManager {
     private LinkedBlockingQueue<ReplicationLogBuffer> emptyLogBuffersQ;
     private LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ;
     protected ReplicationLogBuffer currentTxnLogBuffer;
-    private ReplicationLogFlusher txnlogsReplicator;
+    private TxnLogReplicator txnlogReplicator;
     private Future<? extends Object> txnLogReplicatorTask;
-    private Map<String, SocketChannel> logsReplicaSockets = null;
+    private SocketChannel[] logsRepSockets;
+    private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES);
 
     //TODO this class needs to be refactored by moving its private classes to separate files
     //and possibly using MessageBroker to send/receive remote replicas events.
@@ -143,15 +146,15 @@ public class ReplicationManager implements IReplicationManager {
         this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
         this.hostIPAddressFirstOctet = replicationProperties.getReplicaIPAddress(nodeId).substring(0,
3);
         this.logManager = logManager;
-        replicationJobsQ = new LinkedBlockingQueue<IReplicationJob>();
-        replicaEventsQ = new LinkedBlockingQueue<ReplicaEvent>();
+        replicationJobsQ = new LinkedBlockingQueue<>();
+        replicaEventsQ = new LinkedBlockingQueue<>();
         terminateJobsReplication = new AtomicBoolean(false);
         jobsReplicationSuspended = new AtomicBoolean(true);
         replicationSuspended = new AtomicBoolean(true);
-        replicas = new HashMap<String, Replica>();
-        jobCommitAcks = new ConcurrentHashMap<Integer, Set<String>>();
-        replicationJobsPendingAcks = new ConcurrentHashMap<Integer, ILogRecord>();
-        shuttingDownReplicaIds = new HashSet<String>();
+        replicas = new HashMap<>();
+        jobCommitAcks = new ConcurrentHashMap<>();
+        replicationJobsPendingAcks = new ConcurrentHashMap<>();
+        shuttingDownReplicaIds = new HashSet<>();
         dataBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
 
         //Used as async listeners from replicas
@@ -179,13 +182,14 @@ public class ReplicationManager implements IReplicationManager {
             clientPartitonsSet.addAll(clientPartitions);
             replica2PartitionsMap.put(replica.getId(), clientPartitonsSet);
         }
-        int numLogBuffers = logManager.getNumLogPages();
-        emptyLogBuffersQ = new LinkedBlockingQueue<ReplicationLogBuffer>(numLogBuffers);
-        pendingFlushLogBuffersQ = new LinkedBlockingQueue<ReplicationLogBuffer>(numLogBuffers);
+        int numLogBuffers = replicationProperties.getLogBufferNumOfPages();
+        emptyLogBuffersQ = new LinkedBlockingQueue<>(numLogBuffers);
+        pendingFlushLogBuffersQ = new LinkedBlockingQueue<>(numLogBuffers);
 
-        int logBufferSize = logManager.getLogPageSize();
+        int logBufferSize = replicationProperties.getLogBufferPageSize();
         for (int i = 0; i < numLogBuffers; i++) {
-            emptyLogBuffersQ.offer(new ReplicationLogBuffer(this, logBufferSize));
+            emptyLogBuffersQ
+                    .offer(new ReplicationLogBuffer(this, logBufferSize, replicationProperties.getLogBatchSize()));
         }
     }
 
@@ -200,7 +204,7 @@ public class ReplicationManager implements IReplicationManager {
                     try {
                         replicationSuspended.wait();
                     } catch (InterruptedException e) {
-                        //ignore
+                        Thread.currentThread().interrupt();
                     }
                 }
             }
@@ -209,16 +213,12 @@ public class ReplicationManager implements IReplicationManager {
     }
 
     @Override
-    public void replicateLog(ILogRecord logRecord) {
+    public void replicateLog(ILogRecord logRecord) throws InterruptedException {
         if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)
{
             //if replication is suspended, wait until it is resumed.
             while (replicationSuspended.get()) {
                 synchronized (replicationSuspended) {
-                    try {
-                        replicationSuspended.wait();
-                    } catch (InterruptedException e) {
-                        //ignore
-                    }
+                    replicationSuspended.wait();
                 }
             }
             Set<String> replicaIds = Collections.synchronizedSet(new HashSet<String>());
@@ -232,29 +232,23 @@ public class ReplicationManager implements IReplicationManager {
     protected void getAndInitNewLargePage(int pageSize) {
         // for now, alloc a new buffer for each large page
         // TODO: consider pooling large pages
-        currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize);
-        currentTxnLogBuffer.setReplicationSockets(logsReplicaSockets);
+        currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize, replicationProperties.getLogBufferPageSize());
         pendingFlushLogBuffersQ.offer(currentTxnLogBuffer);
     }
 
-    protected void getAndInitNewPage() {
+    protected void getAndInitNewPage() throws InterruptedException {
         currentTxnLogBuffer = null;
         while (currentTxnLogBuffer == null) {
-            try {
-                currentTxnLogBuffer = emptyLogBuffersQ.take();
-            } catch (InterruptedException e) {
-                //ignore
-            }
+            currentTxnLogBuffer = emptyLogBuffersQ.take();
         }
         currentTxnLogBuffer.reset();
-        currentTxnLogBuffer.setReplicationSockets(logsReplicaSockets);
         pendingFlushLogBuffersQ.offer(currentTxnLogBuffer);
     }
 
-    private synchronized void appendToLogBuffer(ILogRecord logRecord) {
+    private synchronized void appendToLogBuffer(ILogRecord logRecord) throws InterruptedException
{
         if (!currentTxnLogBuffer.hasSpace(logRecord)) {
             currentTxnLogBuffer.isFull(true);
-            if (logRecord.getLogSize() > logManager.getLogPageSize()) {
+            if (logRecord.getLogSize() > getLogPageSize()) {
                 getAndInitNewLargePage(logRecord.getLogSize());
             } else {
                 getAndInitNewPage();
@@ -326,7 +320,10 @@ public class ReplicationManager implements IReplicationManager {
                             long fileSize = fileChannel.size();
 
                             if (LSMComponentJob != null) {
-                                //since this is LSM_COMPONENT REPLICATE job, the job will
contain only the component being replicated.
+                                /**
+                                 * since this is LSM_COMPONENT REPLICATE job, the job will
contain
+                                 * only the component being replicated.
+                                 */
                                 ILSMComponent diskComponent = LSMComponentJob.getLSMIndexOperationContext()
                                         .getComponentsToBeReplicated().get(0);
                                 long LSNByteOffset = AsterixLSMIndexUtil.getComponentFileLSNOffset(
@@ -362,7 +359,7 @@ public class ReplicationManager implements IReplicationManager {
                                         }
                                     }
                                 } catch (IOException e) {
-                                    reportFailedReplica(entry.getKey());
+                                    handleReplicationFailure(socketChannel, e);
                                     iterator.remove();
                                 } finally {
                                     requestBuffer.position(0);
@@ -392,7 +389,7 @@ public class ReplicationManager implements IReplicationManager {
                                     waitForResponse(socketChannel, responseBuffer);
                                 }
                             } catch (IOException e) {
-                                reportFailedReplica(entry.getKey());
+                                handleReplicationFailure(socketChannel, e);
                                 iterator.remove();
                             } finally {
                                 requestBuffer.position(0);
@@ -458,7 +455,7 @@ public class ReplicationManager implements IReplicationManager {
     }
 
     /**
-     * Suspends proccessing replication jobs.
+     * Suspends processing replication jobs/logs.
      *
      * @param force
      *            a flag indicates if replication should be suspended right away or when
the pending jobs are completed.
@@ -477,60 +474,134 @@ public class ReplicationManager implements IReplicationManager {
                     try {
                         jobsReplicationSuspended.wait();
                     } catch (InterruptedException e) {
-                        //ignore
+                        Thread.currentThread().interrupt();
                     }
                 }
             }
         }
 
         //suspend logs replication
-        if (txnlogsReplicator != null) {
-            terminateTxnLogsReplicator();
+        if (txnlogReplicator != null) {
+            endTxnLogReplicationHandshake();
         }
     }
 
     /**
      * Opens a new connection with Active remote replicas and starts a listen thread per
connection.
      */
-    private void establishTxnLogsReplicationConnection() {
-        logsReplicaSockets = getActiveRemoteReplicasSockets();
+    private void establishTxnLogReplicationHandshake() {
+        Map<String, SocketChannel> activeRemoteReplicasSockets = getActiveRemoteReplicasSockets();
+        logsRepSockets = new SocketChannel[activeRemoteReplicasSockets.size()];
+        int i = 0;
         //start a listener thread per connection
-        for (Entry<String, SocketChannel> entry : logsReplicaSockets.entrySet()) {
+        for (Entry<String, SocketChannel> entry : activeRemoteReplicasSockets.entrySet())
{
+            logsRepSockets[i] = entry.getValue();
             replicationListenerThreads
                     .execute(new TxnLogsReplicationResponseListener(entry.getKey(), entry.getValue()));
+            i++;
+        }
+
+        /**
+         * establish log replication handshake
+         */
+        ByteBuffer handshakeBuffer = ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE)
+                .putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
+        handshakeBuffer.flip();
+        //send handshake request
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                NetworkingUtil.transferBufferToChannel(replicaSocket, handshakeBuffer);
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                handshakeBuffer.position(0);
+            }
         }
     }
 
+    private void handleReplicationFailure(SocketChannel socketChannel, Throwable t) {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.log(Level.WARNING, "Could not complete replication request.", t);
+        }
+        if (socketChannel.isOpen()) {
+            try {
+                socketChannel.close();
+            } catch (IOException e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, "Could not close socket.", e);
+                }
+            }
+        }
+        reportFailedReplica(getReplicaIdBySocket(socketChannel));
+    }
+
     /**
-     * Stops ReplicationFlusherThread and closes the sockets used to replicate logs.
+     * Stops TxnLogReplicator and closes the sockets used to replicate logs.
      */
-    private void terminateTxnLogsReplicator() {
-        LOGGER.log(Level.INFO, "Terminating ReplicationLogFlusher thread ...");
-        txnlogsReplicator.terminate();
+    private void endTxnLogReplicationHandshake() {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Terminating TxnLogReplicator thread ...");
+        }
+        txnlogReplicator.terminate();
         try {
             txnLogReplicatorTask.get();
         } catch (ExecutionException | InterruptedException e) {
-            LOGGER.log(Level.WARNING, "RepicationLogFlusher thread terminated abnormally");
-            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.log(Level.SEVERE, "TxnLogReplicator thread terminated abnormally",
e);
+            }
+        }
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("TxnLogReplicator thread was terminated.");
+        }
+
+        /**
+         * End log replication handshake (by sending a dummy log with a single byte)
+         */
+        ByteBuffer endLogRepHandshake = ByteBuffer.allocate(Integer.SIZE + 1).putInt(1).put((byte)
0);
+        endLogRepHandshake.flip();
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                NetworkingUtil.transferBufferToChannel(replicaSocket, endLogRepHandshake);
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                endLogRepHandshake.position(0);
+            }
         }
-        LOGGER.log(Level.INFO, "LogFlusher thread is terminated.");
 
-        if (logsReplicaSockets != null) {
-            //wait for any ACK to arrive before closing sockets.
+        //wait for any ACK to arrive before closing sockets.
+        if (logsRepSockets != null) {
             synchronized (jobCommitAcks) {
-                while (jobCommitAcks.size() != 0) {
-                    try {
+                try {
+                    while (jobCommitAcks.size() != 0) {
                         jobCommitAcks.wait();
-                    } catch (InterruptedException e) {
-                        //ignore
                     }
+                } catch (InterruptedException e) {
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.log(Level.SEVERE, "Interrupted while waiting for jobs ACK",
e);
+                    }
+                    Thread.currentThread().interrupt();
                 }
             }
+        }
 
-            //close log replication sockets
-            closeReplicaSockets(logsReplicaSockets);
-            logsReplicaSockets = null;
+        /**
+         * Close log replication sockets
+         */
+        ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                //send goodbye to remote replica
+                NetworkingUtil.transferBufferToChannel(replicaSocket, goodbyeBuffer);
+                replicaSocket.close();
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                goodbyeBuffer.position(0);
+            }
         }
+        logsRepSockets = null;
     }
 
     /**
@@ -567,14 +638,7 @@ public class ReplicationManager implements IReplicationManager {
             try {
                 NetworkingUtil.transferBufferToChannel(clientSocket, requestBuffer);
             } catch (IOException e) {
-                if (clientSocket.isOpen()) {
-                    try {
-                        clientSocket.close();
-                    } catch (IOException e2) {
-                        e2.printStackTrace();
-                    }
-                }
-                reportFailedReplica(replicaSocket.getKey());
+                handleReplicationFailure(clientSocket, e);
                 iterator.remove();
             } finally {
                 requestBuffer.position(0);
@@ -600,7 +664,7 @@ public class ReplicationManager implements IReplicationManager {
                 try {
                     clientSocket.close();
                 } catch (IOException e) {
-                    e.printStackTrace();
+                    handleReplicationFailure(clientSocket, e);
                 }
             }
         }
@@ -636,7 +700,7 @@ public class ReplicationManager implements IReplicationManager {
                 try {
                     Thread.sleep(1000);
                 } catch (InterruptedException e) {
-                    e.printStackTrace();
+                    Thread.currentThread().interrupt();
                 }
             }
         }
@@ -651,8 +715,10 @@ public class ReplicationManager implements IReplicationManager {
      *            The new state of the replica.
      * @param suspendReplication
      *            a flag indicating whether to suspend replication on state change or not.
+     * @throws InterruptedException
      */
-    public synchronized void updateReplicaState(String replicaId, ReplicaState newState,
boolean suspendReplication) {
+    public synchronized void updateReplicaState(String replicaId, ReplicaState newState,
boolean suspendReplication)
+            throws InterruptedException {
         Replica replica = replicas.get(replicaId);
 
         if (replica.getState() == newState) {
@@ -680,10 +746,8 @@ public class ReplicationManager implements IReplicationManager {
 
         if (newState == ReplicaState.ACTIVE) {
             replicationFactor++;
-        } else if (newState == ReplicaState.DEAD) {
-            if (replicationFactor > INITIAL_REPLICATION_FACTOR) {
-                replicationFactor--;
-            }
+        } else if (newState == ReplicaState.DEAD && replicationFactor > INITIAL_REPLICATION_FACTOR)
{
+            replicationFactor--;
         }
 
         LOGGER.log(Level.WARNING, "Replica " + replicaId + " state changed to: " + newState.name()
@@ -702,22 +766,24 @@ public class ReplicationManager implements IReplicationManager {
      *            The remote replica id the ACK received from.
      */
     private void addAckToJob(int jobId, String replicaId) {
-        //add ACK to the job
-        if (jobCommitAcks.containsKey(jobId)) {
-            Set<String> replicaIds = jobCommitAcks.get(jobId);
-            replicaIds.add(replicaId);
-        } else {
-            throw new IllegalStateException("Job ID not found in pending job commits  " +
jobId);
-        }
+        synchronized (jobCommitAcks) {
+            //add ACK to the job
+            if (jobCommitAcks.containsKey(jobId)) {
+                Set<String> replicaIds = jobCommitAcks.get(jobId);
+                replicaIds.add(replicaId);
+            } else {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Invalid job replication ACK received for jobId(" + jobId
+ ")");
+                }
+                return;
+            }
 
-        //if got ACKs from all remote replicas, notify pending jobs if any
-        if (jobCommitAcks.get(jobId).size() == replicationFactor) {
-            synchronized (replicationJobsPendingAcks) {
-                if (replicationJobsPendingAcks.containsKey(jobId)) {
-                    ILogRecord pendingLog = replicationJobsPendingAcks.get(jobId);
-                    synchronized (pendingLog) {
-                        pendingLog.notify();
-                    }
+            //if got ACKs from all remote replicas, notify pending jobs if any
+
+            if (jobCommitAcks.get(jobId).size() == replicationFactor && replicationJobsPendingAcks.containsKey(jobId))
{
+                ILogRecord pendingLog = replicationJobsPendingAcks.get(jobId);
+                synchronized (pendingLog) {
+                    pendingLog.notify();
                 }
             }
         }
@@ -725,26 +791,25 @@ public class ReplicationManager implements IReplicationManager {
 
     @Override
     public boolean hasBeenReplicated(ILogRecord logRecord) {
-        if (jobCommitAcks.containsKey(logRecord.getJobId())) {
-            //check if all ACKs have been received
-            if (jobCommitAcks.get(logRecord.getJobId()).size() == replicationFactor) {
-                jobCommitAcks.remove(logRecord.getJobId());
+        int jobId = logRecord.getJobId();
+        if (jobCommitAcks.containsKey(jobId)) {
+            synchronized (jobCommitAcks) {
+                //check if all ACKs have been received
+                if (jobCommitAcks.get(jobId).size() == replicationFactor) {
+                    jobCommitAcks.remove(jobId);
 
-                if (replicationJobsPendingAcks.containsKey(logRecord.getJobId())) {
-                    replicationJobsPendingAcks.remove(logRecord);
-                }
+                    //remove from pending jobs if exists
+                    replicationJobsPendingAcks.remove(jobId);
 
-                //notify any threads waiting for all jobs to finish
-                if (jobCommitAcks.size() == 0) {
-                    synchronized (jobCommitAcks) {
+                    //notify any threads waiting for all jobs to finish
+                    if (jobCommitAcks.size() == 0) {
                         jobCommitAcks.notifyAll();
                     }
+                    return true;
+                } else {
+                    replicationJobsPendingAcks.putIfAbsent(jobId, logRecord);
+                    return false;
                 }
-
-                return true;
-            } else {
-                replicationJobsPendingAcks.putIfAbsent(logRecord.getJobId(), logRecord);
-                return false;
             }
         }
 
@@ -753,13 +818,16 @@ public class ReplicationManager implements IReplicationManager {
     }
 
     private Map<String, SocketChannel> getActiveRemoteReplicasSockets() {
-        Map<String, SocketChannel> replicaNodesSockets = new HashMap<String, SocketChannel>();
+        Map<String, SocketChannel> replicaNodesSockets = new HashMap<>();
         for (Replica replica : replicas.values()) {
             if (replica.getState() == ReplicaState.ACTIVE) {
                 try {
                     SocketChannel sc = getReplicaSocket(replica.getId());
                     replicaNodesSockets.put(replica.getId(), sc);
                 } catch (IOException e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.log(Level.WARNING, "Could not get replica socket", e);
+                    }
                     reportFailedReplica(replica.getId());
                 }
             }
@@ -776,7 +844,7 @@ public class ReplicationManager implements IReplicationManager {
      * @throws IOException
      */
     private SocketChannel getReplicaSocket(String replicaId) throws IOException {
-        Replica replica = replicas.get(replicaId);
+        Replica replica = replicationProperties.getReplicaById(replicaId);
         SocketChannel sc = SocketChannel.open();
         sc.configureBlocking(true);
         InetSocketAddress address = replica.getAddress(replicationProperties);
@@ -786,7 +854,7 @@ public class ReplicationManager implements IReplicationManager {
 
     @Override
     public Set<String> getDeadReplicasIds() {
-        Set<String> replicasIds = new HashSet<String>();
+        Set<String> replicasIds = new HashSet<>();
         for (Replica replica : replicas.values()) {
             if (replica.getState() == ReplicaState.DEAD) {
                 replicasIds.add(replica.getNode().getId());
@@ -797,7 +865,7 @@ public class ReplicationManager implements IReplicationManager {
 
     @Override
     public Set<String> getActiveReplicasIds() {
-        Set<String> replicasIds = new HashSet<String>();
+        Set<String> replicasIds = new HashSet<>();
         for (Replica replica : replicas.values()) {
             if (replica.getState() == ReplicaState.ACTIVE) {
                 replicasIds.add(replica.getNode().getId());
@@ -823,40 +891,35 @@ public class ReplicationManager implements IReplicationManager {
 
     /**
      * Called during NC shutdown to notify remote replicas about the shutdown
-     * and wait for remote replicas shutdown notification then closes the local replication
channel.
+     * and wait for remote replicas shutdown notification then closes the local
+     * replication channel.
      */
     @Override
     public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
-        try {
-            //stop replication thread afters all jobs/logs have been processed
-            suspendReplication(false);
-            //send shutdown event to remote replicas
-            sendShutdownNotifiction();
-            //wait until all shutdown events come from all remote replicas
-            synchronized (shuttingDownReplicaIds) {
-                while (!shuttingDownReplicaIds.containsAll(getActiveReplicasIds())) {
-                    try {
-                        shuttingDownReplicaIds.wait(1000);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
+        //stop replication thread afters all jobs/logs have been processed
+        suspendReplication(false);
+        //send shutdown event to remote replicas
+        sendShutdownNotifiction();
+        //wait until all shutdown events come from all remote replicas
+        synchronized (shuttingDownReplicaIds) {
+            while (!shuttingDownReplicaIds.containsAll(getActiveReplicasIds())) {
+                try {
+                    shuttingDownReplicaIds.wait(1000);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
                 }
             }
-            LOGGER.log(Level.INFO, "Got shutdown notification from all remote replicas");
-            //close replication channel
-            asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
-
-            LOGGER.log(Level.INFO, "Replication manager stopped.");
-        } catch (Exception e) {
-            e.printStackTrace();
         }
+        LOGGER.log(Level.INFO, "Got shutdown notification from all remote replicas");
+        //close replication channel
+        asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
+
+        LOGGER.log(Level.INFO, "Replication manager stopped.");
     }
 
     @Override
     public void reportReplicaEvent(ReplicaEvent event) {
-        synchronized (replicaEventsQ) {
-            replicaEventsQ.offer(event);
-        }
+        replicaEventsQ.offer(event);
     }
 
     /**
@@ -867,6 +930,9 @@ public class ReplicationManager implements IReplicationManager {
      */
     public void reportFailedReplica(String replicaId) {
         Replica replica = replicas.get(replicaId);
+        if (replica == null) {
+            return;
+        }
         if (replica.getState() == ReplicaState.DEAD) {
             return;
         }
@@ -878,16 +944,28 @@ public class ReplicationManager implements IReplicationManager {
         reportReplicaEvent(event);
     }
 
+    private String getReplicaIdBySocket(SocketChannel socketChannel) {
+        InetSocketAddress socketAddress = NetworkingUtil.getSocketAddress(socketChannel);
+        for (Replica replica : replicas.values()) {
+            InetSocketAddress replicaAddress = replica.getAddress(replicationProperties);
+            if (replicaAddress.getHostName().equals(socketAddress.getHostName())
+                    && replicaAddress.getPort() == socketAddress.getPort()) {
+                return replica.getId();
+            }
+        }
+        return null;
+    }
+
     @Override
-    public void startReplicationThreads() {
+    public void startReplicationThreads() throws InterruptedException {
         replicationJobsProcessor = new ReplicationJobsProccessor();
 
         //start/continue processing jobs/logs
-        if (logsReplicaSockets == null) {
-            establishTxnLogsReplicationConnection();
+        if (logsRepSockets == null) {
+            establishTxnLogReplicationHandshake();
             getAndInitNewPage();
-            txnlogsReplicator = new ReplicationLogFlusher(emptyLogBuffersQ, pendingFlushLogBuffersQ);
-            txnLogReplicatorTask = asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogsReplicator);
+            txnlogReplicator = new TxnLogReplicator(emptyLogBuffersQ, pendingFlushLogBuffersQ);
+            txnLogReplicatorTask = asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogReplicator);
         }
 
         replicationJobsProcessor.start();
@@ -936,7 +1014,11 @@ public class ReplicationManager implements IReplicationManager {
                     ReplicationProtocol.sendGoodbye(socketChannel);
                 }
 
-                //4. update the LSN_MAP for indexes that were not flushed to the current
append LSN to indicate no operations happend.
+                /**
+                 * 4. update the LSN_MAP for indexes that were not flushed
+                 * to the current append LSN to indicate no operations happened
+                 * since the checkpoint start.
+                 */
                 if (laggingIndexesResponse != null) {
                     for (Long resouceId : laggingIndexesResponse.getLaggingRescouresIds())
{
                         String indexPath = laggingIndexes.get(resouceId);
@@ -955,7 +1037,7 @@ public class ReplicationManager implements IReplicationManager {
         long maxRemoteLSN = 0;
 
         ReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer);
-        Map<String, SocketChannel> replicaSockets = new HashMap<String, SocketChannel>();
+        Map<String, SocketChannel> replicaSockets = new HashMap<>();
         try {
             for (String replicaId : remoteReplicas) {
                 replicaSockets.put(replicaId, getReplicaSocket(replicaId));
@@ -1037,7 +1119,42 @@ public class ReplicationManager implements IReplicationManager {
     }
 
     public int getLogPageSize() {
-        return logManager.getLogPageSize();
+        return replicationProperties.getLogBufferPageSize();
+    }
+
+    @Override
+    public void replicateTxnLogBatch(final ByteBuffer buffer) {
+        //if replication is suspended, wait until it is resumed
+        try {
+            while (replicationSuspended.get()) {
+                synchronized (replicationSuspended) {
+                    replicationSuspended.wait();
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        //prepare the batch size buffer
+        txnLogsBatchSizeBuffer.clear();
+        txnLogsBatchSizeBuffer.putInt(buffer.remaining());
+        txnLogsBatchSizeBuffer.flip();
+
+        buffer.mark();
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                //send batch size
+                NetworkingUtil.transferBufferToChannel(replicaSocket, txnLogsBatchSizeBuffer);
+                //send log
+                NetworkingUtil.transferBufferToChannel(replicaSocket, buffer);
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                txnLogsBatchSizeBuffer.position(0);
+                buffer.reset();
+            }
+        }
+        //move the buffer position to the sent limit
+        buffer.position(buffer.limit());
     }
 
     //supporting classes
@@ -1068,12 +1185,12 @@ public class ReplicationManager implements IReplicationManager {
                             break;
                     }
                 } catch (InterruptedException e) {
-                    //ignore
+                    Thread.currentThread().interrupt();
                 }
             }
         }
 
-        public void handleReplicaFailure(String replicaId) {
+        public void handleReplicaFailure(String replicaId) throws InterruptedException {
             Replica replica = replicas.get(replicaId);
 
             if (replica.getState() == ReplicaState.DEAD) {
@@ -1127,12 +1244,16 @@ public class ReplicationManager implements IReplicationManager {
                     processJob(job, replicaSockets, reusableBuffer);
 
                     //if no more jobs to process, close sockets
-                    if (replicationJobsQ.size() == 0) {
+                    if (replicationJobsQ.isEmpty()) {
                         LOGGER.log(Level.INFO, "No pending replication jobs. Closing connections
to replicas");
                         closeSockets();
                     }
-                } catch (Exception e) {
-                    e.printStackTrace();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } catch (IOException e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.log(Level.WARNING, "Couldn't complete processing replication
job", e);
+                    }
                 }
             }
 
@@ -1169,25 +1290,25 @@ public class ReplicationManager implements IReplicationManager {
             Thread.currentThread().setName("TxnLogs Replication Listener Thread");
             LOGGER.log(Level.INFO, "Started listening on socket: " + replicaSocket.socket().getRemoteSocketAddress());
 
-            try {
-                BufferedReader incomingResponse = new BufferedReader(
-                        new InputStreamReader(replicaSocket.socket().getInputStream()));
-                String responseLine = "";
+            try (BufferedReader incomingResponse = new BufferedReader(
+                    new InputStreamReader(replicaSocket.socket().getInputStream()))) {
                 while (true) {
-                    responseLine = incomingResponse.readLine();
+                    String responseLine = incomingResponse.readLine();
                     if (responseLine == null) {
                         break;
                     }
                     //read ACK for job commit log
-                    String replicaId = ReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
+                    String ackFrom = ReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
                     int jobId = ReplicationProtocol.getJobIdFromLogAckMessage(responseLine);
-                    addAckToJob(jobId, replicaId);
+                    addAckToJob(jobId, ackFrom);
+                }
+            } catch (AsynchronousCloseException e) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.log(Level.INFO, "Replication listener stopped for remote replica:
" + replicaId, e);
                 }
-            } catch (AsynchronousCloseException e1) {
-                LOGGER.log(Level.INFO, "Replication listener stopped for remote replica:
" + replicaId);
-            } catch (IOException e2) {
-                reportFailedReplica(replicaId);
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
             }
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 47e60b2..4da5fd4 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -63,8 +63,8 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
 
         Set<String> nodes = replicationProperties.getNodeReplicationClients(localNodeId);
 
-        Map<String, Set<String>> recoveryCandidates = new HashMap<String,
Set<String>>();
-        Map<String, Integer> candidatesScore = new HashMap<String, Integer>();
+        Map<String, Set<String>> recoveryCandidates = new HashMap<>();
+        Map<String, Integer> candidatesScore = new HashMap<>();
 
         //2. identify which nodes has backup per lost node data
         for (String node : nodes) {
@@ -80,7 +80,7 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
             }
 
             //no active replicas to recover from
-            if (locations.size() == 0) {
+            if (locations.isEmpty()) {
                 throw new IllegalStateException("Could not find any ACTIVE replica to recover
" + node + " data.");
             }
 
@@ -94,7 +94,7 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
             recoveryCandidates.put(node, locations);
         }
 
-        Map<String, Set<String>> recoveryList = new HashMap<String, Set<String>>();
+        Map<String, Set<String>> recoveryList = new HashMap<>();
 
         //3. find best candidate to recover from per lost replica data
         for (Entry<String, Set<String>> entry : recoveryCandidates.entrySet())
{
@@ -113,7 +113,7 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
             if (recoveryList.containsKey(winner)) {
                 recoveryList.get(winner).add(entry.getKey());
             } else {
-                Set<String> nodesToRecover = new HashSet<String>();
+                Set<String> nodesToRecover = new HashSet<>();
                 nodesToRecover.add(entry.getKey());
                 recoveryList.put(winner, nodesToRecover);
             }
@@ -196,15 +196,16 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager
{
                 }
                 break;
             } catch (IOException e) {
-                e.printStackTrace();
-                LOGGER.log(Level.WARNING, "Failed during remote recovery. Attempting again...");
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, "Failed during remote recovery. Attempting
again...", e);
+                }
                 maxRecoveryAttempts--;
             }
         }
     }
 
     @Override
-    public void completeFailbackProcess() throws IOException {
+    public void completeFailbackProcess() throws IOException, InterruptedException {
         ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager();
         ReplicaResourcesManager replicaResourcesManager = (ReplicaResourcesManager) runtimeContext
                 .getReplicaResourcesManager();
@@ -237,7 +238,9 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
              * in case of failure during failback completion process we need to construct
a new plan
              * and get all the files from the start since the remote replicas will change
in the new plan.
              */
-            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.log(Level.WARNING, "Failed during completing failback. Restarting
failback process...", e);
+            }
             startFailbackProcess();
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 872adcd..1245674 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -76,9 +76,9 @@ public class LogBuffer implements ILogBuffer {
         appendOffset = 0;
         flushOffset = 0;
         isLastPage = false;
-        syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
-        flushQ = new LinkedBlockingQueue<ILogRecord>();
-        remoteJobsQ = new LinkedBlockingQueue<ILogRecord>();
+        syncCommitQ = new LinkedBlockingQueue<>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
+        flushQ = new LinkedBlockingQueue<>();
+        remoteJobsQ = new LinkedBlockingQueue<>();
         reusableDsId = new DatasetId(-1);
         reusableJobId = new JobId(-1);
     }
@@ -113,7 +113,7 @@ public class LogBuffer implements ILogBuffer {
 
     @Override
     public void appendWithReplication(ILogRecord logRecord, long appendLSN) {
-        logRecord.writeLogRecord(appendBuffer, appendLSN);
+        logRecord.writeLogRecord(appendBuffer);
 
         if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType()
!= LogType.FLUSH
                 && logRecord.getLogType() != LogType.WAIT) {
@@ -135,10 +135,9 @@ public class LogBuffer implements ILogBuffer {
                     logRecord.isFlushed(false);
                     flushQ.offer(logRecord);
                 }
-            } else if (logRecord.getLogSource() == LogSource.REMOTE) {
-                if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType()
== LogType.ABORT) {
-                    remoteJobsQ.offer(logRecord);
-                }
+            } else if (logRecord.getLogSource() == LogSource.REMOTE
+                    && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType()
== LogType.ABORT)) {
+                remoteJobsQ.offer(logRecord);
             }
             this.notify();
         }
@@ -347,11 +346,7 @@ public class LogBuffer implements ILogBuffer {
         IReplicationThread replicationThread = logRecord.getReplicationThread();
 
         if (replicationThread != null) {
-            try {
-                replicationThread.notifyLogReplicationRequester(logRecord);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
+            replicationThread.notifyLogReplicationRequester(logRecord);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0cf7c329/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index cacd036..0c4cb88 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -37,10 +37,6 @@ public class LogManagerWithReplication extends LogManager {
 
     @Override
     public void log(ILogRecord logRecord) throws ACIDException {
-        if (logRecord.getLogSize() > logPageSize) {
-            throw new IllegalStateException();
-        }
-
         //only locally generated logs should be replicated
         logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType()
!= LogType.WAIT);
 
@@ -58,7 +54,11 @@ public class LogManagerWithReplication extends LogManager {
         syncAppendToLogTail(logRecord);
 
         if (logRecord.isReplicated()) {
-            replicationManager.replicateLog(logRecord);
+            try {
+                replicationManager.replicateLog(logRecord);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
         }
 
         if (logRecord.getLogSource() == LogSource.LOCAL) {
@@ -69,7 +69,7 @@ public class LogManagerWithReplication extends LogManager {
                         try {
                             logRecord.wait();
                         } catch (InterruptedException e) {
-                            //ignore
+                            Thread.currentThread().interrupt();
                         }
                     }
 
@@ -79,7 +79,7 @@ public class LogManagerWithReplication extends LogManager {
                             try {
                                 logRecord.wait();
                             } catch (InterruptedException e) {
-                                //ignore
+                                Thread.currentThread().interrupt();
                             }
                         }
                     }



Mime
View raw message