incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: BLUR-273: Implement SnapshotDeletionPolicy to load old snapshots
Date Tue, 10 Dec 2013 13:10:57 GMT
BLUR-273: Implement SnapshotDeletionPolicy to load old snapshots

Signed-off-by: Aaron McCurry <amccurry@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/5cd63388
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/5cd63388
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/5cd63388

Branch: refs/heads/0.3.0-lucene-upgrade
Commit: 5cd63388fa587b2df427b5714288721cca00a430
Parents: 3f6f7f3
Author: Saurabh Gupta <saurabh.b85@gmail.com>
Authored: Thu Dec 5 11:13:57 2013 +0530
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Dec 10 08:10:28 2013 -0500

----------------------------------------------------------------------
 .../blur/manager/writer/BlurNRTIndex.java       | 135 ++++++++-----------
 .../writer/BlurSnapshotDeletionPolicy.java      |  77 +++++++++++
 .../blur/manager/writer/BlurNRTIndexTest.java   |   8 +-
 3 files changed, 140 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5cd63388/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index fcdc414..58dd2ed 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -18,15 +18,13 @@ package org.apache.blur.manager.writer;
  */
 import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
 
-import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,12 +44,12 @@ import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.IndexSearcherClosableNRT;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
+import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.utils.SimpleTimer;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.lucene.analysis.Analyzer;
@@ -63,7 +61,6 @@ import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.MergePolicy;
-import org.apache.lucene.index.SnapshotDeletionPolicy;
 import org.apache.lucene.index.TrackingIndexWriter;
 import org.apache.lucene.search.ControlledRealTimeReopenThread;
 import org.apache.lucene.search.IndexSearcher;
@@ -109,13 +106,13 @@ public class BlurNRTIndex extends BlurIndex {
     conf.setCodec(new Blur021Codec());
     conf.setSimilarity(_tableContext.getSimilarity());
     conf.setMergedSegmentWarmer(new FieldBasedWarmer(shardContext, _isClosed));
-
-    SnapshotDeletionPolicy sdp;
+    
+    BlurSnapshotDeletionPolicy sdp;
     if (snapshotsDirectoryExists()) {
       // load existing snapshots
-      sdp = new SnapshotDeletionPolicy(_tableContext.getIndexDeletionPolicy(), loadExistingSnapshots());
+      sdp = new BlurSnapshotDeletionPolicy(_tableContext.getIndexDeletionPolicy(), getSnapshotDirectory());
     } else {
-      sdp = new SnapshotDeletionPolicy(_tableContext.getIndexDeletionPolicy());
+      sdp = new BlurSnapshotDeletionPolicy(_tableContext.getIndexDeletionPolicy());
     }
     conf.setIndexDeletionPolicy(sdp);
     conf.setMergedSegmentWarmer(new FieldBasedWarmer(shardContext, _isClosed));
@@ -164,37 +161,6 @@ public class BlurNRTIndex extends BlurIndex {
     simpleTimer.log(LOG);
   }
 
-  /**
-   * The snapshots directory contains a file per snapshot. Name of the file is
-   * the snapshot name and it stores the segments filename
-   * 
-   * @return Map<String, String>
-   * @throws IOException
-   */
-  private Map<String, String> loadExistingSnapshots() throws IOException {
-    Map<String, String> snapshots = new HashMap<String, String>();
-
-    FileSystem fileSystem = getFileSystem();
-    FileStatus[] status = fileSystem.listStatus(getSnapshotsDirectoryPath());
-
-    for (int i = 0; i < status.length; i++) {
-      FileStatus fileStatus = status[i];
-      String snapshotName = fileStatus.getPath().getName();
-      // cleanup all tmp files
-      if (snapshotName.endsWith(SNAPSHOTS_TMPFILE_EXTENSION)) {
-        fileSystem.delete(fileStatus.getPath(), true);
-        continue;
-      }
-      BufferedReader br = new BufferedReader(new InputStreamReader(fileSystem.open(fileStatus.getPath())));
-      String segmentsFilename = br.readLine();
-      if (segmentsFilename != null) {
-        snapshots.put(snapshotName, segmentsFilename);
-      }
-    }
-
-    return snapshots;
-  }
-
   private boolean snapshotsDirectoryExists() throws IOException {
     Path shardHdfsDirPath = _shardContext.getHdfsDirPath();
     FileSystem fileSystem = getFileSystem();
@@ -292,8 +258,13 @@ public class BlurNRTIndex extends BlurIndex {
     if (needsRefresh()) {
       refresh();
     }
-    if (waitToBeVisible && getNRTManager().getCurrentSearchingGen() < generation)
{
-      _refresher.waitForGeneration(generation);
+    if (waitToBeVisible && _trackingWriter.getGeneration() < generation) {
+      try {
+		_refresher.waitForGeneration(generation);
+	} catch (InterruptedException e) {
+		// TODO Auto-generated catch block
+		e.printStackTrace();
+	}
     }
   }
 
@@ -344,11 +315,11 @@ public class BlurNRTIndex extends BlurIndex {
 
   @Override
   public void createSnapshot(String name) throws IOException {
-    SnapshotDeletionPolicy snapshotter = getSnapshotter();
-    List<IndexCommit> snapshots = snapshotter.getSnapshots();
-
-    for (IndexCommit indexCommit : snapshots) {
-      if (indexCommit.getSegmentsFileName().equals(name)) {
+	BlurSnapshotDeletionPolicy snapshotter = getSnapshotter();
+	Map<String, Long> allSnapshots = snapshotter.getAllSnapshots();
+	
+    for (String snapshotFileName : allSnapshots.keySet()) {
+      if (snapshotFileName.equals(name)) {
         LOG.error("A Snapshot already exists with the same name [{0}] on [{1}/{2}].", name,
_tableContext.getTable(),
             _shardContext.getShard());
         throw new IOException("A Snapshot already exists with the same name [" + name + "]
on " + "["
@@ -356,7 +327,7 @@ public class BlurNRTIndex extends BlurIndex {
       }
     }
     _writer.commit();
-    IndexCommit indexCommit = snapshotter.snapshot();
+    IndexCommit indexCommit = snapshotter.createSnapshot(name);
 
     /*
      * Persist the snapshots info into a tmp file under the snapshots sub-folder
@@ -370,58 +341,64 @@ public class BlurNRTIndex extends BlurIndex {
     FileSystem fileSystem = getFileSystem();
     Path shardSnapshotsDirPath = getSnapshotsDirectoryPath();
     BlurUtil.createPath(fileSystem, shardSnapshotsDirPath);
-    Path newTmpSnapshotFile = new Path(shardSnapshotsDirPath, name + SNAPSHOTS_TMPFILE_EXTENSION);
+    String fileName = name +"_"+indexCommit.getGeneration();
+    Path newTmpSnapshotFile = new Path(shardSnapshotsDirPath, fileName + SNAPSHOTS_TMPFILE_EXTENSION);
     BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fileSystem.create(newTmpSnapshotFile,
true)));
     br.write(segmentsFilename);
     br.close();
 
     // now rename the tmp file
-    Path newSnapshotFile = new Path(shardSnapshotsDirPath, name);
+    Path newSnapshotFile = new Path(shardSnapshotsDirPath, fileName);
     fileSystem.rename(newTmpSnapshotFile, newSnapshotFile);
 
-    LOG.info("Snapshot [{0}] created successfully on [{1}/{2}].", name, _tableContext.getTable(),
+    LOG.info("Snapshot [{0}] created successfully on [{1}/{2}].", fileName, _tableContext.getTable(),
         _shardContext.getShard());
   }
 
   @Override
   public void removeSnapshot(String name) throws IOException {
-    SnapshotDeletionPolicy snapshotter = getSnapshotter();
-    Map<String, String> existingSnapshots = snapshotter.getSnapshots();
-    if (existingSnapshots.containsKey(name)) {
-      snapshotter.release(name);
-
-      // now delete the snapshot file stored in the snapshots directory under
-      // the shard
-      Path snapshotFilePath = new Path(getSnapshotsDirectoryPath(), name);
-      getFileSystem().delete(snapshotFilePath, true);
-
-      LOG.info("Snapshot [{0}] removed successfully from [{1}/{2}].", name, _tableContext.getTable(),
-          _shardContext.getShard());
-    } else {
-      LOG.error("No Snapshot exists with the name [{0}] on  [{1}/{2}].", name, _tableContext.getTable(),
-          _shardContext.getShard());
-      throw new IOException("No Snapshot exists with the name [" + name + "] on " + "[" +
_tableContext.getTable()
-          + "/" + _shardContext.getShard() + "].");
-    }
+    BlurSnapshotDeletionPolicy snapshotter = getSnapshotter();
+    Map<String, Long> allSnapshots = snapshotter.getAllSnapshots();
+    Set<String> keySet = allSnapshots.keySet();
+    
+		if(keySet.contains(name)){
+			String fileName = name + "_"+allSnapshots.get(name);
+			snapshotter.releaseGen(name);
+			
+			// now delete the snapshot file stored in the snapshots directory under
+		      // the shard
+		      Path snapshotFilePath = new Path(getSnapshotsDirectoryPath(), fileName);
+		      getFileSystem().delete(snapshotFilePath, true);
+
+		      LOG.info("Snapshot [{0}] removed successfully from [{1}/{2}].", name, _tableContext.getTable(),
+		          _shardContext.getShard());
+		} else {
+		      LOG.error("No Snapshot exists with the name [{0}] on  [{1}/{2}].", name, _tableContext.getTable(),
+		          _shardContext.getShard());
+		      throw new IOException("No Snapshot exists with the name [" + name + "] on " + "["
+ _tableContext.getTable()
+		          + "/" + _shardContext.getShard() + "].");
+	    }
   }
 
   @Override
   public List<String> getSnapshots() throws IOException {
-    SnapshotDeletionPolicy snapshotter = getSnapshotter();
-    Map<String, String> existingSnapshots = snapshotter.getSnapshots();
-    return new ArrayList<String>(existingSnapshots.keySet());
+    BlurSnapshotDeletionPolicy snapshotter = getSnapshotter();
+    Set<String> keySet = snapshotter.getAllSnapshots().keySet();
+    List<String> allSnapshotNames = new ArrayList<String>();
+    allSnapshotNames.addAll(keySet);
+    return allSnapshotNames;
   }
 
   /**
    * Fetches the snapshotter from the LiveIndexWriterConfig of IndexWriter
    * 
-   * @return SnapshotDeletionPolicy
+   * @return BlurSnapshotDeletionPolicy
    * @throws IOException
    */
-  private SnapshotDeletionPolicy getSnapshotter() throws IOException {
+  private BlurSnapshotDeletionPolicy getSnapshotter() throws IOException {
     IndexDeletionPolicy idp = _writer.getConfig().getIndexDeletionPolicy();
-    if (idp instanceof SnapshotDeletionPolicy) {
-      SnapshotDeletionPolicy snapshotter = (SnapshotDeletionPolicy) idp;
+    if (idp instanceof BlurSnapshotDeletionPolicy) {
+    	BlurSnapshotDeletionPolicy snapshotter = (BlurSnapshotDeletionPolicy) idp;
       return snapshotter;
     } else {
       LOG.error("The index deletion policy for [{0}/{1}] does not support snapshots.", _tableContext.getTable(),
@@ -441,4 +418,10 @@ public class BlurNRTIndex extends BlurIndex {
     Configuration configuration = _shardContext.getTableContext().getConfiguration();
     return shardHdfsDirPath.getFileSystem(configuration);
   }
+  
+  private Directory getSnapshotDirectory() throws IOException{
+	Configuration configuration = _shardContext.getTableContext().getConfiguration();
+	Directory dir = new HdfsDirectory(configuration, getSnapshotsDirectoryPath());
+	return dir;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5cd63388/blur-core/src/main/java/org/apache/blur/manager/writer/BlurSnapshotDeletionPolicy.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurSnapshotDeletionPolicy.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurSnapshotDeletionPolicy.java
new file mode 100644
index 0000000..f19262f
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurSnapshotDeletionPolicy.java
@@ -0,0 +1,77 @@
+package org.apache.blur.manager.writer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.SnapshotDeletionPolicy;
+import org.apache.lucene.store.Directory;
+
+public class BlurSnapshotDeletionPolicy extends SnapshotDeletionPolicy {
+
+	public static final String SNAPSHOTS_PREFIX = "_snapshot";
+	private Map<String, Long> mapOfSnapshotNameGen = new HashMap<String, Long>();
+
+	private static final String SNAPSHOTS_TMPFILE_EXTENSION = ".tmp";
+	Directory dir;
+	
+	public BlurSnapshotDeletionPolicy(IndexDeletionPolicy primary, Directory dir) throws IOException{
+		super(primary);
+		this.dir = dir;
+		
+		loadPriorSnapshots();
+	}
+
+	public BlurSnapshotDeletionPolicy(IndexDeletionPolicy indexDeletionPolicy) {
+		super(indexDeletionPolicy);
+	}
+
+	public Map<String, Long> getAllSnapshots(){
+		return mapOfSnapshotNameGen;
+	}
+	
+	public IndexCommit getIndexCommitForSnapshotName(String snapshotFileName){
+		Long gen = mapOfSnapshotNameGen.get(snapshotFileName);
+		return getIndexCommit(gen);
+	}
+	
+	public IndexCommit createSnapshot(String snapshotFileName) throws IOException{
+		IndexCommit snapshot = snapshot();
+		mapOfSnapshotNameGen.put(snapshotFileName, snapshot.getGeneration());
+		return snapshot;
+	}
+	
+	public void releaseGen(String snapshotName) throws IOException{
+		releaseGen(mapOfSnapshotNameGen.get(snapshotName));
+		mapOfSnapshotNameGen.remove(snapshotName);
+	}
+	
+	/**
+	 * Reads the snapshots information from the given {@link Directory}. This
+	 * method can be used if the snapshots information is needed, however you
+	 * cannot instantiate the deletion policy (because e.g., some other process
+	 * keeps a lock on the snapshots directory).
+	 */
+	private synchronized void loadPriorSnapshots() throws IOException {
+		long genLoaded = -1;
+		for (String file : dir.listAll()) {
+			if (file.endsWith(SNAPSHOTS_TMPFILE_EXTENSION)) {
+				dir.deleteFile(file);
+			} else {
+				if (file.contains(SNAPSHOTS_PREFIX)) {
+					long gen = Long.parseLong(file.substring(file.lastIndexOf("_") + 1, file.length()));
+					String fileName = file.substring(0, file.lastIndexOf("_"));
+
+					if (genLoaded == -1 || gen > genLoaded) {
+						mapOfSnapshotNameGen.put(fileName, gen);
+					
+			          refCounts.clear();
+			          refCounts.put(gen, 1);
+					}
+				}
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5cd63388/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
index 9800eab..c62e10a 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
@@ -131,7 +131,7 @@ public class BlurNRTIndexTest {
       writer.replaceRow(true, true, genRow());
       IndexSearcherClosable searcher = writer.getIndexReader();
       IndexReader reader = searcher.getIndexReader();
-      assertEquals(i + 1, reader.numDocs());
+      assertEquals(i+1, reader.numDocs());
       searcher.close();
       total++;
     }
@@ -198,7 +198,7 @@ public class BlurNRTIndexTest {
     // check that the file is persisted
     Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
     FileSystem fileSystem = snapshotsDirPath.getFileSystem(new Configuration());
-    Path snapshotFilePath = new Path(snapshotsDirPath, "test_snapshot");
+    Path snapshotFilePath = new Path(snapshotsDirPath, "test_snapshot_1");
     assertTrue(fileSystem.exists(snapshotFilePath));
     
     // create a new writer instance and test whether the snapshots are loaded properly
@@ -216,8 +216,8 @@ public class BlurNRTIndexTest {
     fileSystem.mkdirs(snapshotsDirPath);
     
     // create 2 files in snapshots sub-dir
-    Path snapshotFile1 = new Path(snapshotsDirPath, "test_snapshot1");
-    Path snapshotFile2 = new Path(snapshotsDirPath, "test_snapshot2");
+    Path snapshotFile1 = new Path(snapshotsDirPath, "test_snapshot1_1");
+    Path snapshotFile2 = new Path(snapshotsDirPath, "test_snapshot2_2");
     
     BufferedWriter br1 = new BufferedWriter(new OutputStreamWriter(fileSystem.create(snapshotFile1,
true)));
     br1.write("segments_1");


Mime
View raw message