hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r686663 - in /hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional: DisabledTestHLogRecovery.java TestHLogRecovery.java
Date Mon, 18 Aug 2008 00:16:25 GMT
Author: stack
Date: Sun Aug 17 17:16:25 2008
New Revision: 686663

URL: http://svn.apache.org/viewvc?rev=686663&view=rev
Log:
HBASE-669  MultiRegion transactions with Optimistic Concurrency Control; disable test that
doesn't pass yet

Added:
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestHLogRecovery.java
Removed:
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestHLogRecovery.java

Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestHLogRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestHLogRecovery.java?rev=686663&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestHLogRecovery.java
(added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestHLogRecovery.java
Sun Aug 17 17:16:25 2008
@@ -0,0 +1,285 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClusterTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
+import org.apache.hadoop.hbase.client.transactional.TransactionManager;
+import org.apache.hadoop.hbase.client.transactional.TransactionState;
+import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class DisabledTestHLogRecovery extends HBaseClusterTestCase {
+  private static final Log LOG = LogFactory.getLog(DisabledTestHLogRecovery.class);
+
+  private static final String TABLE_NAME = "table1";
+
+  private static final byte[] FAMILY = Bytes.toBytes("family:");
+  private static final byte[] COL_A = Bytes.toBytes("family:a");
+
+  private static final byte[] ROW1 = Bytes.toBytes("row1");
+  private static final byte[] ROW2 = Bytes.toBytes("row2");
+  private static final byte[] ROW3 = Bytes.toBytes("row3");
+  private static final int TOTAL_VALUE = 10;
+
+  private HBaseAdmin admin;
+  private TransactionManager transactionManager;
+  private TransactionalTable table;
+
+  /** constructor */
+  public DisabledTestHLogRecovery() {
+    super(2, false);
+
+    conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
+        .getName());
+    conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
+        .getName());
+
+    // Set flush params so we don't get any
+    // FIXME (defaults are probably fine)
+
+    // Copied from TestRegionServerExit
+    conf.setInt("ipc.client.connect.max.retries", 5); // reduce ipc retries
+    conf.setInt("ipc.client.timeout", 10000); // and ipc timeout
+    conf.setInt("hbase.client.pause", 10000); // increase client timeout
+    conf.setInt("hbase.client.retries.number", 10); // increase HBase retries
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    FileSystem.getLocal(conf).delete(new Path(conf.get(HConstants.HBASE_DIR)), true);
+    super.setUp();
+
+    HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
+    desc.addFamily(new HColumnDescriptor(FAMILY));
+    admin = new HBaseAdmin(conf);
+    admin.createTable(desc);
+    table = new TransactionalTable(conf, desc.getName());
+
+    transactionManager = new TransactionManager(conf);
+    writeInitalRows();
+  }
+
+  private void writeInitalRows() throws IOException {
+    BatchUpdate update = new BatchUpdate(ROW1);
+    update.put(COL_A, Bytes.toBytes(TOTAL_VALUE));
+    table.commit(update);
+    update = new BatchUpdate(ROW2);
+    update.put(COL_A, Bytes.toBytes(0));
+    table.commit(update);
+    update = new BatchUpdate(ROW3);
+    update.put(COL_A, Bytes.toBytes(0));
+    table.commit(update);
+  }
+
+  public void testWithoutFlush() throws IOException,
+      CommitUnsuccessfulException {
+    writeInitalRows();
+    TransactionState state1 = makeTransaction(false);
+    transactionManager.tryCommit(state1);
+    stopOrAbortRegionServer(true);
+
+    Thread t = startVerificationThread(1);
+    t.start();
+    threadDumpingJoin(t);
+  }
+
+  public void testWithFlushBeforeCommit() throws IOException,
+      CommitUnsuccessfulException {
+    writeInitalRows();
+    TransactionState state1 = makeTransaction(false);
+    flushRegionServer();
+    transactionManager.tryCommit(state1);
+    stopOrAbortRegionServer(true);
+
+    Thread t = startVerificationThread(1);
+    t.start();
+    threadDumpingJoin(t);
+  }
+
+  // FIXME, TODO
+  // public void testWithFlushBetweenTransactionWrites() {
+  // fail();
+  // }
+
+  private void flushRegionServer() {
+    List<LocalHBaseCluster.RegionServerThread> regionThreads = cluster
+        .getRegionThreads();
+
+    HRegion region = null;
+    int server = -1;
+    for (int i = 0; i < regionThreads.size() && server == -1; i++) {
+      HRegionServer s = regionThreads.get(i).getRegionServer();
+      Collection<HRegion> regions = s.getOnlineRegions();
+      for (HRegion r : regions) {
+        if (Bytes.equals(r.getTableDesc().getName(), Bytes.toBytes(TABLE_NAME))) {
+          server = i;
+          region = r;
+        }
+      }
+    }
+    if (server == -1) {
+      LOG.fatal("could not find region server serving table region");
+      fail();
+    }
+    ((TransactionalRegionServer) regionThreads.get(server).getRegionServer())
+        .getFlushRequester().request(region);
+  }
+
+  /**
+   * Stop the region server serving TABLE_NAME.
+   * 
+   * @param abort set to true if region server should be aborted, if false it is
+   * just shut down.
+   */
+  private void stopOrAbortRegionServer(final boolean abort) {
+    List<LocalHBaseCluster.RegionServerThread> regionThreads = cluster
+        .getRegionThreads();
+
+    int server = -1;
+    for (int i = 0; i < regionThreads.size(); i++) {
+      HRegionServer s = regionThreads.get(i).getRegionServer();
+      Collection<HRegion> regions = s.getOnlineRegions();
+      LOG.info("server: " + regionThreads.get(i).getName());
+      for (HRegion r : regions) {
+        LOG.info("region: " + r.getRegionInfo().getRegionNameAsString());
+        if (Bytes.equals(r.getTableDesc().getName(), Bytes.toBytes(TABLE_NAME))) {
+          server = i;
+        }
+      }
+    }
+    if (server == -1) {
+      LOG.fatal("could not find region server serving table region");
+      fail();
+    }
+    if (abort) {
+      this.cluster.abortRegionServer(server);
+
+    } else {
+      this.cluster.stopRegionServer(server);
+    }
+    LOG.info(this.cluster.waitOnRegionServer(server) + " has been "
+        + (abort ? "aborted" : "shut down"));
+  }
+
+  private void verify(final int numRuns) throws IOException {
+    // Reads
+    int row1 = Bytes.toInt(table.get(ROW1, COL_A).getValue());
+    int row2 = Bytes.toInt(table.get(ROW2, COL_A).getValue());
+    int row3 = Bytes.toInt(table.get(ROW3, COL_A).getValue());
+
+    assertEquals(TOTAL_VALUE - 2 * numRuns, row1);
+    assertEquals(numRuns, row2);
+    assertEquals(numRuns, row3);
+  }
+
+  // Move 2 out of ROW1 and 1 into ROW2 and 1 into ROW3
+  private TransactionState makeTransaction(final boolean flushMidWay)
+      throws IOException {
+    TransactionState transactionState = transactionManager.beginTransaction();
+
+    // Reads
+    int row1 = Bytes.toInt(table.get(transactionState, ROW1, COL_A).getValue());
+    int row2 = Bytes.toInt(table.get(transactionState, ROW2, COL_A).getValue());
+    int row3 = Bytes.toInt(table.get(transactionState, ROW3, COL_A).getValue());
+
+    row1 -= 2;
+    row2 += 1;
+    row3 += 1;
+
+    if (flushMidWay) {
+      flushRegionServer();
+    }
+
+    // Writes
+    BatchUpdate write = new BatchUpdate(ROW1);
+    write.put(COL_A, Bytes.toBytes(row1));
+    table.commit(transactionState, write);
+
+    write = new BatchUpdate(ROW2);
+    write.put(COL_A, Bytes.toBytes(row2));
+    table.commit(transactionState, write);
+
+    write = new BatchUpdate(ROW3);
+    write.put(COL_A, Bytes.toBytes(row3));
+    table.commit(transactionState, write);
+
+    return transactionState;
+  }
+
+  /*
+   * Run verification in a thread so I can concurrently run a thread-dumper
+   * while we're waiting (because in this test sometimes the meta scanner looks
+   * to be be stuck). @param tableName Name of table to find. @param row Row we
+   * expect to find. @return Verification thread. Caller needs to calls start on
+   * it.
+   */
+  private Thread startVerificationThread(final int numRuns) {
+    Runnable runnable = new Runnable() {
+      public void run() {
+        try {
+          // Now try to open a scanner on the meta table. Should stall until
+          // meta server comes back up.
+          HTable t = new HTable(conf, TABLE_NAME);
+          Scanner s = t.getScanner(new byte[][] { COL_A },
+              HConstants.EMPTY_START_ROW);
+          s.close();
+
+        } catch (IOException e) {
+          LOG.fatal("could not re-open meta table because", e);
+          fail();
+        }
+        Scanner scanner = null;
+        try {
+          verify(numRuns);
+          LOG.info("Success!");
+        } catch (Exception e) {
+          e.printStackTrace();
+          fail();
+        } finally {
+          if (scanner != null) {
+            LOG.info("Closing scanner " + scanner);
+            scanner.close();
+          }
+        }
+      }
+    };
+    return new Thread(runnable);
+  }
+}



Mime
View raw message