Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 869ED200D17 for ; Wed, 30 Aug 2017 17:13:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 850B11693EA; Wed, 30 Aug 2017 15:13:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 343311693DA for ; Wed, 30 Aug 2017 17:13:50 +0200 (CEST) Received: (qmail 43147 invoked by uid 500); 30 Aug 2017 15:13:49 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 43114 invoked by uid 99); 30 Aug 2017 15:13:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Aug 2017 15:13:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 59EFBF5592; Wed, 30 Aug 2017 15:13:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Wed, 30 Aug 2017 15:14:00 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/51] [partial] hbase-site git commit: Published site at . archived-at: Wed, 30 Aug 2017 15:13:52 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/0d6dd914/devapidocs/src-html/org/apache/hadoop/hbase/coprocessor/Export.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/coprocessor/Export.html b/devapidocs/src-html/org/apache/hadoop/hbase/coprocessor/Export.html new file mode 100644 index 0000000..64d366a --- /dev/null +++ b/devapidocs/src-html/org/apache/hadoop/hbase/coprocessor/Export.html @@ -0,0 +1,610 @@ + + + +Source code + + + +
+
001/*
+002 * Licensed to the Apache Software Foundation (ASF) under one
+003 * or more contributor license agreements.  See the NOTICE file
+004 * distributed with this work for additional information
+005 * regarding copyright ownership.  The ASF licenses this file
+006 * to you under the Apache License, Version 2.0 (the
+007 * "License"); you may not use this file except in compliance
+008 * with the License.  You may obtain a copy of the License at
+009 *
+010 *   http://www.apache.org/licenses/LICENSE-2.0
+011 *
+012 * Unless required by applicable law or agreed to in writing,
+013 * software distributed under the License is distributed on an
+014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+015 * KIND, either express or implied.  See the License for the
+016 * specific language governing permissions and limitations
+017 * under the License.
+018 */
+019package org.apache.hadoop.hbase.coprocessor;
+020
+021import com.google.protobuf.RpcCallback;
+022import com.google.protobuf.RpcController;
+023import com.google.protobuf.Service;
+024import java.io.Closeable;
+025import java.io.IOException;
+026import java.security.PrivilegedExceptionAction;
+027import java.util.ArrayList;
+028import java.util.LinkedList;
+029import java.util.List;
+030import java.util.Map;
+031import java.util.TreeMap;
+032import org.apache.commons.logging.Log;
+033import org.apache.commons.logging.LogFactory;
+034import org.apache.hadoop.conf.Configuration;
+035import org.apache.hadoop.fs.FileSystem;
+036import org.apache.hadoop.fs.Path;
+037import org.apache.hadoop.hbase.Cell;
+038import org.apache.hadoop.hbase.Coprocessor;
+039import org.apache.hadoop.hbase.CoprocessorEnvironment;
+040import org.apache.hadoop.hbase.HBaseConfiguration;
+041import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+042import org.apache.hadoop.hbase.HRegionInfo;
+043import org.apache.hadoop.hbase.TableName;
+044import org.apache.hadoop.hbase.classification.InterfaceAudience;
+045import org.apache.hadoop.hbase.classification.InterfaceStability;
+046import org.apache.hadoop.hbase.client.Connection;
+047import org.apache.hadoop.hbase.client.ConnectionFactory;
+048import org.apache.hadoop.hbase.client.Result;
+049import org.apache.hadoop.hbase.client.Scan;
+050import org.apache.hadoop.hbase.client.Table;
+051import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+052import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+053import org.apache.hadoop.hbase.ipc.RpcServer;
+054import org.apache.hadoop.hbase.ipc.ServerRpcController;
+055import org.apache.hadoop.hbase.mapreduce.ExportUtils;
+056import org.apache.hadoop.hbase.mapreduce.Import;
+057import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
+058import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+059import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken;
+060import org.apache.hadoop.hbase.protobuf.generated.ExportProtos;
+061import org.apache.hadoop.hbase.regionserver.InternalScanner;
+062import org.apache.hadoop.hbase.regionserver.Region;
+063import org.apache.hadoop.hbase.regionserver.RegionScanner;
+064import org.apache.hadoop.hbase.security.User;
+065import org.apache.hadoop.hbase.security.UserProvider;
+066import org.apache.hadoop.hbase.security.token.FsDelegationToken;
+067import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+068import org.apache.hadoop.hbase.util.ArrayUtils;
+069import org.apache.hadoop.hbase.util.ByteStringer;
+070import org.apache.hadoop.hbase.util.Bytes;
+071import org.apache.hadoop.hbase.util.Triple;
+072import org.apache.hadoop.io.SequenceFile;
+073import org.apache.hadoop.io.Text;
+074import org.apache.hadoop.io.compress.CompressionCodec;
+075import org.apache.hadoop.io.compress.DefaultCodec;
+076import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+077import org.apache.hadoop.security.token.Token;
+078import org.apache.hadoop.util.GenericOptionsParser;
+079import org.apache.hadoop.util.ReflectionUtils;
+080
+081/**
+082 * Export an HBase table. Writes content to sequence files up in HDFS. Use
+083 * {@link Import} to read it back in again. It is implemented by the endpoint
+084 * technique.
+085 *
+086 * @see org.apache.hadoop.hbase.mapreduce.Export
+087 */
+088@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+089@InterfaceStability.Evolving
+090public class Export extends ExportProtos.ExportService
+091        implements Coprocessor, CoprocessorService {
+092
+093  private static final Log LOG = LogFactory.getLog(Export.class);
+094  private static final Class<? extends CompressionCodec> DEFAULT_CODEC = DefaultCodec.class;
+095  private static final SequenceFile.CompressionType DEFAULT_TYPE = SequenceFile.CompressionType.RECORD;
+096  private RegionCoprocessorEnvironment env = null;
+097  private UserProvider userProvider;
+098
+099  public static void main(String[] args) throws Throwable {
+100    Map<byte[], Response> response = run(HBaseConfiguration.create(), args);
+101    System.exit(response == null ? -1 : 0);
+102  }
+103
+104  @VisibleForTesting
+105  static Map<byte[], Response> run(final Configuration conf, final String[] args) throws Throwable {
+106    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+107    if (!ExportUtils.isValidArguements(args)) {
+108      ExportUtils.usage("Wrong number of arguments: " + ArrayUtils.length(otherArgs));
+109      return null;
+110    }
+111    Triple<TableName, Scan, Path> arguments = ExportUtils.getArgumentsFromCommandLine(conf, otherArgs);
+112    return run(conf, arguments.getFirst(), arguments.getSecond(), arguments.getThird());
+113  }
+114
+115  public static Map<byte[], Response> run(final Configuration conf, TableName tableName, Scan scan, Path dir) throws Throwable {
+116    FileSystem fs = dir.getFileSystem(conf);
+117    UserProvider userProvider = UserProvider.instantiate(conf);
+118    checkDir(fs, dir);
+119    FsDelegationToken fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
+120    fsDelegationToken.acquireDelegationToken(fs);
+121    try {
+122      final ExportProtos.ExportRequest request = getConfiguredRequest(conf, dir,
+123        scan, fsDelegationToken.getUserToken());
+124      try (Connection con = ConnectionFactory.createConnection(conf);
+125              Table table = con.getTable(tableName)) {
+126        Map<byte[], Response> result = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+127        table.coprocessorService(ExportProtos.ExportService.class,
+128          scan.getStartRow(),
+129          scan.getStopRow(),
+130          (ExportProtos.ExportService service) -> {
+131            ServerRpcController controller = new ServerRpcController();
+132            Map<byte[], ExportProtos.ExportResponse> rval = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+133            CoprocessorRpcUtils.BlockingRpcCallback<ExportProtos.ExportResponse>
+134              rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
+135            service.export(controller, request, rpcCallback);
+136            if (controller.failedOnException()) {
+137              throw controller.getFailedOn();
+138            }
+139            return rpcCallback.get();
+140          }).forEach((k, v) -> result.put(k, new Response(v)));
+141        return result;
+142      } catch (Throwable e) {
+143        fs.delete(dir, true);
+144        throw e;
+145      }
+146    } finally {
+147      fsDelegationToken.releaseDelegationToken();
+148    }
+149  }
+150
+151  private static boolean getCompression(final ExportProtos.ExportRequest request) {
+152    if (request.hasCompressed()) {
+153      return request.getCompressed();
+154    } else {
+155      return false;
+156    }
+157  }
+158
+159  private static SequenceFile.CompressionType getCompressionType(final ExportProtos.ExportRequest request) {
+160    if (request.hasCompressType()) {
+161      return SequenceFile.CompressionType.valueOf(request.getCompressType());
+162    } else {
+163      return DEFAULT_TYPE;
+164    }
+165  }
+166
+167  private static CompressionCodec getCompressionCodec(final Configuration conf, final ExportProtos.ExportRequest request) {
+168    try {
+169      Class<? extends CompressionCodec> codecClass;
+170      if (request.hasCompressCodec()) {
+171        codecClass = conf.getClassByName(request.getCompressCodec()).asSubclass(CompressionCodec.class);
+172      } else {
+173        codecClass = DEFAULT_CODEC;
+174      }
+175      return ReflectionUtils.newInstance(codecClass, conf);
+176    } catch (ClassNotFoundException e) {
+177      throw new IllegalArgumentException("Compression codec "
+178              + request.getCompressCodec() + " was not found.", e);
+179    }
+180  }
+181
+182  private static SequenceFile.Writer.Option getOutputPath(final Configuration conf,
+183          final HRegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
+184    Path file = new Path(request.getOutputPath(), "export-" + info.getEncodedName());
+185    FileSystem fs = file.getFileSystem(conf);
+186    if (fs.exists(file)) {
+187      throw new IOException(file + " exists");
+188    }
+189    return SequenceFile.Writer.file(file);
+190  }
+191
+192  private static List<SequenceFile.Writer.Option> getWriterOptions(final Configuration conf,
+193          final HRegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
+194    List<SequenceFile.Writer.Option> rval = new LinkedList<>();
+195    rval.add(SequenceFile.Writer.keyClass(ImmutableBytesWritable.class));
+196    rval.add(SequenceFile.Writer.valueClass(Result.class));
+197    rval.add(getOutputPath(conf, info, request));
+198    if (getCompression(request)) {
+199      rval.add(SequenceFile.Writer.compression(getCompressionType(request), getCompressionCodec(conf, request)));
+200    } else {
+201      rval.add(SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
+202    }
+203    return rval;
+204  }
+205
+206  private static ExportProtos.ExportResponse processData(final Region region, final Configuration conf,
+207    final UserProvider userProvider, final Scan scan, final Token userToken,
+208    final List<SequenceFile.Writer.Option> opts) throws IOException {
+209    ScanCoprocessor cp = new ScanCoprocessor(region);
+210    RegionScanner scanner = null;
+211    try (RegionOp regionOp = new RegionOp(region);
+212            SecureWriter out = new SecureWriter(conf, userProvider, userToken, opts)) {
+213      scanner = cp.checkScannerOpen(scan);
+214      ImmutableBytesWritable key = new ImmutableBytesWritable();
+215      long rowCount = 0;
+216      long cellCount = 0;
+217      List<Result> results = new ArrayList<>();
+218      List<Cell> cells = new ArrayList<>();
+219      boolean hasMore;
+220      do {
+221        boolean bypass = cp.preScannerNext(scanner, results, scan.getBatch());
+222        if (bypass) {
+223          hasMore = false;
+224        } else {
+225          hasMore = scanner.nextRaw(cells);
+226          if (cells.isEmpty()) {
+227            continue;
+228          }
+229          Cell firstCell = cells.get(0);
+230          for (Cell cell : cells) {
+231            if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength(),
+232                    cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) != 0) {
+233              throw new IOException("Why the RegionScanner#nextRaw returns the data of different rows??"
+234                      + " first row=" + Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(), firstCell.getRowLength())
+235                      + ", current row=" + Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+236            }
+237          }
+238          results.add(Result.create(cells));
+239          cells.clear();
+240          cp.postScannerNext(scanner, results, scan.getBatch(), hasMore);
+241        }
+242        for (Result r : results) {
+243          key.set(r.getRow());
+244          out.append(key, r);
+245          ++rowCount;
+246          cellCount += r.size();
+247        }
+248        results.clear();
+249      } while (hasMore);
+250      return ExportProtos.ExportResponse.newBuilder()
+251              .setRowCount(rowCount)
+252              .setCellCount(cellCount)
+253              .build();
+254    } finally {
+255      cp.checkScannerClose(scanner);
+256    }
+257  }
+258
+259  private static void checkDir(final FileSystem fs, final Path dir) throws IOException {
+260    if (fs.exists(dir)) {
+261      throw new RuntimeException("The " + dir + " exists");
+262    }
+263    if (!fs.mkdirs(dir)) {
+264      throw new IOException("Failed to create the " + dir);
+265    }
+266  }
+267
+268  private static ExportProtos.ExportRequest getConfiguredRequest(Configuration conf,
+269          Path dir, final Scan scan, final Token<?> userToken) throws IOException {
+270    boolean compressed = conf.getBoolean(FileOutputFormat.COMPRESS, false);
+271    String compressionType = conf.get(FileOutputFormat.COMPRESS_TYPE,
+272            DEFAULT_TYPE.toString());
+273    String compressionCodec = conf.get(FileOutputFormat.COMPRESS_CODEC,
+274            DEFAULT_CODEC.getName());
+275    DelegationToken protoToken = null;
+276    if (userToken != null) {
+277      protoToken = DelegationToken.newBuilder()
+278              .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
+279              .setPassword(ByteStringer.wrap(userToken.getPassword()))
+280              .setKind(userToken.getKind().toString())
+281              .setService(userToken.getService().toString()).build();
+282    }
+283    LOG.info("compressed=" + compressed
+284            + ", compression type=" + compressionType
+285            + ", compression codec=" + compressionCodec
+286            + ", userToken=" + userToken);
+287    ExportProtos.ExportRequest.Builder builder = ExportProtos.ExportRequest.newBuilder()
+288            .setScan(ProtobufUtil.toScan(scan))
+289            .setOutputPath(dir.toString())
+290            .setCompressed(compressed)
+291            .setCompressCodec(compressionCodec)
+292            .setCompressType(compressionType);
+293    if (protoToken != null) {
+294      builder.setFsToken(protoToken);
+295    }
+296    return builder.build();
+297  }
+298
+299
+300  @Override
+301  public void start(CoprocessorEnvironment environment) throws IOException {
+302    if (environment instanceof RegionCoprocessorEnvironment) {
+303      env = (RegionCoprocessorEnvironment) environment;
+304      userProvider = UserProvider.instantiate(env.getConfiguration());
+305    } else {
+306      throw new CoprocessorException("Must be loaded on a table region!");
+307    }
+308  }
+309
+310  @Override
+311  public void stop(CoprocessorEnvironment env) throws IOException {
+312  }
+313
+314  @Override
+315  public Service getService() {
+316    return this;
+317  }
+318
+319  @Override
+320  public void export(RpcController controller, ExportProtos.ExportRequest request,
+321          RpcCallback<ExportProtos.ExportResponse> done) {
+322    Region region = env.getRegion();
+323    Configuration conf = HBaseConfiguration.create(env.getConfiguration());
+324    conf.setStrings("io.serializations", conf.get("io.serializations"), ResultSerialization.class.getName());
+325    try {
+326      Scan scan = validateKey(region.getRegionInfo(), request);
+327      Token userToken = null;
+328      if (userProvider.isHadoopSecurityEnabled() && !request.hasFsToken()) {
+329        LOG.warn("Hadoop security is enable, but no found of user token");
+330      } else if (userProvider.isHadoopSecurityEnabled()) {
+331        userToken = new Token(request.getFsToken().getIdentifier().toByteArray(),
+332                request.getFsToken().getPassword().toByteArray(),
+333                new Text(request.getFsToken().getKind()),
+334                new Text(request.getFsToken().getService()));
+335      }
+336      ExportProtos.ExportResponse response = processData(region, conf, userProvider,
+337        scan, userToken, getWriterOptions(conf, region.getRegionInfo(), request));
+338      done.run(response);
+339    } catch (IOException e) {
+340      CoprocessorRpcUtils.setControllerException(controller, e);
+341      LOG.error(e);
+342    }
+343  }
+344
+345  private Scan validateKey(final HRegionInfo region, final ExportProtos.ExportRequest request) throws IOException {
+346    Scan scan = ProtobufUtil.toScan(request.getScan());
+347    byte[] regionStartKey = region.getStartKey();
+348    byte[] originStartKey = scan.getStartRow();
+349    if (originStartKey == null
+350            || Bytes.compareTo(originStartKey, regionStartKey) < 0) {
+351      scan.setStartRow(regionStartKey);
+352    }
+353    byte[] regionEndKey = region.getEndKey();
+354    byte[] originEndKey = scan.getStopRow();
+355    if (originEndKey == null
+356            || Bytes.compareTo(originEndKey, regionEndKey) > 0) {
+357      scan.setStartRow(regionEndKey);
+358    }
+359    return scan;
+360  }
+361
+362  private static class RegionOp implements Closeable {
+363
+364    private final Region region;
+365
+366    RegionOp(final Region region) throws IOException {
+367      this.region = region;
+368      region.startRegionOperation();
+369    }
+370
+371    @Override
+372    public void close() throws IOException {
+373      region.closeRegionOperation();
+374    }
+375  }
+376
+377  private static class ScanCoprocessor {
+378
+379    private final Region region;
+380
+381    ScanCoprocessor(final Region region) {
+382      this.region = region;
+383    }
+384
+385    RegionScanner checkScannerOpen(final Scan scan) throws IOException {
+386      RegionScanner scanner;
+387      if (region.getCoprocessorHost() == null) {
+388        scanner = region.getScanner(scan);
+389      } else {
+390        scanner = region.getCoprocessorHost().preScannerOpen(scan);
+391        if (scanner == null) {
+392          scanner = region.getScanner(scan);
+393        }
+394        scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
+395      }
+396      if (scanner == null) {
+397        throw new IOException("Failed to open region scanner");
+398      }
+399      return scanner;
+400    }
+401
+402    void checkScannerClose(final InternalScanner s) throws IOException {
+403      if (s == null) {
+404        return;
+405      }
+406      if (region.getCoprocessorHost() == null) {
+407        s.close();
+408        return;
+409      }
+410      if (region.getCoprocessorHost().preScannerClose(s)) {
+411        return;
+412      }
+413      try {
+414        s.close();
+415      } finally {
+416        region.getCoprocessorHost().postScannerClose(s);
+417      }
+418    }
+419
+420    boolean preScannerNext(final InternalScanner s,
+421            final List<Result> results, final int limit) throws IOException {
+422      if (region.getCoprocessorHost() == null) {
+423        return false;
+424      } else {
+425        Boolean bypass = region.getCoprocessorHost().preScannerNext(s, results, limit);
+426        return bypass == null ? false : bypass;
+427      }
+428    }
+429
+430    boolean postScannerNext(final InternalScanner s,
+431            final List<Result> results, final int limit, boolean hasMore)
+432            throws IOException {
+433      if (region.getCoprocessorHost() == null) {
+434        return false;
+435      } else {
+436        return region.getCoprocessorHost().postScannerNext(s, results, limit, hasMore);
+437      }
+438    }
+439  }
+440
+441  private static class SecureWriter implements Closeable {
+442    private final PrivilegedWriter privilegedWriter;
+443
+444    SecureWriter(final Configuration conf, final UserProvider userProvider, final Token userToken,
+445            final List<SequenceFile.Writer.Option> opts) throws IOException {
+446      privilegedWriter = new PrivilegedWriter(getActiveUser(userProvider, userToken),
+447        SequenceFile.createWriter(conf, opts.toArray(new SequenceFile.Writer.Option[opts.size()])));
+448    }
+449
+450    void append(final Object key, final Object value) throws IOException {
+451      privilegedWriter.append(key, value);
+452    }
+453
+454    private static User getActiveUser(final UserProvider userProvider, final Token userToken) throws IOException {
+455      User user = RpcServer.getRequestUser();
+456      if (user == null) {
+457        user = userProvider.getCurrent();
+458      }
+459      if (user == null && userToken != null) {
+460        LOG.warn("No found of user credentials, but a token was got from user request");
+461      } else if (user != null && userToken != null) {
+462        user.addToken(userToken);
+463      }
+464      return user;
+465    }
+466
+467    @Override
+468    public void close() throws IOException {
+469      privilegedWriter.close();
+470    }
+471  }
+472
+473  private static class PrivilegedWriter implements PrivilegedExceptionAction<Boolean>, Closeable {
+474    private final User user;
+475    private final SequenceFile.Writer out;
+476    private Object key;
+477    private Object value;
+478
+479    PrivilegedWriter(final User user, final SequenceFile.Writer out) {
+480      this.user = user;
+481      this.out = out;
+482    }
+483
+484    void append(final Object key, final Object value) throws IOException {
+485      if (user == null) {
+486        out.append(key, value);
+487      } else {
+488        this.key = key;
+489        this.value = value;
+490        try {
+491          user.runAs(this);
+492        } catch (InterruptedException ex) {
+493          throw new IOException(ex);
+494        }
+495      }
+496    }
+497
+498    @Override
+499    public Boolean run() throws Exception {
+500      out.append(key, value);
+501      return true;
+502    }
+503
+504    @Override
+505    public void close() throws IOException {
+506      out.close();
+507    }
+508  }
+509
+510  public static class Response {
+511
+512    private final long rowCount;
+513    private final long cellCount;
+514
+515    private Response(ExportProtos.ExportResponse r) {
+516      this.rowCount = r.getRowCount();
+517      this.cellCount = r.getCellCount();
+518    }
+519
+520    public long getRowCount() {
+521      return rowCount;
+522    }
+523
+524    public long getCellCount() {
+525      return cellCount;
+526    }
+527
+528    @Override
+529    public String toString() {
+530      StringBuilder builder = new StringBuilder(35);
+531      return builder.append("rowCount=")
+532             .append(rowCount)
+533             .append(", cellCount=")
+534             .append(cellCount)
+535             .toString();
+536    }
+537  }
+538}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + http://git-wip-us.apache.org/repos/asf/hbase-site/blob/0d6dd914/devapidocs/src-html/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.html b/devapidocs/src-html/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.html index 0a0c06d..6b98f3c 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.html @@ -135,13 +135,13 @@ 127 128 private void setStreamOptions(FSDataInputStream in) { 129 try { -130 this.stream.setDropBehind(dropBehind); +130 in.setDropBehind(dropBehind); 131 } catch (Exception e) { 132 // Skipped. 133 } 134 if (readahead >= 0) { 135 try { -136 this.stream.setReadahead(readahead); +136 in.setReadahead(readahead); 137 } catch (Exception e) { 138 // Skipped. 139 } http://git-wip-us.apache.org/repos/asf/hbase-site/blob/0d6dd914/devapidocs/src-html/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.html b/devapidocs/src-html/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.html index 3b57398..abcc841 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.html @@ -370,18 +370,16 @@ 362 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job); 363 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses( 364 job, -365 // when making changes here, consider also mapreduce.TableMapReduceUtil -366 // pull job classes -367 job.getMapOutputKeyClass(), -368 job.getMapOutputValueClass(), -369 job.getOutputKeyClass(), -370 job.getOutputValueClass(), -371 job.getPartitionerClass(), -372 job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class), -373 job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class), -374 job.getCombinerClass()); -375 } -376} +365 job.getMapOutputKeyClass(), +366 job.getMapOutputValueClass(), +367 job.getOutputKeyClass(), +368 job.getOutputValueClass(), +369 job.getPartitionerClass(), +370 job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class), +371 job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class), +372 job.getCombinerClass()); +373 } +374} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/0d6dd914/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/Driver.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/Driver.html b/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/Driver.html index 8cad623..7cd733d 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/Driver.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/Driver.html @@ -59,9 +59,9 @@ 051 pgd.addClass(CopyTable.NAME, CopyTable.class, 052 "Export a table from local cluster to peer cluster."); 053 pgd.addClass(VerifyReplication.NAME, VerifyReplication.class, "Compare" + -054 " the data from tables in two different clusters. WARNING: It" + -055 " doesn't work for incrementColumnValues'd cells since the" + -056 " timestamp is changed after being appended to the log."); +054 " data from tables in two different clusters. It" + +055 " doesn't work for incrementColumnValues'd cells since" + +056 " timestamp is changed after appending to WAL."); 057 pgd.addClass(WALPlayer.NAME, WALPlayer.class, "Replay WAL files."); 058 pgd.addClass(ExportSnapshot.NAME, ExportSnapshot.class, "Export" + 059 " the specific snapshot to a given FileSystem."); http://git-wip-us.apache.org/repos/asf/hbase-site/blob/0d6dd914/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/Export.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/Export.html b/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/Export.html index 97e553e..849e452 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/Export.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/Export.html @@ -28,181 +28,86 @@ 020 021import java.io.IOException; 022 -023import org.apache.commons.logging.Log; -024import org.apache.commons.logging.LogFactory; -025import org.apache.hadoop.hbase.classification.InterfaceAudience; -026import org.apache.hadoop.conf.Configuration; -027import org.apache.hadoop.conf.Configured; -028import org.apache.hadoop.fs.Path; -029import org.apache.hadoop.hbase.HBaseConfiguration; -030import org.apache.hadoop.hbase.client.Result; -031import org.apache.hadoop.hbase.client.Scan; -032import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -033import org.apache.hadoop.hbase.filter.Filter; -034import org.apache.hadoop.hbase.filter.IncompatibleFilterException; -035import org.apache.hadoop.hbase.filter.PrefixFilter; -036import org.apache.hadoop.hbase.filter.RegexStringComparator; -037import org.apache.hadoop.hbase.filter.RowFilter; -038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -039import org.apache.hadoop.hbase.util.Bytes; -040import org.apache.hadoop.mapreduce.Job; -041import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -042import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -043import org.apache.hadoop.util.Tool; -044import org.apache.hadoop.util.ToolRunner; -045 -046/** -047 * Export an HBase table. -048 * Writes content to sequence files up in HDFS. Use {@link Import} to read it -049 * back in again. -050 */ -051@InterfaceAudience.Public -052public class Export extends Configured implements Tool { -053 private static final Log LOG = LogFactory.getLog(Export.class); -054 final static String NAME = "export"; -055 final static String RAW_SCAN = "hbase.mapreduce.include.deleted.rows"; -056 final static String EXPORT_BATCHING = "hbase.export.scanner.batch"; -057 -058 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; -059 -060 /** -061 * Sets up the actual job. -062 * -063 * @param conf The current configuration. -064 * @param args The command line parameters. -065 * @return The newly created job. -066 * @throws IOException When setting up the job fails. -067 */ -068 public static Job createSubmittableJob(Configuration conf, String[] args) -069 throws IOException { -070 String tableName = args[0]; -071 Path outputDir = new Path(args[1]); -072 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); -073 job.setJobName(NAME + "_" + tableName); -074 job.setJarByClass(Export.class); -075 // Set optional scan parameters -076 Scan s = getConfiguredScanForJob(conf, args); -077 IdentityTableMapper.initJob(tableName, s, IdentityTableMapper.class, job); -078 // No reducers. Just write straight to output files. -079 job.setNumReduceTasks(0); -080 job.setOutputFormatClass(SequenceFileOutputFormat.class); -081 job.setOutputKeyClass(ImmutableBytesWritable.class); -082 job.setOutputValueClass(Result.class); -083 FileOutputFormat.setOutputPath(job, outputDir); // job conf doesn't contain the conf so doesn't have a default fs. -084 return job; -085 } -086 -087 private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException { -088 Scan s = new Scan(); -089 // Optional arguments. -090 // Set Scan Versions -091 int versions = args.length > 2? Integer.parseInt(args[2]): 1; -092 s.setMaxVersions(versions); -093 // Set Scan Range -094 long startTime = args.length > 3? Long.parseLong(args[3]): 0L; -095 long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE; -096 s.setTimeRange(startTime, endTime); -097 // Set cache blocks -098 s.setCacheBlocks(false); -099 // set Start and Stop row -100 if (conf.get(TableInputFormat.SCAN_ROW_START) != null) { -101 s.setStartRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_START))); -102 } -103 if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) { -104 s.setStopRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_STOP))); -105 } -106 // Set Scan Column Family -107 boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN)); -108 if (raw) { -109 s.setRaw(raw); -110 } -111 for (String columnFamily : conf.getTrimmedStrings(TableInputFormat.SCAN_COLUMN_FAMILY)) { -112 s.addFamily(Bytes.toBytes(columnFamily)); -113 } -114 // Set RowFilter or Prefix Filter if applicable. -115 Filter exportFilter = getExportFilter(args); -116 if (exportFilter!= null) { -117 LOG.info("Setting Scan Filter for Export."); -118 s.setFilter(exportFilter); -119 } -120 -121 int batching = conf.getInt(EXPORT_BATCHING, -1); -122 if (batching != -1){ -123 try { -124 s.setBatch(batching); -125 } catch (IncompatibleFilterException e) { -126 LOG.error("Batching could not be set", e); -127 } -128 } -129 LOG.info("versions=" + versions + ", starttime=" + startTime + -130 ", endtime=" + endTime + ", keepDeletedCells=" + raw); -131 return s; -132 } -133 -134 private static Filter getExportFilter(String[] args) { -135 Filter exportFilter = null; -136 String filterCriteria = (args.length > 5) ? args[5]: null; -137 if (filterCriteria == null) return null; -138 if (filterCriteria.startsWith("^")) { -139 String regexPattern = filterCriteria.substring(1, filterCriteria.length()); -140 exportFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regexPattern)); -141 } else { -142 exportFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria)); -143 } -144 return exportFilter; -145 } -146 -147 /* -148 * @param errorMsg Error message. Can be null. -149 */ -150 private static void usage(final String errorMsg) { -151 if (errorMsg != null && errorMsg.length() > 0) { -152 System.err.println("ERROR: " + errorMsg); -153 } -154 System.err.println("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " + -155 "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]\n"); -156 System.err.println(" Note: -D properties will be applied to the conf used. "); -157 System.err.println(" For example: "); -158 System.err.println(" -D mapreduce.output.fileoutputformat.compress=true"); -159 System.err.println(" -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec"); -160 System.err.println(" -D mapreduce.output.fileoutputformat.compress.type=BLOCK"); -161 System.err.println(" Additionally, the following SCAN properties can be specified"); -162 System.err.println(" to control/limit what is exported.."); -163 System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ..."); -164 System.err.println(" -D " + RAW_SCAN + "=true"); -165 System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>"); -166 System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>"); -167 System.err.println(" -D " + JOB_NAME_CONF_KEY -168 + "=jobName - use the specified mapreduce job name for the export"); -169 System.err.println("For performance consider the following properties:\n" -170 + " -Dhbase.client.scanner.caching=100\n" -171 + " -Dmapreduce.map.speculative=false\n" -172 + " -Dmapreduce.reduce.speculative=false"); -173 System.err.println("For tables with very wide rows consider setting the batch size as below:\n" -174 + " -D" + EXPORT_BATCHING + "=10"); -175 } -176 -177 -178 @Override -179 public int run(String[] args) throws Exception { -180 if (args.length < 2) { -181 usage("Wrong number of arguments: " + args.length); -182 return -1; -183 } -184 Job job = createSubmittableJob(getConf(), args); -185 return (job.waitForCompletion(true) ? 0 : 1); -186 } -187 -188 /** -189 * Main entry point. -190 * @param args The command line parameters. -191 * @throws Exception When running the job fails. -192 */ -193 public static void main(String[] args) throws Exception { -194 int errCode = ToolRunner.run(HBaseConfiguration.create(), new Export(), args); -195 System.exit(errCode); -196 } -197} +023import org.apache.hadoop.conf.Configuration; +024import org.apache.hadoop.conf.Configured; +025import org.apache.hadoop.fs.Path; +026import org.apache.hadoop.hbase.TableName; +027import org.apache.hadoop.hbase.classification.InterfaceAudience; +028import org.apache.hadoop.hbase.client.Result; +029import org.apache.hadoop.hbase.client.Scan; +030import org.apache.hadoop.hbase.HBaseConfiguration; +031import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +032import org.apache.hadoop.hbase.util.ArrayUtils; +033import org.apache.hadoop.hbase.util.Triple; +034import org.apache.hadoop.mapreduce.Job; +035import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +036import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +037import org.apache.hadoop.util.Tool; +038import org.apache.hadoop.util.ToolRunner; +039 +040/** +041 * Export an HBase table. +042 * Writes content to sequence files up in HDFS. Use {@link Import} to read it +043 * back in again. +044 */ +045@InterfaceAudience.Public +046public class Export extends Configured implements Tool { +047 static final String NAME = "export"; +048 static final String JOB_NAME_CONF_KEY = "mapreduce.job.name"; +049 +050 /** +051 * Sets up the actual job. +052 * +053 * @param conf The current configuration. +054 * @param args The command line parameters. +055 * @return The newly created job. +056 * @throws IOException When setting up the job fails. +057 */ +058 public static Job createSubmittableJob(Configuration conf, String[] args) +059 throws IOException { +060 Triple<TableName, Scan, Path> arguments = ExportUtils.getArgumentsFromCommandLine(conf, args); +061 String tableName = arguments.getFirst().getNameAsString(); +062 Path outputDir = arguments.getThird(); +063 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); +064 job.setJobName(NAME + "_" + tableName); +065 job.setJarByClass(Export.class); +066 // Set optional scan parameters +067 Scan s = arguments.getSecond(); +068 IdentityTableMapper.initJob(tableName, s, IdentityTableMapper.class, job); +069 // No reducers. Just write straight to output files. +070 job.setNumReduceTasks(0); +071 job.setOutputFormatClass(SequenceFileOutputFormat.class); +072 job.setOutputKeyClass(ImmutableBytesWritable.class); +073 job.setOutputValueClass(Result.class); +074 FileOutputFormat.setOutputPath(job, outputDir); // job conf doesn't contain the conf so doesn't have a default fs. +075 return job; +076 } +077 +078 @Override +079 public int run(String[] args) throws Exception { +080 if (!ExportUtils.isValidArguements(args)) { +081 ExportUtils.usage("Wrong number of arguments: " + ArrayUtils.length(args)); +082 System.err.println(" -D " + JOB_NAME_CONF_KEY +083 + "=jobName - use the specified mapreduce job name for the export"); +084 System.err.println("For MR performance consider the following properties:"); +085 System.err.println(" -D mapreduce.map.speculative=false"); +086 System.err.println(" -D mapreduce.reduce.speculative=false"); +087 return -1; +088 } +089 Job job = createSubmittableJob(getConf(), args); +090 return (job.waitForCompletion(true) ? 0 : 1); +091 } +092 +093 /** +094 * Main entry point. +095 * @param args The command line parameters. +096 * @throws Exception When running the job fails. +097 */ +098 public static void main(String[] args) throws Exception { +099 int errCode = ToolRunner.run(HBaseConfiguration.create(), new Export(), args); +100 System.exit(errCode); +101 } +102}