hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject svn commit: r1424810 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/regionserver/wal/ main/java/org/apache/hadoop/hbase/replication/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/wal/ test/java/org/apache/hadoop/hbase/re...
Date Fri, 21 Dec 2012 05:59:06 GMT
Author: larsh
Date: Fri Dec 21 05:59:06 2012
New Revision: 1424810

URL: http://svn.apache.org/viewvc?rev=1424810&view=rev
Log:
HBASE-5778 Reapply, Test failures not caused by this. Sorry for the noise.

Added:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java
Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
Fri Dec 21 05:59:06 2012
@@ -143,7 +143,12 @@ public class Compressor {
       // the status byte also acts as the higher order byte of the dictionary
       // entry
       short dictIdx = toShort(status, in.readByte());
-      byte[] entry = dict.getEntry(dictIdx);
+      byte[] entry;
+      try {
+        entry = dict.getEntry(dictIdx);
+      } catch (Exception ex) {
+        throw new IOException("Unable to uncompress the log entry", ex);
+      }
       if (entry == null) {
         throw new IOException("Missing dictionary entry for index "
             + dictIdx);

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri
Dec 21 05:59:06 2012
@@ -167,6 +167,7 @@ public class HLog implements Syncable {
     Entry next(Entry reuse) throws IOException;
     void seek(long pos) throws IOException;
     long getPosition() throws IOException;
+    void reset() throws IOException;
   }
 
   public interface Writer {
@@ -695,15 +696,18 @@ public class HLog implements Syncable {
 
   /**
    * Get a reader for the WAL.
+   * The proper way to tail a log that can be under construction is to first use this method
+   * to get a reader then call {@link HLog.Reader#reset()} to see the new data. It will also
+   * take care of keeping implementation-specific context (like compression).
    * @param fs
    * @param path
    * @param conf
    * @return A WAL reader.  Close when done with it.
    * @throws IOException
    */
-  public static Reader getReader(final FileSystem fs,
-    final Path path, Configuration conf)
-  throws IOException {
+  public static Reader getReader(final FileSystem fs, final Path path,
+                                 Configuration conf)
+      throws IOException {
     try {
 
       if (logReaderClass == null) {

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
Fri Dec 21 05:59:06 2012
@@ -139,15 +139,17 @@ public class SequenceFileLogReader imple
 
   Configuration conf;
   WALReader reader;
+  FileSystem fs;
 
   // Needed logging exceptions
   Path path;
   int edit = 0;
   long entryStart = 0;
+  boolean emptyCompressionContext = true;
   /**
    * Compression context to use reading.  Can be null if no compression.
    */
-  private CompressionContext compressionContext = null;
+  protected CompressionContext compressionContext = null;
 
   protected Class<? extends HLogKey> keyClass;
 
@@ -173,6 +175,7 @@ public class SequenceFileLogReader imple
     this.conf = conf;
     this.path = path;
     reader = new WALReader(fs, path, conf);
+    this.fs = fs;
 
     // If compression is enabled, new dictionaries are created here.
     boolean compression = reader.isWALCompressionEnabled();
@@ -237,11 +240,22 @@ public class SequenceFileLogReader imple
       throw addFileInfoToException(ioe);
     }
     edit++;
+    if (compressionContext != null && emptyCompressionContext) {
+      emptyCompressionContext = false;
+    }
     return b? e: null;
   }
 
   @Override
   public void seek(long pos) throws IOException {
+    if (compressionContext != null && emptyCompressionContext) {
+      while (next() != null) {
+        if (getPosition() == pos) {
+          emptyCompressionContext = false;
+          break;
+        }
+      }
+    }
     try {
       reader.seek(pos);
     } catch (IOException ioe) {
@@ -286,4 +300,11 @@ public class SequenceFileLogReader imple
 
     return ioe;
   }
+
+  @Override
+  public void reset() throws IOException {
+    // Resetting the reader lets us see newly added data if the file is being written to
+    // We also keep the same compressionContext which was previously populated for this file
+    reader = new WALReader(fs, path, conf);
+  }
 }
\ No newline at end of file

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java?rev=1424810&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
(added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
Fri Dec 21 05:59:06 2012
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+import java.io.IOException;
+
+/**
+ * Wrapper class around HLog to help manage the implementation details
+ * such as compression.
+ */
+@InterfaceAudience.Private
+public class ReplicationHLogReaderManager {
+
+  private static final Log LOG = LogFactory.getLog(ReplicationHLogReaderManager.class);
+  private final FileSystem fs;
+  private final Configuration conf;
+  private long position = 0;
+  private HLog.Reader reader;
+  private Path lastPath;
+
+  /**
+   * Creates the helper but doesn't open any file
+   * Use setInitialPosition after using the constructor if some content needs to be skipped
+   * @param fs
+   * @param conf
+   */
+  public ReplicationHLogReaderManager(FileSystem fs, Configuration conf) {
+    this.fs = fs;
+    this.conf = conf;
+  }
+
+  /**
+   * Opens the file at the current position
+   * @param path
+   * @return
+   * @throws IOException
+   */
+  public HLog.Reader openReader(Path path) throws IOException {
+    // Detect if this is a new file, if so get a new reader else
+    // reset the current reader so that we see the new data
+    if (this.reader == null || !this.lastPath.equals(path)) {
+      this.reader = HLog.getReader(this.fs, path, this.conf);
+      this.lastPath = path;
+    } else {
+      this.reader.reset();
+    }
+    return this.reader;
+  }
+
+  /**
+   * Get the next entry, returned and also added in the array
+   * @param entriesArray
+   * @param currentNbEntries
+   * @return a new entry or null
+   * @throws IOException
+   */
+  public HLog.Entry readNextAndSetPosition(HLog.Entry[] entriesArray,
+                                           int currentNbEntries) throws IOException {
+    HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
+    // Store the position so that in the future the reader can start
+    // reading from here. If the above call to next() throws an
+    // exception, the position won't be changed and retry will happen
+    // from the last known good position
+    this.position = this.reader.getPosition();
+    // We need to set the CC to null else it will be compressed when sent to the sink
+    if (entry != null) {
+      entry.setCompressionContext(null);
+    }
+    return entry;
+  }
+
+  /**
+   * Advance the reader to the current position
+   * @throws IOException
+   */
+  public void seek() throws IOException {
+    if (this.position != 0) {
+      this.reader.seek(this.position);
+    }
+  }
+
+  /**
+   * Get the position that we stopped reading at
+   * @return current position, cannot be negative
+   */
+  public long getPosition() {
+    return this.position;
+  }
+
+  public void setPosition(long pos) {
+    this.position = pos;
+  }
+
+  /**
+   * Close the current reader
+   * @throws IOException
+   */
+  public void closeReader() throws IOException {
+    if (this.reader != null) {
+      this.reader.close();
+    }
+  }
+
+  /**
+   * Tell the helper to reset internal state
+   */
+  public void finishCurrentFile() {
+    this.position = 0;
+    this.reader = null;
+  }
+
+}

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Fri Dec 21 05:59:06 2012
@@ -105,8 +105,6 @@ public class ReplicationSource extends T
   private int replicationQueueNbCapacity;
   // Our reader for the current log
   private HLog.Reader reader;
-  // Current position in the log
-  private long position = 0;
   // Last position in the log that we sent to ZooKeeper
   private long lastLoggedPosition = -1;
   // Path of the current log
@@ -132,10 +130,15 @@ public class ReplicationSource extends T
   private int currentNbEntries = 0;
   // Current number of operations (Put/Delete) that we need to replicate
   private int currentNbOperations = 0;
+  // Current size of data we need to replicate
+  private int currentSize = 0;
   // Indicates if this particular source is running
   private volatile boolean running = true;
   // Metrics for this source
   private ReplicationSourceMetrics metrics;
+  // Handle on the log reader helper
+  private ReplicationHLogReaderManager repLogReader;
+
 
   /**
    * Instantiation method used by region servers
@@ -183,7 +186,7 @@ public class ReplicationSource extends T
         this.conf.getLong("replication.source.sleepforretries", 1000);
     this.fs = fs;
     this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
-
+    this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
     try {
       this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());
     } catch (KeeperException ke) {
@@ -263,8 +266,8 @@ public class ReplicationSource extends T
     // normally has a position (unless the RS failed between 2 logs)
     if (this.queueRecovered) {
       try {
-        this.position = this.zkHelper.getHLogRepPosition(
-            this.peerClusterZnode, this.queue.peek().getName());
+        this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition(
+            this.peerClusterZnode, this.queue.peek().getName()));
       } catch (KeeperException e) {
         this.terminate("Couldn't get the position of this recovered queue " +
             peerClusterZnode, e);
@@ -322,6 +325,7 @@ public class ReplicationSource extends T
 
       boolean gotIOE = false;
       currentNbEntries = 0;
+      currentSize = 0;
       try {
         if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) {
           continue;
@@ -357,9 +361,7 @@ public class ReplicationSource extends T
         }
       } finally {
         try {
-          if (this.reader != null) {
-            this.reader.close();
-          }
+          this.repLogReader.closeReader();
         } catch (IOException e) {
           gotIOE = true;
           LOG.warn("Unable to finalize the tailing of a file", e);
@@ -370,10 +372,10 @@ public class ReplicationSource extends T
       // wait a bit and retry.
       // But if we need to stop, don't bother sleeping
       if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
-        if (this.lastLoggedPosition != this.position) {
+        if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
           this.manager.logPositionAndCleanOldLogs(this.currentPath,
-              this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
-          this.lastLoggedPosition = this.position;
+              this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered, currentWALisBeingWrittenTo);
+          this.lastLoggedPosition = this.repLogReader.getPosition();
         }
         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
           sleepMultiplier++;
@@ -405,11 +407,9 @@ public class ReplicationSource extends T
   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
       throws IOException{
     long seenEntries = 0;
-    if (this.position != 0) {
-      this.reader.seek(this.position);
-    }
-    long startPosition = this.position;
-    HLog.Entry entry = readNextAndSetPosition();
+    this.repLogReader.seek();
+    HLog.Entry entry =
+        this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
     while (entry != null) {
       WALEdit edit = entry.getEdit();
       this.metrics.logEditsReadRate.inc(1);
@@ -433,18 +433,18 @@ public class ReplicationSource extends T
           }
           currentNbOperations += countDistinctRowKeys(edit);
           currentNbEntries++;
+          currentSize += entry.getEdit().heapSize();
         } else {
           this.metrics.logEditsFilteredRate.inc(1);
         }
       }
       // Stop if too many entries or too big
-      if ((this.reader.getPosition() - startPosition)
-          >= this.replicationQueueSizeCapacity ||
+      if (currentSize >= this.replicationQueueSizeCapacity ||
           currentNbEntries >= this.replicationQueueNbCapacity) {
         break;
       }
       try {
-        entry = readNextAndSetPosition();
+        entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
       } catch (IOException ie) {
         LOG.debug("Break on IOE: " + ie.getMessage());
         break;
@@ -452,7 +452,7 @@ public class ReplicationSource extends T
     }
     LOG.debug("currentNbOperations:" + currentNbOperations +
         " and seenEntries:" + seenEntries +
-        " and size: " + (this.reader.getPosition() - startPosition));
+        " and size: " + this.currentSize);
     if (currentWALisBeingWrittenTo) {
       return false;
     }
@@ -461,16 +461,6 @@ public class ReplicationSource extends T
     return seenEntries == 0 && processEndOfFile();
   }
 
-  private HLog.Entry readNextAndSetPosition() throws IOException {
-    HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
-    // Store the position so that in the future the reader can start
-    // reading from here. If the above call to next() throws an
-    // exception, the position won't be changed and retry will happen
-    // from the last known good position
-    this.position = this.reader.getPosition();
-    return entry;
-  } 
-
   private void connectToPeers() {
     // Connect to peer cluster first, unless we have to stop
     while (this.isActive() && this.currentPeers.size() == 0) {
@@ -509,10 +499,9 @@ public class ReplicationSource extends T
   protected boolean openReader(int sleepMultiplier) {
     try {
       LOG.debug("Opening log for replication " + this.currentPath.getName() +
-          " at " + this.position);
+          " at " + this.repLogReader.getPosition());
       try {
-       this.reader = null;
-       this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
+        this.reader = repLogReader.openReader(this.currentPath);
       } catch (FileNotFoundException fnfe) {
         if (this.queueRecovered) {
           // We didn't find the log in the archive directory, look if it still
@@ -648,10 +637,10 @@ public class ReplicationSource extends T
         HRegionInterface rrs = getRS();
         LOG.debug("Replicating " + currentNbEntries);
         rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
-        if (this.lastLoggedPosition != this.position) {
+        if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
           this.manager.logPositionAndCleanOldLogs(this.currentPath,
-              this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
-          this.lastLoggedPosition = this.position;
+              this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered, currentWALisBeingWrittenTo);
+          this.lastLoggedPosition = this.repLogReader.getPosition();
         }
         this.totalReplicatedEdits += currentNbEntries;
         this.metrics.shippedBatchesRate.inc(1);
@@ -721,7 +710,8 @@ public class ReplicationSource extends T
   protected boolean processEndOfFile() {
     if (this.queue.size() != 0) {
       this.currentPath = null;
-      this.position = 0;
+      this.repLogReader.finishCurrentFile();
+      this.reader = null;
       return true;
     } else if (this.queueRecovered) {
       this.manager.closeRecoveredQueue(this);

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
(original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
Fri Dec 21 05:59:06 2012
@@ -49,6 +49,9 @@ public class FaultySequenceFileLogReader
         HLogKey key = HLog.newKey(conf);
         WALEdit val = new WALEdit();
         HLog.Entry e = new HLog.Entry(key, val);
+        if (compressionContext != null) {
+          e.setCompressionContext(compressionContext);
+        }
         b = this.reader.next(e.getKey(), e.getEdit());
         nextQueue.offer(e);
         numberOfFileEntries++;

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
(original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
Fri Dec 21 05:59:06 2012
@@ -475,15 +475,13 @@ public class TestHLog  {
       throw t.exception;
 
     // Make sure you can read all the content
-    SequenceFile.Reader reader
-      = new SequenceFile.Reader(this.fs, walPath, this.conf);
+    HLog.Reader reader = HLog.getReader(this.fs, walPath, this.conf);
     int count = 0;
-    HLogKey key = HLog.newKey(conf);
-    WALEdit val = new WALEdit();
-    while (reader.next(key, val)) {
+    HLog.Entry entry = new HLog.Entry();
+    while (reader.next(entry) != null) {
       count++;
       assertTrue("Should be one KeyValue per WALEdit",
-                 val.getKeyValues().size() == 1);
+                 entry.getEdit().getKeyValues().size() == 1);
     }
     assertEquals(total, count);
     reader.close();

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
(original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
Fri Dec 21 05:59:06 2012
@@ -86,7 +86,7 @@ public class TestHLogSplit {
   private Configuration conf;
   private FileSystem fs;
 
-  private final static HBaseTestingUtility
+  protected final static HBaseTestingUtility
           TEST_UTIL = new HBaseTestingUtility();
 
 

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java?rev=1424810&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java
(added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java
Fri Dec 21 05:59:06 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestHLogSplitCompressed extends TestHLogSplit {
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TestHLogSplit.setUpBeforeClass();
+    TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+  }
+}

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
(original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
Fri Dec 21 05:59:06 2012
@@ -61,7 +61,7 @@ public class TestReplication {
 
   private static final Log LOG = LogFactory.getLog(TestReplication.class);
 
-  private static Configuration conf1;
+  protected static Configuration conf1 = HBaseConfiguration.create();
   private static Configuration conf2;
   private static Configuration CONF_WITH_LOCALFS;
 
@@ -91,7 +91,6 @@ public class TestReplication {
    */
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    conf1 = HBaseConfiguration.create();
     conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
     // smaller block size and capacity to trigger more operations
     // and test them
@@ -520,7 +519,7 @@ public class TestReplication {
 
     // disable and start the peer
     admin.disablePeer("2");
-    utility2.startMiniHBaseCluster(1, 1);
+    utility2.startMiniHBaseCluster(1, 2);
     Get get = new Get(rowkey);
     for (int i = 0; i < NB_RETRIES; i++) {
       Result res = htable2.get(get);
@@ -760,7 +759,8 @@ public class TestReplication {
     int lastCount = 0;
 
     final long start = System.currentTimeMillis();
-    for (int i = 0; i < NB_RETRIES; i++) {
+    int i = 0;
+    while (true) {
       if (i==NB_RETRIES-1) {
         fail("Waited too much time for queueFailover replication. " +
           "Waited "+(System.currentTimeMillis() - start)+"ms.");
@@ -772,6 +772,8 @@ public class TestReplication {
       if (res2.length < initialCount) {
         if (lastCount < res2.length) {
           i--; // Don't increment timeout if we make progress
+        } else {
+          i++;
         }
         lastCount = res2.length;
         LOG.info("Only got " + lastCount + " rows instead of " +
@@ -791,7 +793,7 @@ public class TestReplication {
           Thread.sleep(timeout);
           utility.expireRegionServerSession(rs);
         } catch (Exception e) {
-          LOG.error(e);
+          LOG.error("Couldn't kill a region server", e);
         }
       }
     };

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java?rev=1424810&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java
(added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java
Fri Dec 21 05:59:06 2012
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Run the same test as TestReplication but with HLog compression enabled
+ */
+@Category(LargeTests.class)
+public class TestReplicationWithCompression extends TestReplication {
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+    TestReplication.setUpBeforeClass();
+  }
+}



Mime
View raw message