incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/5] git commit: Package change.
Date Wed, 31 Oct 2012 01:14:49 GMT
Updated Branches:
  refs/heads/0.2-dev e50119e41 -> 8d173335a


Package change.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/8d173335
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/8d173335
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/8d173335

Branch: refs/heads/0.2-dev
Commit: 8d173335adcaae7764ff1b0b04acc3dbf32a7ed1
Parents: 0e8a136
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Oct 30 21:14:19 2012 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Oct 30 21:14:19 2012 -0400

----------------------------------------------------------------------
 .../refcounter/DirectoryReferenceCounter.java      |  269 +++++++++++++++
 .../store/refcounter/DirectoryReferenceFileGC.java |   98 ++++++
 2 files changed, 367 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8d173335/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
b/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
new file mode 100644
index 0000000..baade2f
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
@@ -0,0 +1,269 @@
+package org.apache.blur.lucene.store.refcounter;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+public class DirectoryReferenceCounter extends Directory {
+
+  private final static Log LOG = LogFactory.getLog(DirectoryReferenceCounter.class);
+  private Directory directory;
+  private Map<String, AtomicInteger> refs = new ConcurrentHashMap<String, AtomicInteger>();
+  private DirectoryReferenceFileGC gc;
+
+  public DirectoryReferenceCounter(Directory directory, DirectoryReferenceFileGC gc) {
+    this.directory = directory;
+    this.gc = gc;
+  }
+
+  public void deleteFile(String name) throws IOException {
+    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
+      deleteFile(name);
+      return;
+    }
+    AtomicInteger counter = refs.get(name);
+    if (counter != null && counter.get() > 0) {
+      addToFileGC(name);
+    } else {
+      LOG.debug("Delete file [{0}]", name);
+      directory.deleteFile(name);
+    }
+  }
+
+  @Override
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
+      return directory.createOutput(name, context);
+    }
+    LOG.debug("Create file [{0}]", name);
+    AtomicInteger counter = refs.get(name);
+    if (counter != null) {
+      LOG.error("Unknown error while trying to create ref counter for [{0}] reference exists.",
name);
+      throw new IOException("Reference exists [" + name + "]");
+    }
+    counter = new AtomicInteger(0);
+    refs.put(name, counter);
+    return directory.createOutput(name, context);
+  }
+
+  @Override
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    IndexInput input = directory.openInput(name, context);
+    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
+      return input;
+    }
+    return wrap(name, input);
+  }
+
+  public static class RefIndexInput extends IndexInput {
+
+    private IndexInput input;
+    private AtomicInteger ref;
+    private boolean closed = false;
+
+    public RefIndexInput(String resourceDescription, IndexInput input, AtomicInteger ref)
{
+      super(resourceDescription);
+      this.input = input;
+      this.ref = ref;
+      ref.incrementAndGet();
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+      // Seems like not all the clones are being closed...
+      close();
+    }
+
+    @Override
+    public RefIndexInput clone() {
+      RefIndexInput ref = (RefIndexInput) super.clone();
+      ref.input = (IndexInput) input.clone();
+      ref.ref.incrementAndGet();
+      return ref;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (!closed) {
+        input.close();
+        ref.decrementAndGet();
+        closed = true;
+      }
+    }
+
+    @Override
+    public short readShort() throws IOException {
+      return input.readShort();
+    }
+
+    @Override
+    public int readInt() throws IOException {
+      return input.readInt();
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      input.seek(pos);
+    }
+
+    @Override
+    public int readVInt() throws IOException {
+      return input.readVInt();
+    }
+
+    @Override
+    public String toString() {
+      return input.toString();
+    }
+
+    @Override
+    public long readLong() throws IOException {
+      return input.readLong();
+    }
+
+    @Override
+    public long readVLong() throws IOException {
+      return input.readVLong();
+    }
+
+    @Override
+    public String readString() throws IOException {
+      return input.readString();
+    }
+
+    @Override
+    public long getFilePointer() {
+      return input.getFilePointer();
+    }
+
+    @Override
+    public byte readByte() throws IOException {
+      return input.readByte();
+    }
+
+    @Override
+    public void readBytes(byte[] b, int offset, int len) throws IOException {
+      input.readBytes(b, offset, len);
+    }
+
+    @Override
+    public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException
{
+      input.readBytes(b, offset, len, useBuffer);
+    }
+
+    @Override
+    public long length() {
+      return input.length();
+    }
+
+    @Override
+    public Map<String, String> readStringStringMap() throws IOException {
+      return input.readStringStringMap();
+    }
+
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    directory.sync(names);
+  }
+
+  @Override
+  public void clearLock(String name) throws IOException {
+    directory.clearLock(name);
+  }
+
+  @Override
+  public void close() throws IOException {
+    directory.close();
+  }
+
+  @Override
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    directory.setLockFactory(lockFactory);
+  }
+
+  @Override
+  public String getLockID() {
+    return directory.getLockID();
+  }
+
+  @Override
+  public void copy(Directory to, String src, String dest, IOContext context) throws IOException
{
+    directory.copy(to, src, dest, context);
+  }
+
+  @Override
+  public boolean fileExists(String name) throws IOException {
+    return directory.fileExists(name);
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    return directory.fileLength(name);
+  }
+
+  @Override
+  public LockFactory getLockFactory() {
+    return directory.getLockFactory();
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    return directory.listAll();
+  }
+
+  @Override
+  public Lock makeLock(String name) {
+    return directory.makeLock(name);
+  }
+
+  @Override
+  public String toString() {
+    return directory.toString();
+  }
+
+  private void addToFileGC(String name) {
+    if (gc != null) {
+      LOG.debug("Add file [{0}] to be GCed once refs are closed.", name);
+      gc.add(directory, name, refs);
+    }
+  }
+
+  private IndexInput wrap(String name, IndexInput input) {
+    AtomicInteger counter = refs.get(name);
+    if (counter == null) {
+      counter = new AtomicInteger();
+      refs.put(name, counter);
+    }
+    return new RefIndexInput(input.toString(), input, counter);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8d173335/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
b/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
new file mode 100644
index 0000000..ab85f87
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
@@ -0,0 +1,98 @@
+package org.apache.blur.lucene.store.refcounter;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.lucene.store.Directory;
+
+
+public class DirectoryReferenceFileGC extends TimerTask {
+
+  private static final Log LOG = LogFactory.getLog(DirectoryReferenceFileGC.class);
+
+  private Timer _timer;
+  private long _delay = 5000;
+  private LinkedBlockingQueue<Value> _queue;
+
+  public static class Value {
+    public Value(Directory directory, String name, Map<String, AtomicInteger> refs)
{
+      this.directory = directory;
+      this.name = name;
+      this.refs = refs;
+    }
+
+    Directory directory;
+    String name;
+    Map<String, AtomicInteger> refs;
+
+    public boolean tryToDelete() throws IOException {
+      AtomicInteger counter = refs.get(name);
+      if (counter.get() <= 0) {
+        refs.remove(name);
+        LOG.debug("Removing file [{0}]", name);
+        directory.deleteFile(name);
+        return true;
+      } else {
+        LOG.debug("File [{0}] had too many refs [{1}]", name, counter.get());
+      }
+      return false;
+    }
+  }
+
+  public void init() {
+    _timer = new Timer("Blur-File-GC", true);
+    _timer.scheduleAtFixedRate(this, _delay, _delay);
+    _queue = new LinkedBlockingQueue<Value>();
+  }
+
+  public void add(Directory directory, String name, Map<String, AtomicInteger> refs)
{
+    try {
+      _queue.put(new Value(directory, name, refs));
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void close() {
+    _timer.purge();
+    _timer.cancel();
+  }
+
+  @Override
+  public void run() {
+    Iterator<Value> iterator = _queue.iterator();
+    while (iterator.hasNext()) {
+      Value value = iterator.next();
+      try {
+        if (value.tryToDelete()) {
+          iterator.remove();
+        }
+      } catch (IOException e) {
+        LOG.error("Unknown error", e);
+      }
+    }
+  }
+}


Mime
View raw message