incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Fixed BLUR-92
Date Sun, 19 May 2013 21:31:08 GMT
Fixed BLUR-92


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/7e31cd28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/7e31cd28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/7e31cd28

Branch: refs/heads/0.1.5
Commit: 7e31cd28363e19bcb521bccc4f1fd6931b388518
Parents: 5fd0a0f
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun May 19 17:30:35 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun May 19 17:30:35 2013 -0400

----------------------------------------------------------------------
 .../blur/manager/writer/TransactionRecorder.java   |  121 ++++++++++-----
 .../manager/writer/TransactionRecorderTest.java    |  106 +++++++------
 .../org/apache/blur/thrift/BlurClusterTest.java    |    4 +
 3 files changed, 139 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7e31cd28/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
index 68c565f..fbd8da6 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
@@ -24,6 +24,9 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -54,7 +57,7 @@ import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
 
-public class TransactionRecorder {
+public class TransactionRecorder extends TimerTask {
 
   enum TYPE {
     DELETE((byte) 0), ROW((byte) 1);
@@ -92,40 +95,44 @@ public class TransactionRecorder {
     ID_TYPE.freeze();
   }
 
-  private final AtomicBoolean running = new AtomicBoolean(true);
-  private final AtomicReference<FSDataOutputStream> outputStream = new AtomicReference<FSDataOutputStream>();
-  private final long timeBetweenSyncsNanos;
-  private final AtomicLong lastSync = new AtomicLong();
-  
-  private final Path walPath;
-  private final Configuration configuration;
-  private final BlurAnalyzer analyzer;
-  private final FileSystem fileSystem;
-
-  public  TransactionRecorder(ShardContext shardContext) throws IOException {
+  private final AtomicBoolean _running = new AtomicBoolean(true);
+  private final AtomicReference<FSDataOutputStream> _outputStream = new AtomicReference<FSDataOutputStream>();
+  private final long _timeBetweenSyncsNanos;
+  private final AtomicLong _lastSync = new AtomicLong();
+
+  private final Path _walPath;
+  private final Configuration _configuration;
+  private final BlurAnalyzer _analyzer;
+  private final FileSystem _fileSystem;
+  private final Timer _timer;
+
+  public TransactionRecorder(ShardContext shardContext) throws IOException {
     TableContext tableContext = shardContext.getTableContext();
-    configuration = tableContext.getConfiguration();
-    analyzer = tableContext.getAnalyzer();
-    walPath = shardContext.getWalShardPath();
-    fileSystem = walPath.getFileSystem(configuration);
-    timeBetweenSyncsNanos = tableContext.getTimeBetweenWALSyncsNanos();
+    _configuration = tableContext.getConfiguration();
+    _analyzer = tableContext.getAnalyzer();
+    _walPath = shardContext.getWalShardPath();
+    _fileSystem = _walPath.getFileSystem(_configuration);
+    _timeBetweenSyncsNanos = tableContext.getTimeBetweenWALSyncsNanos();
+    _timer = new Timer("wal-sync-[" + tableContext.getTable() + "/" + shardContext.getShard()
+ "]", true);
+    _timer.schedule(this, TimeUnit.NANOSECONDS.toMillis(_timeBetweenSyncsNanos),
+        TimeUnit.NANOSECONDS.toMillis(_timeBetweenSyncsNanos));
   }
 
   public void open() throws IOException {
-    if (fileSystem.exists(walPath)) {
-      throw new IOException("WAL path [" + walPath + "] still exists, replay must have not
worked.");
+    if (_fileSystem.exists(_walPath)) {
+      throw new IOException("WAL path [" + _walPath + "] still exists, replay must have not
worked.");
     } else {
-      outputStream.set(fileSystem.create(walPath));
+      _outputStream.set(_fileSystem.create(_walPath));
     }
-    if (outputStream == null) {
+    if (_outputStream == null) {
       throw new RuntimeException();
     }
-    lastSync.set(System.nanoTime());
+    _lastSync.set(System.nanoTime());
   }
 
   public void replay(IndexWriter writer) throws IOException {
-    if (fileSystem.exists(walPath)) {
-      FSDataInputStream inputStream = fileSystem.open(walPath);
+    if (_fileSystem.exists(_walPath)) {
+      FSDataInputStream inputStream = _fileSystem.open(_walPath);
       replay(writer, inputStream);
       inputStream.close();
       commit(writer);
@@ -144,7 +151,7 @@ public class TransactionRecorder {
       switch (lookup) {
       case ROW:
         Row row = readRow(dataInputStream);
-        writer.updateDocuments(createRowId(row.id), getDocs(row, analyzer));
+        writer.updateDocuments(createRowId(row.id), getDocs(row, _analyzer));
         updateCount++;
         continue;
       case DELETE:
@@ -177,20 +184,22 @@ public class TransactionRecorder {
   }
 
   private void rollLog() throws IOException {
-    LOG.info("Rolling WAL path [" + walPath + "]");
-    FSDataOutputStream os = outputStream.get();
+    LOG.info("Rolling WAL path [" + _walPath + "]");
+    FSDataOutputStream os = _outputStream.get();
     if (os != null) {
       os.close();
     }
-    fileSystem.delete(walPath, false);
+    _fileSystem.delete(_walPath, false);
     open();
   }
 
   public void close() throws IOException {
-    synchronized (running) {
-      running.set(false);
+    _timer.purge();
+    _timer.cancel();
+    synchronized (_running) {
+      _running.set(false);
     }
-    outputStream.get().close();
+    _outputStream.get().close();
   }
 
   private static void writeRow(DataOutputStream outputStream, Row row) throws IOException
{
@@ -266,22 +275,37 @@ public class TransactionRecorder {
   }
 
   private void sync(byte[] bs) throws IOException {
-    if (bs == null || outputStream == null) {
-      throw new RuntimeException("bs [" + bs + "] outputStream [" + outputStream + "]");
+    if (bs == null || _outputStream == null) {
+      throw new RuntimeException("bs [" + bs + "] outputStream [" + _outputStream + "]");
+    }
+    synchronized (_outputStream) {
+      FSDataOutputStream os = _outputStream.get();
+      os.writeInt(bs.length);
+      os.write(bs);
+      tryToSync(os);
+    }
+  }
+
+  private void tryToSync() throws IOException {
+    synchronized (_outputStream) {
+      tryToSync(_outputStream.get());
+    }
+  }
+
+  private void tryToSync(FSDataOutputStream os) throws IOException {
+    if (os == null) {
+      return;
     }
-    FSDataOutputStream os = outputStream.get();
-    os.writeInt(bs.length);
-    os.write(bs);
     long now = System.nanoTime();
-    if (lastSync.get() + timeBetweenSyncsNanos < now) {
+    if (_lastSync.get() + _timeBetweenSyncsNanos < now) {
       os.sync();
-      lastSync.set(now);
+      _lastSync.set(now);
     }
   }
 
   public long replaceRow(boolean wal, Row row, TrackingIndexWriter writer) throws IOException
{
     if (wal) {
-      synchronized (running) {
+      synchronized (_running) {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream outputStream = new DataOutputStream(baos);
         outputStream.writeByte(TYPE.ROW.value());
@@ -291,13 +315,13 @@ public class TransactionRecorder {
       }
     }
     Term term = createRowId(row.id);
-    List<Document> docs = getDocs(row, analyzer);
+    List<Document> docs = getDocs(row, _analyzer);
     return writer.updateDocuments(term, docs);
   }
 
   public long deleteRow(boolean wal, String rowId, TrackingIndexWriter writer) throws IOException
{
     if (wal) {
-      synchronized (running) {
+      synchronized (_running) {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream outputStream = new DataOutputStream(baos);
         outputStream.writeByte(TYPE.DELETE.value());
@@ -310,7 +334,7 @@ public class TransactionRecorder {
   }
 
   public void commit(IndexWriter writer) throws CorruptIndexException, IOException {
-    synchronized (running) {
+    synchronized (_running) {
       long s = System.nanoTime();
       writer.commit();
       long m = System.nanoTime();
@@ -349,4 +373,17 @@ public class TransactionRecorder {
     return new Term(BlurConstants.ROW_ID, id);
   }
 
+  @Override
+  public void run() {
+    try {
+      if (_running.get()) {
+        tryToSync();
+      }
+    } catch (IOException e) {
+      if (_running.get()) {
+        LOG.error("Known error while trying to sync.", e);
+      }
+    }
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7e31cd28/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
b/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
index fbd21ed..3ded662 100644
--- a/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
@@ -21,75 +21,81 @@ import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.MiniCluster;
 import org.apache.blur.index.IndexWriter;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
 import org.apache.blur.thrift.generated.Column;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.lucene.analysis.core.KeywordAnalyzer;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.store.RAMDirectory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TransactionRecorderTest {
-  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "/tmp"));
+  
+  @BeforeClass
+  public static void setup() {
+    MiniCluster.startDfs("target/transaction-recorder-test");
+  }
+  
+  @AfterClass
+  public static void teardown() {
+    MiniCluster.shutdownDfs();
+  }
 
-//  @Test
-//  public void testReplay() throws IOException {
-//    File tmpWalFile = new File(TMPDIR, "transaction-recorder/wal");
-//    rm(tmpWalFile);
-//
-//    KeywordAnalyzer analyzer = new KeywordAnalyzer();
-//    Configuration configuration = new Configuration();
-//    BlurAnalyzer blurAnalyzer = new BlurAnalyzer(analyzer);
-//
-//    TransactionRecorder transactionRecorder = new TransactionRecorder();
-//    transactionRecorder.setAnalyzer(blurAnalyzer);
-//    transactionRecorder.setConfiguration(configuration);
-//
-//    transactionRecorder.setWalPath(new Path(tmpWalFile.getAbsolutePath()));
-//    transactionRecorder.init();
-//    transactionRecorder.open();
-//    try {
-//      transactionRecorder.replaceRow(true, genRow(), null);
-//      fail("Should NPE");
-//    } catch (NullPointerException e) {
-//    }
-//    transactionRecorder.close(); // this is done so that the rawfs will flush
-//                                 // the file to disk for reading
-//
-//    RAMDirectory directory = new RAMDirectory();
-//    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, analyzer);
-//    IndexWriter writer = new IndexWriter(directory, conf);
-//
-//    TransactionRecorder replayTransactionRecorder = new TransactionRecorder();
-//    replayTransactionRecorder.setAnalyzer(blurAnalyzer);
-//    replayTransactionRecorder.setConfiguration(configuration);
-//    replayTransactionRecorder.setWalPath(new Path(tmpWalFile.getAbsolutePath()));
-//    replayTransactionRecorder.init();
-//
-//    replayTransactionRecorder.replay(writer);
-//    IndexReader reader = DirectoryReader.open(directory);
-//    assertEquals(1, reader.numDocs());
-//  }
+  @Test
+  public void testReplaySimpleTest() throws IOException, InterruptedException {
+    Configuration configuration = new Configuration();
+    URI fileSystemUri = MiniCluster.getFileSystemUri();
+    Path path = new Path(fileSystemUri.toString() + "/transaction-recorder-test");
+    FileSystem fileSystem = path.getFileSystem(configuration);
+    fileSystem.delete(path, true);
 
-  private void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
+    KeywordAnalyzer analyzer = new KeywordAnalyzer();
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setName("table");
+    tableDescriptor.setTableUri(new Path(path, "tableuri").toUri().toString());
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+
+    TableContext tableContext = TableContext.create(tableDescriptor);
+    ShardContext shardContext = ShardContext.create(tableContext, "shard-1");
+    TransactionRecorder transactionRecorder = new TransactionRecorder(shardContext);
+    transactionRecorder.open();
+    try {
+      transactionRecorder.replaceRow(true, genRow(), null);
+      fail("Should NPE");
+    } catch (NullPointerException e) {
     }
-    file.delete();
+    
+    Thread.sleep(TimeUnit.NANOSECONDS.toMillis(tableContext.getTimeBetweenWALSyncsNanos())
* 2);
+
+    RAMDirectory directory = new RAMDirectory();
+    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, analyzer);
+    IndexWriter writer = new IndexWriter(directory, conf);
+
+    TransactionRecorder replayTransactionRecorder = new TransactionRecorder(shardContext);
+    replayTransactionRecorder.replay(writer);
+    IndexReader reader = DirectoryReader.open(directory);
+    assertEquals(1, reader.numDocs());
+    
+    // cleanup recorder
+    transactionRecorder.close();
   }
 
   private Row genRow() {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7e31cd28/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java b/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
index 6aaa236..5be43f3 100644
--- a/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
+++ b/src/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.MiniCluster;
 import org.apache.blur.thrift.generated.Blur;
@@ -136,6 +137,9 @@ public class BlurClusterTest {
     
     MiniCluster.killShardServer(1);
     
+    //make sure the WAL syncs
+    Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+    
     client.shardServerLayout("test");
     
     assertEquals(length, client.query("test", blurQuery).getTotalResults());


Mime
View raw message