hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject [49/50] [abbrv] hbase git commit: Merge branch 'apache/master' (4/16/15) into hbase-11339
Date Fri, 01 May 2015 15:28:35 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index fcc93db,91c406c..17c1ee3
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@@ -44,16 -42,14 +43,19 @@@ import org.apache.hadoop.hbase.exceptio
  import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
  import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
  import org.apache.hadoop.hbase.ipc.ServerRpcController;
 +import org.apache.hadoop.hbase.mob.MobUtils;
  import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
+ import org.apache.hadoop.hbase.procedure2.Procedure;
+ import org.apache.hadoop.hbase.procedure2.ProcedureResult;
  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
  import org.apache.hadoop.hbase.protobuf.RequestConverter;
  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
  import org.apache.hadoop.hbase.protobuf.generated.*;
 +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
 +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
 +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
 +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+ import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@@@ -1311,108 -1352,11 +1358,116 @@@ public class MasterRpcServices extends 
      return response.build();
    }
  
 +  /**
 +   * Compact a region on the master.
 +   *
 +   * @param controller the RPC controller
 +   * @param request the request
 +   * @throws ServiceException
 +   */
 +  @Override
 +  @QosPriority(priority=HConstants.ADMIN_QOS)
 +  public CompactRegionResponse compactRegion(final RpcController controller,
 +    final CompactRegionRequest request) throws ServiceException {
 +    try {
 +      master.checkInitialized();
 +      byte[] regionName = request.getRegion().getValue().toByteArray();
 +      TableName tableName = HRegionInfo.getTable(regionName);
 +      // if the region is a mob region, do the mob file compaction.
 +      if (MobUtils.isMobRegionName(tableName, regionName)) {
 +        return compactMob(request, tableName);
 +      } else {
 +        return super.compactRegion(controller, request);
 +      }
 +    } catch (IOException ie) {
 +      throw new ServiceException(ie);
 +    }
 +  }
 +
 +  @Override
 +  @QosPriority(priority=HConstants.ADMIN_QOS)
 +  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
 +    final GetRegionInfoRequest request) throws ServiceException {
 +    try {
 +      master.checkInitialized();
 +      byte[] regionName = request.getRegion().getValue().toByteArray();
 +      TableName tableName = HRegionInfo.getTable(regionName);
 +      if (MobUtils.isMobRegionName(tableName, regionName)) {
 +        // a dummy region info contains the compaction state.
 +        HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(tableName);
 +        GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
 +        builder.setRegionInfo(HRegionInfo.convert(mobRegionInfo));
 +        if (request.hasCompactionState() && request.getCompactionState()) {
 +          builder.setCompactionState(master.getMobCompactionState(tableName));
 +        }
 +        return builder.build();
 +      } else {
 +        return super.getRegionInfo(controller, request);
 +      }
 +    } catch (IOException ie) {
 +      throw new ServiceException(ie);
 +    }
 +  }
 +
 +  /**
 +   * Compacts the mob files in the current table.
 +   * @param request the request.
 +   * @param tableName the current table name.
 +   * @return The response of the mob file compaction.
 +   * @throws IOException
 +   */
 +  private CompactRegionResponse compactMob(final CompactRegionRequest request,
 +    TableName tableName) throws IOException {
 +    if (!master.getTableStateManager().isTableState(tableName, TableState.State.ENABLED)) {
 +      throw new DoNotRetryIOException("Table " + tableName + " is not enabled");
 +    }
 +    boolean isForceAllFiles = false;
 +    List<HColumnDescriptor> compactedColumns = new ArrayList<HColumnDescriptor>();
 +    HColumnDescriptor[] hcds = master.getTableDescriptors().get(tableName).getColumnFamilies();
 +    byte[] family = null;
 +    if (request.hasFamily()) {
 +      family = request.getFamily().toByteArray();
 +      for (HColumnDescriptor hcd : hcds) {
 +        if (Bytes.equals(family, hcd.getName())) {
 +          if (!hcd.isMobEnabled()) {
 +            LOG.error("Column family " + hcd.getName() + " is not a mob column family");
 +            throw new DoNotRetryIOException("Column family " + hcd.getName()
-               + " is not a mob column family");
++                    + " is not a mob column family");
 +          }
 +          compactedColumns.add(hcd);
 +        }
 +      }
 +    } else {
 +      for (HColumnDescriptor hcd : hcds) {
 +        if (hcd.isMobEnabled()) {
 +          compactedColumns.add(hcd);
 +        }
 +      }
 +    }
 +    if (compactedColumns.isEmpty()) {
 +      LOG.error("No mob column families are assigned in the mob file compaction");
 +      throw new DoNotRetryIOException(
-         "No mob column families are assigned in the mob file compaction");
++              "No mob column families are assigned in the mob file compaction");
 +    }
 +    if (request.hasMajor() && request.getMajor()) {
 +      isForceAllFiles = true;
 +    }
 +    String familyLogMsg = (family != null) ? Bytes.toString(family) : "";
 +    if (LOG.isTraceEnabled()) {
 +      LOG.trace("User-triggered mob file compaction requested for table: "
-         + tableName.getNameAsString() + " for column family: " + familyLogMsg);
++              + tableName.getNameAsString() + " for column family: " + familyLogMsg);
 +    }
 +    master.mobFileCompactThread.requestMobFileCompaction(master.getConfiguration(),
-       master.getFileSystem(), tableName, compactedColumns,
-       master.getTableLockManager(), isForceAllFiles);
++            master.getFileSystem(), tableName, compactedColumns,
++            master.getTableLockManager(), isForceAllFiles);
 +    return CompactRegionResponse.newBuilder().build();
 +  }
++
+   @Override
+   public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
+       IsBalancerEnabledRequest request) throws ServiceException {
+     IsBalancerEnabledResponse.Builder response = IsBalancerEnabledResponse.newBuilder();
+     response.setEnabled(master.isBalancerOn());
+     return response.build();
+   }
  }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
index 0664a55,d729cfa..cbff5dd
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
@@@ -27,9 -27,15 +27,10 @@@ import org.apache.commons.logging.Log
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
 -import org.apache.hadoop.hbase.CoordinatedStateException;
 -import org.apache.hadoop.hbase.HRegionInfo;
 -import org.apache.hadoop.hbase.HTableDescriptor;
 -import org.apache.hadoop.hbase.MetaTableAccessor;
 -import org.apache.hadoop.hbase.Server;
 -import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.*;
  import org.apache.hadoop.hbase.backup.HFileArchiver;
  import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.client.ClusterConnection;
  import org.apache.hadoop.hbase.client.Delete;
  import org.apache.hadoop.hbase.client.Result;
  import org.apache.hadoop.hbase.client.ResultScanner;

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
index 0000000,2582a1e..7809e55
mode 000000,100644..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
@@@ -1,0 -1,422 +1,450 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hbase.master.procedure;
+ 
+ import java.io.InputStream;
+ import java.io.IOException;
+ import java.io.OutputStream;
+ import java.security.PrivilegedExceptionAction;
+ import java.util.ArrayList;
+ import java.util.List;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
++import org.apache.hadoop.hbase.*;
+ import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.Path;
 -import org.apache.hadoop.hbase.TableName;
 -import org.apache.hadoop.hbase.TableNotDisabledException;
 -import org.apache.hadoop.hbase.TableNotFoundException;
 -import org.apache.hadoop.hbase.HRegionInfo;
+ import org.apache.hadoop.hbase.backup.HFileArchiver;
 -import org.apache.hadoop.hbase.MetaTableAccessor;
+ import org.apache.hadoop.hbase.client.ClusterConnection;
+ import org.apache.hadoop.hbase.client.Delete;
+ import org.apache.hadoop.hbase.client.Result;
+ import org.apache.hadoop.hbase.client.ResultScanner;
+ import org.apache.hadoop.hbase.client.Scan;
+ import org.apache.hadoop.hbase.client.Table;
+ import org.apache.hadoop.hbase.exceptions.HBaseException;
++import org.apache.hadoop.hbase.mob.MobConstants;
++import org.apache.hadoop.hbase.mob.MobUtils;
+ import org.apache.hadoop.hbase.regionserver.HRegion;
+ import org.apache.hadoop.hbase.master.AssignmentManager;
+ import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+ import org.apache.hadoop.hbase.master.MasterFileSystem;
+ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+ import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
+ import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
+ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+ import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+ import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
+ import org.apache.hadoop.hbase.util.FSUtils;
+ import org.apache.hadoop.security.UserGroupInformation;
+ 
+ @InterfaceAudience.Private
+ public class DeleteTableProcedure
+     extends StateMachineProcedure<MasterProcedureEnv, DeleteTableState>
+     implements TableProcedureInterface {
+   private static final Log LOG = LogFactory.getLog(DeleteTableProcedure.class);
+ 
+   private List<HRegionInfo> regions;
+   private UserGroupInformation user;
+   private TableName tableName;
+ 
+   // used for compatibility with old clients
+   private final ProcedurePrepareLatch syncLatch;
+ 
+   public DeleteTableProcedure() {
+     // Required by the Procedure framework to create the procedure on replay
+     syncLatch = null;
+   }
+ 
+   public DeleteTableProcedure(final MasterProcedureEnv env, final TableName tableName)
+       throws IOException {
+     this(env, tableName, null);
+   }
+ 
+   public DeleteTableProcedure(final MasterProcedureEnv env, final TableName tableName,
+       final ProcedurePrepareLatch syncLatch) throws IOException {
+     this.tableName = tableName;
+     this.user = env.getRequestUser().getUGI();
+ 
+     // used for compatibility with clients without procedures
+     // they need a sync TableNotFoundException, TableNotDisabledException, ...
+     this.syncLatch = syncLatch;
+   }
+ 
+   @Override
+   protected Flow executeFromState(final MasterProcedureEnv env, DeleteTableState state) {
+     if (LOG.isTraceEnabled()) {
+       LOG.trace(this + " execute state=" + state);
+     }
+     try {
+       switch (state) {
+         case DELETE_TABLE_PRE_OPERATION:
+           // Verify if we can delete the table
+           boolean deletable = prepareDelete(env);
+           ProcedurePrepareLatch.releaseLatch(syncLatch, this);
+           if (!deletable) {
+             assert isFailed() : "the delete should have an exception here";
+             return Flow.NO_MORE_STATE;
+           }
+ 
+           // TODO: Move out... in the acquireLock()
+           LOG.debug("waiting for '" + getTableName() + "' regions in transition");
+           regions = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
+           assert regions != null && !regions.isEmpty() : "unexpected 0 regions";
+           ProcedureSyncWait.waitRegionInTransition(env, regions);
+ 
+           // Call coprocessors
+           preDelete(env);
+ 
+           setNextState(DeleteTableState.DELETE_TABLE_REMOVE_FROM_META);
+           break;
+         case DELETE_TABLE_REMOVE_FROM_META:
+           LOG.debug("delete '" + getTableName() + "' regions from META");
+           DeleteTableProcedure.deleteFromMeta(env, getTableName(), regions);
+           setNextState(DeleteTableState.DELETE_TABLE_CLEAR_FS_LAYOUT);
+           break;
+         case DELETE_TABLE_CLEAR_FS_LAYOUT:
+           LOG.debug("delete '" + getTableName() + "' from filesystem");
+           DeleteTableProcedure.deleteFromFs(env, getTableName(), regions, true);
+           setNextState(DeleteTableState.DELETE_TABLE_UPDATE_DESC_CACHE);
+           regions = null;
+           break;
+         case DELETE_TABLE_UPDATE_DESC_CACHE:
+           LOG.debug("delete '" + getTableName() + "' descriptor");
+           DeleteTableProcedure.deleteTableDescriptorCache(env, getTableName());
+           setNextState(DeleteTableState.DELETE_TABLE_UNASSIGN_REGIONS);
+           break;
+         case DELETE_TABLE_UNASSIGN_REGIONS:
+           LOG.debug("delete '" + getTableName() + "' assignment state");
+           DeleteTableProcedure.deleteAssignmentState(env, getTableName());
+           setNextState(DeleteTableState.DELETE_TABLE_POST_OPERATION);
+           break;
+         case DELETE_TABLE_POST_OPERATION:
+           postDelete(env);
+           LOG.debug("delete '" + getTableName() + "' completed");
+           return Flow.NO_MORE_STATE;
+         default:
+           throw new UnsupportedOperationException("unhandled state=" + state);
+       }
+     } catch (HBaseException|IOException e) {
+       LOG.warn("Retriable error trying to delete table=" + getTableName() + " state=" + state, e);
+     } catch (InterruptedException e) {
+       // if the interrupt is real, the executor will be stopped.
+       LOG.warn("Interrupted trying to delete table=" + getTableName() + " state=" + state, e);
+     }
+     return Flow.HAS_MORE_STATE;
+   }
+ 
+   @Override
+   protected void rollbackState(final MasterProcedureEnv env, final DeleteTableState state) {
+     if (state == DeleteTableState.DELETE_TABLE_PRE_OPERATION) {
+       // nothing to rollback, pre-delete is just table-state checks.
+       // We can fail if the table does not exist or is not disabled.
+       ProcedurePrepareLatch.releaseLatch(syncLatch, this);
+       return;
+     }
+ 
+     // The delete doesn't have a rollback. The execution will succeed, at some point.
+     throw new UnsupportedOperationException("unhandled state=" + state);
+   }
+ 
+   @Override
+   protected DeleteTableState getState(final int stateId) {
+     return DeleteTableState.valueOf(stateId);
+   }
+ 
+   @Override
+   protected int getStateId(final DeleteTableState state) {
+     return state.getNumber();
+   }
+ 
+   @Override
+   protected DeleteTableState getInitialState() {
+     return DeleteTableState.DELETE_TABLE_PRE_OPERATION;
+   }
+ 
+   @Override
+   public TableName getTableName() {
+     return tableName;
+   }
+ 
+   @Override
+   public TableOperationType getTableOperationType() {
+     return TableOperationType.DELETE;
+   }
+ 
+   @Override
+   public boolean abort(final MasterProcedureEnv env) {
+     // TODO: We may be able to abort if the procedure is not started yet.
+     return false;
+   }
+ 
+   @Override
+   protected boolean acquireLock(final MasterProcedureEnv env) {
+     if (!env.isInitialized()) return false;
+     return env.getProcedureQueue().tryAcquireTableWrite(getTableName(), "delete table");
+   }
+ 
+   @Override
+   protected void releaseLock(final MasterProcedureEnv env) {
+     env.getProcedureQueue().releaseTableWrite(getTableName());
+   }
+ 
+   @Override
+   public void toStringClassDetails(StringBuilder sb) {
+     sb.append(getClass().getSimpleName());
+     sb.append(" (table=");
+     sb.append(getTableName());
+     sb.append(") user=");
+     sb.append(user);
+   }
+ 
+   @Override
+   public void serializeStateData(final OutputStream stream) throws IOException {
+     super.serializeStateData(stream);
+ 
+     MasterProcedureProtos.DeleteTableStateData.Builder state =
+       MasterProcedureProtos.DeleteTableStateData.newBuilder()
+         .setUserInfo(MasterProcedureUtil.toProtoUserInfo(this.user))
+         .setTableName(ProtobufUtil.toProtoTableName(tableName));
+     if (regions != null) {
+       for (HRegionInfo hri: regions) {
+         state.addRegionInfo(HRegionInfo.convert(hri));
+       }
+     }
+     state.build().writeDelimitedTo(stream);
+   }
+ 
+   @Override
+   public void deserializeStateData(final InputStream stream) throws IOException {
+     super.deserializeStateData(stream);
+ 
+     MasterProcedureProtos.DeleteTableStateData state =
+       MasterProcedureProtos.DeleteTableStateData.parseDelimitedFrom(stream);
+     user = MasterProcedureUtil.toUserInfo(state.getUserInfo());
+     tableName = ProtobufUtil.toTableName(state.getTableName());
+     if (state.getRegionInfoCount() == 0) {
+       regions = null;
+     } else {
+       regions = new ArrayList<HRegionInfo>(state.getRegionInfoCount());
+       for (HBaseProtos.RegionInfo hri: state.getRegionInfoList()) {
+         regions.add(HRegionInfo.convert(hri));
+       }
+     }
+   }
+ 
+   private boolean prepareDelete(final MasterProcedureEnv env) throws IOException {
+     try {
+       env.getMasterServices().checkTableModifiable(tableName);
+     } catch (TableNotFoundException|TableNotDisabledException e) {
+       setFailure("master-delete-table", e);
+       return false;
+     }
+     return true;
+   }
+ 
+   private boolean preDelete(final MasterProcedureEnv env)
+       throws IOException, InterruptedException {
+     final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+     if (cpHost != null) {
+       final TableName tableName = this.tableName;
+       user.doAs(new PrivilegedExceptionAction<Void>() {
+         @Override
+         public Void run() throws Exception {
+           cpHost.preDeleteTableHandler(tableName);
+           return null;
+         }
+       });
+     }
+     return true;
+   }
+ 
+   private void postDelete(final MasterProcedureEnv env)
+       throws IOException, InterruptedException {
+     deleteTableStates(env, tableName);
+ 
+     final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+     if (cpHost != null) {
+       final TableName tableName = this.tableName;
+       user.doAs(new PrivilegedExceptionAction<Void>() {
+         @Override
+         public Void run() throws Exception {
+           cpHost.postDeleteTableHandler(tableName);
+           return null;
+         }
+       });
+     }
+   }
+ 
+   protected static void deleteFromFs(final MasterProcedureEnv env,
+       final TableName tableName, final List<HRegionInfo> regions,
+       final boolean archive) throws IOException {
+     final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+     final FileSystem fs = mfs.getFileSystem();
+     final Path tempdir = mfs.getTempDir();
+ 
+     final Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), tableName);
+     final Path tempTableDir = FSUtils.getTableDir(tempdir, tableName);
+ 
+     if (fs.exists(tableDir)) {
+       // Ensure temp exists
+       if (!fs.exists(tempdir) && !fs.mkdirs(tempdir)) {
+         throw new IOException("HBase temp directory '" + tempdir + "' creation failure.");
+       }
+ 
+       // Ensure parent exists
+       if (!fs.exists(tempTableDir.getParent()) && !fs.mkdirs(tempTableDir.getParent())) {
+         throw new IOException("HBase temp directory '" + tempdir + "' creation failure.");
+       }
+ 
+       // Move the table in /hbase/.tmp
+       if (!fs.rename(tableDir, tempTableDir)) {
+         if (fs.exists(tempTableDir)) {
+           // TODO
+           // what's in this dir? something old? probably something manual from the user...
+           // let's get rid of this stuff...
+           FileStatus[] files = fs.listStatus(tempdir);
+           if (files != null && files.length > 0) {
+             for (int i = 0; i < files.length; ++i) {
+               if (!files[i].isDir()) continue;
+               HFileArchiver.archiveRegion(fs, mfs.getRootDir(), tempTableDir, files[i].getPath());
+             }
+           }
+           fs.delete(tempdir, true);
+         }
+         throw new IOException("Unable to move '" + tableDir + "' to temp '" + tempTableDir + "'");
+       }
+     }
+ 
+     // Archive regions from FS (temp directory)
+     if (archive) {
+       for (HRegionInfo hri : regions) {
+         LOG.debug("Archiving region " + hri.getRegionNameAsString() + " from FS");
+         HFileArchiver.archiveRegion(fs, mfs.getRootDir(),
+             tempTableDir, HRegion.getRegionDir(tempTableDir, hri.getEncodedName()));
+       }
+       LOG.debug("Table '" + tableName + "' archived!");
+     }
+ 
++    // Archive the mob data if there is a mob-enabled column
++    HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(tableName);
++    HColumnDescriptor[] hcds = htd.getColumnFamilies();
++    boolean hasMob = false;
++    for (HColumnDescriptor hcd : hcds) {
++      if (hcd.isMobEnabled()) {
++        hasMob = true;
++        break;
++      }
++    }
++    Path mobTableDir = null;
++    if (hasMob) {
++      // Archive mob data
++      mobTableDir = FSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME),
++              tableName);
++      Path regionDir =
++              new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName());
++      if (fs.exists(regionDir)) {
++        HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir);
++      }
++    }
++
++
+     // Delete table directory from FS (temp directory)
+     if (!fs.delete(tempTableDir, true) && fs.exists(tempTableDir)) {
+       throw new IOException("Couldn't delete " + tempTableDir);
+     }
++
++    // Delete the table directory where the mob files are saved
++    if (hasMob && mobTableDir != null && fs.exists(mobTableDir)) {
++      if (!fs.delete(mobTableDir, true)) {
++        LOG.error("Couldn't delete " + mobTableDir);
++      }
++    }
+   }
+ 
+   /**
+    * There may be items for this table still up in hbase:meta in the case where the
+    * info:regioninfo column was empty because of some write error. Remove ALL rows from hbase:meta
+    * that have to do with this table. See HBASE-12980.
+    * @throws IOException
+    */
+   private static void cleanAnyRemainingRows(final MasterProcedureEnv env,
+       final TableName tableName) throws IOException {
+     ClusterConnection connection = env.getMasterServices().getConnection();
+     Scan tableScan = MetaTableAccessor.getScanForTableName(connection, tableName);
+     try (Table metaTable =
+         connection.getTable(TableName.META_TABLE_NAME)) {
+       List<Delete> deletes = new ArrayList<Delete>();
+       try (ResultScanner resScanner = metaTable.getScanner(tableScan)) {
+         for (Result result : resScanner) {
+           deletes.add(new Delete(result.getRow()));
+         }
+       }
+       if (!deletes.isEmpty()) {
+         LOG.warn("Deleting some vestigal " + deletes.size() + " rows of " + tableName +
+           " from " + TableName.META_TABLE_NAME);
+         metaTable.delete(deletes);
+       }
+     }
+   }
+ 
+   protected static void deleteFromMeta(final MasterProcedureEnv env,
+       final TableName tableName, List<HRegionInfo> regions) throws IOException {
+     MetaTableAccessor.deleteRegions(env.getMasterServices().getConnection(), regions);
+ 
+     // Clean any remaining rows for this table.
+     cleanAnyRemainingRows(env, tableName);
+   }
+ 
+   protected static void deleteAssignmentState(final MasterProcedureEnv env,
+       final TableName tableName) throws IOException {
+     AssignmentManager am = env.getMasterServices().getAssignmentManager();
+ 
+     // Clean up regions of the table in RegionStates.
+     LOG.debug("Removing '" + tableName + "' from region states.");
+     am.getRegionStates().tableDeleted(tableName);
+ 
+     // If entry for this table states, remove it.
+     LOG.debug("Marking '" + tableName + "' as deleted.");
+     am.getTableStateManager().setDeletedTable(tableName);
+   }
+ 
+   protected static void deleteTableDescriptorCache(final MasterProcedureEnv env,
+       final TableName tableName) throws IOException {
+     LOG.debug("Removing '" + tableName + "' descriptor.");
+     env.getMasterServices().getTableDescriptors().remove(tableName);
+   }
+ 
+   protected static void deleteTableStates(final MasterProcedureEnv env, final TableName tableName)
+       throws IOException {
+     getMasterQuotaManager(env).removeTableFromNamespaceQuota(tableName);
+   }
+ 
+   private static MasterQuotaManager getMasterQuotaManager(final MasterProcedureEnv env)
+       throws IOException {
+     return ProcedureSyncWait.waitFor(env, "quota manager to be available",
+         new ProcedureSyncWait.Predicate<MasterQuotaManager>() {
+       @Override
+       public MasterQuotaManager evaluate() throws IOException {
+         return env.getMasterServices().getMasterQuotaManager();
+       }
+     });
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
index c2abc7c,0000000..d54dca4
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
@@@ -1,308 -1,0 +1,304 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Date;
 +import java.util.List;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.CellUtil;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.KeyValueUtil;
 +import org.apache.hadoop.hbase.Tag;
 +import org.apache.hadoop.hbase.TagType;
 +import org.apache.hadoop.hbase.client.Scan;
- import org.apache.hadoop.hbase.regionserver.HMobStore;
- import org.apache.hadoop.hbase.regionserver.HStore;
- import org.apache.hadoop.hbase.regionserver.InternalScanner;
- import org.apache.hadoop.hbase.regionserver.MobCompactionStoreScanner;
- import org.apache.hadoop.hbase.regionserver.ScanType;
- import org.apache.hadoop.hbase.regionserver.Store;
- import org.apache.hadoop.hbase.regionserver.StoreFile;
++import org.apache.hadoop.hbase.regionserver.*;
 +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
- import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
 +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 +import org.apache.hadoop.hbase.util.Bytes;
 +
 +/**
 + * Compact passed set of files in the mob-enabled column family.
 + */
 +@InterfaceAudience.Private
 +public class DefaultMobCompactor extends DefaultCompactor {
 +
 +  private static final Log LOG = LogFactory.getLog(DefaultMobCompactor.class);
 +  private long mobSizeThreshold;
 +  private HMobStore mobStore;
 +  public DefaultMobCompactor(Configuration conf, Store store) {
 +    super(conf, store);
 +    // The mob cells reside in the mob-enabled column family which is held by HMobStore.
 +    // During the compaction, the compactor reads the cells from the mob files and
 +    // probably creates new mob files. All of these operations are included in HMobStore,
 +    // so we need to cast the Store to HMobStore.
 +    if (!(store instanceof HMobStore)) {
 +      throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
 +    }
 +    mobStore = (HMobStore) store;
 +    mobSizeThreshold = store.getFamily().getMobThreshold();
 +  }
 +
 +  /**
 +   * Creates a writer for a new file in a temporary directory.
 +   * @param fd The file details.
 +   * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
 +   * @return Writer for a new StoreFile in the tmp dir.
 +   * @throws IOException
 +   */
 +  @Override
 +  protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException {
 +    // make this writer with tags always because of possible new cells with tags.
 +    StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
 +        true, fd.maxMVCCReadpoint >= smallestReadPoint, true);
 +    return writer;
 +  }
 +
 +  @Override
 +  protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
 +      ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
 +    Scan scan = new Scan();
 +    scan.setMaxVersions(store.getFamily().getMaxVersions());
 +    if (scanType == ScanType.COMPACT_DROP_DELETES) {
 +      scanType = ScanType.COMPACT_RETAIN_DELETES;
 +      return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
 +          scanType, smallestReadPoint, earliestPutTs, true);
 +    } else {
 +      return new MobCompactionStoreScanner(store, store.getScanInfo(), scan, scanners,
 +          scanType, smallestReadPoint, earliestPutTs, false);
 +    }
 +  }
 +
 +  // TODO refactor to take advantage of the throughput controller.
 +
 +  /**
 +   * Performs compaction on a column family with the mob flag enabled.
 +   * This is for when the mob threshold size has changed or if the mob
 +   * column family mode has been toggled via an alter table statement.
 +   * Compacts the files by the following rules.
 +   * 1. If the cell has a mob reference tag, the cell's value is the path of the mob file.
 +   * <ol>
 +   * <li>
 +   * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
 +   * directly copy the (with mob tag) cell into the new store file.
 +   * </li>
 +   * <li>
 +   * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into
 +   * the new store file.
 +   * </li>
 +   * </ol>
 +   * 2. If the cell doesn't have a reference tag.
 +   * <ol>
 +   * <li>
 +   * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
 +   * write this cell to a mob file, and write the path of this mob file to the store file.
 +   * </li>
 +   * <li>
 +   * Otherwise, directly write this cell into the store file.
 +   * </li>
 +   * </ol>
 +   * In the mob compaction, the {@link MobCompactionStoreScanner} is used as a scanner
 +   * which could output the normal cells and delete markers together when required.
 +   * After the major compaction on the normal hfiles, we have a guarantee that we have purged all
 +   * deleted or old version mob refs, and the delete markers are written to a del file with the
 +   * suffix _del. Because of this, it is safe to use the del file in the mob compaction.
 +   * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the
 +   * mob files. When the small mob files are merged into bigger ones, the del file is added into
 +   * the scanner to filter the deleted cells.
 +   * @param fd File details
 +   * @param scanner Where to read from.
 +   * @param writer Where to write to.
 +   * @param smallestReadPoint Smallest read point.
 +   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
 +   * @param major Is a major compaction.
 +   * @return Whether compaction ended; false if it was interrupted for any reason.
 +   */
 +  @Override
 +  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
 +      long smallestReadPoint, boolean cleanSeqId,
 +      CompactionThroughputController throughputController,  boolean major) throws IOException {
 +    if (!(scanner instanceof MobCompactionStoreScanner)) {
 +      throw new IllegalArgumentException(
 +          "The scanner should be an instance of MobCompactionStoreScanner");
 +    }
 +    MobCompactionStoreScanner compactionScanner = (MobCompactionStoreScanner) scanner;
 +    int bytesWritten = 0;
 +    // Since scanner.next() can return 'false' but still be delivering data,
 +    // we have to use a do/while loop.
 +    List<Cell> cells = new ArrayList<Cell>();
 +    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
 +    int closeCheckInterval = HStore.getCloseCheckInterval();
 +    boolean hasMore;
 +    Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
 +    byte[] fileName = null;
 +    StoreFile.Writer mobFileWriter = null;
 +    StoreFile.Writer delFileWriter = null;
 +    long mobCells = 0;
 +    long deleteMarkersCount = 0;
 +    Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
 +            .getName());
 +    long mobCompactedIntoMobCellsCount = 0;
 +    long mobCompactedFromMobCellsCount = 0;
 +    long mobCompactedIntoMobCellsSize = 0;
 +    long mobCompactedFromMobCellsSize = 0;
 +    try {
 +      try {
 +        // If the mob file writer could not be created, directly write the cell to the store file.
 +        mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
 +            store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
 +        fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
 +      } catch (IOException e) {
 +        LOG.error(
 +            "Fail to create mob writer, "
 +                + "we will continue the compaction by writing MOB cells directly in store files",
 +            e);
 +      }
 +      delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
 +          store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
++      ScannerContext scannerContext =
++              ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
++
++
 +      do {
-         hasMore = compactionScanner.next(cells, compactionKVMax);
++        hasMore = compactionScanner.next(cells, scannerContext);
 +        // output to writer:
 +        for (Cell c : cells) {
 +          if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
 +            CellUtil.setSequenceId(c, 0);
 +          }
 +          if (compactionScanner.isOutputDeleteMarkers() && CellUtil.isDelete(c)) {
 +            delFileWriter.append(c);
 +            deleteMarkersCount++;
 +          } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
 +            // If the mob file writer is null or the kv type is not put, directly write the cell
 +            // to the store file.
 +            writer.append(c);
 +          } else if (MobUtils.isMobReferenceCell(c)) {
 +            if (MobUtils.hasValidMobRefCellValue(c)) {
 +              int size = MobUtils.getMobValueLength(c);
 +              if (size > mobSizeThreshold) {
 +                // If the value size is larger than the threshold, it's regarded as a mob. Since
 +                // its value is already in the mob file, directly write this cell to the store file
 +                writer.append(c);
 +              } else {
 +                // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
 +                // the mob cell from the mob file, and write it back to the store file.
 +                Cell mobCell = mobStore.resolve(c, false);
 +                if (mobCell.getValueLength() != 0) {
 +                  // put the mob data back to the store file
-                   // KeyValue mobKv = KeyValueUtil.ensureKeyValue(cell);
 +                  CellUtil.setSequenceId(mobCell, c.getSequenceId());
 +                  writer.append(mobCell);
 +                  mobCompactedFromMobCellsCount++;
 +                  mobCompactedFromMobCellsSize += mobCell.getValueLength();
 +                } else {
 +                  // If the value of a file is empty, there might be issues when retrieving,
 +                  // directly write the cell to the store file, and leave it to be handled by the
 +                  // next compaction.
 +                  writer.append(c);
 +                }
 +              }
 +            } else {
 +              LOG.warn("The value format of the KeyValue " + c
 +                  + " is wrong, its length is less than " + Bytes.SIZEOF_INT);
 +              writer.append(c);
 +            }
 +          } else if (c.getValueLength() <= mobSizeThreshold) {
 +            // If the value size of a cell is not larger than the threshold, directly write it to
 +            // the store file.
 +            writer.append(c);
 +          } else {
 +            // If the value size of a cell is larger than the threshold, it's regarded as a mob,
 +            // write this cell to a mob file, and write the path to the store file.
 +            mobCells++;
 +            // append the original keyValue in the mob file.
 +            mobFileWriter.append(c);
 +            KeyValue reference = MobUtils.createMobRefKeyValue(c, fileName, tableNameTag);
 +            // write the cell whose value is the path of a mob file to the store file.
 +            writer.append(reference);
 +            mobCompactedIntoMobCellsCount++;
 +            mobCompactedIntoMobCellsSize += c.getValueLength();
 +          }
 +          ++progress.currentCompactedKVs;
 +
 +          // check periodically to see if a system stop is requested
 +          if (closeCheckInterval > 0) {
 +            bytesWritten += KeyValueUtil.length(c);
 +            if (bytesWritten > closeCheckInterval) {
 +              bytesWritten = 0;
 +              if (!store.areWritesEnabled()) {
 +                progress.cancel();
 +                return false;
 +              }
 +            }
 +          }
 +        }
 +        cells.clear();
 +      } while (hasMore);
 +    } finally {
 +      if (mobFileWriter != null) {
 +        mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
 +        mobFileWriter.close();
 +      }
 +      if (delFileWriter != null) {
 +        delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount);
 +        delFileWriter.close();
 +      }
 +    }
 +    if (mobFileWriter != null) {
 +      if (mobCells > 0) {
 +        // If the mob file is not empty, commit it.
 +        mobStore.commitFile(mobFileWriter.getPath(), path);
 +      } else {
 +        try {
 +          // If the mob file is empty, delete it instead of committing.
 +          store.getFileSystem().delete(mobFileWriter.getPath(), true);
 +        } catch (IOException e) {
 +          LOG.error("Fail to delete the temp mob file", e);
 +        }
 +      }
 +    }
 +    if (delFileWriter != null) {
 +      if (deleteMarkersCount > 0) {
 +        // If the del file is not empty, commit it.
 +        // If the commit fails, the compaction is re-performed again.
 +        mobStore.commitFile(delFileWriter.getPath(), path);
 +      } else {
 +        try {
 +          // If the del file is empty, delete it instead of committing.
 +          store.getFileSystem().delete(delFileWriter.getPath(), true);
 +        } catch (IOException e) {
 +          LOG.error("Fail to delete the temp del file", e);
 +        }
 +      }
 +    }
 +    mobStore.updateMobCompactedFromMobCellsCount(mobCompactedFromMobCellsCount);
 +    mobStore.updateMobCompactedIntoMobCellsCount(mobCompactedIntoMobCellsCount);
 +    mobStore.updateMobCompactedFromMobCellsSize(mobCompactedFromMobCellsSize);
 +    mobStore.updateMobCompactedIntoMobCellsSize(mobCompactedIntoMobCellsSize);
 +    progress.complete();
 +    return true;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index 00b3421,0000000..44387f5
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@@ -1,222 -1,0 +1,222 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Date;
 +import java.util.List;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.KeyValueUtil;
 +import org.apache.hadoop.hbase.Tag;
 +import org.apache.hadoop.hbase.TagType;
 +import org.apache.hadoop.hbase.monitoring.MonitoredTask;
- import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
- import org.apache.hadoop.hbase.regionserver.HMobStore;
- import org.apache.hadoop.hbase.regionserver.InternalScanner;
- import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
- import org.apache.hadoop.hbase.regionserver.Store;
- import org.apache.hadoop.hbase.regionserver.StoreFile;
++import org.apache.hadoop.hbase.regionserver.*;
 +import org.apache.hadoop.hbase.util.Bytes;
++import org.apache.hadoop.util.StringUtils;
 +
 +/**
 + * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher.
 + * If the store is not a mob store, the flusher flushes the MemStore the same with
 + * DefaultStoreFlusher,
 + * If the store is a mob store, the flusher flushes the MemStore into two places.
 + * One is the store files of HBase, the other is the mob files.
 + * <ol>
 + * <li>Cells that are not PUT type or have the delete mark will be directly flushed to HBase.</li>
 + * <li>If the size of a cell value is larger than a threshold, it'll be flushed
 + * to a mob file, another cell with the path of this file will be flushed to HBase.</li>
 + * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
 + * HBase directly.</li>
 + * </ol>
 + *
 + */
 +@InterfaceAudience.Private
 +public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
 +
 +  private static final Log LOG = LogFactory.getLog(DefaultMobStoreFlusher.class);
 +  private final Object flushLock = new Object();
 +  private long mobCellValueSizeThreshold = 0;
 +  private Path targetPath;
 +  private HMobStore mobStore;
 +
 +  public DefaultMobStoreFlusher(Configuration conf, Store store) throws IOException {
 +    super(conf, store);
 +    mobCellValueSizeThreshold = store.getFamily().getMobThreshold();
 +    this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(),
 +        store.getColumnFamilyName());
 +    if (!this.store.getFileSystem().exists(targetPath)) {
 +      this.store.getFileSystem().mkdirs(targetPath);
 +    }
 +    this.mobStore = (HMobStore) store;
 +  }
 +
 +  /**
 +   * Flushes the snapshot of the MemStore.
 +   * If this store is not a mob store, flush the cells in the snapshot to store files of HBase.
 +   * If the store is a mob one, the flusher flushes the MemStore into two places.
 +   * One is the store files of HBase, the other is the mob files.
 +   * <ol>
 +   * <li>Cells that are not PUT type or have the delete mark will be directly flushed to
 +   * HBase.</li>
 +   * <li>If the size of a cell value is larger than a threshold, it'll be
 +   * flushed to a mob file, another cell with the path of this file will be flushed to HBase.</li>
 +   * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
 +   * HBase directly.</li>
 +   * </ol>
 +   */
 +  @Override
 +  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
 +      MonitoredTask status) throws IOException {
 +    ArrayList<Path> result = new ArrayList<Path>();
 +    int cellsCount = snapshot.getCellsCount();
 +    if (cellsCount == 0) return result; // don't flush if there are no entries
 +
 +    // Use a store scanner to find which rows to flush.
 +    long smallestReadPoint = store.getSmallestReadPoint();
 +    InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
 +    if (scanner == null) {
 +      return result; // NULL scanner returned from coprocessor hooks means skip normal processing
 +    }
 +    StoreFile.Writer writer;
 +    try {
 +      // TODO: We can fail in the below block before we complete adding this flush to
 +      // list of store files. Add cleanup of anything put on filesystem if we fail.
 +      synchronized (flushLock) {
 +        status.setStatus("Flushing " + store + ": creating writer");
 +        // Write the map out to the disk
 +        writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
 +            false, true, true);
 +        writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
 +        try {
 +          // It's a mob store, flush the cells in a mob way. This is the difference of flushing
 +          // between a normal and a mob store.
 +          performMobFlush(snapshot, cacheFlushId, scanner, writer, status);
 +        } finally {
 +          finalizeWriter(writer, cacheFlushId, status);
 +        }
 +      }
 +    } finally {
 +      scanner.close();
 +    }
 +    LOG.info("Flushed, sequenceid=" + cacheFlushId + ", memsize="
-         + snapshot.getSize() + ", hasBloomFilter=" + writer.hasGeneralBloom()
-         + ", into tmp file " + writer.getPath());
++        + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getSize(), "", 1) +
++        ", hasBloomFilter=" + writer.hasGeneralBloom() +
++        ", into tmp file " + writer.getPath());
 +    result.add(writer.getPath());
 +    return result;
 +  }
 +
 +  /**
 +   * Flushes the cells in the mob store.
 +   * <ol>In the mob store, the cells with PUT type might have or have no mob tags.
 +   * <li>If a cell does not have a mob tag, flushing the cell to different files depends
 +   * on the value length. If the length is larger than a threshold, it's flushed to a
 +   * mob file and the mob file is flushed to a store file in HBase. Otherwise, directly
 +   * flush the cell to a store file in HBase.</li>
 +   * <li>If a cell have a mob tag, its value is a mob file name, directly flush it
 +   * to a store file in HBase.</li>
 +   * </ol>
 +   * @param snapshot Memstore snapshot.
 +   * @param cacheFlushId Log cache flush sequence number.
 +   * @param scanner The scanner of memstore snapshot.
 +   * @param writer The store file writer.
 +   * @param status Task that represents the flush operation and may be updated with status.
 +   * @throws IOException
 +   */
 +  protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
 +      InternalScanner scanner, StoreFile.Writer writer, MonitoredTask status) throws IOException {
 +    StoreFile.Writer mobFileWriter = null;
 +    int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX,
 +        HConstants.COMPACTION_KV_MAX_DEFAULT);
 +    long mobCount = 0;
 +    long mobSize = 0;
 +    long time = snapshot.getTimeRangeTracker().getMaximumTimestamp();
 +    mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
 +        store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
 +    // the target path is {tableName}/.mob/{cfName}/mobFiles
 +    // the relative path is mobFiles
 +    byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
 +    try {
 +      Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
 +          .getName());
 +      List<Cell> cells = new ArrayList<Cell>();
 +      boolean hasMore;
++      ScannerContext scannerContext =
++              ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
++
 +      do {
-         hasMore = scanner.next(cells, compactionKVMax);
++        hasMore = scanner.next(cells, scannerContext);
 +        if (!cells.isEmpty()) {
 +          for (Cell c : cells) {
 +            // If we know that this KV is going to be included always, then let us
 +            // set its memstoreTS to 0. This will help us save space when writing to
 +            // disk.
 +            KeyValue kv = KeyValueUtil.ensureKeyValue(c);
 +            if (kv.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(kv)
 +                || kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
 +              writer.append(kv);
 +            } else {
 +              // append the original keyValue in the mob file.
 +              mobFileWriter.append(kv);
 +              mobSize += kv.getValueLength();
 +              mobCount++;
 +
 +              // append the tags to the KeyValue.
 +              // The key is same, the value is the filename of the mob file
 +              KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag);
 +              writer.append(reference);
 +            }
 +          }
 +          cells.clear();
 +        }
 +      } while (hasMore);
 +    } finally {
 +      status.setStatus("Flushing mob file " + store + ": appending metadata");
 +      mobFileWriter.appendMetadata(cacheFlushId, false, mobCount);
 +      status.setStatus("Flushing mob file " + store + ": closing flushed file");
 +      mobFileWriter.close();
 +    }
 +
 +    if (mobCount > 0) {
 +      // commit the mob file from temp folder to target folder.
 +      // If the mob file is committed successfully but the store file is not,
 +      // the committed mob file will be handled by the sweep tool as an unused
 +      // file.
 +      mobStore.commitFile(mobFileWriter.getPath(), targetPath);
 +      mobStore.updateMobFlushCount();
 +      mobStore.updateMobFlushedCellsCount(mobCount);
 +      mobStore.updateMobFlushedCellsSize(mobSize);
 +    } else {
 +      try {
 +        // If the mob file is empty, delete it instead of committing.
 +        store.getFileSystem().delete(mobFileWriter.getPath(), true);
 +      } catch (IOException e) {
 +        LOG.error("Fail to delete the temp mob file", e);
 +      }
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e20bbf6/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
index 0778ac1,0000000..718b513
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
@@@ -1,646 -1,0 +1,643 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.filecompactions;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Date;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Future;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.KeyValueUtil;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.Tag;
 +import org.apache.hadoop.hbase.TagType;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Scan;
++import org.apache.hadoop.hbase.client.*;
 +import org.apache.hadoop.hbase.io.HFileLink;
 +import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobFileName;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType;
 +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
 +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId;
- import org.apache.hadoop.hbase.regionserver.BloomType;
- import org.apache.hadoop.hbase.regionserver.HStore;
- import org.apache.hadoop.hbase.regionserver.ScanInfo;
- import org.apache.hadoop.hbase.regionserver.ScanType;
- import org.apache.hadoop.hbase.regionserver.StoreFile;
++import org.apache.hadoop.hbase.regionserver.*;
 +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
- import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
- import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
- import org.apache.hadoop.hbase.regionserver.StoreScanner;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.Pair;
 +
 +/**
 + * An implementation of {@link MobFileCompactor} that compacts the mob files in partitions.
 + */
 +@InterfaceAudience.Private
 +public class PartitionedMobFileCompactor extends MobFileCompactor {
 +
 +  private static final Log LOG = LogFactory.getLog(PartitionedMobFileCompactor.class);
 +  protected long mergeableSize;
 +  protected int delFileMaxCount;
 +  /** The number of files compacted in a batch */
 +  protected int compactionBatchSize;
 +  protected int compactionKVMax;
 +
 +  private Path tempPath;
 +  private Path bulkloadPath;
 +  private CacheConfig compactionCacheConfig;
 +  private Tag tableNameTag;
 +
 +  public PartitionedMobFileCompactor(Configuration conf, FileSystem fs, TableName tableName,
 +    HColumnDescriptor column, ExecutorService pool) {
 +    super(conf, fs, tableName, column, pool);
 +    mergeableSize = conf.getLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
 +    delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT,
 +      MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
 +    // default is 100
 +    compactionBatchSize = conf.getInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
 +    tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
 +    bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
 +      tableName.getNamespaceAsString(), tableName.getQualifierAsString())));
 +    compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX,
 +      HConstants.COMPACTION_KV_MAX_DEFAULT);
 +    Configuration copyOfConf = new Configuration(conf);
 +    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
 +    compactionCacheConfig = new CacheConfig(copyOfConf);
 +    tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName());
 +  }
 +
 +  @Override
 +  public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) throws IOException {
 +    if (files == null || files.isEmpty()) {
 +      LOG.info("No candidate mob files");
 +      return null;
 +    }
 +    LOG.info("isForceAllFiles: " + isForceAllFiles);
 +    // find the files to compact.
 +    PartitionedMobFileCompactionRequest request = select(files, isForceAllFiles);
 +    // compact the files.
 +    return performCompaction(request);
 +  }
 +
 +  /**
 +   * Selects the compacted mob/del files.
 +   * Iterates the candidates to find out all the del files and small mob files.
 +   * @param candidates All the candidates.
 +   * @param isForceAllFiles Whether add all mob files into the compaction.
 +   * @return A compaction request.
 +   * @throws IOException
 +   */
 +  protected PartitionedMobFileCompactionRequest select(List<FileStatus> candidates,
 +    boolean isForceAllFiles) throws IOException {
 +    Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>();
 +    Map<CompactionPartitionId, CompactionPartition> filesToCompact =
 +      new HashMap<CompactionPartitionId, CompactionPartition>();
 +    int selectedFileCount = 0;
 +    int irrelevantFileCount = 0;
 +    for (FileStatus file : candidates) {
 +      if (!file.isFile()) {
 +        irrelevantFileCount++;
 +        continue;
 +      }
 +      // group the del files and small files.
 +      FileStatus linkedFile = file;
 +      if (HFileLink.isHFileLink(file.getPath())) {
 +        HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
 +        linkedFile = getLinkedFileStatus(link);
 +        if (linkedFile == null) {
 +          // If the linked file cannot be found, regard it as an irrelevantFileCount file
 +          irrelevantFileCount++;
 +          continue;
 +        }
 +      }
 +      if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
 +        allDelFiles.add(file);
 +      } else if (isForceAllFiles || linkedFile.getLen() < mergeableSize) {
 +        // add all files if isForceAllFiles is true,
 +        // otherwise add the small files to the merge pool
 +        MobFileName fileName = MobFileName.create(linkedFile.getPath().getName());
 +        CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(),
 +          fileName.getDate());
 +        CompactionPartition compactionPartition = filesToCompact.get(id);
 +        if (compactionPartition == null) {
 +          compactionPartition = new CompactionPartition(id);
 +          compactionPartition.addFile(file);
 +          filesToCompact.put(id, compactionPartition);
 +        } else {
 +          compactionPartition.addFile(file);
 +        }
 +        selectedFileCount++;
 +      }
 +    }
 +    PartitionedMobFileCompactionRequest request = new PartitionedMobFileCompactionRequest(
 +      filesToCompact.values(), allDelFiles);
 +    if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) {
 +      // all the files are selected
 +      request.setCompactionType(CompactionType.ALL_FILES);
 +    }
 +    LOG.info("The compaction type is " + request.getCompactionType() + ", the request has "
 +      + allDelFiles.size() + " del files, " + selectedFileCount + " selected files, and "
 +      + irrelevantFileCount + " irrelevant files");
 +    return request;
 +  }
 +
 +  /**
 +   * Performs the compaction on the selected files.
 +   * <ol>
 +   * <li>Compacts the del files.</li>
 +   * <li>Compacts the selected small mob files and all the del files.</li>
 +   * <li>If all the candidates are selected, delete the del files.</li>
 +   * </ol>
 +   * @param request The compaction request.
 +   * @return The paths of new mob files generated in the compaction.
 +   * @throws IOException
 +   */
 +  protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request)
 +    throws IOException {
 +    // merge the del files
 +    List<Path> delFilePaths = new ArrayList<Path>();
 +    for (FileStatus delFile : request.delFiles) {
 +      delFilePaths.add(delFile.getPath());
 +    }
 +    List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
 +    List<StoreFile> newDelFiles = new ArrayList<StoreFile>();
 +    for (Path newDelPath : newDelPaths) {
 +      StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
 +      newDelFiles.add(sf);
 +    }
 +    LOG.info("After merging, there are " + newDelFiles.size() + " del files");
 +    // compact the mob files by partitions.
 +    List<Path> paths = compactMobFiles(request, newDelFiles);
 +    LOG.info("After compaction, there are " + paths.size() + " mob files");
 +    // archive the del files if all the mob files are selected.
 +    if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
 +      LOG.info("After a mob file compaction with all files selected, archiving the del files "
 +        + newDelFiles);
 +      try {
 +        MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
 +      } catch (IOException e) {
 +        LOG.error("Failed to archive the del files " + newDelFiles, e);
 +      }
 +    }
 +    return paths;
 +  }
 +
 +  /**
 +   * Compacts the selected small mob files and all the del files.
 +   * @param request The compaction request.
 +   * @param delFiles The del files.
 +   * @return The paths of new mob files after compactions.
 +   * @throws IOException
 +   */
 +  protected List<Path> compactMobFiles(final PartitionedMobFileCompactionRequest request,
 +    final List<StoreFile> delFiles) throws IOException {
 +    Collection<CompactionPartition> partitions = request.compactionPartitions;
 +    if (partitions == null || partitions.isEmpty()) {
 +      LOG.info("No partitions of mob files");
 +      return Collections.emptyList();
 +    }
 +    List<Path> paths = new ArrayList<Path>();
-     final HTable table = new HTable(conf, tableName);
++    Connection c = ConnectionFactory.createConnection(conf);
++    final Table table = c.getTable(tableName);
 +    try {
 +      Map<CompactionPartitionId, Future<List<Path>>> results =
 +        new HashMap<CompactionPartitionId, Future<List<Path>>>();
 +      // compact the mob files by partitions in parallel.
 +      for (final CompactionPartition partition : partitions) {
 +        results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() {
 +          @Override
 +          public List<Path> call() throws Exception {
 +            LOG.info("Compacting mob files for partition " + partition.getPartitionId());
 +            return compactMobFilePartition(request, partition, delFiles, table);
 +          }
 +        }));
 +      }
 +      // compact the partitions in parallel.
 +      boolean hasFailure = false;
 +      for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) {
 +        try {
 +          paths.addAll(result.getValue().get());
 +        } catch (Exception e) {
 +          // just log the error
 +          LOG.error("Failed to compact the partition " + result.getKey(), e);
 +          hasFailure = true;
 +        }
 +      }
 +      if (hasFailure) {
 +        // if any partition fails in the compaction, directly throw an exception.
 +        throw new IOException("Failed to compact the partitions");
 +      }
 +    } finally {
 +      try {
 +        table.close();
 +      } catch (IOException e) {
 +        LOG.error("Failed to close the HTable", e);
 +      }
 +    }
 +    return paths;
 +  }
 +
 +  /**
 +   * Compacts a partition of selected small mob files and all the del files.
 +   * @param request The compaction request.
 +   * @param partition A compaction partition.
 +   * @param delFiles The del files.
 +   * @param table The current table.
 +   * @return The paths of new mob files after compactions.
 +   * @throws IOException
 +   */
 +  private List<Path> compactMobFilePartition(PartitionedMobFileCompactionRequest request,
-     CompactionPartition partition, List<StoreFile> delFiles, HTable table) throws IOException {
++    CompactionPartition partition, List<StoreFile> delFiles, Table table) throws IOException {
 +    List<Path> newFiles = new ArrayList<Path>();
 +    List<FileStatus> files = partition.listFiles();
 +    int offset = 0;
 +    Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString());
 +    Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString());
 +    while (offset < files.size()) {
 +      int batch = compactionBatchSize;
 +      if (files.size() - offset < compactionBatchSize) {
 +        batch = files.size() - offset;
 +      }
 +      if (batch == 1 && delFiles.isEmpty()) {
 +        // only one file left and no del files, do not compact it,
 +        // and directly add it to the new files.
 +        newFiles.add(files.get(offset).getPath());
 +        offset++;
 +        continue;
 +      }
 +      // clean the bulkload directory to avoid loading old files.
 +      fs.delete(bulkloadPathOfPartition, true);
 +      // add the selected mob files and del files into filesToCompact
 +      List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
 +      for (int i = offset; i < batch + offset; i++) {
 +        StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig,
 +          BloomType.NONE);
 +        filesToCompact.add(sf);
 +      }
 +      filesToCompact.addAll(delFiles);
 +      // compact the mob files in a batch.
 +      compactMobFilesInBatch(request, partition, table, filesToCompact, batch,
 +        bulkloadPathOfPartition, bulkloadColumnPath, newFiles);
 +      // move to the next batch.
 +      offset += batch;
 +    }
 +    LOG.info("Compaction is finished. The number of mob files is changed from " + files.size()
 +      + " to " + newFiles.size());
 +    return newFiles;
 +  }
 +
 +  /**
 +   * Compacts a partition of selected small mob files and all the del files in a batch.
 +   * @param request The compaction request.
 +   * @param partition A compaction partition.
 +   * @param table The current table.
 +   * @param filesToCompact The files to be compacted.
 +   * @param batch The number of mob files to be compacted in a batch.
 +   * @param bulkloadPathOfPartition The directory where the bulkload column of the current
 +   *        partition is saved.
 +   * @param bulkloadColumnPath The directory where the bulkload files of current partition
 +   *        are saved.
 +   * @param newFiles The paths of new mob files after compactions.
 +   * @throws IOException
 +   */
 +  private void compactMobFilesInBatch(PartitionedMobFileCompactionRequest request,
-     CompactionPartition partition, HTable table, List<StoreFile> filesToCompact, int batch,
++    CompactionPartition partition, Table table, List<StoreFile> filesToCompact, int batch,
 +    Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles)
 +    throws IOException {
 +    // open scanner to the selected mob files and del files.
 +    StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
 +    // the mob files to be compacted, not include the del files.
 +    List<StoreFile> mobFilesToCompact = filesToCompact.subList(0, batch);
 +    // Pair(maxSeqId, cellsCount)
 +    Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact);
 +    // open writers for the mob files and new ref store files.
 +    Writer writer = null;
 +    Writer refFileWriter = null;
 +    Path filePath = null;
 +    Path refFilePath = null;
 +    long mobCells = 0;
 +    try {
 +      writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(),
 +        tempPath, Long.MAX_VALUE, column.getCompactionCompression(), partition.getPartitionId()
 +          .getStartKey(), compactionCacheConfig);
 +      filePath = writer.getPath();
 +      byte[] fileName = Bytes.toBytes(filePath.getName());
 +      // create a temp file and open a writer for it in the bulkloadPath
 +      refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo
 +        .getSecond().longValue(), compactionCacheConfig);
 +      refFilePath = refFileWriter.getPath();
 +      List<Cell> cells = new ArrayList<Cell>();
 +      boolean hasMore = false;
++      ScannerContext scannerContext =
++              ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
 +      do {
-         hasMore = scanner.next(cells, compactionKVMax);
++        hasMore = scanner.next(cells, scannerContext);
 +        for (Cell cell : cells) {
 +          // TODO remove this after the new code are introduced.
 +          KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
 +          // write the mob cell to the mob file.
 +          writer.append(kv);
 +          // write the new reference cell to the store file.
 +          KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag);
 +          refFileWriter.append(reference);
 +          mobCells++;
 +        }
 +        cells.clear();
 +      } while (hasMore);
 +    } finally {
 +      // close the scanner.
 +      scanner.close();
 +      // append metadata to the mob file, and close the mob file writer.
 +      closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
 +      // append metadata and bulkload info to the ref mob file, and close the writer.
 +      closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
 +    }
 +    if (mobCells > 0) {
 +      // commit mob file
 +      MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
 +      // bulkload the ref file
 +      bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName());
 +      newFiles.add(new Path(mobFamilyDir, filePath.getName()));
 +    } else {
 +      // remove the new files
 +      // the mob file is empty, delete it instead of committing.
 +      deletePath(filePath);
 +      // the ref file is empty, delete it instead of committing.
 +      deletePath(refFilePath);
 +    }
 +    // archive the old mob files, do not archive the del files.
 +    try {
 +      MobUtils
 +        .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
 +    } catch (IOException e) {
 +      LOG.error("Failed to archive the files " + mobFilesToCompact, e);
 +    }
 +  }
 +
 +  /**
 +   * Compacts the del files in batches which avoids opening too many files.
 +   * @param request The compaction request.
 +   * @param delFilePaths
 +   * @return The paths of new del files after merging or the original files if no merging
 +   *         is necessary.
 +   * @throws IOException
 +   */
 +  protected List<Path> compactDelFiles(PartitionedMobFileCompactionRequest request,
 +    List<Path> delFilePaths) throws IOException {
 +    if (delFilePaths.size() <= delFileMaxCount) {
 +      return delFilePaths;
 +    }
 +    // when there are more del files than the number that is allowed, merge it firstly.
 +    int offset = 0;
 +    List<Path> paths = new ArrayList<Path>();
 +    while (offset < delFilePaths.size()) {
 +      // get the batch
 +      int batch = compactionBatchSize;
 +      if (delFilePaths.size() - offset < compactionBatchSize) {
 +        batch = delFilePaths.size() - offset;
 +      }
 +      List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>();
 +      if (batch == 1) {
 +        // only one file left, do not compact it, directly add it to the new files.
 +        paths.add(delFilePaths.get(offset));
 +        offset++;
 +        continue;
 +      }
 +      for (int i = offset; i < batch + offset; i++) {
 +        batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig,
 +          BloomType.NONE));
 +      }
 +      // compact the del files in a batch.
 +      paths.add(compactDelFilesInBatch(request, batchedDelFiles));
 +      // move to the next batch.
 +      offset += batch;
 +    }
 +    return compactDelFiles(request, paths);
 +  }
 +
 +  /**
 +   * Compacts the del file in a batch.
 +   * @param request The compaction request.
 +   * @param delFiles The del files.
 +   * @return The path of new del file after merging.
 +   * @throws IOException
 +   */
 +  private Path compactDelFilesInBatch(PartitionedMobFileCompactionRequest request,
 +    List<StoreFile> delFiles) throws IOException {
 +    // create a scanner for the del files.
 +    StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES);
 +    Writer writer = null;
 +    Path filePath = null;
 +    try {
 +      writer = MobUtils.createDelFileWriter(conf, fs, column,
 +        MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
 +        column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig);
 +      filePath = writer.getPath();
 +      List<Cell> cells = new ArrayList<Cell>();
 +      boolean hasMore = false;
++      ScannerContext scannerContext =
++              ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
 +      do {
-         hasMore = scanner.next(cells, compactionKVMax);
++        hasMore = scanner.next(cells, scannerContext);
 +        for (Cell cell : cells) {
 +          // TODO remove this after the new code are introduced.
 +          KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
 +          writer.append(kv);
 +        }
 +        cells.clear();
 +      } while (hasMore);
 +    } finally {
 +      scanner.close();
 +      if (writer != null) {
 +        try {
 +          writer.close();
 +        } catch (IOException e) {
 +          LOG.error("Failed to close the writer of the file " + filePath, e);
 +        }
 +      }
 +    }
 +    // commit the new del file
 +    Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
 +    // archive the old del files
 +    try {
 +      MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles);
 +    } catch (IOException e) {
 +      LOG.error("Failed to archive the old del files " + delFiles, e);
 +    }
 +    return path;
 +  }
 +
 +  /**
 +   * Creates a store scanner.
 +   * @param filesToCompact The files to be compacted.
 +   * @param scanType The scan type.
 +   * @return The store scanner.
 +   * @throws IOException
 +   */
 +  private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType)
 +    throws IOException {
 +    List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false,
 +      null, HConstants.LATEST_TIMESTAMP);
 +    Scan scan = new Scan();
 +    scan.setMaxVersions(column.getMaxVersions());
 +    long ttl = HStore.determineTTLFromFamily(column);
 +    ScanInfo scanInfo = new ScanInfo(column, ttl, 0, KeyValue.COMPARATOR);
 +    StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L,
 +      HConstants.LATEST_TIMESTAMP);
 +    return scanner;
 +  }
 +
 +  /**
 +   * Bulkloads the current file.
 +   * @param table The current table.
 +   * @param bulkloadDirectory The path of bulkload directory.
 +   * @param fileName The current file name.
 +   * @throws IOException
 +   */
-   private void bulkloadRefFile(HTable table, Path bulkloadDirectory, String fileName)
++  private void bulkloadRefFile(Table table, Path bulkloadDirectory, String fileName)
 +    throws IOException {
 +    // bulkload the ref file
 +    try {
 +      LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
-       bulkload.doBulkLoad(bulkloadDirectory, table);
++      bulkload.doBulkLoad(bulkloadDirectory, (HTable)table);
 +    } catch (Exception e) {
 +      // delete the committed mob file
 +      deletePath(new Path(mobFamilyDir, fileName));
 +      throw new IOException(e);
 +    } finally {
 +      // delete the bulkload files in bulkloadPath
 +      deletePath(bulkloadDirectory);
 +    }
 +  }
 +
 +  /**
 +   * Closes the mob file writer.
 +   * @param writer The mob file writer.
 +   * @param maxSeqId Maximum sequence id.
 +   * @param mobCellsCount The number of mob cells.
 +   * @throws IOException
 +   */
 +  private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount)
 +    throws IOException {
 +    if (writer != null) {
 +      writer.appendMetadata(maxSeqId, false, mobCellsCount);
 +      try {
 +        writer.close();
 +      } catch (IOException e) {
 +        LOG.error("Failed to close the writer of the file " + writer.getPath(), e);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Closes the ref file writer.
 +   * @param writer The ref file writer.
 +   * @param maxSeqId Maximum sequence id.
 +   * @param bulkloadTime The timestamp at which the bulk load file is created.
 +   * @throws IOException
 +   */
 +  private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime)
 +    throws IOException {
 +    if (writer != null) {
 +      writer.appendMetadata(maxSeqId, false);
 +      writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime));
 +      try {
 +        writer.close();
 +      } catch (IOException e) {
 +        LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Gets the max seqId and number of cells of the store files.
 +   * @param storeFiles The store files.
 +   * @return The pair of the max seqId and number of cells of the store files.
 +   * @throws IOException
 +   */
 +  private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException {
 +    long maxSeqId = 0;
 +    long maxKeyCount = 0;
 +    for (StoreFile sf : storeFiles) {
 +      // the readers will be closed later after the merge.
 +      maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId());
 +      byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT);
 +      if (count != null) {
 +        maxKeyCount += Bytes.toLong(count);
 +      }
 +    }
 +    return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount));
 +  }
 +
 +  /**
 +   * Deletes a file.
 +   * @param path The path of the file to be deleted.
 +   */
 +  private void deletePath(Path path) {
 +    try {
 +      if (path != null) {
 +        fs.delete(path, true);
 +      }
 +    } catch (IOException e) {
 +      LOG.error("Failed to delete the file " + path, e);
 +    }
 +  }
 +
 +  private FileStatus getLinkedFileStatus(HFileLink link) throws IOException {
 +    Path[] locations = link.getLocations();
 +    for (Path location : locations) {
 +      FileStatus file = getFileStatus(location);
 +      if (file != null) {
 +        return file;
 +      }
 +    }
 +    return null;
 +  }
 +
 +  private FileStatus getFileStatus(Path path) throws IOException {
 +    try {
 +      if (path != null) {
 +        FileStatus file = fs.getFileStatus(path);
 +        return file;
 +      }
 +    } catch (FileNotFoundException e) {
 +      LOG.warn("The file " + path + " can not be found", e);
 +    }
 +    return null;
 +  }
 +}


Mime
View raw message