incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: Adding the snapshots back into the blurindex.
Date Wed, 26 Feb 2014 03:51:30 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/apache-blur-0.2 4c47521ec -> c17439be0


Adding the snapshots back into the blurindex.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/56ce5d09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/56ce5d09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/56ce5d09

Branch: refs/heads/apache-blur-0.2
Commit: 56ce5d090eee7f764230eaa9a40fae4e0780f913
Parents: e8ca8ec
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Feb 25 22:51:07 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Feb 25 22:51:07 2014 -0500

----------------------------------------------------------------------
 .../manager/writer/BlurIndexSimpleWriter.java   |  30 ++-
 .../writer/SnapshotIndexDeletionPolicy.java     | 187 +++++++++++++++++++
 .../writer/BlurIndexSimpleWriterTest.java       |  98 +++++-----
 3 files changed, 254 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/56ce5d09/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index 306479b..a644c8f 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -19,6 +19,7 @@ package org.apache.blur.manager.writer;
 import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -41,13 +42,13 @@ import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.Tracer;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.index.BlurIndexWriter;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
 import org.apache.lucene.index.TieredMergePolicy;
 import org.apache.lucene.store.Directory;
 
@@ -76,6 +77,8 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   private Thread _optimizeThread;
   private Thread _writerOpener;
   private final IndexDeletionPolicyReader _policy;
+  private final SnapshotIndexDeletionPolicy _snapshotIndexDeletionPolicy;
+  private final String _context;
 
   public BlurIndexSimpleWriter(ShardContext shardContext, Directory directory, SharedMergeScheduler
mergeScheduler,
       final ExecutorService searchExecutor, BlurIndexCloser indexCloser, BlurIndexWarmup
indexWarmup)
@@ -84,6 +87,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     _searchThreadPool = searchExecutor;
     _shardContext = shardContext;
     _tableContext = _shardContext.getTableContext();
+    _context = _tableContext.getTable() + "/" + shardContext.getShard();
     _fieldManager = _tableContext.getFieldManager();
     Analyzer analyzer = _fieldManager.getAnalyzerForIndex();
     _conf = new IndexWriterConfig(LUCENE_VERSION, analyzer);
@@ -94,7 +98,9 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
     mergePolicy.setUseCompoundFile(false);
     _conf.setMergeScheduler(mergeScheduler.getMergeScheduler());
-    _policy = new IndexDeletionPolicyReader(new KeepOnlyLastCommitDeletionPolicy());
+    _snapshotIndexDeletionPolicy = new SnapshotIndexDeletionPolicy(_tableContext.getConfiguration(),
new Path(
+        shardContext.getHdfsDirPath(), "generations"));
+    _policy = new IndexDeletionPolicyReader(_snapshotIndexDeletionPolicy);
     _conf.setIndexDeletionPolicy(_policy);
 
     if (!DirectoryReader.indexExists(directory)) {
@@ -255,17 +261,27 @@ public class BlurIndexSimpleWriter extends BlurIndex {
 
   @Override
   public void createSnapshot(String name) throws IOException {
-    throw new RuntimeException("not impl");
+    _writeLock.lock();
+    try {
+      _snapshotIndexDeletionPolicy.createSnapshot(name, _indexReader.get(), _context);
+    } finally {
+      _writeLock.unlock();
+    }
   }
 
   @Override
   public void removeSnapshot(String name) throws IOException {
-    throw new RuntimeException("not impl");
+    _writeLock.lock();
+    try {
+      _snapshotIndexDeletionPolicy.removeSnapshot(name, _context);
+    } finally {
+      _writeLock.unlock();
+    }
   }
 
   @Override
   public List<String> getSnapshots() throws IOException {
-    throw new RuntimeException("not impl");
+    return new ArrayList<String>(_snapshotIndexDeletionPolicy.getSnapshots());
   }
 
   private void commit() throws IOException {
@@ -324,4 +340,8 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     }
   }
 
+  public Path getSnapshotsDirectoryPath() {
+    return _snapshotIndexDeletionPolicy.getSnapshotsDirectoryPath();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/56ce5d09/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
new file mode 100644
index 0000000..b3034c2
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.writer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexDeletionPolicy;
+
+public class SnapshotIndexDeletionPolicy extends IndexDeletionPolicy {
+
+  private static final Log LOG = LogFactory.getLog(SnapshotIndexDeletionPolicy.class);
+
+  private final Configuration _configuration;
+  private final Path _path;
+  private final Map<String, Long> _namesToGenerations = new ConcurrentHashMap<String,
Long>();
+  private final Map<Long, Set<String>> _generationsToNames = new ConcurrentHashMap<Long,
Set<String>>();
+
+  public SnapshotIndexDeletionPolicy(Configuration configuration, Path path) throws IOException
{
+    _configuration = configuration;
+    _path = path;
+    FileSystem fileSystem = _path.getFileSystem(configuration);
+    fileSystem.mkdirs(path);
+    loadGenerations();
+  }
+
+  @Override
+  public void onInit(List<? extends IndexCommit> commits) throws IOException {
+    onCommit(commits);
+  }
+
+  @Override
+  public void onCommit(List<? extends IndexCommit> commits) throws IOException {
+    int size = commits.size();
+    for (int i = 0; i < size - 1; i++) {
+      IndexCommit indexCommit = commits.get(i);
+      long generation = indexCommit.getGeneration();
+      if (!_generationsToNames.containsKey(generation)) {
+        indexCommit.delete();
+      }
+    }
+  }
+
+  private synchronized void storeGenerations() throws IOException {
+    FileSystem fileSystem = _path.getFileSystem(_configuration);
+    FileStatus[] listStatus = fileSystem.listStatus(_path);
+    SortedSet<FileStatus> existing = new TreeSet<FileStatus>(Arrays.asList(listStatus));
+    long currentFile;
+    if (!existing.isEmpty()) {
+      FileStatus last = existing.last();
+      currentFile = Long.parseLong(last.getPath().getName());
+    } else {
+      currentFile = 0;
+    }
+    Path path = new Path(_path, buffer(currentFile + 1));
+    LOG.info("Creating new snapshot file [{0}]", path);
+    FSDataOutputStream outputStream = fileSystem.create(path, false);
+    Writer writer = SequenceFile.createWriter(_configuration, outputStream, Text.class, LongWritable.class,
+        CompressionType.NONE, null);
+    for (Entry<String, Long> e : _namesToGenerations.entrySet()) {
+      writer.append(new Text(e.getKey()), new LongWritable(e.getValue()));
+    }
+    writer.close();
+    outputStream.close();
+    cleanupOldFiles();
+  }
+
+  private void cleanupOldFiles() {
+    
+  }
+
+  private String buffer(long number) {
+    String s = Long.toString(number);
+    StringBuilder builder = new StringBuilder();
+    for (int i = s.length(); i < 12; i++) {
+      builder.append('0');
+    }
+    return builder.append(s).toString();
+  }
+
+  private void loadGenerations() throws IOException {
+    FileSystem fileSystem = _path.getFileSystem(_configuration);
+    FileStatus[] listStatus = fileSystem.listStatus(_path);
+    SortedSet<FileStatus> existing = new TreeSet<FileStatus>(Arrays.asList(listStatus));
+    if (existing.isEmpty()) {
+      return;
+    }
+    FileStatus last = existing.last();
+    Reader reader = new SequenceFile.Reader(fileSystem, last.getPath(), _configuration);
+    Text key = new Text();
+    LongWritable value = new LongWritable();
+    while (reader.next(key, value)) {
+      String name = key.toString();
+      long gen = value.get();
+      _namesToGenerations.put(name, gen);
+      Set<String> names = _generationsToNames.get(gen);
+      if (names == null) {
+        names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+        _generationsToNames.put(gen, names);
+      }
+      names.add(name);
+    }
+    reader.close();
+    cleanupOldFiles();
+  }
+
+  public void createSnapshot(String name, DirectoryReader reader, String context) throws
IOException {
+    if (_namesToGenerations.containsKey(name)) {
+      throw new IOException("Snapshot [" + name + "] already exists.");
+    }
+    LOG.info("Creating snapshot [{0}] in [{1}].", name, context);
+    IndexCommit indexCommit = reader.getIndexCommit();
+    long generation = indexCommit.getGeneration();
+    _namesToGenerations.put(name, generation);
+    Set<String> names = _generationsToNames.get(generation);
+    if (names == null) {
+      names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+      _generationsToNames.put(generation, names);
+    }
+    names.add(name);
+    storeGenerations();
+  }
+
+  public void removeSnapshot(String name, String context) throws IOException {
+    Long gen = _namesToGenerations.get(name);
+    if (gen == null) {
+      LOG.info("Snapshot [{0}] does not exist in [{1}].", name, context);
+      return;
+    }
+    LOG.info("Removing snapshot [{0}] from [{1}].", name, context);
+    _namesToGenerations.remove(name);
+    Set<String> names = _generationsToNames.get(gen);
+    names.remove(name);
+    if (names.isEmpty()) {
+      _generationsToNames.remove(gen);
+    }
+    storeGenerations();
+  }
+
+  public Collection<String> getSnapshots() {
+    return new HashSet<String>(_namesToGenerations.keySet());
+  }
+
+  public Path getSnapshotsDirectoryPath() {
+    return _path;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/56ce5d09/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
index 2ca0447..4a66853 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
@@ -18,6 +18,7 @@ package org.apache.blur.manager.writer;
  */
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.File;
@@ -41,6 +42,8 @@ import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.TraceCollector;
 import org.apache.blur.trace.TraceStorage;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.store.FSDirectory;
 import org.junit.After;
@@ -50,7 +53,7 @@ import org.junit.Test;
 public class BlurIndexSimpleWriterTest {
 
   private static final String TEST_TABLE = "test-table";
-  private static final int TEST_NUMBER_WAIT_VISIBLE = 500;
+  private static final int TEST_NUMBER_WAIT_VISIBLE = 100;
   private static final int TEST_NUMBER = 50000;
 
   private static final File TMPDIR = new File("./target/tmp");
@@ -241,59 +244,42 @@ public class BlurIndexSimpleWriterTest {
     return row;
   }
 
-  // @Test
-  // public void testCreateSnapshot() throws IOException {
-  // setupWriter(configuration, 5, false);
-  // writer.createSnapshot("test_snapshot");
-  // assertTrue(writer.getSnapshots().contains("test_snapshot"));
-  //
-  // // check that the file is persisted
-  // Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
-  // FileSystem fileSystem = snapshotsDirPath.getFileSystem(new
-  // Configuration());
-  // Path snapshotFilePath = new Path(snapshotsDirPath, "test_snapshot");
-  // assertTrue(fileSystem.exists(snapshotFilePath));
-  //
-  // // create a new writer instance and test whether the snapshots are loaded
-  // properly
-  // writer.close();
-  // setupWriter(configuration, 5, true);
-  // assertTrue(writer.getSnapshots().contains("test_snapshot"));
-  // }
-  //
-  //
-  // @Test
-  // public void testRemoveSnapshots() throws IOException {
-  // setupWriter(configuration, 5, false);
-  // Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
-  // FileSystem fileSystem = snapshotsDirPath.getFileSystem(new
-  // Configuration());
-  // fileSystem.mkdirs(snapshotsDirPath);
-  //
-  // // create 2 files in snapshots sub-dir
-  // Path snapshotFile1 = new Path(snapshotsDirPath, "test_snapshot1");
-  // Path snapshotFile2 = new Path(snapshotsDirPath, "test_snapshot2");
-  //
-  // BufferedWriter br1 = new BufferedWriter(new
-  // OutputStreamWriter(fileSystem.create(snapshotFile1, true)));
-  // br1.write("segments_1");
-  // br1.close();
-  //
-  // BufferedWriter br2 = new BufferedWriter(new
-  // OutputStreamWriter(fileSystem.create(snapshotFile2, true)));
-  // br2.write("segments_1");
-  // br2.close();
-  //
-  // // re-load the writer to load the snpshots
-  // writer.close();
-  // setupWriter(configuration, 5, true);
-  // assertEquals(writer.getSnapshots().size(), 2);
-  //
-  //
-  // writer.removeSnapshot("test_snapshot2");
-  // assertEquals(writer.getSnapshots().size(), 1);
-  // assertTrue(!writer.getSnapshots().contains("test_snapshot2"));
-  // assertTrue(!fileSystem.exists(snapshotFile2));
-  //
-  // }
+  @Test
+  public void testCreateSnapshot() throws IOException {
+    setupWriter(_configuration, 5, false);
+    _writer.createSnapshot("test_snapshot");
+    assertTrue(_writer.getSnapshots().contains("test_snapshot"));
+
+    // check that the file is persisted
+    Path snapshotsDirPath = _writer.getSnapshotsDirectoryPath();
+    FileSystem fileSystem = snapshotsDirPath.getFileSystem(_configuration);
+    assertTrue(fileSystem.exists(new Path(snapshotsDirPath, "000000000001")));
+
+    // create a new writer instance and test whether the snapshots are loaded
+    // properly
+    _writer.close();
+    setupWriter(_configuration, 5, true);
+    assertTrue(_writer.getSnapshots().contains("test_snapshot"));
+  }
+
+  @Test
+  public void testRemoveSnapshots() throws IOException {
+    setupWriter(_configuration, 5, false);
+    Path snapshotsDirPath = _writer.getSnapshotsDirectoryPath();
+    FileSystem fileSystem = snapshotsDirPath.getFileSystem(new Configuration());
+    fileSystem.mkdirs(snapshotsDirPath);
+
+    _writer.createSnapshot("test_snapshot1");
+    _writer.createSnapshot("test_snapshot2");
+
+    // re-load the writer to load the snpshots
+    _writer.close();
+    setupWriter(_configuration, 5, true);
+    assertEquals(2, _writer.getSnapshots().size());
+
+    _writer.removeSnapshot("test_snapshot2");
+    assertEquals(1, _writer.getSnapshots().size());
+    assertTrue(!_writer.getSnapshots().contains("test_snapshot2"));
+
+  }
 }


Mime
View raw message