Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 901E7180FD for ; Thu, 12 Nov 2015 20:51:41 +0000 (UTC) Received: (qmail 99730 invoked by uid 500); 12 Nov 2015 20:51:41 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 99695 invoked by uid 500); 12 Nov 2015 20:51:41 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 99685 invoked by uid 99); 12 Nov 2015 20:51:41 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Nov 2015 20:51:41 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id C8B031A22EE for ; Thu, 12 Nov 2015 20:51:40 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.77 X-Spam-Level: * X-Spam-Status: No, score=1.77 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 24zTC4ZE774c for ; Thu, 12 Nov 2015 20:51:29 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 263BD232A7 for ; Thu, 12 Nov 2015 20:51:29 +0000 (UTC) Received: (qmail 99432 invoked by uid 99); 12 Nov 2015 20:51:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Nov 2015 20:51:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D9E8EE5E1D; Thu, 12 Nov 2015 20:51:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mhubail@apache.org To: commits@asterixdb.incubator.apache.org Date: Thu, 12 Nov 2015 20:51:30 -0000 Message-Id: <9f78d57ee06e4e27876f84f385d381a1@git.apache.org> In-Reply-To: <28024909d31d44f5b9133a2ffcf9359d@git.apache.org> References: <28024909d31d44f5b9133a2ffcf9359d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/6] incubator-asterixdb git commit: Introducing Data Replication To AsterixDB http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java new file mode 100644 index 0000000..eaef9be --- /dev/null +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java @@ -0,0 +1,1247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.management; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.FileChannel; +import java.nio.channels.SocketChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.config.AsterixReplicationProperties; +import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.replication.AsterixReplicationJob; +import org.apache.asterix.common.replication.IReplicaResourcesManager; +import org.apache.asterix.common.replication.IReplicationManager; +import org.apache.asterix.common.replication.Replica; +import org.apache.asterix.common.replication.Replica.ReplicaState; +import org.apache.asterix.common.replication.ReplicaEvent; +import org.apache.asterix.common.replication.ReplicaEvent.ReplicaEventType; +import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider; +import org.apache.asterix.common.transactions.ILogManager; +import org.apache.asterix.common.transactions.ILogRecord; +import org.apache.asterix.common.transactions.LogRecord; +import org.apache.asterix.common.transactions.LogType; +import org.apache.asterix.event.schema.cluster.Node; +import org.apache.asterix.replication.functions.AsterixReplicationProtocol; +import org.apache.asterix.replication.functions.AsterixReplicationProtocol.ReplicationRequestType; +import org.apache.asterix.replication.functions.ReplicaFilesRequest; +import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest; +import org.apache.asterix.replication.functions.ReplicaLogsRequest; +import org.apache.asterix.replication.logging.ReplicationLogBuffer; +import org.apache.asterix.replication.logging.ReplicationLogFlusher; +import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties; +import org.apache.asterix.replication.storage.LSMComponentProperties; +import org.apache.asterix.replication.storage.ReplicaResourcesManager; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.replication.IReplicationJob; +import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType; +import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType; +import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; + +/** + * This class is used to process replication jobs and maintain remote replicas states + */ +public class ReplicationManager implements IReplicationManager { + + private static final Logger LOGGER = Logger.getLogger(ReplicationManager.class.getName()); + private final int INITIAL_REPLICATION_FACTOR = 1; + private final String nodeId; + private ExecutorService replicationListenerThreads; + private final Map> jobCommitAcks; + private final Map replicationJobsPendingAcks; + private ByteBuffer dataBuffer; + + private final LinkedBlockingQueue replicationJobsQ; + private final LinkedBlockingQueue replicaEventsQ; + + private int replicationFactor = 1; + private final ReplicaResourcesManager replicaResourcesManager; + private final ILogManager logManager; + private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider; + private final AsterixReplicationProperties replicationProperties; + private final Map replicas; + + private final AtomicBoolean replicationSuspended; + private AtomicBoolean terminateJobsReplication; + private AtomicBoolean jobsReplicationSuspended; + private final static int INITIAL_BUFFER_SIZE = 4000; //4KB + private final Set shuttingDownReplicaIds; + //replication threads + private ReplicationJobsProccessor replicationJobsProcessor; + private final ReplicasEventsMonitor replicationMonitor; + //dummy job used to stop ReplicationJobsProccessor thread. + private static final IReplicationJob replicationJobPoisonPill = new AsterixReplicationJob( + ReplicationJobType.METADATA, ReplicationOperation.STOP, ReplicationExecutionType.ASYNC, null); + //used to identify the correct IP address when the node has multiple network interfaces + private String hostIPAddressFirstOctet = null; + + private LinkedBlockingQueue emptyLogBuffersQ; + private LinkedBlockingQueue pendingFlushLogBuffersQ; + protected ReplicationLogBuffer currentTxnLogBuffer; + private ReplicationLogFlusher txnlogsReplicator; + private Future txnLogReplicatorTask; + private Map logsReplicaSockets = null; + + public ReplicationManager(String nodeId, AsterixReplicationProperties replicationProperties, + IReplicaResourcesManager remoteResoucesManager, ILogManager logManager, + IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider) { + this.nodeId = nodeId; + this.replicationProperties = replicationProperties; + this.replicaResourcesManager = (ReplicaResourcesManager) remoteResoucesManager; + this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider; + this.hostIPAddressFirstOctet = replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3); + this.logManager = logManager; + replicationJobsQ = new LinkedBlockingQueue(); + replicaEventsQ = new LinkedBlockingQueue(); + terminateJobsReplication = new AtomicBoolean(false); + jobsReplicationSuspended = new AtomicBoolean(true); + replicationSuspended = new AtomicBoolean(true); + + replicas = new HashMap(); + jobCommitAcks = new ConcurrentHashMap>(); + replicationJobsPendingAcks = new ConcurrentHashMap(); + shuttingDownReplicaIds = new HashSet(); + dataBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE); + + //Used as async listeners from replicas + replicationListenerThreads = Executors.newCachedThreadPool(); + replicationJobsProcessor = new ReplicationJobsProccessor(); + replicationMonitor = new ReplicasEventsMonitor(); + + //add list of replicas from configurations (To be read from another source e.g. Zookeeper) + Set replicaNodes = replicationProperties.getRemoteReplicas(nodeId); + if (replicaNodes != null) { + for (Replica replica : replicaNodes) { + replicas.put(replica.getNode().getId(), replica); + } + } + int numLogBuffers = logManager.getNumLogPages(); + emptyLogBuffersQ = new LinkedBlockingQueue(numLogBuffers); + pendingFlushLogBuffersQ = new LinkedBlockingQueue(numLogBuffers); + + int logBufferSize = logManager.getLogPageSize(); + for (int i = 0; i < numLogBuffers; i++) { + emptyLogBuffersQ.offer(new ReplicationLogBuffer(this, logBufferSize)); + } + } + + /** + * Accepts a replication job. If the job execution type is ASYNC, it is queued. + * Otherwise, it is processed immediately. + */ + @Override + public void submitJob(IReplicationJob job) throws IOException { + if (job.getExecutionType() == ReplicationExecutionType.ASYNC) { + replicationJobsQ.offer(job); + } else { + //wait until replication is resumed + while (replicationSuspended.get()) { + synchronized (replicationSuspended) { + try { + replicationSuspended.wait(); + } catch (InterruptedException e) { + //ignore + } + } + } + processJob(job, null, null); + } + } + + @Override + public void replicateLog(ILogRecord logRecord) { + if (logRecord.getLogType() == LogType.JOB_COMMIT) { + //if replication is suspended, wait until it is resumed. + while (replicationSuspended.get()) { + synchronized (replicationSuspended) { + try { + replicationSuspended.wait(); + } catch (InterruptedException e) { + //ignore + } + } + } + Set replicaIds = Collections.synchronizedSet(new HashSet()); + replicaIds.add(nodeId); + jobCommitAcks.put(logRecord.getJobId(), replicaIds); + } + + appendToLogBuffer(logRecord); + } + + protected void getAndInitNewPage() { + currentTxnLogBuffer = null; + while (currentTxnLogBuffer == null) { + try { + currentTxnLogBuffer = emptyLogBuffersQ.take(); + } catch (InterruptedException e) { + //ignore + } + } + currentTxnLogBuffer.reset(); + currentTxnLogBuffer.setReplicationSockets(logsReplicaSockets); + pendingFlushLogBuffersQ.offer(currentTxnLogBuffer); + } + + private synchronized void appendToLogBuffer(ILogRecord logRecord) { + if (!currentTxnLogBuffer.hasSpace(logRecord)) { + currentTxnLogBuffer.isFull(true); + getAndInitNewPage(); + } + + currentTxnLogBuffer.append(logRecord); + } + + /** + * Processes the replication job based on its specifications + * + * @param job + * The replication job + * @param replicasSockets + * The remote replicas sockets to send the request to. + * @param requestBuffer + * The buffer to use to send the request. + * @throws IOException + */ + private void processJob(IReplicationJob job, Map replicasSockets, ByteBuffer requestBuffer) + throws IOException { + boolean isLSMComponentFile; + ByteBuffer responseBuffer = null; + AsterixLSMIndexFileProperties asterixFileProperties = new AsterixLSMIndexFileProperties(); + if (requestBuffer == null) { + requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE); + } + + isLSMComponentFile = job.getJobType() == ReplicationJobType.LSM_COMPONENT ? true : false; + try { + //if there isn't already a connection, establish a new one + if (replicasSockets == null) { + replicasSockets = getActiveRemoteReplicasSockets(); + } + + int remainingFiles = job.getJobFiles().size(); + + if (job.getOperation() == ReplicationOperation.REPLICATE) { + try { + //if the replication job is an LSM_COMPONENT, its properties are sent first, then its files. + ILSMIndexReplicationJob LSMComponentJob = null; + if (job.getJobType() == ReplicationJobType.LSM_COMPONENT) { + //send LSMComponent properties + LSMComponentJob = (ILSMIndexReplicationJob) job; + LSMComponentProperties lsmCompProp = new LSMComponentProperties(LSMComponentJob, nodeId); + requestBuffer = AsterixReplicationProtocol.writeLSMComponentPropertiesRequest(lsmCompProp, + requestBuffer); + sendRequest(replicasSockets, requestBuffer); + } + + for (String filePath : job.getJobFiles()) { + remainingFiles--; + Path path = Paths.get(filePath); + if (Files.notExists(path)) { + LOGGER.log(Level.SEVERE, "File deleted before replication: " + filePath); + continue; + } + + LOGGER.log(Level.INFO, "Replicating file: " + filePath); + //open file for reading + try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r"); + FileChannel fileChannel = fromFile.getChannel();) { + + long fileSize = fileChannel.size(); + + if (LSMComponentJob != null) { + boolean requireLSNSync = AsterixLSMIndexUtil.lsmComponentFileHasLSN( + (AbstractLSMIndex) LSMComponentJob.getLSMIndex(), filePath); + asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile, + requireLSNSync, remainingFiles == 0); + } else { + asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile, false, + remainingFiles == 0); + } + + requestBuffer = AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer, + asterixFileProperties, ReplicationRequestType.REPLICATE_FILE); + + Iterator> iterator = replicasSockets.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + SocketChannel socketChannel = entry.getValue(); + //transfer request header & file + try { + NetworkingUtil.transferBufferToChannel(socketChannel, requestBuffer); + NetworkingUtil.sendFile(fileChannel, socketChannel); + if (asterixFileProperties.requiresAck()) { + ReplicationRequestType responseType = waitForResponse(socketChannel, + responseBuffer); + if (responseType != ReplicationRequestType.ACK) { + throw new IOException( + "Could not receive ACK from replica " + entry.getKey()); + } + } + } catch (IOException e) { + reportFailedReplica(entry.getKey()); + iterator.remove(); + } finally { + requestBuffer.position(0); + } + } + } + } + } finally { + if (job instanceof ILSMIndexReplicationJob) { + //exit the replicated LSM components + ILSMIndexReplicationJob aJob = (ILSMIndexReplicationJob) job; + aJob.endReplication(); + } + } + } else if (job.getOperation() == ReplicationOperation.DELETE) { + for (String filePath : job.getJobFiles()) { + remainingFiles--; + asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile, false, + remainingFiles == 0); + AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties, + ReplicationRequestType.DELETE_FILE); + + Iterator> iterator = replicasSockets.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + SocketChannel socketChannel = entry.getValue(); + try { + sendRequest(replicasSockets, requestBuffer); + if (asterixFileProperties.requiresAck()) { + waitForResponse(socketChannel, responseBuffer); + } + } catch (IOException e) { + reportFailedReplica(entry.getKey()); + iterator.remove(); + } finally { + requestBuffer.position(0); + } + } + } + } + } finally { + //if sync, close sockets with replicas since they wont be reused + if (job.getExecutionType() == ReplicationExecutionType.SYNC) { + closeReplicaSockets(replicasSockets); + } + } + } + + /** + * Waits and reads a response from a remote replica + * + * @param socketChannel + * The socket to read the response from + * @param responseBuffer + * The response buffer to read the response to. + * @return The response type. + * @throws IOException + */ + private static ReplicationRequestType waitForResponse(SocketChannel socketChannel, ByteBuffer responseBuffer) + throws IOException { + if (responseBuffer == null) { + responseBuffer = ByteBuffer.allocate(AsterixReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE); + } else { + responseBuffer.clear(); + } + + //read response from remote replicas + ReplicationRequestType responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel, + responseBuffer); + return responseFunction; + } + + @Override + public boolean isReplicationEnabled() { + return replicationProperties.isReplicationEnabled(); + } + + @Override + public synchronized void updateReplicaInfo(Replica replicaNode) { + Replica replica = replicas.get(replicaNode.getNode().getId()); + //should not update the info of an active replica + if (replica.getState() == ReplicaState.ACTIVE) { + return; + } + + replica.getNode().setClusterIp(replicaNode.getNode().getClusterIp()); + + /* + * This could be used to reconnect to replica without needing the Cluster notifications + if (replica.getState() == ReplicaState.DEAD) { + reportFailedReplica(replica.getNode().getId()); + } else if (replica.getState() == ReplicaState.ACTIVE) { + checkReplicaState(replica.getNode().getId(), true); + } + */ + } + + /** + * Suspends proccessing replication jobs. + * + * @param force + * a flag indicates if replication should be suspended right away or when the pending jobs are completed. + */ + private void suspendReplication(boolean force) { + //suspend replication jobs processing + if (replicationJobsProcessor != null && replicationJobsProcessor.isAlive()) { + if (force) { + terminateJobsReplication.set(true); + } + replicationJobsQ.offer(replicationJobPoisonPill); + + //wait until the jobs are suspended + synchronized (jobsReplicationSuspended) { + while (!jobsReplicationSuspended.get()) { + try { + jobsReplicationSuspended.wait(); + } catch (InterruptedException e) { + //ignore + } + } + } + } + + //suspend logs replication + if (txnlogsReplicator != null) { + terminateTxnLogsReplicator(); + } + } + + /** + * Opens a new connection with Active remote replicas and starts a listen thread per connection. + */ + private void establishTxnLogsReplicationConnection() { + logsReplicaSockets = getActiveRemoteReplicasSockets(); + //start a listener thread per connection + for (Entry entry : logsReplicaSockets.entrySet()) { + replicationListenerThreads + .execute(new TxnLogsReplicationResponseListener(entry.getKey(), entry.getValue())); + } + } + + /** + * Stops ReplicationFlusherThread and closes the sockets used to replicate logs. + */ + private void terminateTxnLogsReplicator() { + LOGGER.log(Level.INFO, "Terminating ReplicationLogFlusher thread ..."); + txnlogsReplicator.terminate(); + try { + txnLogReplicatorTask.get(); + } catch (ExecutionException | InterruptedException e) { + LOGGER.log(Level.WARNING, "RepicationLogFlusher thread terminated abnormally"); + e.printStackTrace(); + } + LOGGER.log(Level.INFO, "LogFlusher thread is terminated."); + + if (logsReplicaSockets != null) { + //wait for any ACK to arrive before closing sockets. + synchronized (jobCommitAcks) { + while (jobCommitAcks.size() != 0) { + try { + jobCommitAcks.wait(); + } catch (InterruptedException e) { + //ignore + } + } + } + + //close log replication sockets + closeReplicaSockets(logsReplicaSockets); + logsReplicaSockets = null; + } + } + + @Override + public void broadcastNewIPAddress() throws IOException { + String orignalIPAddress = replicationProperties.getReplicaIPAddress(nodeId); + String newAddress = NetworkingUtil.getHostAddress(hostIPAddressFirstOctet); + + //IP Address didn't change after failure + if (orignalIPAddress.equals(newAddress)) { + return; + } + + Node node = new Node(); + node.setId(nodeId); + node.setClusterIp(newAddress); + Replica replica = new Replica(node); + + ByteBuffer buffer = AsterixReplicationProtocol.writeUpdateReplicaRequest(replica); + Map replicaSockets = getActiveRemoteReplicasSockets(); + sendRequest(replicaSockets, buffer); + closeReplicaSockets(replicaSockets); + } + + /** + * Sends a shutdown event to remote replicas notifying them + * no more logs/files will be sent from this local replica. + * + * @throws IOException + */ + private void sendShutdownNotifiction() throws IOException { + Node node = new Node(); + node.setId(nodeId); + node.setClusterIp(NetworkingUtil.getHostAddress(hostIPAddressFirstOctet)); + Replica replica = new Replica(node); + ReplicaEvent event = new ReplicaEvent(replica, ReplicaEventType.SHUTDOWN); + ByteBuffer buffer = AsterixReplicationProtocol.writeReplicaEventRequest(event); + Map replicaSockets = getActiveRemoteReplicasSockets(); + sendRequest(replicaSockets, buffer); + closeReplicaSockets(replicaSockets); + } + + /** + * Sends a request to remote replicas + * + * @param replicaSockets + * The sockets to send the request to. + * @param requestBuffer + * The buffer that contains the request. + */ + private void sendRequest(Map replicaSockets, ByteBuffer requestBuffer) { + Iterator> iterator = replicaSockets.entrySet().iterator(); + while (iterator.hasNext()) { + Entry replicaSocket = iterator.next(); + SocketChannel clientSocket = replicaSocket.getValue(); + try { + NetworkingUtil.transferBufferToChannel(clientSocket, requestBuffer); + } catch (IOException e) { + if (clientSocket.isOpen()) { + try { + clientSocket.close(); + } catch (IOException e2) { + e2.printStackTrace(); + } + } + reportFailedReplica(replicaSocket.getKey()); + iterator.remove(); + } finally { + requestBuffer.position(0); + } + } + } + + /** + * Closes the passed replication sockets by sending GOODBYE request to remote replicas. + * + * @param replicaSockets + */ + private void closeReplicaSockets(Map replicaSockets) { + //send goodbye + ByteBuffer goodbyeBuffer = AsterixReplicationProtocol.getGoodbyeBuffer(); + sendRequest(replicaSockets, goodbyeBuffer); + + Iterator> iterator = replicaSockets.entrySet().iterator(); + while (iterator.hasNext()) { + Entry replicaSocket = iterator.next(); + SocketChannel clientSocket = replicaSocket.getValue(); + if (clientSocket.isOpen()) { + try { + clientSocket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + @Override + public void initializeReplicasState() { + for (Replica replica : replicas.values()) { + checkReplicaState(replica.getNode().getId(), false, false); + } + } + + /** + * Checks the state of a remote replica by trying to ping it. + * + * @param replicaId + * The replica to check the state for. + * @param async + * a flag indicating whether to wait for the result or not. + * @param suspendReplication + * a flag indicating whether to suspend replication on replica state change or not. + */ + private void checkReplicaState(String replicaId, boolean async, boolean suspendReplication) { + Replica replica = replicas.get(replicaId); + + ReplicaStateChecker connector = new ReplicaStateChecker(replica, replicationProperties.getReplicationTimeOut(), + this, replicationProperties, suspendReplication); + Future ft = asterixAppRuntimeContextProvider.getThreadExecutor().submit(connector); + + if (!async) { + //wait until task is done + while (!ft.isDone()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + /** + * Updates the state of a remote replica. + * + * @param replicaId + * The replica id to update. + * @param newState + * The new state of the replica. + * @param suspendReplication + * a flag indicating whether to suspend replication on state change or not. + */ + public synchronized void updateReplicaState(String replicaId, ReplicaState newState, boolean suspendReplication) { + Replica replica = replicas.get(replicaId); + + if (replica.getState() == newState) { + return; + } + + if (suspendReplication) { + //prevent new jobs/logs from coming in + replicationSuspended.set(true); + + if (newState == ReplicaState.DEAD) { + //assume the dead replica ACK has been received for all pending jobs + synchronized (jobCommitAcks) { + for (Integer jobId : jobCommitAcks.keySet()) { + addAckToJob(jobId, replicaId); + } + } + } + + //force replication threads to stop in order to change the replication factor + suspendReplication(true); + } + + replica.setState(newState); + + if (newState == ReplicaState.ACTIVE) { + replicationFactor++; + //TODO Extra check: make sure newly added replica is in sync. + //Since in the current design the whole cluster becomes UNUSABLE, + //no new jobs could start before the failed node rejoins + } else if (newState == ReplicaState.DEAD) { + if (replicationFactor > INITIAL_REPLICATION_FACTOR) { + replicationFactor--; + } + } + + LOGGER.log(Level.WARNING, "Replica " + replicaId + " state changed to: " + newState.name() + + ". Replication factor changed to: " + replicationFactor); + + if (suspendReplication) { + startReplicationThreads(); + } + } + + /** + * When an ACK for a JOB_COMMIT is received, it is added to the corresponding job. + * + * @param jobId + * @param replicaId + * The remote replica id the ACK received from. + */ + private void addAckToJob(int jobId, String replicaId) { + //add ACK to the job + if (jobCommitAcks.containsKey(jobId)) { + Set replicaIds = jobCommitAcks.get(jobId); + replicaIds.add(replicaId); + } else { + throw new IllegalStateException("Job ID not found in pending job commits " + jobId); + } + + //if got ACKs from all remote replicas, notify pending jobs if any + if (jobCommitAcks.get(jobId).size() == replicationFactor) { + synchronized (replicationJobsPendingAcks) { + if (replicationJobsPendingAcks.containsKey(jobId)) { + ILogRecord pendingLog = replicationJobsPendingAcks.get(jobId); + synchronized (pendingLog) { + pendingLog.notify(); + } + } + } + } + } + + @Override + public boolean hasBeenReplicated(ILogRecord logRecord) { + if (jobCommitAcks.containsKey(logRecord.getJobId())) { + //check if all ACKs have been received + if (jobCommitAcks.get(logRecord.getJobId()).size() == replicationFactor) { + jobCommitAcks.remove(logRecord.getJobId()); + + if (replicationJobsPendingAcks.containsKey(logRecord.getJobId())) { + replicationJobsPendingAcks.remove(logRecord); + } + + //notify any threads waiting for all jobs to finish + if (jobCommitAcks.size() == 0) { + synchronized (jobCommitAcks) { + jobCommitAcks.notifyAll(); + } + } + + return true; + } else { + if (!replicationJobsPendingAcks.containsKey(logRecord.getJobId())) { + synchronized (replicationJobsPendingAcks) { + replicationJobsPendingAcks.put(logRecord.getJobId(), logRecord); + } + } + return false; + } + } + + //presume replicated + return true; + } + + private Map getActiveRemoteReplicasSockets() { + Map replicaNodesSockets = new HashMap(); + for (Replica replica : replicas.values()) { + if (replica.getState() == ReplicaState.ACTIVE) { + try { + SocketChannel sc = getReplicaSocket(replica.getId()); + replicaNodesSockets.put(replica.getId(), sc); + } catch (IOException e) { + reportFailedReplica(replica.getId()); + } + } + } + return replicaNodesSockets; + } + + /** + * Establishes a connection with a remote replica. + * + * @param replicaId + * The replica to connect to. + * @return The socket of the remote replica + * @throws IOException + */ + private SocketChannel getReplicaSocket(String replicaId) throws IOException { + Replica replica = replicas.get(replicaId); + SocketChannel sc = SocketChannel.open(); + sc.configureBlocking(true); + InetSocketAddress address = replica.getAddress(replicationProperties); + sc.connect(new InetSocketAddress(address.getHostString(), address.getPort())); + return sc; + } + + @Override + public Set getDeadReplicasIds() { + Set replicasIds = new HashSet(); + for (Replica replica : replicas.values()) { + if (replica.getState() == ReplicaState.DEAD) { + replicasIds.add(replica.getNode().getId()); + } + } + return replicasIds; + } + + @Override + public Set getActiveReplicasIds() { + Set replicasIds = new HashSet(); + for (Replica replica : replicas.values()) { + if (replica.getState() == ReplicaState.ACTIVE) { + replicasIds.add(replica.getNode().getId()); + } + } + return replicasIds; + } + + @Override + public int getActiveReplicasCount() { + return getActiveReplicasIds().size(); + } + + @Override + public void start() { + //do nothing + } + + @Override + public void dumpState(OutputStream os) throws IOException { + //do nothing + } + + /** + * Called during NC shutdown to notify remote replicas about the shutdown + * and wait for remote replicas shutdown notification then closes the local replication channel. + */ + @Override + public void stop(boolean dumpState, OutputStream ouputStream) throws IOException { + try { + //stop replication thread afters all jobs/logs have been processed + suspendReplication(false); + //send shutdown event to remote replicas + sendShutdownNotifiction(); + //wait until all shutdown events come from all remote replicas + synchronized (shuttingDownReplicaIds) { + while (!shuttingDownReplicaIds.containsAll(getActiveReplicasIds())) { + try { + shuttingDownReplicaIds.wait(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + LOGGER.log(Level.INFO, "Got shutdown notification from all remote replicas"); + //close replication channel + asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close(); + + LOGGER.log(Level.INFO, "Replication manager stopped."); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void reportReplicaEvent(ReplicaEvent event) { + synchronized (replicaEventsQ) { + replicaEventsQ.offer(event); + } + } + + /** + * Suspends replications and sends a remote replica failure event to ReplicasEventsMonitor. + * + * @param replicaId + * the failed replica id. + */ + public void reportFailedReplica(String replicaId) { + Replica replica = replicas.get(replicaId); + if (replica.getState() == ReplicaState.DEAD) { + return; + } + + //need to stop processing any new logs or jobs + terminateJobsReplication.set(true); + + ReplicaEvent event = new ReplicaEvent(replica, ReplicaEventType.FAIL); + reportReplicaEvent(event); + } + + @Override + public void startReplicationThreads() { + replicationJobsProcessor = new ReplicationJobsProccessor(); + + //start/continue processing jobs/logs + if (logsReplicaSockets == null) { + establishTxnLogsReplicationConnection(); + getAndInitNewPage(); + txnlogsReplicator = new ReplicationLogFlusher(emptyLogBuffersQ, pendingFlushLogBuffersQ); + txnLogReplicatorTask = asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogsReplicator); + } + + replicationJobsProcessor.start(); + + if (!replicationMonitor.isAlive()) { + replicationMonitor.start(); + } + + //notify any waiting threads that replication has been resumed + synchronized (replicationSuspended) { + LOGGER.log(Level.INFO, "Replication started/resumed"); + replicationSuspended.set(false); + replicationSuspended.notifyAll(); + } + } + + @Override + public void requestFlushLaggingReplicaIndexes(long nonSharpCheckpointTargetLSN) throws IOException { + long startLSN = logManager.getAppendLSN(); + Set replicaIds = getActiveReplicasIds(); + ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE); + for (String replicaId : replicaIds) { + //1. identify replica indexes with LSN less than nonSharpCheckpointTargetLSN. + HashMap laggingIndexes = replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId, + nonSharpCheckpointTargetLSN); + + if (laggingIndexes.size() > 0) { + //2. send request to remote replicas that have lagging indexes. + ReplicaIndexFlushRequest laggingIndexesResponse = null; + try (SocketChannel socketChannel = getReplicaSocket(replicaId)) { + ReplicaIndexFlushRequest laggingIndexesRequest = new ReplicaIndexFlushRequest( + laggingIndexes.keySet()); + requestBuffer = AsterixReplicationProtocol.writeGetReplicaIndexFlushRequest(requestBuffer, + laggingIndexesRequest); + NetworkingUtil.transferBufferToChannel(socketChannel, requestBuffer); + + //3. remote replicas will respond with indexes that were not flushed. + ReplicationRequestType responseFunction = waitForResponse(socketChannel, requestBuffer); + + if (responseFunction == ReplicationRequestType.FLUSH_INDEX) { + requestBuffer = AsterixReplicationProtocol.readRequest(socketChannel, requestBuffer); + //returning the indexes that were not flushed + laggingIndexesResponse = AsterixReplicationProtocol.readReplicaIndexFlushRequest(requestBuffer); + } + //send goodbye + AsterixReplicationProtocol.sendGoodbye(socketChannel); + } + + //4. update the LSN_MAP for indexes that were not flushed to the current append LSN to indicate no operations happend. + if (laggingIndexesResponse != null) { + for (Long resouceId : laggingIndexesResponse.getLaggingRescouresIds()) { + String indexPath = laggingIndexes.get(resouceId); + HashMap indexLSNMap = replicaResourcesManager.getReplicaIndexLSNMap(indexPath); + indexLSNMap.put(ReplicaResourcesManager.REPLICA_INDEX_CREATION_LSN, startLSN); + replicaResourcesManager.updateReplicaIndexLSNMap(indexPath, indexLSNMap); + } + } + } + } + } + + //Recovery Method + @Override + public long getMaxRemoteLSN(Set remoteReplicas) throws IOException { + long maxRemoteLSN = 0; + + AsterixReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer); + Map replicaSockets = new HashMap(); + try { + for (String replicaId : remoteReplicas) { + replicaSockets.put(replicaId, getReplicaSocket(replicaId)); + } + + //send request + Iterator> iterator = replicaSockets.entrySet().iterator(); + while (iterator.hasNext()) { + Entry replicaSocket = iterator.next(); + SocketChannel clientSocket = replicaSocket.getValue(); + NetworkingUtil.transferBufferToChannel(clientSocket, dataBuffer); + dataBuffer.position(0); + } + + iterator = replicaSockets.entrySet().iterator(); + while (iterator.hasNext()) { + Entry replicaSocket = iterator.next(); + SocketChannel clientSocket = replicaSocket.getValue(); + //read response + NetworkingUtil.readBytes(clientSocket, dataBuffer, Long.BYTES); + maxRemoteLSN = Math.max(maxRemoteLSN, dataBuffer.getLong()); + } + } finally { + closeReplicaSockets(replicaSockets); + } + + return maxRemoteLSN; + } + + //Recovery Method + @Override + public void requestReplicaFiles(String selectedReplicaId, Set replicasDataToRecover) throws IOException { + ReplicaFilesRequest request = new ReplicaFilesRequest(replicasDataToRecover); + AsterixReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request); + + try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId)) { + + //transfer request + NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer); + + String destFolder; + String destFilePath; + + ReplicationRequestType responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel, + dataBuffer); + AsterixLSMIndexFileProperties fileProperties; + while (responseFunction != ReplicationRequestType.GOODBYE) { + dataBuffer = AsterixReplicationProtocol.readRequest(socketChannel, dataBuffer); + + fileProperties = AsterixReplicationProtocol.readFileReplicationRequest(dataBuffer); + destFolder = replicaResourcesManager.getIndexPath(fileProperties.getNodeId(), + fileProperties.getIoDeviceNum(), fileProperties.getDataverse(), fileProperties.getIdxName()); + destFilePath = destFolder + File.separator + fileProperties.getFileName(); + + //create file + File destFile = new File(destFilePath); + destFile.createNewFile(); + + try (RandomAccessFile fileOutputStream = new RandomAccessFile(destFile, "rw"); + FileChannel fileChannel = fileOutputStream.getChannel();) { + fileOutputStream.setLength(fileProperties.getFileSize()); + + NetworkingUtil.downloadFile(fileChannel, socketChannel); + fileChannel.force(true); + } + + //we need to create LSN map for .metadata files that belong to remote replicas + if (!fileProperties.isLSMComponentFile() && !fileProperties.getNodeId().equals(nodeId)) { + //replica index + replicaResourcesManager.initializeReplicaIndexLSNMap(destFolder, logManager.getAppendLSN()); + } + + responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer); + } + + //send goodbye + AsterixReplicationProtocol.sendGoodbye(socketChannel); + } + } + + //Recovery Method + @Override + public long requestReplicaMinLSN(String selectedReplicaId) throws IOException { + long minLSN = 0; + AsterixReplicationProtocol.writeMinLSNRequest(dataBuffer); + try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId);) { + //transfer request + NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer); + + //read response + NetworkingUtil.readBytes(socketChannel, dataBuffer, Long.BYTES); + minLSN = dataBuffer.getLong(); + + //send goodbye + AsterixReplicationProtocol.sendGoodbye(socketChannel); + } + + return minLSN; + } + + //Recovery Method + @Override + public ArrayList requestReplicaLogs(String remoteNode, Set nodeIdsToRecoverFor, long fromLSN) + throws IOException, ACIDException { + ReplicaLogsRequest request = new ReplicaLogsRequest(nodeIdsToRecoverFor, fromLSN); + dataBuffer = AsterixReplicationProtocol.writeGetReplicaLogsRequest(dataBuffer, request); + + try (SocketChannel socketChannel = getReplicaSocket(remoteNode)) { + //transfer request + NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer); + + //read response type + ReplicationRequestType responseType = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer); + + ArrayList recoveryLogs = new ArrayList(); + ILogRecord logRecord = new LogRecord(); + while (responseType != ReplicationRequestType.GOODBYE) { + dataBuffer = AsterixReplicationProtocol.readRequest(socketChannel, dataBuffer); + logRecord.deserialize(dataBuffer, true, nodeId); + + if (logRecord.getNodeId().equals(nodeId)) { + //store log in memory to replay it for recovery + recoveryLogs.add(logRecord); + //this needs to be a new log object so that it is passed to recovery manager as a different object + logRecord = new LogRecord(); + } else { + //send log to log manager as a remote recovery log + logManager.log(logRecord); + } + + responseType = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer); + } + + //send goodbye + AsterixReplicationProtocol.sendGoodbye(socketChannel); + return recoveryLogs; + } + } + + //supporting classes + /** + * This class is responsible for processing replica events. + */ + private class ReplicasEventsMonitor extends Thread { + ReplicaEvent event; + + @Override + public void run() { + while (true) { + try { + event = replicaEventsQ.take(); + + switch (event.getEventType()) { + case FAIL: + handleReplicaFailure(event.getReplica().getId()); + break; + case JOIN: + checkReplicaState(event.getReplica().getId(), false, true); + break; + case SHUTDOWN: + handleShutdownEvent(event.getReplica().getId()); + break; + default: + break; + } + } catch (InterruptedException e) { + //ignore + } + } + } + + public void handleReplicaFailure(String replicaId) { + Replica replica = replicas.get(replicaId); + + if (replica.getState() == ReplicaState.DEAD) { + return; + } + + updateReplicaState(replicaId, ReplicaState.DEAD, true); + + //delete any invalid LSMComponents for this replica + try { + replicaResourcesManager.cleanInvalidLSMComponents(replicaId); + } catch (HyracksDataException e) { + e.printStackTrace(); + } + } + + public void handleShutdownEvent(String replicaId) { + synchronized (shuttingDownReplicaIds) { + shuttingDownReplicaIds.add(replicaId); + shuttingDownReplicaIds.notifyAll(); + } + } + } + + /** + * This class process is responsible for processing ASYNC replication job. + */ + private class ReplicationJobsProccessor extends Thread { + Map replicaSockets; + ByteBuffer reusableBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE); + + @Override + public void run() { + Thread.currentThread().setName("ReplicationJobsProccessor Thread"); + terminateJobsReplication.set(false); + jobsReplicationSuspended.set(false); + + while (true) { + try { + if (terminateJobsReplication.get()) { + closeSockets(); + break; + } + + IReplicationJob job = replicationJobsQ.take(); + if (job.getOperation() != ReplicationOperation.STOP) { + //if there isn't already a connection, establish a new one + if (replicaSockets == null) { + replicaSockets = getActiveRemoteReplicasSockets(); + } + + processJob(job, replicaSockets, reusableBuffer); + } else { + terminateJobsReplication.set(true); + continue; + } + + //if no more jobs to process, close sockets + if (replicationJobsQ.size() == 0) { + LOGGER.log(Level.INFO, "No pending replication jobs. Closing connections to replicas"); + closeSockets(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + synchronized (jobsReplicationSuspended) { + jobsReplicationSuspended.set(true); + jobsReplicationSuspended.notifyAll(); + } + LOGGER.log(Level.INFO, "ReplicationJobsProccessor stopped. "); + } + + private void closeSockets() { + if (replicaSockets != null) { + closeReplicaSockets(replicaSockets); + replicaSockets.clear(); + replicaSockets = null; + } + } + } + + /** + * This class is responsible for listening on sockets that belong to TxnLogsReplicator. + */ + private class TxnLogsReplicationResponseListener implements Runnable { + final SocketChannel replicaSocket; + final String replicaId; + + public TxnLogsReplicationResponseListener(String replicaId, SocketChannel replicaSocket) { + this.replicaId = replicaId; + this.replicaSocket = replicaSocket; + } + + @Override + public void run() { + Thread.currentThread().setName("TxnLogs Replication Listener Thread"); + LOGGER.log(Level.INFO, "Started listening on socket: " + replicaSocket.socket().getRemoteSocketAddress()); + + try { + BufferedReader incomingResponse = new BufferedReader( + new InputStreamReader(replicaSocket.socket().getInputStream())); + String responseLine = ""; + while (true) { + responseLine = incomingResponse.readLine(); + if (responseLine == null) { + break; + } + //read ACK for job commit log + String replicaId = AsterixReplicationProtocol.getNodeIdFromLogAckMessage(responseLine); + int jobId = AsterixReplicationProtocol.getJobIdFromLogAckMessage(responseLine); + addAckToJob(jobId, replicaId); + } + } catch (AsynchronousCloseException e1) { + LOGGER.log(Level.INFO, "Replication listener stopped for remote replica: " + replicaId); + } catch (IOException e2) { + reportFailedReplica(replicaId); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java new file mode 100644 index 0000000..f424bc3 --- /dev/null +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.recovery; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.config.AsterixReplicationProperties; +import org.apache.asterix.common.context.DatasetLifecycleManager; +import org.apache.asterix.common.replication.IRemoteRecoveryManager; +import org.apache.asterix.common.replication.IReplicationManager; +import org.apache.asterix.common.transactions.ILogManager; +import org.apache.asterix.common.transactions.ILogRecord; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; + +public class RemoteRecoveryManager implements IRemoteRecoveryManager { + + private final IReplicationManager replicationManager; + private final ILogManager logManager; + public static final boolean IS_DEBUG_MODE = false;//true + private static final Logger LOGGER = Logger.getLogger(RemoteRecoveryManager.class.getName()); + private final IAsterixAppRuntimeContext runtimeContext; + private final AsterixReplicationProperties replicationProperties; + + public RemoteRecoveryManager(IReplicationManager replicationManager, IAsterixAppRuntimeContext runtimeContext, + AsterixReplicationProperties replicationProperties) { + this.replicationManager = replicationManager; + this.runtimeContext = runtimeContext; + this.logManager = runtimeContext.getTransactionSubsystem().getLogManager(); + this.replicationProperties = replicationProperties; + } + + @Override + public void performRemoteRecovery() { + //The whole remote recovery process should be atomic. + //Any error happens, we should start the recovery from the start until the recovery is complete or an illegal state is reached (cannot recovery). + int maxRecoveryAttempts = 10; + + while (true) { + //start recovery recovery steps + try { + maxRecoveryAttempts--; + + if (maxRecoveryAttempts == 0) { + //to avoid infinite loop in case of unexpected behavior. + throw new IllegalStateException("Failed to perform remote recovery."); + } + + /*** Prepare for Recovery ***/ + //1. check remote replicas states + replicationManager.initializeReplicasState(); + int activeReplicasCount = replicationManager.getActiveReplicasCount(); + + if (activeReplicasCount == 0) { + throw new IllegalStateException("no ACTIVE remote replica(s) exists to performe remote recovery"); + } + + //2. clean any memory data that could've existed from previous failed recovery attempt + IDatasetLifecycleManager datasetLifeCycleManager = runtimeContext.getDatasetLifecycleManager(); + datasetLifeCycleManager.closeAllDatasets(); + + //3. remove any existing storage data + runtimeContext.getReplicaResourcesManager().deleteAsterixStorageData(); + + //4. select remote replicas to recover from per lost replica data + Map> selectedRemoteReplicas = constructRemoteRecoveryPlan(); + + //5. get max LSN from selected remote replicas + long maxRemoteLSN = 0; + maxRemoteLSN = replicationManager.getMaxRemoteLSN(selectedRemoteReplicas.keySet()); + + //6. force LogManager to start from a partition > maxLSN in selected remote replicas + logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN); + + /*** Start Recovery Per Lost Replica ***/ + for (Entry> remoteReplica : selectedRemoteReplicas.entrySet()) { + String replicaId = remoteReplica.getKey(); + Set replicasDataToRecover = remoteReplica.getValue(); + + //1. Request indexes metadata and LSM components + replicationManager.requestReplicaFiles(replicaId, replicasDataToRecover); + + //2. Initialize local resources based on the newly received files (if we are recovering the primary replica on this node) + if (replicasDataToRecover.contains(logManager.getNodeId())) { + ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository()).initialize( + logManager.getNodeId(), + runtimeContext.getReplicaResourcesManager().getLocalStorageFolder()); + //initialize resource id factor to correct max resource id + runtimeContext.initializeResourceIdFactory(); + } + + //3. Get min LSN to start requesting logs from + long minLSN = replicationManager.requestReplicaMinLSN(replicaId); + + //4. Request remote logs from selected remote replicas + ArrayList remoteRecoveryLogs = replicationManager.requestReplicaLogs(replicaId, + replicasDataToRecover, minLSN); + + //5. Replay remote logs using recovery manager + if (replicasDataToRecover.contains(logManager.getNodeId())) { + if (remoteRecoveryLogs.size() > 0) { + runtimeContext.getTransactionSubsystem().getRecoveryManager() + .replayRemoteLogs(remoteRecoveryLogs); + } + remoteRecoveryLogs.clear(); + } + } + + LOGGER.log(Level.INFO, "Completed remote recovery successfully!"); + break; + } catch (Exception e) { + e.printStackTrace(); + LOGGER.log(Level.WARNING, "Failed during remote recovery. Attempting again..."); + } + } + } + + private Map> constructRemoteRecoveryPlan() { + + //1. identify which replicas reside in this node + String localNodeId = logManager.getNodeId(); + Set nodes = replicationProperties.getNodeReplicasIds(localNodeId); + + Map> recoveryCandidates = new HashMap>(); + Map candidatesScore = new HashMap(); + + //2. identify which nodes has backup per lost node data + for (String node : nodes) { + Set locations = replicationProperties.getNodeReplicasIds(node); + + //since the local node just started, remove it from candidates + locations.remove(localNodeId); + + //remove any dead replicas + Set deadReplicas = replicationManager.getDeadReplicasIds(); + for (String deadReplica : deadReplicas) { + locations.remove(deadReplica); + } + + //no active replicas to recover from + if (locations.size() == 0) { + throw new IllegalStateException("Could not find any ACTIVE replica to recover " + node + " data."); + } + + for (String location : locations) { + if (candidatesScore.containsKey(location)) { + candidatesScore.put(location, candidatesScore.get(location) + 1); + } else { + candidatesScore.put(location, 1); + } + } + recoveryCandidates.put(node, locations); + } + + Map> recoveryList = new HashMap>(); + + //3. find best candidate to recover from per lost replica data + for (Entry> entry : recoveryCandidates.entrySet()) { + + int winnerScore = -1; + String winner = ""; + for (String node : entry.getValue()) { + + int nodeScore = candidatesScore.get(node); + + if (nodeScore > winnerScore) { + winnerScore = nodeScore; + winner = node; + } + } + + if (recoveryList.containsKey(winner)) { + recoveryList.get(winner).add(entry.getKey()); + } else { + Set nodesToRecover = new HashSet(); + nodesToRecover.add(entry.getKey()); + recoveryList.put(winner, nodesToRecover); + } + + } + + return recoveryList; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java ---------------------------------------------------------------------- diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java new file mode 100644 index 0000000..67b39c4 --- /dev/null +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.storage; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; + +public class AsterixFilesUtil { + + public static void deleteFolder(String folderPath) throws IOException { + File folder = new File(folderPath); + if (folder.exists()) { + //delete files inside the folder + while (deleteDirecotryFiles(folderPath) != true) { + //if there is a file being written (e.g. LSM Component), wait and try again to delete the file + try { + Thread.sleep(500); + } catch (InterruptedException e) { + //ignore + } + } + + //delete the folder itself + folder.delete(); + } + } + + private static boolean deleteDirecotryFiles(String dirPath) throws IOException { + try { + Path directory = Paths.get(dirPath); + Files.walkFileTree(directory, new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + Files.delete(file); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + Files.delete(dir); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException { + return FileVisitResult.CONTINUE; + } + + }); + return true; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java ---------------------------------------------------------------------- diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java new file mode 100644 index 0000000..30f2afc --- /dev/null +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.storage; + +import java.io.DataInput; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil; + +public class AsterixLSMIndexFileProperties { + + private String fileName; + private long fileSize; + private String nodeId; + private String dataverse; + private int ioDeviceNum; + private String idxName; + private boolean lsmComponentFile; + private boolean requireLSNSync; + private String filePath; + private boolean requiresAck = false; + + public AsterixLSMIndexFileProperties() { + } + + public AsterixLSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, + boolean requireLSNSync, boolean requiresAck) { + initialize(filePath, fileSize, nodeId, lsmComponentFile, requireLSNSync, requiresAck); + } + + public AsterixLSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) { + initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false, false, false); + } + + public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, + boolean requireLSNSync, boolean requiresAck) { + this.filePath = filePath; + this.fileSize = fileSize; + this.nodeId = nodeId; + String[] tokens = filePath.split(File.separator); + + int arraySize = tokens.length; + this.fileName = tokens[arraySize - 1]; + this.ioDeviceNum = getDeviceIONumFromName(tokens[arraySize - 2]); + this.idxName = tokens[arraySize - 3]; + this.dataverse = tokens[arraySize - 4]; + this.lsmComponentFile = lsmComponentFile; + this.requireLSNSync = requireLSNSync; + this.requiresAck = requiresAck; + } + + public static int getDeviceIONumFromName(String name) { + return Integer.parseInt(name.substring(IndexFileNameUtil.IO_DEVICE_NAME_PREFIX.length())); + } + + public void serialize(OutputStream out) throws IOException { + DataOutputStream dos = new DataOutputStream(out); + dos.writeUTF(nodeId); + dos.writeUTF(filePath); + dos.writeLong(fileSize); + dos.writeBoolean(lsmComponentFile); + dos.writeBoolean(requireLSNSync); + dos.writeBoolean(requiresAck); + } + + public static AsterixLSMIndexFileProperties create(DataInput input) throws IOException { + String nodeId = input.readUTF(); + String filePath = input.readUTF(); + long fileSize = input.readLong(); + boolean lsmComponentFile = input.readBoolean(); + boolean requireLSNSync = input.readBoolean(); + boolean requiresAck = input.readBoolean(); + AsterixLSMIndexFileProperties fileProp = new AsterixLSMIndexFileProperties(); + fileProp.initialize(filePath, fileSize, nodeId, lsmComponentFile, requireLSNSync, requiresAck); + return fileProp; + } + + public String getFilePath() { + return filePath; + } + + public void setFilePath(String filePath) { + this.filePath = filePath; + } + + public long getFileSize() { + return fileSize; + } + + public String getFileName() { + return fileName; + } + + public void setFileName(String fileName) { + this.fileName = fileName; + } + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public String getDataverse() { + return dataverse; + } + + public void setDataverse(String dataverse) { + this.dataverse = dataverse; + } + + public void setFileSize(long fileSize) { + this.fileSize = fileSize; + } + + public int getIoDeviceNum() { + return ioDeviceNum; + } + + public void setIoDeviceNum(int ioDevoceNum) { + this.ioDeviceNum = ioDevoceNum; + } + + public String getIdxName() { + return idxName; + } + + public void setIdxName(String idxName) { + this.idxName = idxName; + } + + public boolean isLSMComponentFile() { + return lsmComponentFile; + } + + public void setLsmComponentFile(boolean lsmComponentFile) { + this.lsmComponentFile = lsmComponentFile; + } + + public boolean isRequireLSNSync() { + return requireLSNSync; + } + + public void setRequireLSNSync(boolean requireLSNSync) { + this.requireLSNSync = requireLSNSync; + } + + public boolean requiresAck() { + return requiresAck; + } + + public void setRequiresAck(boolean requiresAck) { + this.requiresAck = requiresAck; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("File Name: " + fileName + " "); + sb.append("File Size: " + fileSize + " "); + sb.append("Node ID: " + nodeId + " "); + sb.append("I/O Device: " + ioDeviceNum + " "); + sb.append("IDX Name: " + idxName + " "); + sb.append("isLSMComponentFile : " + lsmComponentFile + " "); + sb.append("Dataverse: " + dataverse); + return sb.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java ---------------------------------------------------------------------- diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java new file mode 100644 index 0000000..69f7d07 --- /dev/null +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.storage; + +public class LSMComponentLSNSyncTask { + private String componentFilePath; + private String componentId; + + public LSMComponentLSNSyncTask(String componentId, String componentFilePath) { + this.componentId = componentId; + this.componentFilePath = componentFilePath; + } + + public String getComponentFilePath() { + return componentFilePath; + } + + public void setComponentFilePath(String componentFilePath) { + this.componentFilePath = componentFilePath; + } + + public String getComponentId() { + return componentId; + } + + public void setComponentId(String componentId) { + this.componentId = componentId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/209f3907/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java ---------------------------------------------------------------------- diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java new file mode 100644 index 0000000..84d5dbe --- /dev/null +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.storage; + +import java.io.DataInput; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.frames.LIFOMetaDataFrame; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob; +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; + +public class LSMComponentProperties { + + private AtomicInteger numberOfFiles; + private String componentId; + private long LSNOffset; + private long originalLSN; + private String nodeId; + private Long replicaLSN; + private String maskPath = null; + private String replicaPath = null; + private LSMOperationType opType; + + public LSMComponentProperties(ILSMIndexReplicationJob job, String nodeId) { + this.nodeId = nodeId; + componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0], nodeId); + numberOfFiles = new AtomicInteger(job.getJobFiles().size()); + originalLSN = getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(), job.getLSMIndexOperationContext()); + //TODO this should be changed to a dynamic value when append only LSM indexes are implemented + LSNOffset = LIFOMetaDataFrame.lsnOff; + opType = job.getLSMOpType(); + } + + public LSMComponentProperties() { + + } + + public long getLSMComponentLSN(AbstractLSMIndex lsmIndex, ILSMIndexOperationContext ctx) { + long componentLSN = -1; + try { + componentLSN = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback()).getComponentLSN(ctx + .getComponentsToBeReplicated()); + } catch (HyracksDataException e) { + e.printStackTrace(); + } + if (componentLSN < 0) { + componentLSN = 0; + } + return componentLSN; + } + + public void serialize(OutputStream out) throws IOException { + DataOutputStream dos = new DataOutputStream(out); + dos.writeUTF(componentId); + dos.writeUTF(nodeId); + dos.writeInt(numberOfFiles.get()); + dos.writeLong(originalLSN); + dos.writeLong(LSNOffset); + dos.writeInt(opType.ordinal()); + } + + public static LSMComponentProperties create(DataInput input) throws IOException { + LSMComponentProperties lsmCompProp = new LSMComponentProperties(); + lsmCompProp.componentId = input.readUTF(); + lsmCompProp.nodeId = input.readUTF(); + lsmCompProp.numberOfFiles = new AtomicInteger(input.readInt()); + lsmCompProp.originalLSN = input.readLong(); + lsmCompProp.LSNOffset = input.readLong(); + lsmCompProp.opType = LSMOperationType.values()[input.readInt()]; + return lsmCompProp; + } + + public String getMaskPath(ReplicaResourcesManager resourceManager) { + if (maskPath == null) { + AsterixLSMIndexFileProperties afp = new AsterixLSMIndexFileProperties(this); + maskPath = getReplicaComponentPath(resourceManager) + File.separator + afp.getFileName() + + ReplicaResourcesManager.LSM_COMPONENT_MASK_SUFFIX; + } + return maskPath; + } + + public String getReplicaComponentPath(ReplicaResourcesManager resourceManager) { + if (replicaPath == null) { + AsterixLSMIndexFileProperties afp = new AsterixLSMIndexFileProperties(this); + replicaPath = resourceManager.getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(), afp.getDataverse(), + afp.getIdxName()); + } + return replicaPath; + } + + /*** + * @param filePath + * any file of the LSM component + * @param nodeId + * @return a unique id based on the timestamp of the component + */ + public static String getLSMComponentID(String filePath, String nodeId) { + String[] tokens = filePath.split(File.separator); + + int arraySize = tokens.length; + String fileName = tokens[arraySize - 1]; + String ioDevoceName = tokens[arraySize - 2]; + String idxName = tokens[arraySize - 3]; + String dataverse = tokens[arraySize - 4]; + + StringBuilder componentId = new StringBuilder(); + componentId.append(nodeId); + componentId.append(File.separator); + componentId.append(dataverse); + componentId.append(File.separator); + componentId.append(idxName); + componentId.append(File.separator); + componentId.append(ioDevoceName); + componentId.append(File.separator); + componentId.append(fileName.substring(0, fileName.lastIndexOf(AbstractLSMIndexFileManager.SPLIT_STRING))); + return componentId.toString(); + } + + public String getComponentId() { + return componentId; + } + + public void setComponentId(String componentId) { + this.componentId = componentId; + } + + public long getLSNOffset() { + return LSNOffset; + } + + public void setLSNOffset(long lSNOffset) { + LSNOffset = lSNOffset; + } + + public long getOriginalLSN() { + return originalLSN; + } + + public void setOriginalLSN(long lSN) { + originalLSN = lSN; + } + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public int getNumberOfFiles() { + return numberOfFiles.get(); + } + + public int markFileComplete() { + return numberOfFiles.decrementAndGet(); + } + + public void setNumberOfFiles(AtomicInteger numberOfFiles) { + this.numberOfFiles = numberOfFiles; + } + + public Long getReplicaLSN() { + return replicaLSN; + } + + public void setReplicaLSN(Long replicaLSN) { + this.replicaLSN = replicaLSN; + } + + public LSMOperationType getOpType() { + return opType; + } + + public void setOpType(LSMOperationType opType) { + this.opType = opType; + } +}