incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/3] git commit: Updating some of the logic in the HDFSKeyValueStore also adding the scanning code.
Date Sun, 23 Feb 2014 02:47:34 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/apache-blur-0.2 dffa41120 -> 052c131e9


Updating some of the logic in the HDFSKeyValueStore also adding the scanning code.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/044923c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/044923c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/044923c2

Branch: refs/heads/apache-blur-0.2
Commit: 044923c2b1b4cb7db8d0d490b74ecf389d27c0d1
Parents: dffa411
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat Feb 22 20:44:35 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat Feb 22 20:44:35 2014 -0500

----------------------------------------------------------------------
 .../writer/DirectoryReferenceCounterTest.java   | 340 -------------------
 .../refcounter/DirectoryReferenceCounter.java   | 274 ---------------
 .../refcounter/DirectoryReferenceFileGC.java    | 119 -------
 .../store/refcounter/IndexInputReference.java   |  79 -----
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   |  62 +++-
 .../store/hdfs_v2/HdfsKeyValueStoreTest.java    |  73 +++-
 6 files changed, 131 insertions(+), 816 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/044923c2/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java
deleted file mode 100644
index ac31918..0000000
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java
+++ /dev/null
@@ -1,340 +0,0 @@
-package org.apache.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter;
-import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
-import org.apache.lucene.analysis.core.KeywordAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.FieldType;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockFactory;
-import org.apache.lucene.store.LockObtainFailedException;
-import org.apache.lucene.store.RAMDirectory;
-import org.junit.Test;
-
-public class DirectoryReferenceCounterTest {
-
-  @Test
-  public void testDirectoryReferenceCounterTestError() throws CorruptIndexException, IOException
{
-    Directory directory = wrap(new RAMDirectory());
-    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, new KeywordAnalyzer());
-    IndexWriter writer = new IndexWriter(directory, conf);
-    int size = 100;
-    IndexReader[] readers = new IndexReader[size];
-    for (int i = 0; i < size; i++) {
-      writer.addDocument(getDoc());
-      readers[i] = DirectoryReader.open(writer, true);
-      writer.forceMerge(1);
-    }
-
-    try {
-      for (int i = 0; i < size; i++) {
-        checkReader(readers[i], i);
-      }
-      fail();
-    } catch (Exception e) {
-      // should error
-    }
-  }
-
-  @Test
-  public void testDirectoryReferenceCounter() throws CorruptIndexException, LockObtainFailedException,
IOException, InterruptedException {
-    Directory directory = wrap(new RAMDirectory());
-    DirectoryReferenceFileGC gc = new DirectoryReferenceFileGC();
-    DirectoryReferenceCounter counter = new DirectoryReferenceCounter(directory, gc);
-    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, new KeywordAnalyzer());
-    IndexWriter writer = new IndexWriter(counter, conf);
-    int size = 100;
-    IndexReader[] readers = new IndexReader[size];
-    for (int i = 0; i < size; i++) {
-      writer.addDocument(getDoc());
-      writer.forceMerge(1);
-      readers[i] = DirectoryReader.open(writer, true);
-    }
-
-    for (int i = 0; i < size; i++) {
-      assertEquals(i + 1, readers[i].numDocs());
-      checkReader(readers[i], i);
-    }
-
-    String[] listAll = directory.listAll();
-
-    for (int i = 0; i < size - 1; i++) {
-      readers[i].close();
-    }
-
-    for (int i = 0; i < 1000; i++) {
-      gc.run();
-      Thread.sleep(1);
-    }
-
-    IndexReader last = readers[size - 1];
-
-    assertEquals(100, last.numDocs());
-
-    assertTrue(listAll.length > directory.listAll().length);
-
-    last.close();
-    writer.close();
-    gc.close();
-  }
-
-  private Document getDoc() {
-    Document document = new Document();
-    FieldType type = new FieldType();
-    type.setIndexed(true);
-    type.setOmitNorms(true);
-    type.setTokenized(false);
-    type.setStored(true);
-    document.add(new Field("id", "value", type));
-    return document;
-  }
-
-  private void checkReader(IndexReader indexReader, int size) throws CorruptIndexException,
IOException {
-    for (int i = 0; i < size; i++) {
-      Document document = indexReader.document(i);
-      String value = document.get("id");
-      assertEquals("value", value);
-    }
-  }
-
-  // This class is use simulate what would happen with a directory that will
-  // forcefully delete files even if they are still in use. e.g. HDFSDirectory
-  public static Directory wrap(final RAMDirectory ramDirectory) {
-    return new Directory() {
-      private Directory d = ramDirectory;
-      private Collection<String> deletedFiles = new LinkedBlockingQueue<String>();
-
-      @Override
-      public void deleteFile(String name) throws IOException {
-        deletedFiles.add(name);
-        d.deleteFile(name);
-      }
-
-      @Override
-      public IndexOutput createOutput(String name, IOContext context) throws IOException
{
-        return d.createOutput(name, context);
-      }
-
-      @Override
-      public void sync(Collection<String> names) throws IOException {
-        d.sync(names);
-      }
-
-      @Override
-      public IndexInput openInput(String name, IOContext context) throws IOException {
-        return wrap(d.openInput(name, context), deletedFiles, name);
-      }
-
-      @Override
-      public void clearLock(String name) throws IOException {
-        d.clearLock(name);
-      }
-
-      @Override
-      public void close() throws IOException {
-        d.close();
-      }
-
-      @Override
-      public void setLockFactory(LockFactory lockFactory) throws IOException {
-        d.setLockFactory(lockFactory);
-      }
-
-      @Override
-      public String getLockID() {
-        return d.getLockID();
-      }
-
-      @Override
-      public boolean equals(Object arg0) {
-        return d.equals(arg0);
-      }
-
-      @Override
-      public boolean fileExists(String name) throws IOException {
-        return d.fileExists(name);
-      }
-
-      @Override
-      public long fileLength(String name) throws IOException {
-        return d.fileLength(name);
-      }
-
-      @Override
-      public LockFactory getLockFactory() {
-        return d.getLockFactory();
-      }
-
-      @Override
-      public int hashCode() {
-        return d.hashCode();
-      }
-
-      @Override
-      public String[] listAll() throws IOException {
-        return d.listAll();
-      }
-
-      @Override
-      public Lock makeLock(String name) {
-        return d.makeLock(name);
-      }
-
-      @Override
-      public String toString() {
-        return d.toString();
-      }
-    };
-  }
-
-  public static IndexInput wrap(final IndexInput input, final Collection<String> deletedFiles,
final String name) {
-    return new IndexInput(input.toString()) {
-      private IndexInput in = input;
-
-      private void checkForDeleted() throws IOException {
-        if (deletedFiles.contains(name)) {
-          throw new IOException("File [" + name + "] does not exist");
-        }
-      }
-
-      @Override
-      public void close() throws IOException {
-        checkForDeleted();
-        in.close();
-      }
-
-      @Override
-      public short readShort() throws IOException {
-        checkForDeleted();
-        return in.readShort();
-      }
-
-      @Override
-      public void seek(long pos) throws IOException {
-        checkForDeleted();
-        in.seek(pos);
-      }
-
-      @Override
-      public int readInt() throws IOException {
-        checkForDeleted();
-        return in.readInt();
-      }
-
-      @Override
-      public int readVInt() throws IOException {
-        checkForDeleted();
-        return in.readVInt();
-      }
-
-      @Override
-      public String toString() {
-        return in.toString();
-      }
-
-      @Override
-      public long readLong() throws IOException {
-        checkForDeleted();
-        return in.readLong();
-      }
-
-      @Override
-      public long readVLong() throws IOException {
-        checkForDeleted();
-        return in.readVLong();
-      }
-
-      @Override
-      public String readString() throws IOException {
-        checkForDeleted();
-        return in.readString();
-      }
-
-      @Override
-      public IndexInput clone() {
-        return super.clone();
-      }
-
-      @Override
-      public boolean equals(Object obj) {
-        return in.equals(obj);
-      }
-
-      @Override
-      public long getFilePointer() {
-        return in.getFilePointer();
-      }
-
-      @Override
-      public int hashCode() {
-        return in.hashCode();
-      }
-
-      @Override
-      public byte readByte() throws IOException {
-        checkForDeleted();
-        return in.readByte();
-      }
-
-      @Override
-      public void readBytes(byte[] b, int offset, int len) throws IOException {
-        checkForDeleted();
-        in.readBytes(b, offset, len);
-      }
-
-      @Override
-      public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException
{
-        checkForDeleted();
-        in.readBytes(b, offset, len, useBuffer);
-      }
-
-      @Override
-      public long length() {
-        return in.length();
-      }
-
-      @Override
-      public Map<String, String> readStringStringMap() throws IOException {
-        checkForDeleted();
-        return in.readStringStringMap();
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/044923c2/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
b/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
deleted file mode 100644
index cca20f3..0000000
--- a/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
+++ /dev/null
@@ -1,274 +0,0 @@
-package org.apache.blur.lucene.store.refcounter;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.store.hdfs.DirectoryDecorator;
-import org.apache.lucene.index.IndexFileNames;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockFactory;
-
-public class DirectoryReferenceCounter extends Directory implements DirectoryDecorator {
-
-  private final static Log LOG = LogFactory.getLog(DirectoryReferenceCounter.class);
-  private final Directory _directory;
-  private final Map<String, AtomicInteger> _refCounters = new ConcurrentHashMap<String,
AtomicInteger>();
-  private final DirectoryReferenceFileGC _gc;
-
-  public DirectoryReferenceCounter(Directory directory, DirectoryReferenceFileGC gc) {
-    _directory = directory;
-    _gc = gc;
-  }
-
-  private IndexInput wrap(String name, IndexInput input) {
-    AtomicInteger counter = _refCounters.get(name);
-    if (counter == null) {
-      counter = new AtomicInteger();
-      _refCounters.put(name, counter);
-    }
-    return new RefIndexInput(input, counter);
-  }
-
-  public void deleteFile(String name) throws IOException {
-    LOG.debug("deleteFile [{0}] being called", name);
-    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
-      _directory.deleteFile(name);
-      return;
-    }
-    AtomicInteger counter = _refCounters.get(name);
-    if (counter != null && counter.get() > 0) {
-      addToFileGC(name);
-    } else {
-      LOG.debug("Delete file [{0}]", name);
-      _directory.deleteFile(name);
-    }
-  }
-
-  @Override
-  public IndexOutput createOutput(String name, IOContext context) throws IOException {
-    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
-      return _directory.createOutput(name, context);
-    }
-    LOG.debug("Create file [{0}]", name);
-    AtomicInteger counter = _refCounters.get(name);
-    if (counter != null) {
-      LOG.error("Unknown error while trying to create ref counter for [{0}] reference exists.",
name);
-      throw new IOException("Reference exists [" + name + "]");
-    }
-    _refCounters.put(name, new AtomicInteger(0));
-    return _directory.createOutput(name, context);
-  }
-
-  @Override
-  public IndexInput openInput(String name, IOContext context) throws IOException {
-    IndexInput input = _directory.openInput(name, context);
-    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
-      return input;
-    }
-    return wrap(name, input);
-  }
-
-  public static class RefIndexInput extends IndexInputReference {
-
-    private IndexInput input;
-    private AtomicInteger ref;
-
-    public RefIndexInput(IndexInput input, AtomicInteger ref) {
-      super("RefIndexInput(" + input.toString() + ")");
-      this.input = input;
-      this.ref = ref;
-      ref.incrementAndGet();
-    }
-
-    @Override
-    public short readShort() throws IOException {
-      return input.readShort();
-    }
-
-    @Override
-    public int readInt() throws IOException {
-      return input.readInt();
-    }
-
-    @Override
-    public void seek(long pos) throws IOException {
-      input.seek(pos);
-    }
-
-    @Override
-    public int readVInt() throws IOException {
-      return input.readVInt();
-    }
-
-    @Override
-    public String toString() {
-      return input.toString();
-    }
-
-    @Override
-    public long readLong() throws IOException {
-      return input.readLong();
-    }
-
-    @Override
-    public long readVLong() throws IOException {
-      return input.readVLong();
-    }
-
-    @Override
-    public String readString() throws IOException {
-      return input.readString();
-    }
-
-    @Override
-    public long getFilePointer() {
-      return input.getFilePointer();
-    }
-
-    @Override
-    public byte readByte() throws IOException {
-      return input.readByte();
-    }
-
-    @Override
-    public void readBytes(byte[] b, int offset, int len) throws IOException {
-      input.readBytes(b, offset, len);
-    }
-
-    @Override
-    public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException
{
-      input.readBytes(b, offset, len, useBuffer);
-    }
-
-    @Override
-    public long length() {
-      return input.length();
-    }
-
-    @Override
-    public Map<String, String> readStringStringMap() throws IOException {
-      return input.readStringStringMap();
-    }
-
-    @Override
-    public RefIndexInput clone() {
-      RefIndexInput refIndexInput = (RefIndexInput) super.clone();
-      refIndexInput.input = (IndexInput) input.clone();
-      refIndexInput.ref.incrementAndGet();
-      return refIndexInput;
-    }
-
-    @Override
-    protected void closeBase() throws IOException {
-      input.close();
-      ref.decrementAndGet();
-      _isClosed = true;
-    }
-
-    @Override
-    protected void closeClone() throws IOException {
-      input.close();
-      ref.decrementAndGet();
-      _isClosed = true;
-    }
-
-  }
-
-  @Override
-  public void sync(Collection<String> names) throws IOException {
-    _directory.sync(names);
-  }
-
-  @Override
-  public void clearLock(String name) throws IOException {
-    _directory.clearLock(name);
-  }
-
-  @Override
-  public void close() throws IOException {
-    _directory.close();
-  }
-
-  @Override
-  public void setLockFactory(LockFactory lockFactory) throws IOException {
-    _directory.setLockFactory(lockFactory);
-  }
-
-  @Override
-  public String getLockID() {
-    return _directory.getLockID();
-  }
-
-  @Override
-  public void copy(Directory to, String src, String dest, IOContext context) throws IOException
{
-    _directory.copy(to, src, dest, context);
-  }
-
-  @Override
-  public boolean fileExists(String name) throws IOException {
-    return _directory.fileExists(name);
-  }
-
-  @Override
-  public long fileLength(String name) throws IOException {
-    return _directory.fileLength(name);
-  }
-
-  @Override
-  public LockFactory getLockFactory() {
-    return _directory.getLockFactory();
-  }
-
-  @Override
-  public String[] listAll() throws IOException {
-    return _directory.listAll();
-  }
-
-  @Override
-  public Lock makeLock(String name) {
-    return _directory.makeLock(name);
-  }
-
-  @Override
-  public String toString() {
-    return _directory.toString();
-  }
-
-  private void addToFileGC(String name) {
-    if (_gc != null) {
-      LOG.debug("Add file [{0}] to be GCed once refs are closed.", name);
-      _gc.add(_directory, name, _refCounters);
-    }
-  }
-
-  @Override
-  public Directory getOriginalDirectory() {
-    return _directory;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/044923c2/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
b/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
deleted file mode 100644
index 6522d6a..0000000
--- a/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package org.apache.blur.lucene.store.refcounter;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.Closeable;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.lucene.store.Directory;
-
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.MetricName;
-
-import static org.apache.blur.metrics.MetricsConstants.*;
-
-public class DirectoryReferenceFileGC extends TimerTask implements Closeable {
-
-  private static final Log LOG = LogFactory.getLog(DirectoryReferenceFileGC.class);
-
-  private Timer _timer;
-  private long _delay = 5000;
-  private LinkedBlockingQueue<Value> _queue;
-  private volatile int numberOfFilesToBeDeleted = 0;
-
-  public static class Value {
-    public Value(Directory directory, String name, Map<String, AtomicInteger> refs)
{
-      this.directory = directory;
-      this.name = name;
-      this.refs = refs;
-    }
-
-    Directory directory;
-    String name;
-    Map<String, AtomicInteger> refs;
-
-    public boolean tryToDelete() throws IOException {
-      AtomicInteger counter = refs.get(name);
-      if (counter == null || counter.get() <= 0) {
-        refs.remove(name);
-        LOG.debug("Removing file [{0}]", name);
-        directory.deleteFile(name);
-        return true;
-      } else {
-        LOG.debug("File [{0}] had too many refs [{1}]", name, counter.get());
-        return false;
-      }
-    }
-  }
-
-  public DirectoryReferenceFileGC() {
-    _timer = new Timer("Blur-File-GC", true);
-    _timer.scheduleAtFixedRate(this, _delay, _delay);
-    _queue = new LinkedBlockingQueue<Value>();
-    Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, LUCENE, FILES_IN_QUEUE_TO_BE_DELETED),
new Gauge<Integer>() {
-      @Override
-      public Integer value() {
-        return numberOfFilesToBeDeleted;
-      }
-    });
-  }
-
-  public void add(Directory directory, String name, Map<String, AtomicInteger> refs)
{
-    try {
-      _queue.put(new Value(directory, name, refs));
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public void close() {
-    _timer.purge();
-    _timer.cancel();
-  }
-
-  @Override
-  public void run() {
-    Iterator<Value> iterator = _queue.iterator();
-    int count = 0;
-    while (iterator.hasNext()) {
-      Value value = iterator.next();
-      try {
-        if (value.tryToDelete()) {
-          iterator.remove();
-        } else {
-          count++;
-        }
-      } catch (FileNotFoundException e) {
-        LOG.error("File [{0}] already deleted.", value);
-        iterator.remove();
-      } catch (IOException e) {
-        LOG.error("Unknown error", e);
-      }
-    }
-    numberOfFilesToBeDeleted = count;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/044923c2/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputReference.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputReference.java
b/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputReference.java
deleted file mode 100644
index 12fd562..0000000
--- a/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputReference.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.lucene.store.refcounter;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.util.WeakIdentityMap;
-
-public abstract class IndexInputReference extends IndexInput {
-
-  protected boolean _isClosed;
-  protected boolean _isClone;
-  private final WeakIdentityMap<IndexInputReference, Boolean> _clones;
-
-  protected IndexInputReference(String resourceDescription) {
-    super(resourceDescription);
-    _isClone = false;
-    _isClosed = false;
-    _clones = WeakIdentityMap.<IndexInputReference, Boolean> newConcurrentHashMap();
-  }
-
-  @Override
-  public final void close() throws IOException {
-    _clones.remove(this);
-    if (_isClone) {
-      closeClone();
-      return;
-    }
-    for (Iterator<IndexInputReference> it = _clones.keyIterator(); it.hasNext();) {
-      closeCloneQuietly(it.next());
-    }
-    _clones.clear();
-    closeBase();
-  }
-
-  private static void closeCloneQuietly(IndexInputReference ref) {
-    try {
-      if (ref != null) {
-        ref.closeClone();
-      }
-    } catch (IOException ioe) {
-      // ignore
-    }
-  }
-
-  @Override
-  public IndexInput clone() {
-    IndexInputReference clone = (IndexInputReference) super.clone();
-    clone._isClone = true;
-    _clones.put(clone, Boolean.TRUE);
-    return clone;
-  }
-
-  @Override
-  protected final void finalize() throws Throwable {
-    close();
-  }
-
-  protected abstract void closeBase() throws IOException;
-
-  protected abstract void closeClone() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/044923c2/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
index 51997c7..ec469f3 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
@@ -45,9 +46,11 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient.DFSInputStream;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.lucene.util.BytesRef;
 
 public class HdfsKeyValueStore implements Store {
@@ -157,6 +160,10 @@ public class HdfsKeyValueStore implements Store {
     _writeLock.lock();
     try {
       syncInternal();
+    } catch (RemoteException e) {
+      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.", e);
+    } catch (LeaseExpiredException e) {
+      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.", e);
     } finally {
       _writeLock.unlock();
     }
@@ -165,8 +172,51 @@ public class HdfsKeyValueStore implements Store {
   @Override
   public Iterable<Entry<BytesRef, BytesRef>> scan(BytesRef key) throws IOException
{
     ensureOpen();
-    // to do
-    return null;
+    if (key == null) {
+      key = _pointers.firstKey();
+    }
+    ConcurrentNavigableMap<BytesRef, Value> tailMap = _pointers.tailMap(key, true);
+    final Set<Entry<BytesRef, Value>> entrySet = tailMap.entrySet();
+    return new Iterable<Entry<BytesRef, BytesRef>>() {
+      @Override
+      public Iterator<Entry<BytesRef, BytesRef>> iterator() {
+        final Iterator<Entry<BytesRef, Value>> iterator = entrySet.iterator();
+        return new Iterator<Entry<BytesRef, BytesRef>>() {
+
+          @Override
+          public boolean hasNext() {
+            return iterator.hasNext();
+          }
+
+          @Override
+          public Entry<BytesRef, BytesRef> next() {
+            final Entry<BytesRef, Value> e = iterator.next();
+            return new Entry<BytesRef, BytesRef>() {
+
+              @Override
+              public BytesRef setValue(BytesRef value) {
+                throw new RuntimeException("Read only.");
+              }
+
+              @Override
+              public BytesRef getValue() {
+                return e.getValue()._bytesRef;
+              }
+
+              @Override
+              public BytesRef getKey() {
+                return e.getKey();
+              }
+            };
+          }
+
+          @Override
+          public void remove() {
+            throw new RuntimeException("Read only.");
+          }
+        };
+      }
+    };
   }
 
   @Override
@@ -184,6 +234,10 @@ public class HdfsKeyValueStore implements Store {
       if (!path.equals(_outputPath)) {
         cleanupOldFiles();
       }
+    } catch (RemoteException e) {
+      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.", e);
+    } catch (LeaseExpiredException e) {
+      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.", e);
     } finally {
       _writeLock.unlock();
     }
@@ -262,6 +316,10 @@ public class HdfsKeyValueStore implements Store {
       Operation op = getDeleteOperation(OperationType.DELETE, key);
       write(op);
       _pointers.remove(key);
+    } catch (RemoteException e) {
+      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.", e);
+    } catch (LeaseExpiredException e) {
+      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.", e);
     } finally {
       _writeLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/044923c2/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
index 4c098f4..b9ca41b 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
@@ -16,12 +16,12 @@
  */
 package org.apache.blur.store.hdfs_v2;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.*;
 
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.util.Map.Entry;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.blur.log.Log;
@@ -141,6 +141,75 @@ public class HdfsKeyValueStoreTest {
     store.close();
   }
 
+//  @Test
+  public void testTwoKeyStoreInstancesWritingAtTheSameTime() throws IOException {
+    HdfsKeyValueStore store1 = new HdfsKeyValueStore(_configuration, _path);
+    listFiles();
+    store1.put(new BytesRef("a1"), new BytesRef(new byte[2000]));
+    listFiles();
+    HdfsKeyValueStore store2 = new HdfsKeyValueStore(_configuration, _path);
+    listFiles();
+    store2.put(new BytesRef("a1"), new BytesRef(new byte[1000]));
+    listFiles();
+    store1.put(new BytesRef("a2"), new BytesRef(new byte[2000]));
+    listFiles();
+    store2.put(new BytesRef("a2"), new BytesRef(new byte[1000]));
+    listFiles();
+    store1.put(new BytesRef("a3"), new BytesRef(new byte[2000]));
+    listFiles();
+    store2.put(new BytesRef("a3"), new BytesRef(new byte[1000]));
+    listFiles();
+    try {
+      store1.sync();
+      fail();
+    } catch (Exception e) {
+
+    }
+    store2.sync();
+    store1.close();
+    store2.close();
+
+    HdfsKeyValueStore store3 = new HdfsKeyValueStore(_configuration, _path);
+    Iterable<Entry<BytesRef, BytesRef>> scan = store3.scan(null);
+    for (Entry<BytesRef, BytesRef> e : scan) {
+      System.out.println(e.getValue().length);
+    }
+    store3.close();
+  }
+
+  @Test
+  public void testTwoKeyStoreInstancesWritingAtTheSameTimeSmallFiles() throws IOException
{
+    HdfsKeyValueStore store1 = new HdfsKeyValueStore(_configuration, _path, 1000);
+    store1.put(new BytesRef("a1"), new BytesRef(new byte[2000]));
+    HdfsKeyValueStore store2 = new HdfsKeyValueStore(_configuration, _path, 1000);
+    store2.put(new BytesRef("a1"), new BytesRef(new byte[1000]));
+    try {
+      store1.put(new BytesRef("a2"), new BytesRef(new byte[2000]));
+      fail();
+    } catch (Exception e) {
+      // Should throw exception
+      store1.close();
+    }
+    store2.put(new BytesRef("a2"), new BytesRef(new byte[1000]));
+    store2.put(new BytesRef("a3"), new BytesRef(new byte[1000]));
+    store2.sync();
+    store2.close();
+
+    HdfsKeyValueStore store3 = new HdfsKeyValueStore(_configuration, _path);
+    Iterable<Entry<BytesRef, BytesRef>> scan = store3.scan(null);
+    for (Entry<BytesRef, BytesRef> e : scan) {
+      System.out.println(e.getValue().length);
+    }
+    store3.close();
+  }
+
+  private void listFiles() throws IOException {
+    FileSystem fileSystem = _path.getFileSystem(_configuration);
+    for (FileStatus status : fileSystem.listStatus(_path)) {
+      System.out.println(status.getPath() + " " + status.getLen());
+    }
+  }
+
   private BytesRef toBytesRef(String s) {
     return new BytesRef(s);
   }


Mime
View raw message