incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/4] git commit: Added unit tests and made snapshot file creation an atomic process and minor modifications to how the file system is fetched and used
Date Thu, 22 Aug 2013 21:54:45 GMT
Updated Branches:
  refs/heads/master b1f1832a6 -> ce0cfaea2


Added unit tests and made snapshot file creation an atomic process and minor modifications
to how the file system is fetched and used

Signed-off-by: Aaron McCurry <amccurry@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/121f1dd1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/121f1dd1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/121f1dd1

Branch: refs/heads/master
Commit: 121f1dd1bbd2c234301cea4298d7f0dbd4142b1e
Parents: b1f1832
Author: Rahul Challapalli <challapallirahul@gmail.com>
Authored: Wed Aug 21 21:31:24 2013 -0700
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Aug 22 16:02:45 2013 -0400

----------------------------------------------------------------------
 .../blur/manager/writer/BlurNRTIndex.java       | 157 ++++++++++++++++++-
 .../blur/manager/writer/BlurNRTIndexTest.java   |  73 ++++++++-
 2 files changed, 220 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/121f1dd1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index ed0569a..cb3ba19 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -18,8 +18,15 @@ package org.apache.blur.manager.writer;
  */
 import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -41,12 +48,20 @@ import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.index.BlurIndexWriter;
 import org.apache.lucene.index.BlurIndexWriter.LockOwnerException;
 import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.SnapshotDeletionPolicy;
 import org.apache.lucene.index.TieredMergePolicy;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.NRTManager;
@@ -59,6 +74,8 @@ public class BlurNRTIndex extends BlurIndex {
 
   private static final Log LOG = LogFactory.getLog(BlurNRTIndex.class);
   private static final boolean APPLY_ALL_DELETES = true;
+  private static final String SNAPSHOTS_FOLDER_NAME = "snapshots";
+  private static final String SNAPSHOTS_TMPFILE_EXTENSION = ".tmp";
 
   private final AtomicReference<NRTManager> _nrtManagerRef = new AtomicReference<NRTManager>();
   private final AtomicBoolean _isClosed = new AtomicBoolean();
@@ -82,13 +99,22 @@ public class BlurNRTIndex extends BlurIndex {
     _tableContext = shardContext.getTableContext();
     _directory = directory;
     _shardContext = shardContext;
-
+    
     FieldManager fieldManager = _tableContext.getFieldManager();
     Analyzer analyzer = fieldManager.getAnalyzerForIndex();
     IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, analyzer);
     conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
     conf.setSimilarity(_tableContext.getSimilarity());
-    conf.setIndexDeletionPolicy(_tableContext.getIndexDeletionPolicy());
+    
+    SnapshotDeletionPolicy sdp;
+    if (snapshotsDirectoryExists()) {
+      // load existing snapshots
+      sdp = new SnapshotDeletionPolicy(_tableContext.getIndexDeletionPolicy(), loadExistingSnapshots());
+      
+    } else {
+      sdp = new SnapshotDeletionPolicy(_tableContext.getIndexDeletionPolicy());
+    }
+    conf.setIndexDeletionPolicy(sdp);
     conf.setMergedSegmentWarmer(new FieldBasedWarmer(shardContext, _isClosed));
 
     TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
@@ -127,6 +153,46 @@ public class BlurNRTIndex extends BlurIndex {
     _refresher.start();
   }
 
+  /**
+   * The snapshots directory contains a file per snapshot.
+   * Name of the file is the snapshot name and it stores the segments filename
+   * 
+   * @return Map<String, String>
+   * @throws IOException
+   */
+  private Map<String, String> loadExistingSnapshots() throws IOException {
+    Map<String, String> snapshots = new HashMap<String, String>();
+    
+    FileSystem fileSystem = getFileSystem();
+    FileStatus[] status = fileSystem.listStatus(getSnapshotsDirectoryPath());
+    
+    for (int i=0;i<status.length;i++){
+      FileStatus fileStatus = status[i];
+      String snapshotName = fileStatus.getPath().getName();
+      // cleanup all tmp files
+      if (snapshotName.endsWith(SNAPSHOTS_TMPFILE_EXTENSION)) {
+        fileSystem.delete(fileStatus.getPath(), true);
+        continue;
+      }
+      BufferedReader br=new BufferedReader(new InputStreamReader(fileSystem.open(fileStatus.getPath())));
+      String segmentsFilename=br.readLine();
+      if (segmentsFilename != null){
+        snapshots.put(snapshotName, segmentsFilename);
+      }
+    }
+    return snapshots;
+  }
+  
+  private boolean snapshotsDirectoryExists() throws IOException{
+    Path shardHdfsDirPath = _shardContext.getHdfsDirPath();
+    FileSystem fileSystem = getFileSystem();
+    Path shardSnapshotsDirPath = new Path(shardHdfsDirPath, SNAPSHOTS_FOLDER_NAME);
+    if (fileSystem.exists(shardSnapshotsDirPath)) {
+      return true;
+    }
+    return false;
+  }
+  
   @Override
   public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException
{
     _lock.readLock().lock();
@@ -266,18 +332,97 @@ public class BlurNRTIndex extends BlurIndex {
 
   @Override
   public void createSnapshot(String name) throws IOException {
-    // TODO Auto-generated method stub
+    SnapshotDeletionPolicy snapshotter = getSnapshotter();
+    Map<String, String> existingSnapshots = snapshotter.getSnapshots();
+    if (existingSnapshots.containsKey(name)) {
+      LOG.error("A Snapshot already exists with the same name [{0}] on [{1}/{2}].", 
+          name, _tableContext.getTable(), _shardContext.getShard());
+      throw new IOException("A Snapshot already exists with the same name [" + name + "]
on " +
+      	"["+_tableContext.getTable()+"/"+_shardContext.getShard()+"].");
+    }
+    _writer.commit();
+    IndexCommit indexCommit = snapshotter.snapshot(name);
+    
+    /*
+     * Persist the snapshots info into a tmp file under the snapshots sub-folder
+     * and once writing is finished, close the writer.
+     * Now rename the tmp file to an actual snapshots file.
+     * This make the file write an atomic operation
+     * 
+     * The name of the file is the snapshot name and its contents specify the segments file
name
+     */
+    String segmentsFilename = indexCommit.getSegmentsFileName();
+    FileSystem fileSystem = getFileSystem();
+    Path shardSnapshotsDirPath = getSnapshotsDirectoryPath();
+    BlurUtil.createPath(fileSystem, shardSnapshotsDirPath);
+    Path newTmpSnapshotFile = new Path(shardSnapshotsDirPath, name + SNAPSHOTS_TMPFILE_EXTENSION);
+    BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fileSystem.create(newTmpSnapshotFile,true)));
+    br.write(segmentsFilename);
+    br.close();
+    
+    // now rename the tmp file
+    Path newSnapshotFile = new Path(shardSnapshotsDirPath, name);
+    fileSystem.rename(newTmpSnapshotFile, newSnapshotFile);
+    
+    LOG.info("Snapshot [{0}] created successfully on [{1}/{2}].",
+        name, _tableContext.getTable(), _shardContext.getShard());
   }
 
   @Override
   public void removeSnapshot(String name) throws IOException {
-    // TODO Auto-generated method stub
+    SnapshotDeletionPolicy snapshotter = getSnapshotter();
+    Map<String, String> existingSnapshots = snapshotter.getSnapshots();
+    if (existingSnapshots.containsKey(name)) {
+      snapshotter.release(name);
+      
+      // now delete the snapshot file stored in the snapshots directory under the shard
+      Path snapshotFilePath = new Path(getSnapshotsDirectoryPath(), name);
+      getFileSystem().delete(snapshotFilePath, true);
+       
+      LOG.info("Snapshot [{0}] removed successfully from [{1}/{2}].",
+          name, _tableContext.getTable(), _shardContext.getShard());
+    } else {
+      LOG.error("No Snapshot exists with the name [{0}] on  [{1}/{2}].", 
+          name, _tableContext.getTable(), _shardContext.getShard());
+      throw new IOException("No Snapshot exists with the name [" + name + "] on " +
+        "["+_tableContext.getTable()+"/"+_shardContext.getShard()+"].");
+    }
   }
 
   @Override
   public List<String> getSnapshots() throws IOException {
-    // TODO Auto-generated method stub
-    return null;
+    SnapshotDeletionPolicy snapshotter = getSnapshotter();
+    Map<String, String> existingSnapshots = snapshotter.getSnapshots();
+    return new ArrayList<String>(existingSnapshots.keySet());
   }
 
+  /**
+   * Fetches the snapshotter from the LiveIndexWriterConfig of IndexWriter
+   * 
+   * @return SnapshotDeletionPolicy
+   * @throws IOException
+   */
+  private SnapshotDeletionPolicy getSnapshotter() throws IOException{
+    IndexDeletionPolicy idp = _writer.getConfig().getIndexDeletionPolicy();
+    if (idp instanceof SnapshotDeletionPolicy) {
+      SnapshotDeletionPolicy snapshotter = (SnapshotDeletionPolicy)idp;
+      return snapshotter;
+    } else {
+      LOG.error("The index deletion policy for [{0}/{1}] does not support snapshots.", 
+          _tableContext.getTable(), _shardContext.getShard());
+      throw new IOException("The index deletion policy for ["+_tableContext.getTable()+"/"+_shardContext.getShard()+"]"
+
+      		" does not support snapshots.");
+    }
+  }
+  
+  public Path getSnapshotsDirectoryPath() throws IOException {
+    Path shardHdfsDirPath = _shardContext.getHdfsDirPath();
+    return new Path(shardHdfsDirPath, SNAPSHOTS_FOLDER_NAME);
+  }
+  
+  private FileSystem getFileSystem() throws IOException{
+    Path shardHdfsDirPath = _shardContext.getHdfsDirPath();
+    Configuration configuration = _shardContext.getTableContext().getConfiguration();
+    return shardHdfsDirPath.getFileSystem(configuration);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/121f1dd1/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
index 8c6367a..21c531e 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
@@ -18,9 +18,12 @@ package org.apache.blur.manager.writer;
  */
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -36,6 +39,8 @@ import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.store.FSDirectory;
 import org.junit.After;
@@ -58,6 +63,7 @@ public class BlurNRTIndexTest {
   private DirectoryReferenceFileGC gc;
   private IndexInputCloser closer;
   private SharedMergeScheduler mergeScheduler;
+  private String uuid;
 
   @Before
   public void setup() throws IOException {
@@ -76,10 +82,19 @@ public class BlurNRTIndexTest {
     service = Executors.newThreadPool("test", 10);
   }
 
-  private void setupWriter(Configuration configuration, long refresh) throws IOException
{
+  private void setupWriter(Configuration configuration, long refresh, boolean reload) throws
IOException {
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setName("test-table");
-    String uuid = UUID.randomUUID().toString();
+    /*
+     * if reload is set to true...we create a new writer instance pointing
+     * to the same location as the old one.....
+     * so previous writer instances should be closed
+     */
+    
+    if (!reload && uuid == null) {
+      uuid = UUID.randomUUID().toString();
+    }
+    
     tableDescriptor.setTableUri(new File(base, "table-store-" + uuid).toURI().toString());
     tableDescriptor.putToTableProperties("blur.shard.time.between.refreshs", Long.toString(refresh));
 
@@ -115,7 +130,7 @@ public class BlurNRTIndexTest {
 
   @Test
   public void testBlurIndexWriter() throws IOException {
-    setupWriter(configuration, 5);
+    setupWriter(configuration, 5, false);
     long s = System.nanoTime();
     int total = 0;
     for (int i = 0; i < TEST_NUMBER_WAIT_VISIBLE; i++) {
@@ -138,7 +153,7 @@ public class BlurNRTIndexTest {
 
   @Test
   public void testBlurIndexWriterFaster() throws IOException, InterruptedException {
-    setupWriter(configuration, 100);
+    setupWriter(configuration, 100, false);
     IndexSearcherClosable searcher1 = writer.getIndexReader();
     IndexReader reader1 = searcher1.getIndexReader();
     assertEquals(0, reader1.numDocs());
@@ -180,4 +195,54 @@ public class BlurNRTIndexTest {
     return row;
   }
 
+  @Test
+  public void testCreateSnapshot() throws IOException {
+    setupWriter(configuration, 5, false);
+    writer.createSnapshot("test_snapshot");
+    assertTrue(writer.getSnapshots().contains("test_snapshot"));
+    
+    // check that the file is persisted
+    Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
+    FileSystem fileSystem = snapshotsDirPath.getFileSystem(new Configuration());
+    Path snapshotFilePath = new Path(snapshotsDirPath, "test_snapshot");
+    assertTrue(fileSystem.exists(snapshotFilePath));
+    
+    // create a new writer instance and test whether the snapshots are loaded properly
+    writer.close();
+    setupWriter(configuration, 5, true);
+    assertTrue(writer.getSnapshots().contains("test_snapshot"));
+  }
+  
+  
+  @Test
+  public void testRemoveSnapshots() throws IOException {
+    setupWriter(configuration, 5, false);
+    Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
+    FileSystem fileSystem = snapshotsDirPath.getFileSystem(new Configuration());
+    fileSystem.mkdirs(snapshotsDirPath);
+    
+    // create 2 files in snapshots sub-dir
+    Path snapshotFile1 = new Path(snapshotsDirPath, "test_snapshot1");
+    Path snapshotFile2 = new Path(snapshotsDirPath, "test_snapshot2");
+    
+    BufferedWriter br1 = new BufferedWriter(new OutputStreamWriter(fileSystem.create(snapshotFile1,
true)));
+    br1.write("segments_1");
+    br1.close();
+    
+    BufferedWriter br2 = new BufferedWriter(new OutputStreamWriter(fileSystem.create(snapshotFile2,
true)));
+    br2.write("segments_1");
+    br2.close();
+    
+    // re-load the writer to load the snpshots
+    writer.close();
+    setupWriter(configuration, 5, true);
+    assertEquals(writer.getSnapshots().size(), 2);
+    
+    
+    writer.removeSnapshot("test_snapshot2");
+    assertEquals(writer.getSnapshots().size(), 1);
+    assertTrue(!writer.getSnapshots().contains("test_snapshot2"));
+    assertTrue(!fileSystem.exists(snapshotFile2));
+
+  }
 }


Mime
View raw message