asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject [05/10] asterixdb git commit: [ASTERIXDB-2195][REPL] Replace Static Replication
Date Fri, 05 Jan 2018 16:10:33 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 6445345..727a379 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -18,1342 +18,120 @@
  */
 package org.apache.asterix.replication.management;
 
-import java.io.BufferedReader;
-import java.io.File;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
 
-import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.replication.IPartitionReplica;
-import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.replication.IReplicationDestination;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.common.replication.Replica.ReplicaState;
-import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.replication.ReplicationStrategyFactory;
-import org.apache.asterix.common.storage.DatasetResourceReference;
-import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
-import org.apache.asterix.common.storage.ResourceReference;
-import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
-import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.replication.functions.ReplicaFilesRequest;
-import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
-import org.apache.asterix.replication.logging.ReplicationLogBuffer;
-import org.apache.asterix.replication.logging.TxnLogReplicator;
-import org.apache.asterix.replication.storage.LSMComponentProperties;
-import org.apache.asterix.replication.storage.LSMIndexFileProperties;
-import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
-import org.apache.hyracks.api.application.INCServiceContext;
-import org.apache.hyracks.api.config.IApplicationConfig;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.asterix.replication.api.ReplicationDestination;
 import org.apache.hyracks.api.replication.IReplicationJob;
-import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
-import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
-import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
-import org.apache.hyracks.control.common.controllers.NCConfig;
-import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
-import org.apache.hyracks.util.StorageUtil;
-import org.apache.hyracks.util.StorageUtil.StorageUnit;
-import org.apache.logging.log4j.Level;
+import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-/**
- * This class is used to process replication jobs and maintain remote replicas states
- */
+@ThreadSafe
 public class ReplicationManager implements IReplicationManager {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final int INITIAL_REPLICATION_FACTOR = 1;
-    private static final int MAX_JOB_COMMIT_ACK_WAIT = 10000;
-    private final String nodeId;
-    private ExecutorService replicationListenerThreads;
-    private final Map<Long, Set<String>> txnCommitAcks;
-    private final Map<Long, ILogRecord> replicationTxnsPendingAcks;
-    private ByteBuffer dataBuffer;
-    private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ;
-    private final LinkedBlockingQueue<ReplicaEvent> replicaEventsQ;
-
-    private int replicationFactor = 1;
-    private final ReplicaResourcesManager replicaResourcesManager;
-    private final ILogManager logManager;
-    private final IAppRuntimeContextProvider asterixAppRuntimeContextProvider;
+    private final Map<InetSocketAddress, ReplicationDestination> dests = new HashMap<>();
     private final ReplicationProperties replicationProperties;
-    private final Map<String, Replica> replicas;
-    private final Map<String, Set<Integer>> replica2PartitionsMap;
-
-    private final AtomicBoolean replicationSuspended;
-    private AtomicBoolean terminateJobsReplication;
-    private AtomicBoolean jobsReplicationSuspended;
-    private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUnit.KILOBYTE);
-    private final Set<String> shuttingDownReplicaIds;
-    //replication threads
-    private ReplicationJobsProccessor replicationJobsProcessor;
-    private final ReplicasEventsMonitor replicationMonitor;
-    //dummy job used to stop ReplicationJobsProccessor thread.
-    private static final IReplicationJob REPLICATION_JOB_POISON_PILL = new ReplicationJob(ReplicationJobType.METADATA,
-            ReplicationOperation.REPLICATE, ReplicationExecutionType.ASYNC, null);
-    //used to identify the correct IP address when the node has multiple network interfaces
-    private String hostIPAddressFirstOctet = null;
+    private final IReplicationStrategy strategy;
+    private final INcApplicationContext appCtx;
+    private final LogReplicationManager logReplicationManager;
+    private final IndexReplicationManager lsnIndexReplicationManager;
 
-    private LinkedBlockingQueue<ReplicationLogBuffer> emptyLogBuffersQ;
-    private LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ;
-    protected ReplicationLogBuffer currentTxnLogBuffer;
-    private TxnLogReplicator txnlogReplicator;
-    private Future<? extends Object> txnLogReplicatorTask;
-    private SocketChannel[] logsRepSockets;
-    private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES);
-    private IReplicationStrategy replicationStrategy;
-    private final PersistentLocalResourceRepository localResourceRepo;
-    private NCConfig ncConfig;
-    private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
-
-    //TODO this class needs to be refactored by moving its private classes to separate files
-    //and possibly using MessageBroker to send/receive remote replicas events.
-    public ReplicationManager(String nodeId, ReplicationProperties replicationProperties,
-            IReplicaResourcesManager remoteResoucesManager, ILogManager logManager,
-            IAppRuntimeContextProvider asterixAppRuntimeContextProvider, INCServiceContext ncServiceContext) {
-        this.nodeId = nodeId;
-        this.ncConfig = ((NodeControllerService) ncServiceContext.getControllerService()).getConfiguration();
+    public ReplicationManager(INcApplicationContext appCtx, ReplicationProperties replicationProperties) {
         this.replicationProperties = replicationProperties;
-        try {
-            replicationStrategy = ReplicationStrategyFactory.create(replicationProperties.getReplicationStrategy(),
-                    replicationProperties, ncConfig.getConfigManager());
-        } catch (HyracksDataException e) {
-            LOGGER.log(Level.WARN, "Couldn't initialize replication strategy", e);
-        }
-        this.replicaResourcesManager = (ReplicaResourcesManager) remoteResoucesManager;
-        this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
-        this.logManager = logManager;
-        localResourceRepo =
-                (PersistentLocalResourceRepository) asterixAppRuntimeContextProvider.getLocalResourceRepository();
-        this.hostIPAddressFirstOctet = ncConfig.getPublicAddress().substring(0, 3);
-        this.indexCheckpointManagerProvider =
-                asterixAppRuntimeContextProvider.getAppContext().getIndexCheckpointManagerProvider();
-        replicas = new HashMap<>();
-        replicationJobsQ = new LinkedBlockingQueue<>();
-        replicaEventsQ = new LinkedBlockingQueue<>();
-        terminateJobsReplication = new AtomicBoolean(false);
-        jobsReplicationSuspended = new AtomicBoolean(true);
-        replicationSuspended = new AtomicBoolean(true);
-        txnCommitAcks = new ConcurrentHashMap<>();
-        replicationTxnsPendingAcks = new ConcurrentHashMap<>();
-        shuttingDownReplicaIds = new HashSet<>();
-        dataBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
-        replicationMonitor = new ReplicasEventsMonitor();
-        //add list of replicas from configurations (To be read from another source e.g. Zookeeper)
-        Set<Replica> replicaNodes = replicationStrategy.getRemoteReplicas(nodeId);
-
-        //Used as async listeners from replicas
-        replicationListenerThreads = Executors.newCachedThreadPool();
-        replicationJobsProcessor = new ReplicationJobsProccessor();
-
-        Map<String, ClusterPartition[]> nodePartitions =
-                asterixAppRuntimeContextProvider.getAppContext().getMetadataProperties().getNodePartitions();
-        replica2PartitionsMap = new HashMap<>(replicaNodes.size());
-        for (Replica replica : replicaNodes) {
-            replicas.put(replica.getId(), replica);
-            //for each remote replica, get the list of replication clients
-            Set<Replica> nodeReplicationClients = replicationStrategy.getRemotePrimaryReplicas(replica.getId());
-            //get the partitions of each client
-            List<Integer> clientPartitions = new ArrayList<>();
-            for (Replica client : nodeReplicationClients) {
-                for (ClusterPartition clusterPartition : nodePartitions.get(client.getId())) {
-                    clientPartitions.add(clusterPartition.getPartitionId());
-                }
-            }
-            Set<Integer> clientPartitonsSet = new HashSet<>(clientPartitions.size());
-            clientPartitonsSet.addAll(clientPartitions);
-            replica2PartitionsMap.put(replica.getId(), clientPartitonsSet);
-        }
-        int numLogBuffers = replicationProperties.getLogBufferNumOfPages();
-        emptyLogBuffersQ = new LinkedBlockingQueue<>(numLogBuffers);
-        pendingFlushLogBuffersQ = new LinkedBlockingQueue<>(numLogBuffers);
-
-        int logBufferSize = replicationProperties.getLogBufferPageSize();
-        for (int i = 0; i < numLogBuffers; i++) {
-            emptyLogBuffersQ
-                    .offer(new ReplicationLogBuffer(this, logBufferSize, replicationProperties.getLogBatchSize()));
-        }
+        this.appCtx = appCtx;
+        strategy = ReplicationStrategyFactory.create(replicationProperties.getReplicationStrategy());
+        logReplicationManager = new LogReplicationManager(appCtx, this);
+        lsnIndexReplicationManager = new IndexReplicationManager(appCtx, this);
     }
 
     @Override
-    public void submitJob(IReplicationJob job) throws IOException {
-        if (job.getExecutionType() == ReplicationExecutionType.ASYNC) {
-            replicationJobsQ.offer(job);
-        } else {
-            //wait until replication is resumed
-            while (replicationSuspended.get()) {
-                synchronized (replicationSuspended) {
-                    try {
-                        replicationSuspended.wait();
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                    }
-                }
-            }
-            processJob(job, null, null);
+    public void register(IPartitionReplica replica) {
+        synchronized (dests) {
+            final InetSocketAddress location = replica.getIdentifier().getLocation();
+            final ReplicationDestination replicationDest = dests.computeIfAbsent(location, ReplicationDestination::at);
+            replicationDest.add(replica);
+            logReplicationManager.register(replicationDest);
+            lsnIndexReplicationManager.register(replicationDest);
         }
     }
 
     @Override
-    public void replicateLog(ILogRecord logRecord) throws InterruptedException {
-        if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
-            //if replication is suspended, wait until it is resumed.
-            while (replicationSuspended.get()) {
-                synchronized (replicationSuspended) {
-                    replicationSuspended.wait();
-                }
-            }
-            Set<String> replicaIds = Collections.synchronizedSet(new HashSet<String>());
-            replicaIds.add(nodeId);
-            txnCommitAcks.put(logRecord.getTxnId(), replicaIds);
-        }
-
-        appendToLogBuffer(logRecord);
-    }
-
-    protected void getAndInitNewLargePage(int pageSize) {
-        // for now, alloc a new buffer for each large page
-        // TODO: consider pooling large pages
-        currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize, replicationProperties.getLogBufferPageSize());
-        pendingFlushLogBuffersQ.offer(currentTxnLogBuffer);
-    }
-
-    protected void getAndInitNewPage() throws InterruptedException {
-        currentTxnLogBuffer = null;
-        while (currentTxnLogBuffer == null) {
-            currentTxnLogBuffer = emptyLogBuffersQ.take();
-        }
-        currentTxnLogBuffer.reset();
-        pendingFlushLogBuffersQ.offer(currentTxnLogBuffer);
-    }
-
-    private synchronized void appendToLogBuffer(ILogRecord logRecord) throws InterruptedException {
-        if (!currentTxnLogBuffer.hasSpace(logRecord)) {
-            currentTxnLogBuffer.isFull(true);
-            if (logRecord.getLogSize() > getLogPageSize()) {
-                getAndInitNewLargePage(logRecord.getLogSize());
-            } else {
-                getAndInitNewPage();
-            }
-        }
-        currentTxnLogBuffer.append(logRecord);
-    }
-
-    /**
-     * Processes the replication job based on its specifications
-     *
-     * @param job
-     *            The replication job
-     * @param replicasSockets
-     *            The remote replicas sockets to send the request to.
-     * @param requestBuffer
-     *            The buffer to use to send the request.
-     * @throws IOException
-     */
-    private void processJob(IReplicationJob job, Map<String, SocketChannel> replicasSockets, ByteBuffer requestBuffer)
-            throws IOException {
-        try {
-
-            //all of the job's files belong to a single storage partition.
-            //get any of them to determine the partition from the file path.
-            String jobFile = job.getJobFiles().iterator().next();
-            DatasetResourceReference indexFileRef = localResourceRepo.getLocalResourceReference(jobFile);
-            if (!replicationStrategy.isMatch(indexFileRef.getDatasetId())) {
+    public void unregister(IPartitionReplica replica) {
+        synchronized (dests) {
+            final InetSocketAddress location = replica.getIdentifier().getLocation();
+            final ReplicationDestination dest = dests.get(location);
+            if (dest == null) {
+                LOGGER.warn(() -> "Asked to unregister unknown replica " + replica);
                 return;
             }
-            int jobPartitionId = indexFileRef.getPartitionId();
-
-            ByteBuffer responseBuffer = null;
-            LSMIndexFileProperties asterixFileProperties = new LSMIndexFileProperties();
-            if (requestBuffer == null) {
-                requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
-            }
-
-            boolean isLSMComponentFile = job.getJobType() == ReplicationJobType.LSM_COMPONENT;
-            try {
-                //if there isn't already a connection, establish a new one
-                if (replicasSockets == null) {
-                    replicasSockets = getActiveRemoteReplicasSockets();
-                }
-
-                int remainingFiles = job.getJobFiles().size();
-                if (job.getOperation() == ReplicationOperation.REPLICATE) {
-                    //if the replication job is an LSM_COMPONENT, its properties are sent first, then its files.
-                    ILSMIndexReplicationJob LSMComponentJob = null;
-                    if (job.getJobType() == ReplicationJobType.LSM_COMPONENT) {
-                        //send LSMComponent properties
-                        LSMComponentJob = (ILSMIndexReplicationJob) job;
-                        LSMComponentProperties lsmCompProp = new LSMComponentProperties(LSMComponentJob, nodeId);
-                        requestBuffer = ReplicationProtocol.writeLSMComponentPropertiesRequest(lsmCompProp, //NOSONAR
-                                requestBuffer);
-                        sendRequest(replicasSockets, requestBuffer);
-                    }
-
-                    for (String filePath : job.getJobFiles()) {
-                        remainingFiles--;
-                        Path path = Paths.get(filePath);
-                        if (Files.notExists(path)) {
-                            LOGGER.log(Level.ERROR, "File deleted before replication: " + filePath);
-                            continue;
-                        }
-
-                        LOGGER.log(Level.INFO, "Replicating file: " + filePath);
-                        //open file for reading
-                        try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
-                                FileChannel fileChannel = fromFile.getChannel();) {
-
-                            long fileSize = fileChannel.size();
-                            asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile,
-                                    remainingFiles == 0);
-                            requestBuffer = ReplicationProtocol.writeFileReplicationRequest(requestBuffer,
-                                    asterixFileProperties, ReplicationRequestType.REPLICATE_FILE);
-                            Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
-                            while (iterator.hasNext()) {
-                                Map.Entry<String, SocketChannel> entry = iterator.next();
-                                //if the remote replica is not interested in this partition, skip it.
-                                if (!replica2PartitionsMap.get(entry.getKey()).contains(jobPartitionId)) {
-                                    continue;
-                                }
-                                SocketChannel socketChannel = entry.getValue();
-                                //transfer request header & file
-                                try {
-                                    NetworkingUtil.transferBufferToChannel(socketChannel, requestBuffer);
-                                    NetworkingUtil.sendFile(fileChannel, socketChannel);
-                                    if (asterixFileProperties.requiresAck()) {
-                                        ReplicationRequestType responseType =
-                                                waitForResponse(socketChannel, responseBuffer);
-                                        if (responseType != ReplicationRequestType.ACK) {
-                                            throw new IOException(
-                                                    "Could not receive ACK from replica " + entry.getKey());
-                                        }
-                                    }
-                                } catch (IOException e) {
-                                    handleReplicationFailure(socketChannel, e);
-                                    iterator.remove();
-                                } finally {
-                                    requestBuffer.position(0);
-                                }
-                            }
-                        }
-                    }
-                } else if (job.getOperation() == ReplicationOperation.DELETE) {
-                    for (String filePath : job.getJobFiles()) {
-                        remainingFiles--;
-                        asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile, remainingFiles == 0);
-                        ReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
-                                ReplicationRequestType.DELETE_FILE);
-
-                        Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
-                        while (iterator.hasNext()) {
-                            Map.Entry<String, SocketChannel> entry = iterator.next();
-                            //if the remote replica is not interested in this partition, skip it.
-                            if (!replica2PartitionsMap.get(entry.getKey()).contains(jobPartitionId)) {
-                                continue;
-                            }
-                            SocketChannel socketChannel = entry.getValue();
-                            try {
-                                sendRequest(replicasSockets, requestBuffer);
-                                if (asterixFileProperties.requiresAck()) {
-                                    waitForResponse(socketChannel, responseBuffer);
-                                }
-                            } catch (IOException e) {
-                                handleReplicationFailure(socketChannel, e);
-                                iterator.remove();
-                            } finally {
-                                requestBuffer.position(0);
-                            }
-                        }
-                    }
-                }
-            } finally {
-                //if sync, close sockets with replicas since they wont be reused
-                if (job.getExecutionType() == ReplicationExecutionType.SYNC) {
-                    closeReplicaSockets(replicasSockets);
-                }
-            }
-        } finally {
-            exitReplicatedLSMComponent(job);
-        }
-    }
-
-    private static void exitReplicatedLSMComponent(IReplicationJob job) throws HyracksDataException {
-        if (job.getOperation() == ReplicationOperation.REPLICATE && job instanceof ILSMIndexReplicationJob) {
-            //exit the replicated LSM components
-            ILSMIndexReplicationJob aJob = (ILSMIndexReplicationJob) job;
-            aJob.endReplication();
-        }
-    }
-
-    /**
-     * Waits and reads a response from a remote replica
-     *
-     * @param socketChannel
-     *            The socket to read the response from
-     * @param responseBuffer
-     *            The response buffer to read the response to.
-     * @return The response type.
-     * @throws IOException
-     */
-    private static ReplicationRequestType waitForResponse(SocketChannel socketChannel, ByteBuffer responseBuffer)
-            throws IOException {
-        if (responseBuffer == null) {
-            responseBuffer = ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE);
-        } else {
-            responseBuffer.clear();
-        }
-
-        //read response from remote replicas
-        ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel, responseBuffer);
-        return responseFunction;
-    }
-
-    @Override
-    public boolean isReplicationEnabled() {
-        return replicationStrategy.isParticipant(nodeId);
-    }
-
-    @Override
-    public synchronized void updateReplicaInfo(Replica replicaNode) {
-        Replica replica = replicas.get(replicaNode.getId());
-        //should not update the info of an active replica
-        if (replica.getState() == ReplicaState.ACTIVE) {
-            return;
-        }
-        replica.setClusterIp(replicaNode.getClusterIp());
-    }
-
-    /**
-     * Suspends processing replication jobs/logs.
-     *
-     * @param force
-     *            a flag indicates if replication should be suspended right away or when the pending jobs are completed.
-     */
-    private void suspendReplication(boolean force) {
-        //suspend replication jobs processing
-        if (replicationJobsProcessor != null && replicationJobsProcessor.isAlive()) {
-            if (force) {
-                terminateJobsReplication.set(true);
-            }
-            replicationJobsQ.offer(REPLICATION_JOB_POISON_PILL);
-
-            //wait until the jobs are suspended
-            synchronized (jobsReplicationSuspended) {
-                while (!jobsReplicationSuspended.get()) {
-                    try {
-                        jobsReplicationSuspended.wait();
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                    }
-                }
-            }
-        }
-
-        //suspend logs replication
-        if (txnlogReplicator != null) {
-            endTxnLogReplicationHandshake();
-        }
-    }
-
-    /**
-     * Opens a new connection with Active remote replicas and starts a listen thread per connection.
-     */
-    private void establishTxnLogReplicationHandshake() {
-        Map<String, SocketChannel> activeRemoteReplicasSockets = getActiveRemoteReplicasSockets();
-        logsRepSockets = new SocketChannel[activeRemoteReplicasSockets.size()];
-        int i = 0;
-        //start a listener thread per connection
-        for (Entry<String, SocketChannel> entry : activeRemoteReplicasSockets.entrySet()) {
-            logsRepSockets[i] = entry.getValue();
-            replicationListenerThreads
-                    .execute(new TxnLogsReplicationResponseListener(entry.getKey(), entry.getValue()));
-            i++;
-        }
-
-        /*
-         * establish log replication handshake
-         */
-        ByteBuffer handshakeBuffer = ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE)
-                .putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
-        handshakeBuffer.flip();
-        //send handshake request
-        for (SocketChannel replicaSocket : logsRepSockets) {
-            try {
-                NetworkingUtil.transferBufferToChannel(replicaSocket, handshakeBuffer);
-            } catch (IOException e) {
-                handleReplicationFailure(replicaSocket, e);
-            } finally {
-                handshakeBuffer.position(0);
-            }
-        }
-    }
-
-    private void handleReplicationFailure(SocketChannel socketChannel, Throwable t) {
-        if (LOGGER.isWarnEnabled()) {
-            LOGGER.log(Level.WARN, "Could not complete replication request.", t);
-        }
-        if (socketChannel.isOpen()) {
-            try {
-                socketChannel.close();
-            } catch (IOException e) {
-                LOGGER.log(Level.WARN, "Could not close socket.", e);
-            }
-        }
-        reportFailedReplica(getReplicaIdBySocket(socketChannel));
-    }
-
-    /**
-     * Stops TxnLogReplicator and closes the sockets used to replicate logs.
-     */
-    private void endTxnLogReplicationHandshake() {
-        LOGGER.info("Terminating TxnLogReplicator thread ...");
-        txnlogReplicator.terminate();
-        try {
-            txnLogReplicatorTask.get();
-        } catch (ExecutionException | InterruptedException e) {
-            LOGGER.error("TxnLogReplicator thread terminated abnormally", e);
-        }
-        LOGGER.info("TxnLogReplicator thread was terminated.");
-
-        /*
-         * End log replication handshake (by sending a dummy log with a single byte)
-         */
-        ByteBuffer endLogRepHandshake = ByteBuffer.allocate(Integer.SIZE + 1).putInt(1).put((byte) 0);
-        endLogRepHandshake.flip();
-        for (SocketChannel replicaSocket : logsRepSockets) {
-            try {
-                NetworkingUtil.transferBufferToChannel(replicaSocket, endLogRepHandshake);
-            } catch (IOException e) {
-                handleReplicationFailure(replicaSocket, e);
-            } finally {
-                endLogRepHandshake.position(0);
-            }
-        }
-
-        //wait for any ACK to arrive before closing sockets.
-        if (logsRepSockets != null) {
-            synchronized (txnCommitAcks) {
-                try {
-                    long waitStartTime = System.currentTimeMillis();
-                    while (!txnCommitAcks.isEmpty()) {
-                        txnCommitAcks.wait(1000);
-                        long waitDuration = System.currentTimeMillis() - waitStartTime;
-                        if (waitDuration > MAX_JOB_COMMIT_ACK_WAIT) {
-                            LOGGER.log(Level.ERROR,
-                                    "Timeout before receving all job ACKs from replicas. Pending txns ("
-                                            + txnCommitAcks.keySet().toString() + ")");
-                            break;
-                        }
-                    }
-                } catch (InterruptedException e) {
-                    LOGGER.error("Interrupted while waiting for jobs ACK", e);
-                    Thread.currentThread().interrupt();
-                }
-            }
-        }
-
-        /*
-         * Close log replication sockets
-         */
-        ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
-        for (SocketChannel replicaSocket : logsRepSockets) {
-            try {
-                //send goodbye to remote replica
-                NetworkingUtil.transferBufferToChannel(replicaSocket, goodbyeBuffer);
-                replicaSocket.close();
-            } catch (IOException e) {
-                handleReplicationFailure(replicaSocket, e);
-            } finally {
-                goodbyeBuffer.position(0);
-            }
-        }
-        logsRepSockets = null;
-    }
-
-    /**
-     * Sends a shutdown event to remote replicas notifying them
-     * no more logs/files will be sent from this local replica.
-     *
-     * @throws IOException
-     */
-    private void sendShutdownNotifiction() throws IOException {
-        Replica replica = new Replica(nodeId, NetworkingUtil.getHostAddress(hostIPAddressFirstOctet),
-                ncConfig.getReplicationPublicPort());
-        ReplicaEvent event = new ReplicaEvent(replica, ClusterEventType.NODE_SHUTTING_DOWN);
-        ByteBuffer buffer = ReplicationProtocol.writeReplicaEventRequest(event);
-        Map<String, SocketChannel> replicaSockets = getActiveRemoteReplicasSockets();
-        sendRequest(replicaSockets, buffer);
-        closeReplicaSockets(replicaSockets);
-    }
-
-    /**
-     * Sends a request to remote replicas
-     *
-     * @param replicaSockets
-     *            The sockets to send the request to.
-     * @param requestBuffer
-     *            The buffer that contains the request.
-     */
-    private void sendRequest(Map<String, SocketChannel> replicaSockets, ByteBuffer requestBuffer) {
-        Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator();
-        while (iterator.hasNext()) {
-            Entry<String, SocketChannel> replicaSocket = iterator.next();
-            SocketChannel clientSocket = replicaSocket.getValue();
-            try {
-                NetworkingUtil.transferBufferToChannel(clientSocket, requestBuffer);
-            } catch (IOException e) {
-                handleReplicationFailure(clientSocket, e);
-                iterator.remove();
-            } finally {
-                requestBuffer.position(0);
-            }
-        }
-    }
-
-    /**
-     * Closes the passed replication sockets by sending GOODBYE request to remote replicas.
-     *
-     * @param replicaSockets
-     */
-    private void closeReplicaSockets(Map<String, SocketChannel> replicaSockets) {
-        //send goodbye
-        ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
-        sendRequest(replicaSockets, goodbyeBuffer);
-
-        Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator();
-        while (iterator.hasNext()) {
-            Entry<String, SocketChannel> replicaSocket = iterator.next();
-            SocketChannel clientSocket = replicaSocket.getValue();
-            if (clientSocket.isOpen()) {
-                try {
-                    clientSocket.close();
-                } catch (IOException e) {
-                    handleReplicationFailure(clientSocket, e);
-                }
+            LOGGER.info(() -> "unregister " + replica);
+            dest.remove(replica);
+            if (dest.getReplicas().isEmpty()) {
+                LOGGER.info(() -> "Removing destination with no replicas " + dest);
+                logReplicationManager.unregister(dest);
+                lsnIndexReplicationManager.unregister(dest);
+                dests.remove(location);
             }
         }
     }
 
     @Override
-    public void initializeReplicasState() {
-        for (Replica replica : replicas.values()) {
-            checkReplicaState(replica.getId(), false, false);
-        }
-    }
-
-    /**
-     * Checks the state of a remote replica by trying to ping it.
-     *
-     * @param replicaId
-     *            The replica to check the state for.
-     * @param async
-     *            a flag indicating whether to wait for the result or not.
-     * @param suspendReplication
-     *            a flag indicating whether to suspend replication on replica state change or not.
-     */
-    private void checkReplicaState(String replicaId, boolean async, boolean suspendReplication) {
-        Replica replica = replicas.get(replicaId);
-
-        ReplicaStateChecker connector = new ReplicaStateChecker(replica, replicationProperties.getReplicationTimeOut(),
-                this, suspendReplication);
-        Future<? extends Object> ft = asterixAppRuntimeContextProvider.getThreadExecutor().submit(connector);
-
-        if (!async) {
-            //wait until task is done
-            while (!ft.isDone()) {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
-            }
-        }
-    }
-
-    /**
-     * Updates the state of a remote replica.
-     *
-     * @param replicaId
-     *            The replica id to update.
-     * @param newState
-     *            The new state of the replica.
-     * @param suspendReplication
-     *            a flag indicating whether to suspend replication on state change or not.
-     * @throws InterruptedException
-     */
-    public synchronized void updateReplicaState(String replicaId, ReplicaState newState, boolean suspendReplication)
-            throws InterruptedException {
-        Replica replica = replicas.get(replicaId);
-
-        if (replica.getState() == newState) {
-            return;
-        }
-
-        if (suspendReplication) {
-            //prevent new jobs/logs from coming in
-            replicationSuspended.set(true);
-
-            if (newState == ReplicaState.DEAD) {
-                //assume the dead replica ACK has been received for all pending jobs
-                synchronized (txnCommitAcks) {
-                    for (Long txnId : txnCommitAcks.keySet()) {
-                        addAckToJob(txnId, replicaId);
-                    }
-                }
-            }
-
-            //force replication threads to stop in order to change the replication factor
-            suspendReplication(true);
-        }
-
-        replica.setState(newState);
-
-        if (newState == ReplicaState.ACTIVE) {
-            replicationFactor++;
-        } else if (newState == ReplicaState.DEAD && replicationFactor > INITIAL_REPLICATION_FACTOR) {
-            replicationFactor--;
-        }
-
-        if (LOGGER.isWarnEnabled()) {
-            LOGGER.warn("Replica " + replicaId + " state changed to: " + newState.name()
-                    + ". Replication factor changed to: " + replicationFactor);
-        }
-
-        if (suspendReplication) {
-            startReplicationThreads();
-        }
-    }
-
-    /**
-     * When an ACK for a JOB_COMMIT is received, it is added to the corresponding job.
-     *
-     * @param txnId
-     * @param replicaId
-     *            The remote replica id the ACK received from.
-     */
-    private void addAckToJob(long txnId, String replicaId) {
-        synchronized (txnCommitAcks) {
-            //add ACK to the job
-            if (txnCommitAcks.containsKey(txnId)) {
-                Set<String> replicaIds = txnCommitAcks.get(txnId);
-                replicaIds.add(replicaId);
-            } else {
-                if (LOGGER.isWarnEnabled()) {
-                    LOGGER.warn("Invalid job replication ACK received for txnId(" + txnId + ")");
-                }
-                return;
-            }
-
-            //if got ACKs from all remote replicas, notify pending jobs if any
-
-            if (txnCommitAcks.get(txnId).size() == replicationFactor && replicationTxnsPendingAcks.containsKey(txnId)) {
-                ILogRecord pendingLog = replicationTxnsPendingAcks.get(txnId);
-                synchronized (pendingLog) {
-                    pendingLog.notifyAll();
-                }
-            }
-        }
+    public void notifyFailure(IReplicationDestination dest, Exception failure) {
+        LOGGER.info(() -> "processing failure for " + dest);
+        appCtx.getThreadExecutor().execute(() -> {
+            logReplicationManager.unregister(dest);
+            lsnIndexReplicationManager.unregister(dest);
+            dest.notifyFailure(failure);
+        });
     }
 
     @Override
-    public boolean hasBeenReplicated(ILogRecord logRecord) {
-        long txnId = logRecord.getTxnId();
-        if (txnCommitAcks.containsKey(txnId)) {
-            synchronized (txnCommitAcks) {
-                //check if all ACKs have been received
-                if (txnCommitAcks.get(txnId).size() == replicationFactor) {
-                    txnCommitAcks.remove(txnId);
-
-                    //remove from pending jobs if exists
-                    replicationTxnsPendingAcks.remove(txnId);
-
-                    //notify any threads waiting for all jobs to finish
-                    if (txnCommitAcks.size() == 0) {
-                        txnCommitAcks.notifyAll();
-                    }
-                    return true;
-                } else {
-                    replicationTxnsPendingAcks.putIfAbsent(txnId, logRecord);
-                    return false;
-                }
-            }
-        }
-        //presume replicated
-        return true;
-    }
-
-    private Map<String, SocketChannel> getActiveRemoteReplicasSockets() {
-        Map<String, SocketChannel> replicaNodesSockets = new HashMap<>();
-        for (Replica replica : replicas.values()) {
-            if (replica.getState() == ReplicaState.ACTIVE) {
-                try {
-                    SocketChannel sc = getReplicaSocket(replica.getId());
-                    replicaNodesSockets.put(replica.getId(), sc);
-                } catch (IOException e) {
-                    if (LOGGER.isWarnEnabled()) {
-                        LOGGER.log(Level.WARN, "Could not get replica socket", e);
-                    }
-                    reportFailedReplica(replica.getId());
-                }
-            }
-        }
-        return replicaNodesSockets;
-    }
-
-    /**
-     * Establishes a connection with a remote replica.
-     *
-     * @param replicaId
-     *            The replica to connect to.
-     * @return The socket of the remote replica
-     * @throws IOException
-     */
-    private SocketChannel getReplicaSocket(String replicaId) throws IOException {
-        SocketChannel sc = SocketChannel.open();
-        sc.configureBlocking(true);
-        IApplicationConfig config = ncConfig.getConfigManager().getNodeEffectiveConfig(replicaId);
-        sc.connect(new InetSocketAddress(config.getString(NCConfig.Option.REPLICATION_LISTEN_ADDRESS),
-                config.getInt(NCConfig.Option.REPLICATION_LISTEN_PORT)));
-        return sc;
+    public void replicate(ILogRecord logRecord) throws InterruptedException {
+        logReplicationManager.replicate(logRecord);
     }
 
     @Override
-    public Set<String> getDeadReplicasIds() {
-        Set<String> replicasIds = new HashSet<>();
-        for (Replica replica : replicas.values()) {
-            if (replica.getState() == ReplicaState.DEAD) {
-                replicasIds.add(replica.getId());
-            }
-        }
-        return replicasIds;
+    public IReplicationStrategy getReplicationStrategy() {
+        return strategy;
     }
 
     @Override
-    public Set<String> getActiveReplicasIds() {
-        Set<String> replicasIds = new HashSet<>();
-        for (Replica replica : replicas.values()) {
-            if (replica.getState() == ReplicaState.ACTIVE) {
-                replicasIds.add(replica.getId());
-            }
-        }
-        return replicasIds;
+    public void submitJob(IReplicationJob job) {
+        lsnIndexReplicationManager.accept(job);
     }
 
     @Override
-    public int getActiveReplicasCount() {
-        return getActiveReplicasIds().size();
+    public boolean isReplicationEnabled() {
+        return replicationProperties.isReplicationEnabled();
     }
 
     @Override
     public void start() {
-        //do nothing
+        // no op
     }
 
     @Override
-    public void dumpState(OutputStream os) throws IOException {
-        //do nothing
+    public void dumpState(OutputStream os) {
+        // no op
     }
 
-    /**
-     * Called during NC shutdown to notify remote replicas about the shutdown
-     * and wait for remote replicas shutdown notification then closes the local
-     * replication channel.
-     */
     @Override
     public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
-        //stop replication thread afters all jobs/logs have been processed
-        suspendReplication(false);
-
-        /*
-         * If this node has any remote replicas, it needs to inform them
-         * that it is shutting down.
-         */
-        if (!replicationStrategy.getRemoteReplicas(nodeId).isEmpty()) {
-            //send shutdown event to remote replicas
-            sendShutdownNotifiction();
-        }
-
-        /*
-         * If this node has any remote primary replicas, then it needs to wait
-         * until all of them send the shutdown notification.
-         */
-        // find active remote primary replicas
-        Set<String> activeRemotePrimaryReplicas = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
-                .map(Replica::getId).filter(getActiveReplicasIds()::contains).collect(Collectors.toSet());
-
-        if (!activeRemotePrimaryReplicas.isEmpty()) {
-            //wait until all shutdown events come from all remote primary replicas
-            synchronized (shuttingDownReplicaIds) {
-                while (!shuttingDownReplicaIds.containsAll(activeRemotePrimaryReplicas)) {
-                    try {
-                        shuttingDownReplicaIds.wait();
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                    }
-                }
-            }
-        }
-
-        LOGGER.log(Level.INFO, "Got shutdown notification from all remote replicas");
-        //close replication channel
-        asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
-
-        LOGGER.log(Level.INFO, "Replication manager stopped.");
-    }
-
-    @Override
-    public void reportReplicaEvent(ReplicaEvent event) {
-        replicaEventsQ.offer(event);
-    }
-
-    /**
-     * Suspends replications and sends a remote replica failure event to ReplicasEventsMonitor.
-     *
-     * @param replicaId
-     *            the failed replica id.
-     */
-    public void reportFailedReplica(String replicaId) {
-        Replica replica = replicas.get(replicaId);
-        if (replica == null) {
-            return;
-        }
-        if (replica.getState() == ReplicaState.DEAD) {
-            return;
-        }
-
-        //need to stop processing any new logs or jobs
-        terminateJobsReplication.set(true);
-
-        ReplicaEvent event = new ReplicaEvent(replica, ClusterEventType.NODE_FAILURE);
-        reportReplicaEvent(event);
-    }
-
-    private String getReplicaIdBySocket(SocketChannel socketChannel) {
-        InetSocketAddress socketAddress = NetworkingUtil.getSocketAddress(socketChannel);
-        for (Replica replica : replicas.values()) {
-            if (replica.getClusterIp().equals(socketAddress.getHostName())
-                    && ncConfig.getReplicationPublicPort() == socketAddress.getPort()) {
-                return replica.getId();
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public void startReplicationThreads() throws InterruptedException {
-        replicationJobsProcessor = new ReplicationJobsProccessor();
-
-        //start/continue processing jobs/logs
-        if (logsRepSockets == null) {
-            establishTxnLogReplicationHandshake();
-            getAndInitNewPage();
-            txnlogReplicator = new TxnLogReplicator(emptyLogBuffersQ, pendingFlushLogBuffersQ);
-            txnLogReplicatorTask = asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogReplicator);
-        }
-
-        replicationJobsProcessor.start();
-
-        if (!replicationMonitor.isAlive()) {
-            replicationMonitor.start();
-        }
-
-        //notify any waiting threads that replication has been resumed
-        synchronized (replicationSuspended) {
-            LOGGER.log(Level.INFO, "Replication started/resumed");
-            replicationSuspended.set(false);
-            replicationSuspended.notifyAll();
-        }
-    }
-
-    @Override
-    public void requestFlushLaggingReplicaIndexes(long nonSharpCheckpointTargetLSN) throws IOException {
-        long startLSN = logManager.getAppendLSN();
-        Set<String> replicaIds = getActiveReplicasIds();
-        if (replicaIds.isEmpty()) {
-            return;
-        }
-        ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
-        for (String replicaId : replicaIds) {
-            //1. identify replica indexes with LSN less than nonSharpCheckpointTargetLSN.
-            Map<Long, DatasetResourceReference> laggingIndexes =
-                    replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId, nonSharpCheckpointTargetLSN);
-
-            if (!laggingIndexes.isEmpty()) {
-                //2. send request to remote replicas that have lagging indexes.
-                ReplicaIndexFlushRequest laggingIndexesResponse = null;
-                try (SocketChannel socketChannel = getReplicaSocket(replicaId)) {
-                    ReplicaIndexFlushRequest laggingIndexesRequest =
-                            new ReplicaIndexFlushRequest(laggingIndexes.keySet());
-                    requestBuffer =
-                            ReplicationProtocol.writeGetReplicaIndexFlushRequest(requestBuffer, laggingIndexesRequest);
-                    NetworkingUtil.transferBufferToChannel(socketChannel, requestBuffer);
-
-                    //3. remote replicas will respond with indexes that were not flushed.
-                    ReplicationRequestType responseFunction = waitForResponse(socketChannel, requestBuffer);
-
-                    if (responseFunction == ReplicationRequestType.FLUSH_INDEX) {
-                        requestBuffer = ReplicationProtocol.readRequest(socketChannel, requestBuffer);
-                        //returning the indexes that were not flushed
-                        laggingIndexesResponse = ReplicationProtocol.readReplicaIndexFlushRequest(requestBuffer);
-                    }
-                    //send goodbye
-                    ReplicationProtocol.sendGoodbye(socketChannel);
-                }
-
-                /*
-                 * 4. update checkpoints for indexes that were not flushed
-                 * to the current append LSN to indicate no operations happened
-                 * since the checkpoint start.
-                 */
-                if (laggingIndexesResponse != null) {
-                    for (Long resouceId : laggingIndexesResponse.getLaggingRescouresIds()) {
-                        final DatasetResourceReference datasetResourceReference = laggingIndexes.get(resouceId);
-                        indexCheckpointManagerProvider.get(datasetResourceReference).advanceLowWatermark(startLSN);
-                    }
-                }
-            }
-        }
-    }
-
-    //Recovery Method
-    @Override
-    public long getMaxRemoteLSN(Set<String> remoteReplicas) throws IOException {
-        long maxRemoteLSN = 0;
-
-        ReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer);
-        Map<String, SocketChannel> replicaSockets = new HashMap<>();
-        try {
-            for (String replicaId : remoteReplicas) {
-                replicaSockets.put(replicaId, getReplicaSocket(replicaId));
-            }
-
-            //send request
-            Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator();
-            while (iterator.hasNext()) {
-                Entry<String, SocketChannel> replicaSocket = iterator.next();
-                SocketChannel clientSocket = replicaSocket.getValue();
-                NetworkingUtil.transferBufferToChannel(clientSocket, dataBuffer);
-                dataBuffer.position(0);
-            }
-
-            iterator = replicaSockets.entrySet().iterator();
-            while (iterator.hasNext()) {
-                Entry<String, SocketChannel> replicaSocket = iterator.next();
-                SocketChannel clientSocket = replicaSocket.getValue();
-                //read response
-                NetworkingUtil.readBytes(clientSocket, dataBuffer, Long.BYTES);
-                maxRemoteLSN = Math.max(maxRemoteLSN, dataBuffer.getLong());
-            }
-        } finally {
-            closeReplicaSockets(replicaSockets);
-        }
-
-        return maxRemoteLSN;
-    }
-
-    //Recovery Method
-    @Override
-    public void requestReplicaFiles(String selectedReplicaId, Set<Integer> partitionsToRecover,
-            Set<String> existingFiles) throws IOException {
-        ReplicaFilesRequest request = new ReplicaFilesRequest(partitionsToRecover, existingFiles);
-        dataBuffer = ReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request);
-
-        try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId)) {
-
-            //transfer request
-            NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
-
-            String indexPath;
-            String destFilePath;
-            ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
-            LSMIndexFileProperties fileProperties;
-            while (responseFunction != ReplicationRequestType.GOODBYE) {
-                dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
-
-                fileProperties = ReplicationProtocol.readFileReplicationRequest(dataBuffer);
-
-                //get index path
-                indexPath = replicaResourcesManager.getIndexPath(fileProperties);
-                destFilePath = indexPath + File.separator + fileProperties.getFileName();
-
-                //create file
-                File destFile = new File(destFilePath);
-                destFile.createNewFile();
-
-                try (RandomAccessFile fileOutputStream = new RandomAccessFile(destFile, "rw");
-                        FileChannel fileChannel = fileOutputStream.getChannel();) {
-                    fileOutputStream.setLength(fileProperties.getFileSize());
-
-                    NetworkingUtil.downloadFile(fileChannel, socketChannel);
-                    fileChannel.force(true);
-                }
-
-                //we need to create initial map for .metadata files that belong to remote replicas
-                if (!fileProperties.isLSMComponentFile() && !fileProperties.getNodeId().equals(nodeId)) {
-                    final ResourceReference indexRef = ResourceReference.of(destFile.getAbsolutePath());
-                    indexCheckpointManagerProvider.get(indexRef).init(logManager.getAppendLSN());
-                }
-                responseFunction = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
-            }
-
-            //send goodbye
-            ReplicationProtocol.sendGoodbye(socketChannel);
-        }
-    }
-
-    public int getLogPageSize() {
-        return replicationProperties.getLogBufferPageSize();
-    }
-
-    @Override
-    public void replicateTxnLogBatch(final ByteBuffer buffer) {
-        //if replication is suspended, wait until it is resumed
-        try {
-            while (replicationSuspended.get()) {
-                synchronized (replicationSuspended) {
-                    replicationSuspended.wait();
-                }
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-        //prepare the batch size buffer
-        txnLogsBatchSizeBuffer.clear();
-        txnLogsBatchSizeBuffer.putInt(buffer.remaining());
-        txnLogsBatchSizeBuffer.flip();
-
-        buffer.mark();
-        for (SocketChannel replicaSocket : logsRepSockets) {
-            try {
-                //send batch size
-                NetworkingUtil.transferBufferToChannel(replicaSocket, txnLogsBatchSizeBuffer);
-                //send log
-                NetworkingUtil.transferBufferToChannel(replicaSocket, buffer);
-            } catch (IOException e) {
-                handleReplicationFailure(replicaSocket, e);
-            } finally {
-                txnLogsBatchSizeBuffer.position(0);
-                buffer.reset();
-            }
-        }
-        //move the bufeer position to the sent limit
-        buffer.position(buffer.limit());
-    }
-
-    @Override
-    public void register(IPartitionReplica replica) {
-        // find the replica node based on ip and replication port
-        Optional<Replica> replicaNode = replicationStrategy.getRemoteReplicasAndSelf(nodeId).stream()
-                .filter(node -> node.getClusterIp().equals(replica.getIdentifier().getLocation().getHostString())
-                        && node.getPort() == replica.getIdentifier().getLocation().getPort())
-                .findAny();
-        if (!replicaNode.isPresent()) {
-            throw new IllegalStateException("Couldn't find node for replica: " + replica);
-        }
-        Replica replicaRef = replicaNode.get();
-        final String replicaId = replicaRef.getId();
-        replicas.putIfAbsent(replicaId, replicaRef);
-        replica2PartitionsMap.computeIfAbsent(replicaId, k -> new HashSet<>());
-        replica2PartitionsMap.get(replicaId).add(replica.getIdentifier().getPartition());
-        updateReplicaInfo(replicaRef);
-        checkReplicaState(replicaId, false, true);
-    }
-
-    //supporting classes
-    /**
-     * This class is responsible for processing replica events.
-     */
-    private class ReplicasEventsMonitor extends Thread {
-        ReplicaEvent event;
-
-        @Override
-        public void run() {
-            while (true) {
-                try {
-                    event = replicaEventsQ.take();
-
-                    switch (event.getEventType()) {
-                        case NODE_FAILURE:
-                            handleReplicaFailure(event.getReplica().getId());
-                            break;
-                        case NODE_JOIN:
-                            updateReplicaInfo(event.getReplica());
-                            checkReplicaState(event.getReplica().getId(), false, true);
-                            break;
-                        case NODE_SHUTTING_DOWN:
-                            handleShutdownEvent(event.getReplica().getId());
-                            break;
-                        default:
-                            break;
-                    }
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
-            }
-        }
-
-        public void handleReplicaFailure(String replicaId) throws InterruptedException {
-            Replica replica = replicas.get(replicaId);
-
-            if (replica.getState() == ReplicaState.DEAD) {
-                return;
-            }
-
-            updateReplicaState(replicaId, ReplicaState.DEAD, true);
-
-            //delete any invalid LSMComponents for this replica
-            replicaResourcesManager.cleanInvalidLSMComponents(replicaId);
-        }
-
-        public void handleShutdownEvent(String replicaId) {
-            synchronized (shuttingDownReplicaIds) {
-                shuttingDownReplicaIds.add(replicaId);
-                shuttingDownReplicaIds.notifyAll();
-            }
-        }
-    }
-
-    /**
-     * This class process is responsible for processing ASYNC replication job.
-     */
-    private class ReplicationJobsProccessor extends Thread {
-        Map<String, SocketChannel> replicaSockets;
-        ByteBuffer reusableBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
-
-        @Override
-        public void run() {
-            Thread.currentThread().setName("ReplicationJobsProccessor Thread");
-            terminateJobsReplication.set(false);
-            jobsReplicationSuspended.set(false);
-
-            while (true) {
-                try {
-                    if (terminateJobsReplication.get()) {
-                        closeSockets();
-                        break;
-                    }
-
-                    IReplicationJob job = replicationJobsQ.take();
-                    if (job == REPLICATION_JOB_POISON_PILL) {
-                        terminateJobsReplication.set(true);
-                        continue;
-                    }
-
-                    //if there isn't already a connection, establish a new one
-                    if (replicaSockets == null) {
-                        replicaSockets = getActiveRemoteReplicasSockets();
-                    }
-                    processJob(job, replicaSockets, reusableBuffer);
-
-                    //if no more jobs to process, close sockets
-                    if (replicationJobsQ.isEmpty()) {
-                        LOGGER.log(Level.INFO, "No pending replication jobs. Closing connections to replicas");
-                        closeSockets();
-                    }
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                } catch (IOException e) {
-                    LOGGER.warn("Couldn't complete processing replication job", e);
-                }
-            }
-
-            synchronized (jobsReplicationSuspended) {
-                jobsReplicationSuspended.set(true);
-                jobsReplicationSuspended.notifyAll();
-            }
-            LOGGER.log(Level.INFO, "ReplicationJobsProccessor stopped. ");
-        }
-
-        private void closeSockets() {
-            if (replicaSockets != null) {
-                closeReplicaSockets(replicaSockets);
-                replicaSockets.clear();
-                replicaSockets = null;
-            }
-        }
-    }
-
-    /**
-     * This class is responsible for listening on sockets that belong to TxnLogsReplicator.
-     */
-    private class TxnLogsReplicationResponseListener implements Runnable {
-        final SocketChannel replicaSocket;
-        final String replicaId;
-
-        public TxnLogsReplicationResponseListener(String replicaId, SocketChannel replicaSocket) {
-            this.replicaId = replicaId;
-            this.replicaSocket = replicaSocket;
-        }
-
-        @Override
-        public void run() {
-            Thread.currentThread().setName("TxnLogs Replication Listener Thread");
-            LOGGER.log(Level.INFO, "Started listening on socket: " + replicaSocket.socket().getRemoteSocketAddress());
-
-            try (BufferedReader incomingResponse =
-                    new BufferedReader(new InputStreamReader(replicaSocket.socket().getInputStream()))) {
-                while (true) {
-                    String responseLine = incomingResponse.readLine();
-                    if (responseLine == null) {
-                        break;
-                    }
-                    //read ACK for job commit log
-                    String ackFrom = ReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
-                    int jobId = ReplicationProtocol.getJobIdFromLogAckMessage(responseLine);
-                    addAckToJob(jobId, ackFrom);
-                }
-            } catch (AsynchronousCloseException e) {
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.log(Level.INFO, "Replication listener stopped for remote replica: " + replicaId, e);
-                }
-            } catch (IOException e) {
-                handleReplicationFailure(replicaSocket, e);
-            }
-        }
-    }
-
-    @Override
-    public IReplicationStrategy getReplicationStrategy() {
-        return replicationStrategy;
+        LOGGER.info("Closing replication channel");
+        appCtx.getReplicationChannel().close();
+        LOGGER.info("Replication manager stopped");
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 2c1937b..84922cd 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -25,12 +25,11 @@ import java.io.OutputStream;
 import java.util.Collection;
 
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.replication.api.IReplicaTask;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.common.LocalResource;
@@ -47,7 +46,7 @@ public class CheckpointPartitionIndexesTask implements IReplicaTask {
     }
 
     @Override
-    public void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException {
+    public void perform(INcApplicationContext appCtx, IReplicationWorker worker) throws HyracksDataException {
         final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
                 appCtx.getIndexCheckpointManagerProvider();
         PersistentLocalResourceRepository resRepo =

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
new file mode 100644
index 0000000..26c9577
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+/**
+ * A task to create a mask file for an incoming lsm component from master
+ */
+public class ComponentMaskTask implements IReplicaTask {
+
+    private static final String COMPONENT_MASK_FILE_PREFIX = StorageConstants.MASK_FILE_PREFIX + "C_";
+    private final String file;
+    private final String componentId;
+
+    public ComponentMaskTask(String file, String componentId) {
+        this.file = file;
+        this.componentId = componentId;
+    }
+
+    @Override
+    public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
+        try {
+            // create mask
+            final Path maskPath = getComponentMaskPath(appCtx, file);
+            Files.createFile(maskPath);
+            ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public static Path getComponentMaskPath(INcApplicationContext appCtx, String file) throws IOException {
+        final IIOManager ioManager = appCtx.getIoManager();
+        final FileReference localPath = ioManager.resolve(file);
+        final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath());
+        return Paths.get(resourceDir.toString(), COMPONENT_MASK_FILE_PREFIX + localPath.getFile().getName());
+    }
+
+    @Override
+    public ReplicationProtocol.ReplicationRequestType getMessageType() {
+        return ReplicationProtocol.ReplicationRequestType.LSM_COMPONENT_MASK;
+    }
+
+    @Override
+    public void serialize(OutputStream out) throws HyracksDataException {
+        try {
+            final DataOutputStream dos = new DataOutputStream(out);
+            dos.writeUTF(file);
+            dos.writeUTF(componentId);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static ComponentMaskTask create(DataInput input) throws IOException {
+        String indexFile = input.readUTF();
+        String componentId = input.readUTF();
+        return new ComponentMaskTask(indexFile, componentId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
index d4de3b7..1b5470d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
@@ -27,9 +27,8 @@ import java.nio.file.Files;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
-import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.replication.api.IReplicaTask;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.logging.log4j.LogManager;
@@ -48,7 +47,7 @@ public class DeleteFileTask implements IReplicaTask {
     }
 
     @Override
-    public void perform(INcApplicationContext appCtx, IReplicationThread worker) {
+    public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
         try {
             final IIOManager ioManager = appCtx.getIoManager();
             final File localFile = ioManager.resolve(file).getFile();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
new file mode 100644
index 0000000..b7f0985
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.util.IoUtil;
+
+/**
+ * A task to drop an index that was dropped on master
+ */
+public class DropIndexTask implements IReplicaTask {
+
+    private static final Logger LOGGER = Logger.getLogger(DeleteFileTask.class.getName());
+    private final String file;
+
+    public DropIndexTask(String file) {
+        this.file = file;
+    }
+
+    @Override
+    public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
+        try {
+            final IIOManager ioManager = appCtx.getIoManager();
+            final File indexFile = ioManager.resolve(file).getFile();
+            if (indexFile.exists()) {
+                File indexDir = indexFile.getParentFile();
+                IoUtil.deleteDirectory(indexDir);
+                LOGGER.info(() -> "Deleted index: " + indexFile.getAbsolutePath());
+            } else {
+                LOGGER.warning(() -> "Requested to delete a non-existing index: " + indexFile.getAbsolutePath());
+            }
+            ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    @Override
+    public ReplicationProtocol.ReplicationRequestType getMessageType() {
+        return ReplicationProtocol.ReplicationRequestType.REPLICATE_RESOURCE_FILE;
+    }
+
+    @Override
+    public void serialize(OutputStream out) throws HyracksDataException {
+        try {
+            DataOutputStream dos = new DataOutputStream(out);
+            dos.writeUTF(file);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static DropIndexTask create(DataInput input) throws IOException {
+        return new DropIndexTask(input.readUTF());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
new file mode 100644
index 0000000..b581321
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+
+/**
+ * A task to mark a replicated LSM component as valid
+ */
+public class MarkComponentValidTask implements IReplicaTask {
+
+    private final long masterLsn;
+    private final String file;
+
+    public MarkComponentValidTask(String file, long masterLsn) {
+        this.file = file;
+        this.masterLsn = masterLsn;
+    }
+
+    @Override
+    public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
+        try {
+            if (masterLsn > 0) {
+                ensureComponentLsnFlushed(appCtx);
+            }
+            // delete mask
+            final Path maskPath = ComponentMaskTask.getComponentMaskPath(appCtx, file);
+            Files.delete(maskPath);
+            ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
+        } catch (IOException | InterruptedException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    private void ensureComponentLsnFlushed(INcApplicationContext appCtx)
+            throws HyracksDataException, InterruptedException {
+        final ResourceReference indexRef = ResourceReference.of(file);
+        final IIndexCheckpointManagerProvider checkpointManagerProvider = appCtx.getIndexCheckpointManagerProvider();
+        final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
+        long replicationTimeOut = TimeUnit.SECONDS.toMillis(appCtx.getReplicationProperties().getReplicationTimeOut());
+        final long startTime = System.nanoTime();
+        synchronized (indexCheckpointManager) {
+            // wait until the lsn mapping is flushed to disk
+            while (!indexCheckpointManager.isFlushed(masterLsn)) {
+                if (replicationTimeOut <= 0) {
+                    throw new ReplicationException(new TimeoutException("Couldn't receive flush lsn from master"));
+                }
+                indexCheckpointManager.wait(replicationTimeOut);
+                replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+            }
+            final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName());
+            indexCheckpointManager.replicated(componentEndTime, masterLsn);
+        }
+    }
+
+    @Override
+    public ReplicationProtocol.ReplicationRequestType getMessageType() {
+        return ReplicationProtocol.ReplicationRequestType.MARK_COMPONENT_VALID;
+    }
+
+    @Override
+    public void serialize(OutputStream out) throws HyracksDataException {
+        try {
+            final DataOutputStream dos = new DataOutputStream(out);
+            dos.writeUTF(file);
+            dos.writeLong(masterLsn);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static MarkComponentValidTask create(DataInput input) throws IOException {
+        final String indexFile = input.readUTF();
+        final long lsn = input.readLong();
+        return new MarkComponentValidTask(indexFile, lsn);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
index 85b7bb9..561a6bf 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.asterix.replication.api.IReplicationMessage;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class PartitionResourcesListResponse implements IReplicationMessage {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index b2b8ac6..b972f32 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -26,10 +26,10 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.replication.api.IReplicaTask;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
@@ -44,11 +44,12 @@ public class PartitionResourcesListTask implements IReplicaTask {
     }
 
     @Override
-    public void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException {
+    public void perform(INcApplicationContext appCtx, IReplicationWorker worker) throws HyracksDataException {
         //TODO delete any invalid files with masks
-        final List<String> partitionResources =
-                appCtx.getReplicaResourcesManager().getPartitionIndexesFiles(partition, false).stream()
-                        .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
+        final PersistentLocalResourceRepository localResourceRepository =
+                (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+        final List<String> partitionResources = localResourceRepository.getPartitionIndexesFiles(partition).stream()
+                .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
         final PartitionResourcesListResponse response =
                 new PartitionResourcesListResponse(partition, partitionResources);
         ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index 45d8971..99c7256 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -30,10 +30,12 @@ import java.nio.file.Paths;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
-import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.replication.api.IReplicaTask;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
 import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -49,14 +51,16 @@ public class ReplicateFileTask implements IReplicaTask {
     private static final Logger LOGGER = LogManager.getLogger();
     private final String file;
     private final long size;
+    private final boolean indexMetadata;
 
-    public ReplicateFileTask(String file, long size) {
+    public ReplicateFileTask(String file, long size, boolean indexMetadata) {
         this.file = file;
         this.size = size;
+        this.indexMetadata = indexMetadata;
     }
 
     @Override
-    public void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException {
+    public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
         try {
             final IIOManager ioManager = appCtx.getIoManager();
             // resolve path
@@ -76,6 +80,9 @@ public class ReplicateFileTask implements IReplicaTask {
                 NetworkingUtil.downloadFile(fileChannel, worker.getChannel());
                 fileChannel.force(true);
             }
+            if (indexMetadata) {
+                initIndexCheckpoint(appCtx);
+            }
             //delete mask
             Files.delete(maskPath);
             LOGGER.info(() -> "Replicated file: " + localPath);
@@ -85,6 +92,16 @@ public class ReplicateFileTask implements IReplicaTask {
         }
     }
 
+    private void initIndexCheckpoint(INcApplicationContext appCtx) throws HyracksDataException {
+        final ResourceReference indexRef = ResourceReference.of(file);
+        final IIndexCheckpointManagerProvider checkpointManagerProvider = appCtx.getIndexCheckpointManagerProvider();
+        final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
+        final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
+        indexCheckpointManager.delete();
+        indexCheckpointManager.init(currentLSN);
+        LOGGER.info(() -> "Checkpoint index: " + indexRef);
+    }
+
     @Override
     public ReplicationProtocol.ReplicationRequestType getMessageType() {
         return ReplicationProtocol.ReplicationRequestType.REPLICATE_RESOURCE_FILE;
@@ -96,6 +113,7 @@ public class ReplicateFileTask implements IReplicaTask {
             DataOutputStream dos = new DataOutputStream(out);
             dos.writeUTF(file);
             dos.writeLong(size);
+            dos.writeBoolean(indexMetadata);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
@@ -104,6 +122,7 @@ public class ReplicateFileTask implements IReplicaTask {
     public static ReplicateFileTask create(DataInput input) throws IOException {
         final String s = input.readUTF();
         final long i = input.readLong();
-        return new ReplicateFileTask(s, i);
+        final boolean isMetadata = input.readBoolean();
+        return new ReplicateFileTask(s, i, isMetadata);
     }
 }
\ No newline at end of file


Mime
View raw message