incubator-bigtop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bm...@apache.org
Subject svn commit: r1210649 - in /incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system: Putter.java Scanner.java TestConcurrentScanAndPut.java
Date Mon, 05 Dec 2011 21:32:44 GMT
Author: bmahe
Date: Mon Dec  5 21:32:43 2011
New Revision: 1210649

URL: http://svn.apache.org/viewvc?rev=1210649&view=rev
Log:
BIGTOP-287. Integrating test for HBASE-4570 (contributed by Stephen Chu)

Added:
    incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Putter.java
    incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Scanner.java
    incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/TestConcurrentScanAndPut.java

Added: incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Putter.java
URL: http://svn.apache.org/viewvc/incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Putter.java?rev=1210649&view=auto
==============================================================================
--- incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Putter.java
(added)
+++ incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Putter.java
Mon Dec  5 21:32:43 2011
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.itest.hbase.system;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This program scans a table for rows with a specified column
+ * ("f1:qual") value, and updates the column with the same value.
+ */
+public class Putter {
+  public static Put convert(Result result, int versions) {
+    Put put = null;
+    if (result != null) {
+      NavigableMap<byte[], NavigableMap<byte[], byte[]>> cfmap =
+        result.getNoVersionMap();
+
+      if (result.getRow() != null && cfmap != null) {
+        put = new Put(result.getRow());
+        for (byte[] family : cfmap.keySet()) {
+          NavigableMap<byte[], byte[]> qualifierMap = cfmap.get(family);
+
+          if (qualifierMap != null) {
+            for (Map.Entry<byte[], byte[]> e : qualifierMap.entrySet()) {
+              byte[] qual = e.getKey();
+              byte[] value = e.getValue();
+
+              if (value != null && value.length > 0) {
+                put.add(family, qual, value);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    return put;
+  }
+
+  public static int doScanAndPut(HTable table, int val) throws IOException {
+    return doScanAndPut(table, val, true);
+  }
+
+  public static int doScanAndPut(HTable table, int val, boolean autoflush)
+    throws IOException {
+    Scan s = new Scan();
+    byte[] start = {};
+    byte[] stop = {};
+    byte[] value = Bytes.toBytes(String.format("%010d", val));
+    s.setStartRow(start);
+    s.setStopRow(stop);
+    SingleColumnValueFilter filter = new SingleColumnValueFilter(
+      Bytes.toBytes("f1"), Bytes.toBytes("qual"), CompareOp.EQUAL, value);
+    s.setFilter(filter);
+
+    table.setAutoFlush(autoflush);
+    ResultScanner sc = table.getScanner(s);
+    int cnt = 0;
+    for (Result r : sc) {
+      Put p = convert(r, 0);
+      table.put(p);
+      cnt++;
+    }
+    return cnt;
+  }
+
+  public static void main(String argv[]) throws IOException {
+    if (argv.length < 2) {
+      System.err.println("usage: " + Putter.class.getSimpleName() +
+                         " <table> <value>");
+      System.err.println(" <value>: a numeric value [0,500)");
+      System.exit(1);
+    }
+
+    boolean autoflush = true;
+    int loops = 1;
+    for (int i = 1; i < argv.length; i++) {
+      if (argv[i].equals("-f")) {
+        autoflush = false;
+      } else if (argv[i].equals("-l")) {
+        i++;
+        loops = Integer.parseInt(argv[i]);
+      }
+    }
+
+    String tableName = argv[0];
+    int val = Integer.parseInt(argv[1]);
+    HTable table = new HTable(tableName);
+    for (int i = 0; i < loops; i++) {
+      try {
+        doScanAndPut(table, val, autoflush);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+}

Added: incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Scanner.java
URL: http://svn.apache.org/viewvc/incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Scanner.java?rev=1210649&view=auto
==============================================================================
--- incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Scanner.java
(added)
+++ incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Scanner.java
Mon Dec  5 21:32:43 2011
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.itest.hbase.system;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * This program scans a table a configurable number of times. Uses 
+ * the table record reader.
+ */
+public class Scanner {
+  public static final Log LOG = LogFactory.getLog(Scanner.class);
+
+  public static int doScan(HTable table, int val) throws IOException,
+    InterruptedException {
+    Scan s = new Scan();
+    byte[] start = {};
+    byte[] stop = {};
+    byte[] value = Bytes.toBytes(String.format("%010d", val));
+    s.setStartRow(start);
+    s.setStopRow(stop);
+    SingleColumnValueFilter filter = new SingleColumnValueFilter(
+      Bytes.toBytes("f1"), Bytes.toBytes("qual"), CompareOp.EQUAL, value);
+    s.setFilter(filter);
+
+    // Keep track of gathered elements.
+    Multimap<String, String> mm = ArrayListMultimap.create();
+
+    // Counts
+    int cnt = 0;
+    long i = 0;
+    ResultScanner rs = table.getScanner(s);
+    for (Result r : rs) {
+      if (r.getRow() == null) {
+        continue;
+      }
+
+      NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long,
+        byte[]>>> columnFamilyMap = r.getMap();
+
+      // Output time to show if flush related.
+      String k = Bytes.toStringBinary(r.getRow());
+      if (mm.get(k).size() >= 1) {
+        System.out.println("Duplicate rowkey " + k);
+        LOG.error("Duplicate rowkey " + k);
+      }
+
+      mm.put(Bytes.toStringBinary(r.getRow()), i + ": " + r);
+      cnt++;
+      i++;
+    }
+
+    System.out.println("scan items counted: " + cnt + " for scan " +
+      s.toString() + " with filter f1:qual == " + Bytes.toString(value));
+
+    // Print out dupes.
+    int dupes = 0;
+    for (Entry<String, Collection<String>> e : mm.asMap().entrySet()) {
+      if (e.getValue().size() > 1) {
+        dupes++;
+        System.out.print("Row " + e.getKey() + " had time stamps: ");
+        String[] tss = e.getValue().toArray(new String[0]);
+        System.out.println(Arrays.toString(tss));
+      }
+    }
+
+    return dupes;
+  }
+
+  public static void main(String argv[]) throws IOException {
+    if (argv.length < 2) {
+      System.err.println("usage: " + Scanner.class.getSimpleName() +
+        " <table> <value>");
+      System.err.println(" <value>: a numeric value [0,500)");
+      System.exit(1);
+    }
+
+    String tableName = argv[0];
+    int val = Integer.parseInt(argv[1]);
+    int loops = 1;
+    for (int i = 1; i < argv.length; i++) {
+      if (argv[i].equals("-l")) {
+        i++;
+        loops = Integer.parseInt(argv[i]);
+      }
+    }
+
+    HTable table = new HTable(tableName);
+    int exitVal = 0;
+    for (int i = 0; i < loops; i++) {
+      try {
+        exitVal = doScan(table, val);
+      } catch (IOException e) {
+        e.printStackTrace();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+
+      if (exitVal != 0) {
+        break;
+      }
+    }
+    System.exit(exitVal);
+  }
+}

Added: incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/TestConcurrentScanAndPut.java
URL: http://svn.apache.org/viewvc/incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/TestConcurrentScanAndPut.java?rev=1210649&view=auto
==============================================================================
--- incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/TestConcurrentScanAndPut.java
(added)
+++ incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/TestConcurrentScanAndPut.java
Mon Dec  5 21:32:43 2011
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.itest.hbase.system;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.bigtop.itest.JarContent;
+import org.apache.bigtop.itest.shell.Shell;
+
+import org.apache.bigtop.itest.hbase.util.HBaseTestUtil;
+
+/**
+ * This program tests concurrent scans and writes. In HBASE-4570,
+ * when scanning a table during concurrent writes, rows that have
+ * multiple column families sometimes get split into two rows.
+ */
+public class TestConcurrentScanAndPut {
+  public static Shell scanSh = new Shell( "/bin/bash -s" );
+  public static Shell putSh = new Shell( "/bin/bash -s" );
+
+  public static HBaseAdmin admin;
+  public static String tableName;
+  public static String putter_pid;
+
+  public static int scannerLoops;
+  public static int putterLoops;
+
+  @BeforeClass
+  public static void setUp() throws ClassNotFoundException,
+                                    InterruptedException, IOException {
+    System.out.println("Unpacking resources");
+    JarContent.unpackJarContainer(Scanner.class, "." , null);
+    JarContent.unpackJarContainer(Putter.class, "." , null);
+
+    Configuration conf = HBaseConfiguration.create();
+    try {
+      HBaseAdmin.checkHBaseAvailable(conf);
+    } catch (Exception e) {
+      System.err.println("Hbase is not up. Bailing out.");
+      System.exit(1);
+    }
+    
+    tableName =
+      new String(HBaseTestUtil.getTestTableName("concurrentScanAndPut"));
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    for (int i = 0; i < 10; i++) {
+      htd.addFamily(new HColumnDescriptor("f" + i));
+    }
+    admin = new HBaseAdmin(conf);
+    admin.createTable(htd);
+
+    HTable table = new HTable(tableName);
+    ArrayList<Put> puts = new ArrayList<Put>(1000);
+
+    Random rnd = new Random();
+    int size = 25000;
+    int batch = 2000;
+
+    System.out.println("Creating table with 10 column families and 25k rows");
+    for (int i = 0; i < size; i++) {
+      String r = String.format("row%010d", i);
+      Put p = new Put(Bytes.toBytes(r));
+      for (int j = 0; j < 10; j++) {
+        String value = String.format("%010d", rnd.nextInt(500));
+        p.add(Bytes.toBytes("f" + j),
+              Bytes.toBytes("qual"),
+              Bytes.toBytes(value));
+        String bigvalue = String.format("%0100d%0100d%0100d%0100d%0100d" +
+                                        "%0100d%0100d%0100d%0100d%0100d",
+                                        i, i, i, i, i, i, i, i, i, i);
+        p.add(Bytes.toBytes("f" + j),
+              Bytes.toBytes("data"),
+              Bytes.toBytes(bigvalue));
+      }
+      puts.add(p);
+      if (i % batch == (batch - 1)) {
+        table.put(puts);
+        puts.clear();
+        System.out.println("put " + i);
+      }
+    }
+    table.put(puts);
+    table.flushCommits();
+    table.close();
+
+    try {
+      scannerLoops = Integer.parseInt(System.getProperty(
+                                      "concurrentScanAndPut.scanner.loops"));
+    } catch (NumberFormatException e) {
+      scannerLoops = 100;
+    }
+
+    try {
+      putterLoops = Integer.parseInt(System.getProperty(
+                                     "concurrentScanAndPut.putter.loops"));
+    } catch (NumberFormatException e) {
+      putterLoops = 100;
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    System.out.println("Killing putter process");
+    putSh.exec("kill -9 " + putter_pid);
+
+    System.out.println("Removing test table " + tableName);
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
+  }
+
+  @Test
+  public void testConcurrentScanAndPut() {
+    System.out.println("Starting puts to test table " + tableName);
+    putSh.exec("(HBASE_CLASSPATH=. " +
+               "hbase com.cloudera.itest.hbase.system.Putter " +
+               tableName + " 13 -l " + putterLoops +
+               " > /dev/null 2>&1 & echo $! ) 2> /dev/null");
+    putter_pid = putSh.getOut().get(0);
+
+    System.out.println("Starting concurrent scans of test table " +
+                       tableName);
+    scanSh.exec("HBASE_CLASSPATH=. hbase " +
+                "com.cloudera.itest.hbase.system.Scanner " +
+                tableName + " 13 -l " + scannerLoops + " 2>/dev/null");
+
+    int splitRows = scanSh.getRet();
+    System.out.println("Split rows: " + splitRows);
+    assertTrue("Rows were split when scanning table with concurrent writes",
+               splitRows == 0);
+  }
+}



Mime
View raw message