asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject asterixdb git commit: [NO ISSUE][REPL] Exclude Non-Replicated Datasets From Delta Recovery
Date Wed, 21 Feb 2018 10:24:53 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master b82f6dfb8 -> 877a36de4


[NO ISSUE][REPL] Exclude Non-Replicated Datasets From Delta Recovery

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Exclude non-replicated datasets files from
  delta recovery.
- Fix used read buffer for large replication
  requests.

Change-Id: Ic734af7becf26082e79fae52bd2c01ba567c1c99
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2412
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/877a36de
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/877a36de
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/877a36de

Branch: refs/heads/master
Commit: 877a36de4e7f148bd859e8e5ed49e7d5fe26fb59
Parents: b82f6df
Author: Murtadha Hubail <mhubail@apache.org>
Authored: Wed Feb 21 04:30:01 2018 +0300
Committer: Murtadha Hubail <mhubail@apache.org>
Committed: Wed Feb 21 02:24:22 2018 -0800

----------------------------------------------------------------------
 .../messaging/PartitionResourcesListTask.java   |  9 +++--
 .../messaging/ReplicationProtocol.java          |  7 ++--
 .../sync/ReplicaFilesSynchronizer.java          |  7 ++--
 .../PersistentLocalResourceRepository.java      | 35 ++++++++++++++------
 4 files changed, 40 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/877a36de/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index 54d3a02..b2b1ad1 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -26,9 +26,10 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -48,8 +49,10 @@ public class PartitionResourcesListTask implements IReplicaTask {
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
         localResourceRepository.cleanup(partition);
-        final List<String> partitionResources = localResourceRepository.getPartitionIndexesFiles(partition).stream()
-                .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
+        final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+        final List<String> partitionResources =
+                localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
+                        .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
         final PartitionResourcesListResponse response =
                 new PartitionResourcesListResponse(partition, partitionResources);
         ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/877a36de/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
index 280a2d4..41e7d9e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
@@ -72,7 +72,7 @@ public class ReplicationProtocol {
         final ByteBuffer buf = ensureSize(dataBuffer, requestSize);
         // read request
         NetworkingUtil.readBytes(socketChannel, buf, requestSize);
-        return dataBuffer;
+        return buf;
     }
 
     public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer
byteBuffer)
@@ -135,6 +135,7 @@ public class ReplicationProtocol {
             requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
             requestBuffer.flip();
             NetworkingUtil.transferBufferToChannel(channel, requestBuffer);
+            channel.socket().getOutputStream().flush();
         } catch (IOException e) {
             throw new ReplicationException(e);
         }
@@ -148,9 +149,9 @@ public class ReplicationProtocol {
     public static IReplicationMessage readMessage(ReplicationRequestType type, SocketChannel
socketChannel,
             ByteBuffer buffer) {
         try {
-            ReplicationProtocol.readRequest(socketChannel, buffer);
+            final ByteBuffer requestBuf = ReplicationProtocol.readRequest(socketChannel,
buffer);
             final ByteArrayInputStream input =
-                    new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+                    new ByteArrayInputStream(requestBuf.array(), requestBuf.position(), requestBuf.limit());
             try (DataInputStream dis = new DataInputStream(input)) {
                 switch (type) {
                     case PARTITION_RESOURCES_REQUEST:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/877a36de/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index 5658779..fae6ed6 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
@@ -52,8 +53,10 @@ public class ReplicaFilesSynchronizer {
         final Set<String> replicaFiles = getReplicaFiles(partition);
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
-        final Set<String> masterFiles = localResourceRepository.getPartitionIndexesFiles(partition).stream()
-                .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
+        final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+        final Set<String> masterFiles =
+                localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
+                        .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
         // find files on master and not on replica
         final List<String> replicaMissingFiles =
                 masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/877a36de/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index ca22a84..7206382 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -52,6 +52,7 @@ import java.util.stream.Stream;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.storage.IIndexCheckpointManager;
@@ -342,18 +343,32 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
         });
     }
 
-    public List<String> getPartitionIndexesFiles(int partition) throws HyracksDataException
{
-        List<String> partitionFiles = new ArrayList<>();
-        Set<File> partitionIndexes = getPartitionIndexes(partition);
-        for (File indexDir : partitionIndexes) {
-            if (indexDir.isDirectory()) {
-                File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
-                if (indexFiles != null) {
-                    Stream.of(indexFiles).map(File::getAbsolutePath).forEach(partitionFiles::add);
-                }
+    public List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy
strategy)
+            throws HyracksDataException {
+        final List<String> partitionReplicatedFiles = new ArrayList<>();
+        final Set<File> replicatedIndexes = new HashSet<>();
+        final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
+        for (LocalResource lr : partitionResources.values()) {
+            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
+            if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+                replicatedIndexes.add(ioManager.resolve(lr.getPath()).getFile());
+            }
+        }
+        for (File indexDir : replicatedIndexes) {
+            partitionReplicatedFiles.addAll(getIndexFiles(indexDir));
+        }
+        return partitionReplicatedFiles;
+    }
+
+    private List<String> getIndexFiles(File indexDir) {
+        final List<String> indexFiles = new ArrayList<>();
+        if (indexDir.isDirectory()) {
+            File[] indexFilteredFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
+            if (indexFilteredFiles != null) {
+                Stream.of(indexFilteredFiles).map(File::getAbsolutePath).forEach(indexFiles::add);
             }
         }
-        return partitionFiles;
+        return indexFiles;
     }
 
     private void createStorageRoots() {


Mime
View raw message