hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramkris...@apache.org
Subject [2/3] hbase git commit: HBASE-13153 Bulk Loaded HFile Replication (Ashish Singhi)
Date Thu, 10 Dec 2015 07:42:00 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/65117d3d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 00d20aa..ead2d25 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1740,7 +1740,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         List<WALEntry> entries = request.getEntryList();
         CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
         regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
-        regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
+        regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
+          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
+          request.getSourceHFileArchiveDirPath());
         regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
         return ReplicateWALEntryResponse.newBuilder().build();
       } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/65117d3d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
index 5f96bf7..836d3aa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
@@ -36,7 +36,13 @@ public interface ReplicationSinkService extends ReplicationService {
    * Carry on the list of log entries down to the sink
    * @param entries list of WALEntries to replicate
    * @param cells Cells that the WALEntries refer to (if cells is non-null)
+   * @param replicationClusterId Id which will uniquely identify source cluster FS client
+   *          configurations in the replication configuration directory
+   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
+   *          directory required for replicating hfiles
+   * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
    * @throws IOException
    */
-  void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException;
+  void replicateLogEntries(List<WALEntry> entries, CellScanner cells, String replicationClusterId,
+      String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/65117d3d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
index 457d859..db98083 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
@@ -85,17 +85,16 @@ public interface WALActionsListener {
   );
 
   /**
-   *
    * @param htd
    * @param logKey
-   * @param logEdit
-   * TODO: Retire this in favor of {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)}
-   * It only exists to get scope when replicating.  Scope should be in the WALKey and not need
-   * us passing in a <code>htd</code>.
+   * @param logEdit TODO: Retire this in favor of
+   *          {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)} It only exists to get
+   *          scope when replicating. Scope should be in the WALKey and not need us passing in a
+   *          <code>htd</code>.
+   * @throws IOException If failed to parse the WALEdit
    */
-  void visitLogEntryBeforeWrite(
-    HTableDescriptor htd, WALKey logKey, WALEdit logEdit
-  );
+  void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
+      throws IOException;
 
   /**
    * For notification post append to the writer.  Used by metrics system at least.
@@ -136,7 +135,9 @@ public interface WALActionsListener {
     public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey, WALEdit logEdit) {}
 
     @Override
-    public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {}
+    public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
+        throws IOException {
+    }
 
     @Override
     public void postAppend(final long entryLen, final long elapsedTimeMillis) {}

http://git-wip-us.apache.org/repos/asf/hbase/blob/65117d3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index 166dc37..516ab8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -18,11 +18,20 @@
 
 package org.apache.hadoop.hbase.replication;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.NavigableMap;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
@@ -31,6 +40,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
  */
 @InterfaceAudience.Private
 public class ScopeWALEntryFilter implements WALEntryFilter {
+  private static final Log LOG = LogFactory.getLog(ScopeWALEntryFilter.class);
 
   @Override
   public Entry filter(Entry entry) {
@@ -40,13 +50,27 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
     }
     ArrayList<Cell> cells = entry.getEdit().getCells();
     int size = cells.size();
+    byte[] fam;
     for (int i = size - 1; i >= 0; i--) {
       Cell cell = cells.get(i);
-      // The scope will be null or empty if
-      // there's nothing to replicate in that WALEdit
-      if (!scopes.containsKey(cell.getFamily())
-          || scopes.get(cell.getFamily()) == HConstants.REPLICATION_SCOPE_LOCAL) {
-        cells.remove(i);
+      // If a bulk load entry has a scope then that means user has enabled replication for
+      // bulk load hfiles.
+      // TODO There is a similar logic in TableCfWALEntryFilter but data structures are different so
+      // cannot refactor into one now, can revisit and see if any way to unify them.
+      if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+        Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(scopes, cell);
+        if (filteredBulkLoadEntryCell != null) {
+          cells.set(i, filteredBulkLoadEntryCell);
+        } else {
+          cells.remove(i);
+        }
+      } else {
+        // The scope will be null or empty if
+        // there's nothing to replicate in that WALEdit
+        fam = CellUtil.cloneFamily(cell);
+        if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
+          cells.remove(i);
+        }
       }
     }
     if (cells.size() < size / 2) {
@@ -55,4 +79,41 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
     return entry;
   }
 
+  private Cell filterBulkLoadEntries(NavigableMap<byte[], Integer> scopes, Cell cell) {
+    byte[] fam;
+    BulkLoadDescriptor bld = null;
+    try {
+      bld = WALEdit.getBulkLoadDescriptor(cell);
+    } catch (IOException e) {
+      LOG.warn("Failed to get bulk load events information from the WAL file.", e);
+      return cell;
+    }
+    List<StoreDescriptor> storesList = bld.getStoresList();
+    // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
+    List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
+    Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
+    boolean anyStoreRemoved = false;
+    while (copiedStoresListIterator.hasNext()) {
+      StoreDescriptor sd = copiedStoresListIterator.next();
+      fam = sd.getFamilyName().toByteArray();
+      if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
+        copiedStoresListIterator.remove();
+        anyStoreRemoved = true;
+      }
+    }
+
+    if (!anyStoreRemoved) {
+      return cell;
+    } else if (copiedStoresList.isEmpty()) {
+      return null;
+    }
+    BulkLoadDescriptor.Builder newDesc =
+        BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
+            .setEncodedRegionName(bld.getEncodedRegionName())
+            .setBulkloadSeqNum(bld.getBulkloadSeqNum());
+    newDesc.addAllStores(copiedStoresList);
+    BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
+    return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
+      cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/65117d3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
index b892512..6c2a752 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
@@ -18,14 +18,20 @@
 
 package org.apache.hadoop.hbase.replication;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -51,19 +57,37 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
           ", degenerate as if it's not configured by keeping tableCFs==null");
     }
     int size = cells.size();
-
+    
+    // If null means user has explicitly not configured any table CFs so all the tables data are
+    // applicable for replication
+    if (tableCFs == null) {
+      return entry;
+    }
     // return null(prevent replicating) if logKey's table isn't in this peer's
-    // replicable table list (empty tableCFs means all table are replicable)
-    if (tableCFs != null && !tableCFs.containsKey(tabName)) {
+    // replicable table list
+    if (!tableCFs.containsKey(tabName)) {
       return null;
     } else {
-      List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
+      List<String> cfs = tableCFs.get(tabName);
       for (int i = size - 1; i >= 0; i--) {
         Cell cell = cells.get(i);
-        // ignore(remove) kv if its cf isn't in the replicable cf list
-        // (empty cfs means all cfs of this table are replicable)
-        if ((cfs != null && !cfs.contains(Bytes.toString(cell.getFamily())))) {
-          cells.remove(i);
+        // TODO There is a similar logic in ScopeWALEntryFilter but data structures are different so
+        // cannot refactor into one now, can revisit and see if any way to unify them.
+        // Filter bulk load entries separately
+        if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+          Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(cfs, cell);
+          if (filteredBulkLoadEntryCell != null) {
+            cells.set(i, filteredBulkLoadEntryCell);
+          } else {
+            cells.remove(i);
+          }
+        } else {
+          // ignore(remove) kv if its cf isn't in the replicable cf list
+          // (empty cfs means all cfs of this table are replicable)
+          if ((cfs != null) && !cfs.contains(Bytes.toString(cell.getFamilyArray(),
+            cell.getFamilyOffset(), cell.getFamilyLength()))) {
+            cells.remove(i);
+          }
         }
       }
     }
@@ -73,4 +97,41 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
     return entry;
   }
 
+  private Cell filterBulkLoadEntries(List<String> cfs, Cell cell) {
+    byte[] fam;
+    BulkLoadDescriptor bld = null;
+    try {
+      bld = WALEdit.getBulkLoadDescriptor(cell);
+    } catch (IOException e) {
+      LOG.warn("Failed to get bulk load events information from the WAL file.", e);
+      return cell;
+    }
+    List<StoreDescriptor> storesList = bld.getStoresList();
+    // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
+    List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
+    Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
+    boolean anyStoreRemoved = false;
+    while (copiedStoresListIterator.hasNext()) {
+      StoreDescriptor sd = copiedStoresListIterator.next();
+      fam = sd.getFamilyName().toByteArray();
+      if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
+        copiedStoresListIterator.remove();
+        anyStoreRemoved = true;
+      }
+    }
+
+    if (!anyStoreRemoved) {
+      return cell;
+    } else if (copiedStoresList.isEmpty()) {
+      return null;
+    }
+    BulkLoadDescriptor.Builder newDesc =
+        BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
+            .setEncodedRegionName(bld.getEncodedRegionName())
+            .setBulkloadSeqNum(bld.getBulkloadSeqNum());
+    newDesc.addAllStores(copiedStoresList);
+    BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
+    return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
+      cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/65117d3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
new file mode 100644
index 0000000..9bfea4b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
+import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before
+ * deleting it from hfile archive directory.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate implements Abortable {
+  private static final Log LOG = LogFactory.getLog(ReplicationHFileCleaner.class);
+  private ZooKeeperWatcher zkw;
+  private ReplicationQueuesClient rqc;
+  private boolean stopped = false;
+  private boolean aborted;
+
+  @Override
+  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
+    // all members of this class are null if replication is disabled,
+    // so we cannot filter the files
+    if (this.getConf() == null) {
+      return files;
+    }
+
+    final Set<String> hfileRefs;
+    try {
+      // The concurrently created new hfile entries in ZK may not be included in the return list,
+      // but they won't be deleted because they're not in the checking set.
+      hfileRefs = loadHFileRefsFromPeers();
+    } catch (KeeperException e) {
+      LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files");
+      return Collections.emptyList();
+    }
+    return Iterables.filter(files, new Predicate<FileStatus>() {
+      @Override
+      public boolean apply(FileStatus file) {
+        String hfile = file.getPath().getName();
+        boolean foundHFileRefInQueue = hfileRefs.contains(hfile);
+        if (LOG.isDebugEnabled()) {
+          if (foundHFileRefInQueue) {
+            LOG.debug("Found hfile reference in ZK, keeping: " + hfile);
+          } else {
+            LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile);
+          }
+        }
+        return !foundHFileRefInQueue;
+      }
+    });
+  }
+
+  /**
+   * Load all hfile references in all replication queues from ZK. This method guarantees to return a
+   * snapshot which contains all hfile references in the zookeeper at the start of this call.
+   * However, some newly created hfile references during the call may not be included.
+   */
+  private Set<String> loadHFileRefsFromPeers() throws KeeperException {
+    Set<String> hfileRefs = Sets.newHashSet();
+    List<String> listOfPeers;
+    for (int retry = 0;; retry++) {
+      int v0 = rqc.getHFileRefsNodeChangeVersion();
+      hfileRefs.clear();
+      listOfPeers = rqc.getAllPeersFromHFileRefsQueue();
+      if (listOfPeers == null) {
+        LOG.debug("Didn't find any peers with hfile references, won't prevent any deletions.");
+        return ImmutableSet.of();
+      }
+      for (String id : listOfPeers) {
+        List<String> peerHFileRefs = rqc.getReplicableHFiles(id);
+        if (peerHFileRefs != null) {
+          hfileRefs.addAll(peerHFileRefs);
+        }
+      }
+      int v1 = rqc.getHFileRefsNodeChangeVersion();
+      if (v0 == v1) {
+        return hfileRefs;
+      }
+      LOG.debug(String.format("Replication hfile references node cversion changed from "
+          + "%d to %d, retry = %d", v0, v1, retry));
+    }
+  }
+
+  @Override
+  public void setConf(Configuration config) {
+    // If either replication or replication of bulk load hfiles is disabled, keep all members null
+    if (!(config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT) && config.getBoolean(
+      HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT))) {
+      LOG.warn(HConstants.REPLICATION_ENABLE_KEY
+          + " is not enabled so allowing all hfile references to be deleted. Better to remove "
+          + ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS
+          + " configuration.");
+      return;
+    }
+    // Make my own Configuration. Then I'll have my own connection to zk that
+    // I can close myself when time comes.
+    Configuration conf = new Configuration(config);
+    super.setConf(conf);
+    try {
+      initReplicationQueuesClient(conf);
+    } catch (IOException e) {
+      LOG.error("Error while configuring " + this.getClass().getName(), e);
+    }
+  }
+
+  private void initReplicationQueuesClient(Configuration conf)
+      throws ZooKeeperConnectionException, IOException {
+    this.zkw = new ZooKeeperWatcher(conf, "replicationHFileCleaner", null);
+    this.rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this);
+  }
+
+  @Override
+  public void stop(String why) {
+    if (this.stopped) {
+      return;
+    }
+    this.stopped = true;
+    if (this.zkw != null) {
+      LOG.info("Stopping " + this.zkw);
+      this.zkw.close();
+    }
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    LOG.warn("Aborting ReplicationHFileCleaner because " + why, e);
+    this.aborted = true;
+    stop(why);
+  }
+
+  @Override
+  public boolean isAborted() {
+    return this.aborted;
+  }
+
+  @Override
+  public boolean isFileDeletable(FileStatus fStat) {
+    Set<String> hfileRefsFromQueue;
+    // all members of this class are null if replication is disabled,
+    // so do not stop from deleting the file
+    if (getConf() == null) {
+      return true;
+    }
+
+    try {
+      hfileRefsFromQueue = loadHFileRefsFromPeers();
+    } catch (KeeperException e) {
+      LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable "
+          + "file for " + fStat.getPath());
+      return false;
+    }
+    return !hfileRefsFromQueue.contains(fStat.getPath().getName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/65117d3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java
new file mode 100644
index 0000000..8d5c6d4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This will load all the xml configuration files for the source cluster replication ID from
+ * user configured replication configuration directory.
+ */
+@InterfaceAudience.Private
+public class DefaultSourceFSConfigurationProvider implements SourceFSConfigurationProvider {
+  private static final Log LOG = LogFactory.getLog(DefaultSourceFSConfigurationProvider.class);
+  // Map containing all the source clusters configurations against their replication cluster id
+  private Map<String, Configuration> sourceClustersConfs = new HashMap<>();
+  private static final String XML = ".xml";
+
+  @Override
+  public Configuration getConf(Configuration sinkConf, String replicationClusterId)
+      throws IOException {
+    if (sourceClustersConfs.get(replicationClusterId) == null) {
+      synchronized (this.sourceClustersConfs) {
+        if (sourceClustersConfs.get(replicationClusterId) == null) {
+          LOG.info("Loading source cluster FS client conf for cluster " + replicationClusterId);
+          // Load only user provided client configurations.
+          Configuration sourceClusterConf = new Configuration(false);
+
+          String replicationConfDir = sinkConf.get(HConstants.REPLICATION_CONF_DIR);
+          if (replicationConfDir == null) {
+            LOG.debug(HConstants.REPLICATION_CONF_DIR + " is not configured.");
+            URL resource = HBaseConfiguration.class.getClassLoader().getResource("hbase-site.xml");
+            if (resource != null) {
+              String path = resource.getPath();
+              replicationConfDir = path.substring(0, path.lastIndexOf("/"));
+            } else {
+              replicationConfDir = System.getenv("HBASE_CONF_DIR");
+            }
+          }
+
+          LOG.info("Loading source cluster " + replicationClusterId
+              + " file system configurations from xml files under directory " + replicationConfDir);
+          File confDir = new File(replicationConfDir, replicationClusterId);
+          String[] listofConfFiles = FileUtil.list(confDir);
+          for (String confFile : listofConfFiles) {
+            if (new File(confDir, confFile).isFile() && confFile.endsWith(XML)) {
+              // Add all the user provided client conf files
+              sourceClusterConf.addResource(new Path(confDir.getPath(), confFile));
+            }
+          }
+          this.sourceClustersConfs.put(replicationClusterId, sourceClusterConf);
+        }
+      }
+    }
+    return this.sourceClustersConfs.get(replicationClusterId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/65117d3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index d351927..22646db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -33,26 +33,29 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.ipc.RemoteException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} 
  * implementation for replicating to another HBase cluster.
@@ -84,8 +87,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   // Handles connecting to peer region servers
   private ReplicationSinkManager replicationSinkMgr;
   private boolean peersSelected = false;
+  private String replicationClusterId = "";
   private ThreadPoolExecutor exec;
   private int maxThreads;
+  private Path baseNamespaceDir;
+  private Path hfileArchiveDir;
+  private boolean replicationBulkLoadDataEnabled;
 
   @Override
   public void init(Context context) throws IOException {
@@ -108,7 +115,19 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
       HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
     this.exec = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS,
-      new SynchronousQueue<Runnable>());
+        new SynchronousQueue<Runnable>());
+
+    this.replicationBulkLoadDataEnabled =
+        conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+          HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+    if (this.replicationBulkLoadDataEnabled) {
+      replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
+    }
+    // Construct base namespace directory and hfile archive directory path
+    Path rootDir = FSUtils.getRootDir(conf);
+    Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
+    baseNamespaceDir = new Path(rootDir, baseNSDir);
+    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
   }
 
   private void decorateConf() {
@@ -317,8 +336,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
       try {
         sinkPeer = replicationSinkMgr.getReplicationSink();
         BlockingInterface rrs = sinkPeer.getRegionServer();
-        ReplicationProtbufUtil.replicateWALEntry(rrs,
-            entries.toArray(new Entry[entries.size()]));
+        ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
+          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
         replicationSinkMgr.reportSinkSuccess(sinkPeer);
         return ordinal;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/65117d3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
new file mode 100644
index 0000000..17f6780
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.math.BigInteger;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.FsDelegationToken;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local
+ * staging directory and then it will use ({@link LoadIncrementalHFiles} to prepare a collection of
+ * {@link LoadQueueItem} which will finally be loaded(replicated) into the table of this cluster.
+ */
+@InterfaceAudience.Private
+public class HFileReplicator {
+  /** Maximum number of threads to allow in pool to copy hfiles during replication */
+  public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY =
+      "hbase.replication.bulkload.copy.maxthreads";
+  public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10;
+  /** Number of hfiles to copy per thread during replication */
+  public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY =
+      "hbase.replication.bulkload.copy.hfiles.perthread";
+  public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10;
+
+  private static final Log LOG = LogFactory.getLog(HFileReplicator.class);
+  private final String UNDERSCORE = "_";
+  private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
+
+  private Configuration sourceClusterConf;
+  private String sourceBaseNamespaceDirPath;
+  private String sourceHFileArchiveDirPath;
+  private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap;
+  private FileSystem sinkFs;
+  private FsDelegationToken fsDelegationToken;
+  private UserProvider userProvider;
+  private Configuration conf;
+  private Connection connection;
+  private String hbaseStagingDir;
+  private ThreadPoolExecutor exec;
+  private int maxCopyThreads;
+  private int copiesPerThread;
+
+  public HFileReplicator(Configuration sourceClusterConf,
+      String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
+      Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
+      Connection connection) throws IOException {
+    this.sourceClusterConf = sourceClusterConf;
+    this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
+    this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
+    this.bulkLoadHFileMap = tableQueueMap;
+    this.conf = conf;
+    this.connection = connection;
+
+    userProvider = UserProvider.instantiate(conf);
+    fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
+    this.hbaseStagingDir = conf.get("hbase.bulkload.staging.dir");
+    this.maxCopyThreads =
+        this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
+          REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    builder.setNameFormat("HFileReplicationCallable-%1$d");
+    this.exec =
+        new ThreadPoolExecutor(1, maxCopyThreads, 60, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>(), builder.build());
+    this.exec.allowCoreThreadTimeOut(true);
+    this.copiesPerThread =
+        conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
+          REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);
+
+    sinkFs = FileSystem.get(conf);
+  }
+
+  public Void replicate() throws IOException {
+    // Copy all the hfiles to the local file system
+    Map<String, Path> tableStagingDirsMap = copyHFilesToStagingDir();
+
+    int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
+
+    for (Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) {
+      String tableNameString = tableStagingDir.getKey();
+      Path stagingDir = tableStagingDir.getValue();
+
+      LoadIncrementalHFiles loadHFiles = null;
+      try {
+        loadHFiles = new LoadIncrementalHFiles(conf);
+      } catch (Exception e) {
+        LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded"
+            + " data.", e);
+        throw new IOException(e);
+      }
+      Configuration newConf = HBaseConfiguration.create(conf);
+      newConf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
+      loadHFiles.setConf(newConf);
+
+      TableName tableName = TableName.valueOf(tableNameString);
+      Table table = this.connection.getTable(tableName);
+
+      // Prepare collection of queue of hfiles to be loaded(replicated)
+      Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
+      loadHFiles.prepareHFileQueue(stagingDir, table, queue, false);
+
+      if (queue.isEmpty()) {
+        LOG.warn("Replication process did not find any files to replicate in directory "
+            + stagingDir.toUri());
+        return null;
+      }
+
+      try (RegionLocator locator = connection.getRegionLocator(tableName)) {
+
+        fsDelegationToken.acquireDelegationToken(sinkFs);
+
+        // Set the staging directory which will be used by LoadIncrementalHFiles for loading the
+        // data
+        loadHFiles.setBulkToken(stagingDir.toString());
+
+        doBulkLoad(loadHFiles, table, queue, locator, maxRetries);
+      } finally {
+        cleanup(stagingDir.toString(), table);
+      }
+    }
+    return null;
+  }
+
+  private void doBulkLoad(LoadIncrementalHFiles loadHFiles, Table table,
+      Deque<LoadQueueItem> queue, RegionLocator locator, int maxRetries) throws IOException {
+    int count = 0;
+    Pair<byte[][], byte[][]> startEndKeys;
+    while (!queue.isEmpty()) {
+      // need to reload split keys each iteration.
+      startEndKeys = locator.getStartEndKeys();
+      if (count != 0) {
+        LOG.warn("Error occured while replicating HFiles, retry attempt " + count + " with "
+            + queue.size() + " files still remaining to replicate.");
+      }
+
+      if (maxRetries != 0 && count >= maxRetries) {
+        throw new IOException("Retry attempted " + count
+            + " times without completing, bailing out.");
+      }
+      count++;
+
+      // Try bulk load
+      loadHFiles.loadHFileQueue(table, connection, queue, startEndKeys);
+    }
+  }
+
+  private void cleanup(String stagingDir, Table table) {
+    // Release the file system delegation token
+    fsDelegationToken.releaseDelegationToken();
+    // Delete the staging directory
+    if (stagingDir != null) {
+      try {
+        sinkFs.delete(new Path(stagingDir), true);
+      } catch (IOException e) {
+        LOG.warn("Failed to delete the staging directory " + stagingDir, e);
+      }
+    }
+    // Do not close the file system
+
+    /*
+     * if (sinkFs != null) { try { sinkFs.close(); } catch (IOException e) { LOG.warn(
+     * "Failed to close the file system"); } }
+     */
+
+    // Close the table
+    if (table != null) {
+      try {
+        table.close();
+      } catch (IOException e) {
+        LOG.warn("Failed to close the table.", e);
+      }
+    }
+  }
+
+  private Map<String, Path> copyHFilesToStagingDir() throws IOException {
+    Map<String, Path> mapOfCopiedHFiles = new HashMap<String, Path>();
+    Pair<byte[], List<String>> familyHFilePathsPair;
+    List<String> hfilePaths;
+    byte[] family;
+    Path familyStagingDir;
+    int familyHFilePathsPairsListSize;
+    int totalNoOfHFiles;
+    List<Pair<byte[], List<String>>> familyHFilePathsPairsList;
+    FileSystem sourceFs = null;
+
+    try {
+      Path sourceClusterPath = new Path(sourceBaseNamespaceDirPath);
+      /*
+       * Path#getFileSystem will by default get the FS from cache. If both source and sink cluster
+       * has same FS name service then it will return peer cluster FS. To avoid this we explicitly
+       * disable the loading of FS from cache, so that a new FS is created with source cluster
+       * configuration.
+       */
+      String sourceScheme = sourceClusterPath.toUri().getScheme();
+      String disableCacheName =
+          String.format("fs.%s.impl.disable.cache", new Object[] { sourceScheme });
+      sourceClusterConf.setBoolean(disableCacheName, true);
+
+      sourceFs = sourceClusterPath.getFileSystem(sourceClusterConf);
+
+      User user = userProvider.getCurrent();
+      // For each table name in the map
+      for (Entry<String, List<Pair<byte[], List<String>>>> tableEntry : bulkLoadHFileMap
+          .entrySet()) {
+        String tableName = tableEntry.getKey();
+
+        // Create staging directory for each table
+        Path stagingDir =
+            createStagingDir(new Path(hbaseStagingDir), user, TableName.valueOf(tableName));
+
+        familyHFilePathsPairsList = tableEntry.getValue();
+        familyHFilePathsPairsListSize = familyHFilePathsPairsList.size();
+
+        // For each list of family hfile paths pair in the table
+        for (int i = 0; i < familyHFilePathsPairsListSize; i++) {
+          familyHFilePathsPair = familyHFilePathsPairsList.get(i);
+
+          family = familyHFilePathsPair.getFirst();
+          hfilePaths = familyHFilePathsPair.getSecond();
+
+          familyStagingDir = new Path(stagingDir, Bytes.toString(family));
+          totalNoOfHFiles = hfilePaths.size();
+
+          // For each list of hfile paths for the family
+          List<Future<Void>> futures = new ArrayList<Future<Void>>();
+          Callable<Void> c;
+          Future<Void> future;
+          int currentCopied = 0;
+          // Copy the hfiles parallely
+          while (totalNoOfHFiles > currentCopied + this.copiesPerThread) {
+            c =
+                new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
+                  currentCopied + this.copiesPerThread));
+            future = exec.submit(c);
+            futures.add(future);
+            currentCopied += this.copiesPerThread;
+          }
+
+          int remaining = totalNoOfHFiles - currentCopied;
+          if (remaining > 0) {
+            c =
+                new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
+                  currentCopied + remaining));
+            future = exec.submit(c);
+            futures.add(future);
+          }
+
+          for (Future<Void> f : futures) {
+            try {
+              f.get();
+            } catch (InterruptedException e) {
+              InterruptedIOException iioe =
+                  new InterruptedIOException(
+                      "Failed to copy HFiles to local file system. This will be retried again "
+                          + "by the source cluster.");
+              iioe.initCause(e);
+              throw iioe;
+            } catch (ExecutionException e) {
+              throw new IOException("Failed to copy HFiles to local file system. This will "
+                  + "be retried again by the source cluster.", e);
+            }
+          }
+        }
+        // Add the staging directory to this table. Staging directory contains all the hfiles
+        // belonging to this table
+        mapOfCopiedHFiles.put(tableName, stagingDir);
+      }
+      return mapOfCopiedHFiles;
+    } finally {
+      if (sourceFs != null) {
+        sourceFs.close();
+      }
+      if(exec != null) {
+        exec.shutdown();
+      }
+    }
+  }
+
+  private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException {
+    String tblName = tableName.getNameAsString().replace(":", UNDERSCORE);
+    int RANDOM_WIDTH = 320;
+    int RANDOM_RADIX = 32;
+    String doubleUnderScore = UNDERSCORE + UNDERSCORE;
+    String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore
+        + (new BigInteger(RANDOM_WIDTH, new SecureRandom()).toString(RANDOM_RADIX));
+    return createStagingDir(baseDir, user, randomDir);
+  }
+
+  private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException {
+    Path p = new Path(baseDir, randomDir);
+    sinkFs.mkdirs(p, PERM_ALL_ACCESS);
+    sinkFs.setPermission(p, PERM_ALL_ACCESS);
+    return p;
+  }
+
+  /**
+   * This class will copy the given hfiles from the given source file system to the given local file
+   * system staging directory.
+   */
+  private class Copier implements Callable<Void> {
+    private FileSystem sourceFs;
+    private Path stagingDir;
+    private List<String> hfiles;
+
+    public Copier(FileSystem sourceFs, final Path stagingDir, final List<String> hfiles)
+        throws IOException {
+      this.sourceFs = sourceFs;
+      this.stagingDir = stagingDir;
+      this.hfiles = hfiles;
+    }
+
+    @Override
+    public Void call() throws IOException {
+      Path sourceHFilePath;
+      Path localHFilePath;
+      int totalHFiles = hfiles.size();
+      for (int i = 0; i < totalHFiles; i++) {
+        sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i));
+        localHFilePath = new Path(stagingDir, sourceHFilePath.getName());
+        try {
+          FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
+          // If any other exception other than FNFE then we will fail the replication requests and
+          // source will retry to replicate these data.
+        } catch (FileNotFoundException e) {
+          LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
+              + ". Trying to copy from hfile archive directory.",
+            e);
+          sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i));
+
+          try {
+            FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
+          } catch (FileNotFoundException e1) {
+            // This will mean that the hfile does not exists any where in source cluster FS. So we
+            // cannot do anything here just log and return.
+            LOG.error("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
+                + ". Hence ignoring this hfile from replication..",
+              e1);
+            return null;
+          }
+        }
+        sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS);
+      }
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/65117d3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
index 37dc1dd..f308daf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
@@ -47,7 +47,7 @@ public class MetricsSink {
     if (lastTimestampForAge != timestamp) {
       lastTimestampForAge = timestamp;
       age = System.currentTimeMillis() - lastTimestampForAge;
-    } 
+    }
     mss.setLastAppliedOpAge(age);
     return age;
   }
@@ -72,6 +72,17 @@ public class MetricsSink {
   }
 
   /**
+   * Convience method to change metrics when a batch of operations are applied.
+   *
+   * @param batchSize total number of mutations that are applied/replicated
+   * @param hfileSize total number of hfiles that are applied/replicated
+   */
+  public void applyBatch(long batchSize, long hfileSize) {
+    applyBatch(batchSize);
+    mss.incrAppliedHFiles(hfileSize);
+  }
+
+  /**
    * Get the Age of Last Applied Op
    * @return ageOfLastAppliedOp
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/65117d3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index f9f7001..9687af7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -40,11 +40,13 @@ public class MetricsSource {
   // tracks last shipped timestamp for each wal group
   private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
   private int lastQueueSize = 0;
+  private long lastHFileRefsQueueSize = 0;
   private String id;
 
   private final MetricsReplicationSourceSource singleSourceSource;
   private final MetricsReplicationSourceSource globalSourceSource;
 
+
   /**
    * Constructor used to register the metrics
    *
@@ -143,6 +145,18 @@ public class MetricsSource {
     globalSourceSource.incrShippedKBs(sizeInKB);
   }
 
+  /**
+   * Convience method to apply changes to metrics do to shipping a batch of logs.
+   *
+   * @param batchSize the size of the batch that was shipped to sinks.
+   * @param hfiles total number of hfiles shipped to sinks.
+   */
+  public void shipBatch(long batchSize, int sizeInKB, long hfiles) {
+    shipBatch(batchSize, sizeInKB);
+    singleSourceSource.incrHFilesShipped(hfiles);
+    globalSourceSource.incrHFilesShipped(hfiles);
+  }
+
   /** increase the byte number read by source from log file */
   public void incrLogReadInBytes(long readInBytes) {
     singleSourceSource.incrLogReadInBytes(readInBytes);
@@ -153,8 +167,10 @@ public class MetricsSource {
   public void clear() {
     singleSourceSource.clear();
     globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
+    globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
     lastTimeStamps.clear();
     lastQueueSize = 0;
+    lastHFileRefsQueueSize = 0;
   }
 
   /**
@@ -194,4 +210,19 @@ public class MetricsSource {
   public String getPeerID() {
     return id;
   }
+
+  public void incrSizeOfHFileRefsQueue(long size) {
+    singleSourceSource.incrSizeOfHFileRefsQueue(size);
+    globalSourceSource.incrSizeOfHFileRefsQueue(size);
+    lastHFileRefsQueueSize = size;
+  }
+
+  public void decrSizeOfHFileRefsQueue(int size) {
+    singleSourceSource.decrSizeOfHFileRefsQueue(size);
+    globalSourceSource.decrSizeOfHFileRefsQueue(size);
+    lastHFileRefsQueueSize -= size;
+    if (lastHFileRefsQueueSize < 0) {
+      lastHFileRefsQueueSize = 0;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/65117d3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index db3b87d..1b4e0aa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -650,8 +650,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
 
         // set the region name for the target region replica
         Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
-            ReplicationProtbufUtil.buildReplicateWALEntryRequest(
-              entriesArray, location.getRegionInfo().getEncodedNameAsBytes());
+            ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray,
+              location.getRegionInfo().getEncodedNameAsBytes(), null, null, null);
         try {
           PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
           controller.setCallTimeout(timeout);

http://git-wip-us.apache.org/repos/asf/hbase/blob/65117d3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 78bb92e..7110273 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -34,7 +34,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -42,13 +41,16 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
-import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -56,8 +58,10 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.zookeeper.KeeperException;
 
@@ -72,6 +76,7 @@ public class Replication extends WALActionsListener.Base implements
   private static final Log LOG =
       LogFactory.getLog(Replication.class);
   private boolean replication;
+  private boolean replicationForBulkLoadData;
   private ReplicationSourceManager replicationManager;
   private ReplicationQueues replicationQueues;
   private ReplicationPeers replicationPeers;
@@ -85,7 +90,6 @@ public class Replication extends WALActionsListener.Base implements
   private int statsThreadPeriod;
   // ReplicationLoad to access replication metrics
   private ReplicationLoad replicationLoad;
-
   /**
    * Instantiate the replication management (if rep is enabled).
    * @param server Hosting server
@@ -110,11 +114,20 @@ public class Replication extends WALActionsListener.Base implements
     this.server = server;
     this.conf = this.server.getConfiguration();
     this.replication = isReplication(this.conf);
+    this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf);
     this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
       new ThreadFactoryBuilder()
         .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
         .setDaemon(true)
         .build());
+    if (this.replicationForBulkLoadData) {
+      if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
+          || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
+        throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID
+            + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
+            + " is set to true.");
+      }
+    }
     if (replication) {
       try {
         this.replicationQueues =
@@ -159,6 +172,15 @@ public class Replication extends WALActionsListener.Base implements
     return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
   }
 
+  /**
+   * @param c Configuration to look at
+   * @return True if replication for bulk load data is enabled.
+   */
+  public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
+    return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+  }
+
    /*
     * Returns an object to listen to new wal changes
     **/
@@ -188,14 +210,22 @@ public class Replication extends WALActionsListener.Base implements
   /**
    * Carry on the list of log entries down to the sink
    * @param entries list of entries to replicate
-   * @param cells The data -- the cells -- that <code>entries</code> describes (the entries
-   * do not contain the Cells we are replicating; they are passed here on the side in this
-   * CellScanner).
+   * @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
+   *          contain the Cells we are replicating; they are passed here on the side in this
+   *          CellScanner).
+   * @param replicationClusterId Id which will uniquely identify source cluster FS client
+   *          configurations in the replication configuration directory
+   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
+   *          directory required for replicating hfiles
+   * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
    * @throws IOException
    */
-  public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException {
+  public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
+      String replicationClusterId, String sourceBaseNamespaceDirPath,
+      String sourceHFileArchiveDirPath) throws IOException {
     if (this.replication) {
-      this.replicationSink.replicateEntries(entries, cells);
+      this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
+        sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
     }
   }
 
@@ -227,34 +257,44 @@ public class Replication extends WALActionsListener.Base implements
   }
 
   @Override
-  public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
-                                       WALEdit logEdit) {
-    scopeWALEdits(htd, logKey, logEdit);
+  public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
+      throws IOException {
+    scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager());
   }
 
   /**
-   * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys
-   * from compaction WAL edits and if the scope is local.
+   * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
+   * compaction WAL edits and if the scope is local.
    * @param htd Descriptor used to find the scope to use
    * @param logKey Key that may get scoped according to its edits
    * @param logEdit Edits used to lookup the scopes
+   * @param replicationManager Manager used to add bulk load events hfile references
+   * @throws IOException If failed to parse the WALEdit
    */
-  public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey,
-                                   WALEdit logEdit) {
-    NavigableMap<byte[], Integer> scopes =
-        new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+  public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit,
+      Configuration conf, ReplicationSourceManager replicationManager) throws IOException {
+    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
     byte[] family;
+    boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf);
     for (Cell cell : logEdit.getCells()) {
-      family = cell.getFamily();
-      // This is expected and the KV should not be replicated
-      if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue;
-      // Unexpected, has a tendency to happen in unit tests
-      assert htd.getFamily(family) != null;
+      if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+        if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+          scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell);
+        } else {
+          // Skip the flush/compaction/region events
+          continue;
+        }
+      } else {
+        family = CellUtil.cloneFamily(cell);
+        // Unexpected, has a tendency to happen in unit tests
+        assert htd.getFamily(family) != null;
 
-      int scope = htd.getFamily(family).getScope();
-      if (scope != REPLICATION_SCOPE_LOCAL &&
-          !scopes.containsKey(family)) {
-        scopes.put(family, scope);
+        if (!scopes.containsKey(family)) {
+          int scope = htd.getFamily(family).getScope();
+          if (scope != REPLICATION_SCOPE_LOCAL) {
+            scopes.put(family, scope);
+          }
+        }
       }
     }
     if (!scopes.isEmpty()) {
@@ -262,6 +302,40 @@ public class Replication extends WALActionsListener.Base implements
     }
   }
 
+  private static void scopeBulkLoadEdits(HTableDescriptor htd,
+      ReplicationSourceManager replicationManager, NavigableMap<byte[], Integer> scopes,
+      TableName tableName, Cell cell) throws IOException {
+    byte[] family;
+    try {
+      BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+      for (StoreDescriptor s : bld.getStoresList()) {
+        family = s.getFamilyName().toByteArray();
+        if (!scopes.containsKey(family)) {
+          int scope = htd.getFamily(family).getScope();
+          if (scope != REPLICATION_SCOPE_LOCAL) {
+            scopes.put(family, scope);
+            addHFileRefsToQueue(replicationManager, tableName, family, s);
+          }
+        } else {
+          addHFileRefsToQueue(replicationManager, tableName, family, s);
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to get bulk load events information from the wal file.", e);
+      throw e;
+    }
+  }
+
+  private static void addHFileRefsToQueue(ReplicationSourceManager replicationManager,
+      TableName tableName, byte[] family, StoreDescriptor s) throws IOException {
+    try {
+      replicationManager.addHFileRefs(tableName, family, s.getStoreFileList());
+    } catch (ReplicationException e) {
+      LOG.error("Failed to create hfile references in ZK.", e);
+      throw new IOException(e);
+    }
+  }
+
   @Override
   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
     getReplicationManager().preLogRoll(newPath);
@@ -273,8 +347,7 @@ public class Replication extends WALActionsListener.Base implements
   }
 
   /**
-   * This method modifies the master's configuration in order to inject
-   * replication-related features
+   * This method modifies the master's configuration in order to inject replication-related features
    * @param conf
    */
   public static void decorateMasterConfiguration(Configuration conf) {
@@ -286,6 +359,13 @@ public class Replication extends WALActionsListener.Base implements
     if (!plugins.contains(cleanerClass)) {
       conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
     }
+    if (isReplicationForBulkLoadDataEnabled(conf)) {
+      plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
+      cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
+      if (!plugins.contains(cleanerClass)) {
+        conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
+      }
+    }
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/hbase/blob/65117d3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 4dd76cb..8f262c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -33,15 +33,16 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
@@ -51,6 +52,11 @@ import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * <p>
@@ -78,6 +84,9 @@ public class ReplicationSink {
   private final MetricsSink metrics;
   private final AtomicLong totalReplicatedEdits = new AtomicLong();
   private final Object sharedHtableConLock = new Object();
+  // Number of hfiles that we successfully replicated
+  private long hfilesReplicated = 0;
+  private SourceFSConfigurationProvider provider;
 
   /**
    * Create a sink for replication
@@ -91,6 +100,18 @@ public class ReplicationSink {
     this.conf = HBaseConfiguration.create(conf);
     decorateConf();
     this.metrics = new MetricsSink();
+
+    String className =
+        conf.get("hbase.replication.source.fs.conf.provider",
+          DefaultSourceFSConfigurationProvider.class.getCanonicalName());
+    try {
+      @SuppressWarnings("rawtypes")
+      Class c = Class.forName(className);
+      this.provider = (SourceFSConfigurationProvider) c.newInstance();
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Configured source fs configuration provider class "
+          + className + " throws error.", e);
+    }
   }
 
   /**
@@ -113,9 +134,16 @@ public class ReplicationSink {
    * operates against raw protobuf type saving on a conversion from pb to pojo.
    * @param entries
    * @param cells
-   * @throws IOException
+   * @param replicationClusterId Id which will uniquely identify source cluster FS client
+   *          configurations in the replication configuration directory
+   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
+   *          directory
+   * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
+   * @throws IOException If failed to replicate the data
    */
-  public void replicateEntries(List<WALEntry> entries, final CellScanner cells) throws IOException {
+  public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
+      String replicationClusterId, String sourceBaseNamespaceDirPath,
+      String sourceHFileArchiveDirPath) throws IOException {
     if (entries.isEmpty()) return;
     if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner");
     // Very simple optimization where we batch sequences of rows going
@@ -126,6 +154,10 @@ public class ReplicationSink {
       // invocation of this method per table and cluster id.
       Map<TableName, Map<List<UUID>, List<Row>>> rowMap =
           new TreeMap<TableName, Map<List<UUID>, List<Row>>>();
+
+      // Map of table name Vs list of pair of family and list of hfile paths from its namespace
+      Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
+
       for (WALEntry entry : entries) {
         TableName table =
             TableName.valueOf(entry.getKey().getTableName().toByteArray());
@@ -138,33 +170,60 @@ public class ReplicationSink {
             throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
           }
           Cell cell = cells.current();
-          if (isNewRowOrType(previousCell, cell)) {
-            // Create new mutation
-            m = CellUtil.isDelete(cell)?
-              new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()):
-              new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
-            List<UUID> clusterIds = new ArrayList<UUID>();
-            for(HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()){
-              clusterIds.add(toUUID(clusterId));
+          // Handle bulk load hfiles replication
+          if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+            if (bulkLoadHFileMap == null) {
+              bulkLoadHFileMap = new HashMap<String, List<Pair<byte[], List<String>>>>();
             }
-            m.setClusterIds(clusterIds);
-            addToHashMultiMap(rowMap, table, clusterIds, m);
-          }
-          if (CellUtil.isDelete(cell)) {
-            ((Delete)m).addDeleteMarker(cell);
+            buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
           } else {
-            ((Put)m).add(cell);
+            // Handle wal replication
+            if (isNewRowOrType(previousCell, cell)) {
+              // Create new mutation
+              m =
+                  CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(),
+                      cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(),
+                      cell.getRowLength());
+              List<UUID> clusterIds = new ArrayList<UUID>();
+              for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
+                clusterIds.add(toUUID(clusterId));
+              }
+              m.setClusterIds(clusterIds);
+              addToHashMultiMap(rowMap, table, clusterIds, m);
+            }
+            if (CellUtil.isDelete(cell)) {
+              ((Delete) m).addDeleteMarker(cell);
+            } else {
+              ((Put) m).add(cell);
+            }
+            previousCell = cell;
           }
-          previousCell = cell;
         }
         totalReplicated++;
       }
-      for (Entry<TableName, Map<List<UUID>,List<Row>>> entry : rowMap.entrySet()) {
-        batch(entry.getKey(), entry.getValue().values());
+
+      // TODO Replicating mutations and bulk loaded data can be made parallel
+      if (!rowMap.isEmpty()) {
+        LOG.debug("Started replicating mutations.");
+        for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
+          batch(entry.getKey(), entry.getValue().values());
+        }
+        LOG.debug("Finished replicating mutations.");
+      }
+
+      if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
+        LOG.debug("Started replicating bulk loaded data.");
+        HFileReplicator hFileReplicator =
+            new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
+                sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
+                getConnection());
+        hFileReplicator.replicate();
+        LOG.debug("Finished replicating bulk loaded data.");
       }
+
       int size = entries.size();
       this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
-      this.metrics.applyBatch(size);
+      this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
       this.totalReplicatedEdits.addAndGet(totalReplicated);
     } catch (IOException ex) {
       LOG.error("Unable to accept edit because:", ex);
@@ -172,6 +231,76 @@ public class ReplicationSink {
     }
   }
 
+  private void buildBulkLoadHFileMap(
+      final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
+      Cell cell) throws IOException {
+    BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+    List<StoreDescriptor> storesList = bld.getStoresList();
+    int storesSize = storesList.size();
+    for (int j = 0; j < storesSize; j++) {
+      StoreDescriptor storeDescriptor = storesList.get(j);
+      List<String> storeFileList = storeDescriptor.getStoreFileList();
+      int storeFilesSize = storeFileList.size();
+      hfilesReplicated += storeFilesSize;
+      for (int k = 0; k < storeFilesSize; k++) {
+        byte[] family = storeDescriptor.getFamilyName().toByteArray();
+
+        // Build hfile relative path from its namespace
+        String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family);
+
+        String tableName = table.getNameWithNamespaceInclAsString();
+        if (bulkLoadHFileMap.containsKey(tableName)) {
+          List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
+          boolean foundFamily = false;
+          for (int i = 0; i < familyHFilePathsList.size(); i++) {
+            Pair<byte[], List<String>> familyHFilePathsPair = familyHFilePathsList.get(i);
+            if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) {
+              // Found family already present, just add the path to the existing list
+              familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
+              foundFamily = true;
+              break;
+            }
+          }
+          if (!foundFamily) {
+            // Family not found, add this family and its hfile paths pair to the list
+            addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
+          }
+        } else {
+          // Add this table entry into the map
+          addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
+        }
+      }
+    }
+  }
+
+  private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS,
+      List<Pair<byte[], List<String>>> familyHFilePathsList) {
+    List<String> hfilePaths = new ArrayList<String>();
+    hfilePaths.add(pathToHfileFromNS);
+    familyHFilePathsList.add(new Pair<byte[], List<String>>(family, hfilePaths));
+  }
+
+  private void addNewTableEntryInMap(
+      final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family,
+      String pathToHfileFromNS, String tableName) {
+    List<String> hfilePaths = new ArrayList<String>();
+    hfilePaths.add(pathToHfileFromNS);
+    Pair<byte[], List<String>> newFamilyHFilePathsPair =
+        new Pair<byte[], List<String>>(family, hfilePaths);
+    List<Pair<byte[], List<String>>> newFamilyHFilePathsList =
+        new ArrayList<Pair<byte[], List<String>>>();
+    newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
+    bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
+  }
+
+  private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile,
+      byte[] family) {
+    return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR)
+        .append(table.getQualifierAsString()).append(Path.SEPARATOR)
+        .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR)
+        .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString();
+  }
+
   /**
    * @param previousCell
    * @param cell
@@ -241,22 +370,13 @@ public class ReplicationSink {
     }
     Table table = null;
     try {
-      // See https://en.wikipedia.org/wiki/Double-checked_locking
-      Connection connection = this.sharedHtableCon;
-      if (connection == null) {
-        synchronized (sharedHtableConLock) {
-          connection = this.sharedHtableCon;
-          if (connection == null) {
-            connection = this.sharedHtableCon = ConnectionFactory.createConnection(this.conf);
-          }
-        }
-      }
+      Connection connection = getConnection();
       table = connection.getTable(tableName);
       for (List<Row> rows : allRows) {
         table.batch(rows);
       }
     } catch (InterruptedException ix) {
-      throw (InterruptedIOException)new InterruptedIOException().initCause(ix);
+      throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
     } finally {
       if (table != null) {
         table.close();
@@ -264,6 +384,20 @@ public class ReplicationSink {
     }
   }
 
+  private Connection getConnection() throws IOException {
+    // See https://en.wikipedia.org/wiki/Double-checked_locking
+    Connection connection = sharedHtableCon;
+    if (connection == null) {
+      synchronized (sharedHtableConLock) {
+        connection = sharedHtableCon;
+        if (connection == null) {
+          connection = sharedHtableCon = ConnectionFactory.createConnection(conf);
+        }
+      }
+    }
+    return connection;
+  }
+
   /**
    * Get a string representation of this sink's metrics
    * @return string with the total replicated edits count and the date


Mime
View raw message