incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: More clean of kvstore.
Date Mon, 15 Jun 2015 13:18:10 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master f88a9ef5c -> 90625d023


More clean of kvstore.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/61070469
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/61070469
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/61070469

Branch: refs/heads/master
Commit: 61070469bd0f73f42c4b12770d8d3d4c88fd0e51
Parents: f88a9ef
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun Jun 14 16:00:40 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Jun 14 16:00:40 2015 -0400

----------------------------------------------------------------------
 .../org/apache/blur/kvs/HdfsKeyValueStore.java  | 67 +++++++++++++++-----
 .../main/java/org/apache/blur/kvs/Store.java    |  2 +
 .../org/apache/blur/store/hdfs_v2/Store.java    | 39 ------------
 3 files changed, 52 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/61070469/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsKeyValueStore.java b/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsKeyValueStore.java
index da93044..227d620 100644
--- a/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsKeyValueStore.java
+++ b/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsKeyValueStore.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.NavigableMap;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.Timer;
@@ -141,12 +142,13 @@ public class HdfsKeyValueStore implements Store {
   private final TimerTask _oldFileCleanerTimerTask;
   private final AtomicLong _lastWrite = new AtomicLong();
   private final Timer _hdfsKeyValueTimer;
-  private final long _maxOpenForWriting;
+  private final long _maxTimeOpenForWriting;
   private final boolean _readOnly;
 
   private FSDataOutputStream _output;
   private Path _outputPath;
   private boolean _isClosed;
+  
 
   public HdfsKeyValueStore(boolean readOnly, Timer hdfsKeyValueTimer, Configuration configuration,
Path path)
       throws IOException {
@@ -160,9 +162,9 @@ public class HdfsKeyValueStore implements Store {
   }
 
   public HdfsKeyValueStore(boolean readOnly, Timer hdfsKeyValueTimer, Configuration configuration,
Path path,
-      long maxAmountAllowedPerFile, long maxOpenForWriting) throws IOException {
+      long maxAmountAllowedPerFile, long maxTimeOpenForWriting) throws IOException {
     _readOnly = readOnly;
-    _maxOpenForWriting = maxOpenForWriting;
+    _maxTimeOpenForWriting = maxTimeOpenForWriting;
     _maxAmountAllowedPerFile = maxAmountAllowedPerFile;
     _path = path;
     _fileSystem = _path.getFileSystem(configuration);
@@ -247,9 +249,9 @@ public class HdfsKeyValueStore implements Store {
     try {
       syncInternal();
     } catch (RemoteException e) {
-      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.", e);
+      throw new IOException("Another HDFS KeyStore has taken ownership of this key value
store.", e);
     } catch (LeaseExpiredException e) {
-      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.", e);
+      throw new IOException("Another HDFS KeyStore has taken ownership of this key value
store.", e);
     } finally {
       _writeLock.unlock();
     }
@@ -258,11 +260,29 @@ public class HdfsKeyValueStore implements Store {
   @Override
   public Iterable<Entry<BytesRef, BytesRef>> scan(BytesRef key) throws IOException
{
     ensureOpen();
+    NavigableMap<BytesRef, Value> pointers = createSnapshot();
+    return getIterable(key, pointers);
+  }
+
+  private Iterable<Entry<BytesRef, BytesRef>> getIterable(BytesRef key, NavigableMap<BytesRef,
Value> pointers) {
     if (key == null) {
-      key = _pointers.firstKey();
+      key = pointers.firstKey();
+    }
+    NavigableMap<BytesRef, Value> tailMap = pointers.tailMap(key, true);
+    return getIterable(tailMap);
+  }
+
+  private NavigableMap<BytesRef, Value> createSnapshot() {
+    _writeLock.lock();
+    try {
+      return new ConcurrentSkipListMap<BytesRef, Value>(_pointers);
+    } finally {
+      _writeLock.unlock();
     }
-    ConcurrentNavigableMap<BytesRef, Value> tailMap = _pointers.tailMap(key, true);
-    final Set<Entry<BytesRef, Value>> entrySet = tailMap.entrySet();
+  }
+
+  private Iterable<Entry<BytesRef, BytesRef>> getIterable(NavigableMap<BytesRef,
Value> map) {
+    final Set<Entry<BytesRef, Value>> entrySet = map.entrySet();
     return new Iterable<Entry<BytesRef, BytesRef>>() {
       @Override
       public Iterator<Entry<BytesRef, BytesRef>> iterator() {
@@ -324,9 +344,9 @@ public class HdfsKeyValueStore implements Store {
         _size.addAndGet(-old._bytesRef.bytes.length);
       }
     } catch (RemoteException e) {
-      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.", e);
+      throw new IOException("Another HDFS KeyStore has taken ownership of this key value
store.", e);
     } catch (LeaseExpiredException e) {
-      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.", e);
+      throw new IOException("Another HDFS KeyStore has taken ownership of this key value
store.", e);
     } finally {
       _writeLock.unlock();
     }
@@ -390,7 +410,7 @@ public class HdfsKeyValueStore implements Store {
   private void closeLogFileIfIdle() throws IOException {
     _writeLock.lock();
     try {
-      if (_output != null && _lastWrite.get() + _maxOpenForWriting < System.currentTimeMillis())
{
+      if (_output != null && _lastWrite.get() + _maxTimeOpenForWriting < System.currentTimeMillis())
{
         // Close writer
         LOG.info("Closing KV log due to inactivity [{0}].", _path);
         try {
@@ -452,9 +472,9 @@ public class HdfsKeyValueStore implements Store {
         _size.addAndGet(-old._bytesRef.bytes.length);
       }
     } catch (RemoteException e) {
-      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.", e);
+      throw new IOException("Another HDFS KeyStore has taken ownership of this key value
store.", e);
     } catch (LeaseExpiredException e) {
-      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.", e);
+      throw new IOException("Another HDFS KeyStore has taken ownership of this key value
store.", e);
     } finally {
       _writeLock.unlock();
     }
@@ -534,9 +554,8 @@ public class HdfsKeyValueStore implements Store {
   }
 
   private void validateNextSegmentHasNotStarted() throws IOException {
-    Path p = getSegmentPath(_currentFileCounter.get() + 1);
-    if (_fileSystem.exists(p)) {
-      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.");
+    if (!isOwner()) {
+      throw new IOException("Another HDFS KeyStore has taken ownership of this key value
store.");
     }
   }
 
@@ -596,10 +615,24 @@ public class HdfsKeyValueStore implements Store {
     if (_fileSystem.exists(p)) {
       FileStatus[] listStatus = _fileSystem.listStatus(p);
       if (listStatus != null) {
-        return new TreeSet<FileStatus>(Arrays.asList(listStatus));
+        TreeSet<FileStatus> result = new TreeSet<FileStatus>();
+        for (FileStatus fileStatus : listStatus) {
+          if (!fileStatus.isDir()) {
+            result.add(fileStatus);
+          }
+        }
+        return result;
       }
     }
     return new TreeSet<FileStatus>();
   }
 
+  @Override
+  public boolean isOwner() throws IOException {
+    Path p = getSegmentPath(_currentFileCounter.get() + 1);
+    if (_fileSystem.exists(p)) {
+      return false;
+    }
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/61070469/blur-kvs/src/main/java/org/apache/blur/kvs/Store.java
----------------------------------------------------------------------
diff --git a/blur-kvs/src/main/java/org/apache/blur/kvs/Store.java b/blur-kvs/src/main/java/org/apache/blur/kvs/Store.java
index 3bfc3ec..71aba0e 100644
--- a/blur-kvs/src/main/java/org/apache/blur/kvs/Store.java
+++ b/blur-kvs/src/main/java/org/apache/blur/kvs/Store.java
@@ -22,6 +22,8 @@ import java.util.Map.Entry;
 
 public interface Store extends Closeable {
 
+  boolean isOwner() throws IOException;
+  
   void sync() throws IOException;
 
   Iterable<Entry<BytesRef, BytesRef>> scan(BytesRef key) throws IOException;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/61070469/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/Store.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/Store.java b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/Store.java
deleted file mode 100644
index 189585a..0000000
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/Store.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.store.hdfs_v2;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Map.Entry;
-
-import org.apache.lucene.util.BytesRef;
-
-public interface Store extends Closeable {
-
-  void sync() throws IOException;
-
-  Iterable<Entry<BytesRef, BytesRef>> scan(BytesRef key) throws IOException;
-
-  void put(BytesRef key, BytesRef value) throws IOException;
-
-  boolean get(BytesRef key, BytesRef value) throws IOException;
-
-  void delete(BytesRef key) throws IOException;
-
-  void close() throws IOException;
-
-}
\ No newline at end of file


Mime
View raw message