hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject svn commit: r911621 - in /hadoop/hbase/trunk: ./ src/contrib/mdc_replication/ src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/ src/java/org/apache/hadoop/hbase/mapreduce/
Date Thu, 18 Feb 2010 22:27:40 GMT
Author: jdcryans
Date: Thu Feb 18 22:27:40 2010
New Revision: 911621

URL: http://svn.apache.org/viewvc?rev=911621&view=rev
Log:
HBASE-2221  MR to copy a table

Added:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/contrib/mdc_replication/ivy.xml
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=911621&r1=911620&r2=911621&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Thu Feb 18 22:27:40 2010
@@ -395,6 +395,7 @@
                repo, etc. (Kay Kay via Stack)
    HBASE-2129  Simple Master/Slave replication
    HBASE-2070  Collect HLogs and delete them after a period of time
+   HBASE-2221  MR to copy a table
 
   OPTIMIZATIONS
    HBASE-410   [testing] Speed up the test suite

Modified: hadoop/hbase/trunk/src/contrib/mdc_replication/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/ivy.xml?rev=911621&r1=911620&r2=911621&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/ivy.xml (original)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/ivy.xml Thu Feb 18 22:27:40 2010
@@ -86,6 +86,10 @@
                rev="${hadoop-hdfs.version}" conf="common->default" changing="true" >
       <exclude conf="test"/>  
     </dependency>
+   <dependency org="org.apache.hadoop" name="hadoop-mapred"
+               rev="${hadoop-mapred.version}" conf="common->default" changing="true" >
+      <exclude conf="test"/>
+   </dependency>
     <dependency org="org.jruby" name="jruby-complete"
               rev="${jruby.version}" conf="common->default" />
     
@@ -99,6 +103,8 @@
                rev="${hadoop-core.version}" conf="test->default"  transitive="false" changing="true"
/>
     <dependency org="org.apache.hadoop" name="hadoop-hdfs-test" 
                rev="${hadoop-hdfs.version}" conf="test->default" transitive="false" changing="true"/>
+    <dependency org="org.apache.hadoop" name="hadoop-mapred-test"
+               rev="${hadoop-mapred.version}" conf="test->default" transitive="false"
changing="true"/>
     <dependency org="log4j" name="log4j" 
                rev="${log4j.version}" conf="test->master">
       <exclude conf="jmx,mail,jms"/>  

Modified: hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java?rev=911621&r1=911620&r2=911621&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java
(original)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java
Thu Feb 18 22:27:40 2010
@@ -19,47 +19,131 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.EmptyWatcher;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
+import org.apache.hadoop.hbase.mapreduce.CopyTable;
+import org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assert.assertArrayEquals;
-
-import org.junit.*;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 public class TestReplication implements HConstants{
 
   protected static final Log LOG = LogFactory.getLog(TestReplication.class);
 
-  private Configuration conf1;
-  private Configuration conf2;
-
-  private ZooKeeperWrapper zkw1;
-  private ZooKeeperWrapper zkw2;
+  private static Configuration conf1;
+  private static Configuration conf2;
 
-  private HBaseTestingUtility utility1;
-  private HBaseTestingUtility utility2;
-
-  private final int NB_ROWS_IN_BATCH = 100;
-  private final long SLEEP_TIME = 500;
-  private final int NB_RETRIES = 5;
+  private static ZooKeeperWrapper zkw1;
+  private static ZooKeeperWrapper zkw2;
 
+  private static HBaseTestingUtility utility1;
+  private static HBaseTestingUtility utility2;
+  private static final int NB_ROWS_IN_BATCH = 100;
+  private static final long SLEEP_TIME = 500; //ms
+  private static final int NB_RETRIES = 10;
+
+  private static final byte[] tableName = Bytes.toBytes("test");
+  private static final byte[] famName = Bytes.toBytes("f");
+  private static final byte[] row = Bytes.toBytes("row");
+  private static final byte[] noRepfamName = Bytes.toBytes("norep");
 
   /**
    * @throws java.lang.Exception
    */
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    try {
+      conf1 = HBaseConfiguration.create();
+      conf1.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
+          .getName());
+      conf1.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
+          .getName());
+      conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1");
+
+      utility1 = new HBaseTestingUtility(conf1);
+      utility1.startMiniZKCluster();
+      MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+      zkw1 = new ZooKeeperWrapper(conf1, EmptyWatcher.instance);
+      zkw1.writeZNode("/1", "replication", "");
+      zkw1.writeZNode("/1/replication", "master",
+          conf1.get(ZOOKEEPER_QUORUM)+":" +
+          conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+      setIsReplication("true");
+
+      LOG.info("Setup first Zk");
+
+      conf2 = HBaseConfiguration.create();
+      conf2.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
+          .getName());
+      conf2.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
+          .getName());
+      conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2");
+
+      utility2 = new HBaseTestingUtility(conf2);
+      utility2.setZkCluster(miniZK);
+      zkw2 = new ZooKeeperWrapper(conf2, EmptyWatcher.instance);
+      zkw2.writeZNode("/2", "replication", "");
+      zkw2.writeZNode("/2/replication", "master",
+          conf1.get(ZOOKEEPER_QUORUM)+":" +
+          conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+
+      zkw1.writeZNode("/1/replication/peers", "test",
+          conf2.get(ZOOKEEPER_QUORUM)+":" +
+          conf2.get("hbase.zookeeper.property.clientPort")+":/2");
+
+      LOG.info("Setup second Zk");
+
+      utility1.startMiniCluster();
+      utility2.startMiniCluster();
+
+      utility1.startMiniMapReduceCluster();
+
+      HTableDescriptor table = new HTableDescriptor(tableName);
+      HColumnDescriptor fam = new HColumnDescriptor(famName);
+      fam.setScope(REPLICATION_SCOPE_GLOBAL);
+      table.addFamily(fam);
+      fam = new HColumnDescriptor(noRepfamName);
+      table.addFamily(fam);
+
+      HBaseAdmin admin1 = new HBaseAdmin(conf1);
+      HBaseAdmin admin2 = new HBaseAdmin(conf2);
+      admin1.createTable(table);
+      admin2.createTable(table);
 
+    } catch (Exception ex) { ex.printStackTrace(); throw ex; }
+  }
+
+  private static void setIsReplication(String bool) throws Exception {
+    zkw1.writeZNode("/1/replication", "state", bool);
+    // Takes some ms for ZK to fire the watcher
+    Thread.sleep(100);
   }
 
   /**
@@ -67,88 +151,29 @@
    */
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
+    utility1.shutdownMiniCluster();
+    utility2.shutdownMiniCluster();
   }
 
   /**
    * @throws java.lang.Exception
    */
   @Before
-  public void setUp() throws Exception {
-    try {
-    conf1 = HBaseConfiguration.create();
-    conf1.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
-        .getName());
-    conf1.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
-        .getName());
-    conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1");
-
-    utility1 = new HBaseTestingUtility(conf1);
-    utility1.startMiniZKCluster();
-    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
-    zkw1 = new ZooKeeperWrapper(conf1, EmptyWatcher.instance);
-    zkw1.writeZNode("/1", "replication", "");
-    zkw1.writeZNode("/1/replication", "master",
-        conf1.get(ZOOKEEPER_QUORUM)+":" +
-        conf1.get("hbase.zookeeper.property.clientPort")+":/1");
-    setIsReplication("true");
-
-
-    LOG.info("Setup first Zk");
-
-    conf2 = HBaseConfiguration.create();
-    conf2.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
-        .getName());
-    conf2.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
-        .getName());
-    conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2");
-
-    utility2 = new HBaseTestingUtility(conf2);
-    utility2.setZkCluster(miniZK);
-    zkw2 = new ZooKeeperWrapper(conf2, EmptyWatcher.instance);
-    zkw2.writeZNode("/2", "replication", "");
-    zkw2.writeZNode("/2/replication", "master",
-        conf1.get(ZOOKEEPER_QUORUM)+":" +
-        conf1.get("hbase.zookeeper.property.clientPort")+":/1");
-
-    zkw1.writeZNode("/1/replication/peers", "test",
-        conf2.get(ZOOKEEPER_QUORUM)+":" +
-        conf2.get("hbase.zookeeper.property.clientPort")+":/2");
-
-    LOG.info("Setup second Zk");
-    } catch (Exception ex) { ex.printStackTrace(); throw ex; }
-  }
+  public void setUp() throws Exception {}
 
   /**
    * @throws java.lang.Exception
    */
   @After
   public void tearDown() throws Exception {
-    utility1.shutdownMiniCluster();
-    utility2.shutdownMiniCluster();
+    setIsReplication("false");
+    utility1.truncateTable(tableName);
+    utility2.truncateTable(tableName);
+    setIsReplication("true");
   }
 
   @Test
   public void testReplication() throws Exception {
-    utility1.startMiniCluster();
-    utility2.startMiniCluster();
-
-    byte[] tableName = Bytes.toBytes("test");
-    byte[] famName = Bytes.toBytes("f");
-    byte[] noRepfamName = Bytes.toBytes("norep");
-    byte[] row = Bytes.toBytes("row");
-
-    HTableDescriptor table = new HTableDescriptor(tableName);
-    HColumnDescriptor fam = new HColumnDescriptor(famName);
-    fam.setScope(REPLICATION_SCOPE_GLOBAL);
-    table.addFamily(fam);
-    fam = new HColumnDescriptor(noRepfamName);
-    table.addFamily(fam);
-
-    HBaseAdmin admin1 = new HBaseAdmin(conf1);
-    HBaseAdmin admin2 = new HBaseAdmin(conf2);
-    admin1.createTable(table);
-    admin2.createTable(table);
-
     Put put = new Put(row);
     put.add(famName, row, row);
 
@@ -157,12 +182,12 @@
 
     HTable table2 = new HTable(conf2, tableName);
     Get get = new Get(row);
-    for(int i = 0; i < NB_RETRIES; i++) {
-      if(i==NB_RETRIES-1) {
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
         fail("Waited too much time for put replication");
       }
       Result res = table2.get(get);
-      if(res.size() == 0) {
+      if (res.size() == 0) {
         LOG.info("Row not available");
         Thread.sleep(SLEEP_TIME);
       } else {
@@ -176,12 +201,12 @@
 
     table2 = new HTable(conf2, tableName);  
     get = new Get(row);
-    for(int i = 0; i < NB_RETRIES; i++) {
-      if(i==NB_RETRIES-1) {
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
         fail("Waited too much time for del replication");
       }
       Result res = table2.get(get);
-      if(res.size() >= 1) {
+      if (res.size() >= 1) {
         LOG.info("Row not deleted");
         Thread.sleep(SLEEP_TIME);
       } else {
@@ -200,14 +225,14 @@
 
     Scan scan = new Scan();
 
-    for(int i = 0; i < NB_RETRIES; i++) {
-      if(i==NB_RETRIES-1) {
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
         fail("Waited too much time for normal batch replication");
       }
       ResultScanner scanner = table2.getScanner(scan);
       Result[] res = scanner.next(NB_ROWS_IN_BATCH);
       scanner.close();
-      if(res.length != NB_ROWS_IN_BATCH) {
+      if (res.length != NB_ROWS_IN_BATCH) {
         LOG.info("Only got " + res.length + " rows");
         Thread.sleep(SLEEP_TIME);
       } else {
@@ -220,17 +245,13 @@
     // Test stopping replication
     setIsReplication("false");
 
-    // Takes some ms for ZK to fire the watcher
-    Thread.sleep(100);
-
-
     put = new Put(Bytes.toBytes("stop start"));
     put.add(famName, row, row);
     table1.put(put);
 
     get = new Get(Bytes.toBytes("stop start"));
-    for(int i = 0; i < NB_RETRIES; i++) {
-      if(i==NB_RETRIES-1) {
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
         break;
       }
       Result res = table2.get(get);
@@ -244,15 +265,12 @@
     }
 
     // Test restart replication
-
     setIsReplication("true");
 
-    Thread.sleep(100);
-
     table1.put(put);
 
-    for(int i = 0; i < NB_RETRIES; i++) {
-      if(i==NB_RETRIES-1) {
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
         fail("Waited too much time for put replication");
       }
       Result res = table2.get(get);
@@ -270,12 +288,12 @@
     table1.put(put);
 
     get = new Get(Bytes.toBytes("do not rep"));
-    for(int i = 0; i < NB_RETRIES; i++) {
-      if(i == NB_RETRIES-1) {
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES-1) {
         break;
       }
       Result res = table2.get(get);
-      if(res.size() >= 1) {
+      if (res.size() >= 1) {
         fail("Not supposed to be replicated");
       } else {
         LOG.info("Row not replicated, let's wait a bit more...");
@@ -285,7 +303,29 @@
 
   }
 
-  private void setIsReplication(String bool) throws Exception{
-    zkw1.writeZNode("/1/replication", "state", bool);
+  @Test
+  public void testMRCopy() throws Exception {
+    setIsReplication("false");
+    HTable table1 = new HTable(conf1, tableName);
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      Put put = new Put(Bytes.toBytes(i));
+      put.add(famName, row, row);
+      table1.put(put);
+    }
+    String[] args = new String[] {
+        "--rs.class="+ReplicationRegionInterface.class.getName(),
+        "--rs.impl="+ReplicationRegionServer.class.getName(),
+        "--peer.adr="+conf2.get(ZOOKEEPER_QUORUM)+":/2",
+        "--families=f", "test"};
+    Job job = CopyTable.createSubmittableJob(conf1, args);
+    assertTrue(job.waitForCompletion(true));
+
+    HTable table2 = new HTable(conf2, tableName);
+    Scan scan = new Scan();
+    ResultScanner scanner = table2.getScanner(scan);
+    Result[] res = scanner.next(NB_ROWS_IN_BATCH);
+    scanner.close();
+    assertEquals(NB_ROWS_IN_BATCH, res.length);
+
   }
 }

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java?rev=911621&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java Thu Feb 18
22:27:40 2010
@@ -0,0 +1,207 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+
+/**
+ * Tool used to copy a table to another one which can be on a different setup.
+ * It is also configurable with a start and time as well as a specification
+ * of the region server implementation if different from the local cluster.
+ */
+public class CopyTable {
+
+  final static String NAME = "Copy Table";
+  static String rsClass = null;
+  static String rsImpl = null;
+  static long startTime = 0;
+  static long endTime = 0;
+  static String tableName = null;
+  static String newTableName = null;
+  static String peerAddress = null;
+  static String families = null;
+
+  /**
+   * Sets up the actual job.
+   *
+   * @param conf  The current configuration.
+   * @param args  The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  public static Job createSubmittableJob(Configuration conf, String[] args)
+  throws IOException {
+    if (!doCommandLine(args)) {
+      return null;
+    }
+    Cluster mrCluster = new Cluster(conf);
+    Job job = Job.getInstance(mrCluster, conf);
+    job.setJobName(NAME + "_" + tableName);
+    job.setJarByClass(CopyTable.class);
+    Scan scan = new Scan();
+    if (startTime != 0) {
+      scan.setTimeRange(startTime,
+          endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
+    }
+    if(families != null) {
+      String[] fams = families.split(",");
+      for(String fam : fams) {
+        scan.addFamily(Bytes.toBytes(fam));
+      }
+    }
+    TableMapReduceUtil.initTableMapperJob(tableName, scan,
+        Import.Importer.class, null, null, job);
+    TableMapReduceUtil.initTableReducerJob(
+        newTableName == null ? tableName : newTableName, null, job,
+        null, peerAddress, rsClass, rsImpl);
+    job.setNumReduceTasks(0);
+    return job;
+  }
+
+  /*
+   * @param errorMsg Error message.  Can be null.
+   */
+  private static void printUsage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    System.err.println("Usage: CopyTable [--rs.class=CLASS] " +
+        "[--rs.impl=IMPL] [--starttime=X] [--endtime=Y] " +
+        "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
+    System.err.println();
+    System.err.println("Options:");
+    System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
+    System.err.println("              specify if different from current cluster");
+    System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
+    System.err.println(" starttime    beginning of the time range");
+    System.err.println("              without endtime means from starttime to forever");
+    System.err.println(" endtime      end of the time range");
+    System.err.println(" new.name     new table's name");
+    System.err.println(" peer.adr     Address of the peer cluster given in the format");
+    System.err.println("              hbase.zookeeer.quorum:zookeeper.znode.parent");
+    System.err.println(" families     comma-seperated list of families to copy");
+    System.err.println();
+    System.err.println("Args:");
+    System.err.println(" tablename    Name of the table to copy");
+    System.err.println();
+    System.err.println("Examples:");
+    System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour
window:");
+    System.err.println(" $ bin/hbase " +
+        "org.apache.hadoop.hbase.mapreduce.CopyTable --rs.class=org.apache.hadoop.hbase.ipc.ReplicationRegionInterface
" +
+        "--rs.impl=org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer
--starttime=1265875194289 --endtime=1265878794289 " +
+        "--peer.adr=server1,server2,server3:/hbase TestTable ");
+  }
+
+  private static boolean doCommandLine(final String[] args) {
+    // Process command-line args. TODO: Better cmd-line processing
+    // (but hopefully something not as painful as cli options).
+    if (args.length < 1) {
+      printUsage(null);
+      return false;
+    }
+    try {
+      for (int i = 0; i < args.length; i++) {
+        String cmd = args[i];
+        if (cmd.equals("-h") || cmd.startsWith("--h")) {
+          printUsage(null);
+          return false;
+        }
+
+        final String rsClassArgKey = "--rs.class=";
+        if (cmd.startsWith(rsClassArgKey)) {
+          rsClass = cmd.substring(rsClassArgKey.length());
+          continue;
+        }
+
+        final String rsImplArgKey = "--rs.impl=";
+        if (cmd.startsWith(rsImplArgKey)) {
+          rsImpl = cmd.substring(rsImplArgKey.length());
+          continue;
+        }
+
+        final String startTimeArgKey = "--starttime=";
+        if (cmd.startsWith(startTimeArgKey)) {
+          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
+          continue;
+        }
+
+        final String endTimeArgKey = "--endtime=";
+        if (cmd.startsWith(endTimeArgKey)) {
+          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
+          continue;
+        }
+
+        final String newNameArgKey = "--new.name=";
+        if (cmd.startsWith(rsClassArgKey)) {
+          newTableName = cmd.substring(newNameArgKey.length());
+          continue;
+        }
+
+        final String peerAdrArgKey = "--peer.adr=";
+        if (cmd.startsWith(peerAdrArgKey)) {
+          peerAddress = cmd.substring(peerAdrArgKey.length());
+          continue;
+        }
+
+        final String familiesArgKey = "--families=";
+        if (cmd.startsWith(familiesArgKey)) {
+          families = cmd.substring(familiesArgKey.length());
+          continue;
+        }
+
+        if (i == args.length-1) {
+          tableName = cmd;
+        }
+      }
+      if (newTableName == null && peerAddress == null) {
+        printUsage("At least a new table name or a " +
+            "peer address must be specified");
+        return false;
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      printUsage("Can't start because " + e.getMessage());
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Main entry point.
+   *
+   * @param args  The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    Job job = createSubmittableJob(conf, args);
+    if (job != null) {
+      System.exit(job.waitForCompletion(true) ? 0 : 1);
+    }
+  }
+}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java?rev=911621&r1=911620&r2=911621&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java Thu Feb 18 22:27:40
2010
@@ -36,6 +36,8 @@
       "Count rows in HBase table");
     pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS.");
     pgd.addClass(Import.NAME, Import.class, "Import data written by Export.");
+    pgd.addClass(CopyTable.NAME, CopyTable.class,
+        "Export a table from local cluster to peer cluster");
     pgd.driver(args);
   }
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=911621&r1=911620&r2=911621&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
(original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
Thu Feb 18 22:27:40 2010
@@ -26,6 +26,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -33,6 +34,7 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Utility for {@link TableMapper} and {@link TableReducer}
@@ -112,26 +114,61 @@
   /**
    * Use this before submitting a TableReduce job. It will
    * appropriately set up the JobConf.
+   *
+   * @param table  The output table.
+   * @param reducer  The reducer class to use.
+   * @param job  The current job to adjust.
+   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
+   * default partitioner.
+   * @throws IOException When determining the region count fails.
+   */
+  public static void initTableReducerJob(String table,
+    Class<? extends TableReducer> reducer, Job job,
+    Class partitioner) throws IOException {
+    initTableReducerJob(table, reducer, job, null, null, null, null);
+  }
+
+  /**
+   * Use this before submitting a TableReduce job. It will
+   * appropriately set up the JobConf.
    * 
    * @param table  The output table.
    * @param reducer  The reducer class to use.
    * @param job  The current job to adjust.
    * @param partitioner  Partitioner to use. Pass <code>null</code> to use 
    * default partitioner.
+   * @param quorumAddress Distant cluster to write to
+   * @param serverClass redefined hbase.regionserver.class
+   * @param serverImpl redefined hbase.regionserver.impl
    * @throws IOException When determining the region count fails. 
    */
   public static void initTableReducerJob(String table,
-    Class<? extends TableReducer> reducer, Job job, Class partitioner)
-  throws IOException {
+    Class<? extends TableReducer> reducer, Job job,
+    Class partitioner, String quorumAddress, String serverClass,
+    String serverImpl) throws IOException {
+
+    Configuration conf = job.getConfiguration();
     job.setOutputFormatClass(TableOutputFormat.class);
     if (reducer != null) job.setReducerClass(reducer);
-    job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
+    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
+    if (quorumAddress != null) {
+      if (quorumAddress.split(":").length == 2) {
+        conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress);
+      } else {
+        throw new IOException("Please specify the peer cluster as " +
+            HConstants.ZOOKEEPER_QUORUM+":"+HConstants.ZOOKEEPER_ZNODE_PARENT);
+      }
+    }
+    if (serverClass != null && serverImpl != null) {
+      conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
+      conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
+    }
     job.setOutputKeyClass(ImmutableBytesWritable.class);
     job.setOutputValueClass(Writable.class);
     if (partitioner == HRegionPartitioner.class) {
-      HBaseConfiguration.addHbaseResources(job.getConfiguration());
+      HBaseConfiguration.addHbaseResources(conf);
       job.setPartitionerClass(HRegionPartitioner.class);
-      HTable outputTable = new HTable(job.getConfiguration(), table);
+      HTable outputTable = new HTable(conf, table);
       int regions = outputTable.getRegionsInfo().size();
       if (job.getNumReduceTasks() > regions) {
         job.setNumReduceTasks(outputTable.getRegionsInfo().size());

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java?rev=911621&r1=911620&r2=911621&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java Thu
Feb 18 22:27:40 2010
@@ -24,6 +24,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
@@ -33,6 +34,7 @@
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
@@ -46,6 +48,14 @@
   private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
   /** Job parameter that specifies the output table. */
   public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+  /** Optional job parameter to specify a peer cluster */
+  public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum";
+  /** Optional specification of the rs class name of the peer cluster */
+  public static final String
+      REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
+  /** Optional specification of the rs impl name of the peer cluster */
+  public static final String
+      REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
 
   /**
    * Writes the reducer output to an HBase table.
@@ -111,12 +121,25 @@
     TaskAttemptContext context) 
   throws IOException, InterruptedException {
     // expecting exactly one path
-    String tableName = context.getConfiguration().get(OUTPUT_TABLE);
+    Configuration conf = new Configuration(context.getConfiguration());
+    String tableName = conf.get(OUTPUT_TABLE);
+    String address = conf.get(QUORUM_ADDRESS);
+    String serverClass = conf.get(REGION_SERVER_CLASS);
+    String serverImpl = conf.get(REGION_SERVER_IMPL);
     HTable table = null;
     try {
-      HBaseConfiguration.addHbaseResources(context.getConfiguration());
-      table = new HTable(context.getConfiguration(), 
-        tableName);
+      HBaseConfiguration.addHbaseResources(conf);
+      if (address != null) {
+        // Check is done in TMRU
+        String[] parts = address.split(":");
+        conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
+        conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[1]);
+      }
+      if (serverClass != null) {
+        conf.set(HConstants.REGION_SERVER_CLASS, serverClass);
+        conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
+      }
+      table = new HTable(conf, tableName);
     } catch(IOException e) {
       LOG.error(e);
       throw e;



Mime
View raw message