incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Extracting blur keyvalue store into it's own project.
Date Sat, 13 Jun 2015 00:57:55 GMT
Extracting blur keyvalue store into it's own project.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/f88a9ef5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/f88a9ef5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/f88a9ef5

Branch: refs/heads/master
Commit: f88a9ef5cadeb88ca1a2472b237ed03ea8cdc299
Parents: 4a5f070
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Jun 12 20:57:55 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Jun 12 20:57:55 2015 -0400

----------------------------------------------------------------------
 blur-kvs/pom.xml                                | 191 ++++++
 .../main/java/org/apache/blur/kvs/BytesRef.java | 129 ++++
 .../org/apache/blur/kvs/HdfsKeyValueStore.java  | 605 ++++++++++++++++++
 .../java/org/apache/blur/kvs/HdfsUtils.java     |  52 ++
 .../main/java/org/apache/blur/kvs/Store.java    |  37 ++
 .../org/apache/blur/HdfsMiniClusterUtil.java    | 132 ++++
 .../apache/blur/kvs/HdfsKeyValueStoreTest.java  | 269 ++++++++
 blur-store/pom.xml                              |  12 +
 .../hdfs_v2/FastHdfsKeyValueDirectory.java      |   3 +-
 .../hdfs_v2/FastHdfsKeyValueIndexInput.java     |   2 +-
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   | 615 -------------------
 .../org/apache/blur/HdfsMiniClusterUtil.java    | 132 ----
 .../hdfs_v2/FastHdfsKeyValueDirectoryTest.java  |   4 +-
 .../store/hdfs_v2/HdfsKeyValueStoreTest.java    | 270 --------
 pom.xml                                         |   3 +
 15 files changed, 1436 insertions(+), 1020 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f88a9ef5/blur-kvs/pom.xml
----------------------------------------------------------------------
diff --git a/blur-kvs/pom.xml b/blur-kvs/pom.xml
new file mode 100644
index 0000000..7fea5b8
--- /dev/null
+++ b/blur-kvs/pom.xml
@@ -0,0 +1,191 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	you under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.blur</groupId>
+		<artifactId>blur</artifactId>
+		<version>0.2.4-incubating-SNAPSHOT</version>
+		<relativePath>../pom.xml</relativePath>
+	</parent>
+	<groupId>org.apache.blur</groupId>
+	<artifactId>blur-kvs</artifactId>
+	<version>${projectVersion}</version>
+	<packaging>jar</packaging>
+	<name>Blur KeyValue Store</name>
+	<description>The Blur kvs module contains a transactional single writer embedded key value store.</description>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-util</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-cli</groupId>
+			<artifactId>commons-cli</artifactId>
+			<version>${commons-cli.version}</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
+			<artifactId>concurrentlinkedhashmap-lru</artifactId>
+			<version>${concurrentlinkedhashmap-lru.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+
+	<repositories>
+		<repository>
+			<id>libdir</id>
+			<url>file://${basedir}/../lib</url>
+		</repository>
+	</repositories>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-source-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>attach-sources</id>
+						<goals>
+							<goal>jar</goal>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-help-plugin</artifactId>
+				<version>2.2</version>
+				<executions>
+					<execution>
+						<phase>generate-resources</phase>
+						<goals>
+							<goal>effective-pom</goal>
+						</goals>
+						<configuration>
+							<output>${project.build.directory}/effective-pom.xml</output>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-install-plugin</artifactId>
+				<version>2.3.1</version>
+				<executions>
+					<execution>
+						<phase>install</phase>
+						<goals>
+							<goal>install-file</goal>
+						</goals>
+						<configuration>
+							<file>${project.build.directory}/${artifactId}-${project.version}.jar</file>
+							<pomFile>${project.build.directory}/effective-pom.xml</pomFile>
+							<!-- sources></sources -->
+							<!-- javadoc></javadoc -->
+							<groupId>${project.groupId}</groupId>
+							<artifactId>${project.artifactId}</artifactId>
+							<version>${project.version}</version>
+							<packaging>jar</packaging>
+							<!--classifier></classifier -->
+							<generatePom>true</generatePom>
+							<createChecksum>true</createChecksum>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+	<profiles>
+		<profile>
+			<id>hadoop1</id>
+			<activation>
+				<property>
+					<name>hadoop1</name>
+				</property>
+			</activation>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-core</artifactId>
+					<version>${hadoop.version}</version>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-test</artifactId>
+					<version>${hadoop.version}</version>
+					<scope>test</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<id>hadoop2-mr1</id>
+			<activation>
+				<property>
+					<name>hadoop2-mr1</name>
+				</property>
+			</activation>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-client</artifactId>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<scope>test</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<id>hadoop2</id>
+			<activation>
+				<property>
+					<name>hadoop2</name>
+				</property>
+			</activation>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-client</artifactId>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<scope>test</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f88a9ef5/blur-kvs/src/main/java/org/apache/blur/kvs/BytesRef.java
----------------------------------------------------------------------
diff --git a/blur-kvs/src/main/java/org/apache/blur/kvs/BytesRef.java b/blur-kvs/src/main/java/org/apache/blur/kvs/BytesRef.java
new file mode 100644
index 0000000..c318aac
--- /dev/null
+++ b/blur-kvs/src/main/java/org/apache/blur/kvs/BytesRef.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.kvs;
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.io.WritableComparator;
+
+public class BytesRef implements Comparable<BytesRef> {
+
+  private static final String UTF_8 = "UTF-8";
+
+  public static final byte[] EMPTY_BYTES = new byte[0];
+
+  public byte[] bytes;
+  public int offset;
+  public int length;
+
+  public BytesRef(byte[] bytes, int offset, int length) {
+    this.bytes = bytes;
+    this.offset = offset;
+    this.length = length;
+  }
+
+  public BytesRef() {
+    this(EMPTY_BYTES);
+  }
+
+  public BytesRef(byte[] b) {
+    this(b, 0, b.length);
+  }
+
+  public BytesRef(String s) {
+    this(toBytes(s));
+  }
+
+  public BytesRef(int capacity) {
+    this.bytes = new byte[capacity];
+  }
+
+  private static byte[] toBytes(String s) {
+    try {
+      return s.getBytes(UTF_8);
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static BytesRef deepCopyOf(BytesRef other) {
+    BytesRef copy = new BytesRef();
+    copy.copyBytes(other);
+    return copy;
+  }
+
+  public void copyBytes(BytesRef other) {
+    if (bytes.length - offset < other.length) {
+      bytes = new byte[other.length];
+      offset = 0;
+    }
+    System.arraycopy(other.bytes, other.offset, bytes, offset, other.length);
+    length = other.length;
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = 0;
+    final int end = offset + length;
+    for (int i = offset; i < end; i++) {
+      hash = 31 * hash + bytes[i];
+    }
+    return hash;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other instanceof BytesRef) {
+      return this.bytesEquals((BytesRef) other);
+    }
+    return false;
+  }
+
+  public boolean bytesEquals(BytesRef other) {
+    assert other != null;
+    if (length == other.length) {
+      int otherUpto = other.offset;
+      final byte[] otherBytes = other.bytes;
+      final int end = offset + length;
+      for (int upto = offset; upto < end; upto++, otherUpto++) {
+        if (bytes[upto] != otherBytes[otherUpto]) {
+          return false;
+        }
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int compareTo(BytesRef o) {
+    return WritableComparator.compareBytes(bytes, offset, length, o.bytes, o.offset, o.length);
+  }
+
+  public String utf8ToString() {
+    try {
+      return new String(bytes, offset, length, UTF_8);
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f88a9ef5/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsKeyValueStore.java b/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsKeyValueStore.java
new file mode 100644
index 0000000..da93044
--- /dev/null
+++ b/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsKeyValueStore.java
@@ -0,0 +1,605 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.kvs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RemoteException;
+
+public class HdfsKeyValueStore implements Store {
+
+  public static final int DEFAULT_MAX_AMOUNT_ALLOWED_PER_FILE = 64 * 1024 * 1024;
+  public static final long DEFAULT_MAX_OPEN_FOR_WRITING = TimeUnit.MINUTES.toMillis(1);
+
+  private static final String UTF_8 = "UTF-8";
+  private static final String BLUR_KEY_VALUE = "blur_key_value";
+  private static final Log LOG = LogFactory.getLog(HdfsKeyValueStore.class);
+  private static final byte[] MAGIC;
+  private static final int VERSION = 1;
+  private static final long DAEMON_POLL_TIME = TimeUnit.SECONDS.toMillis(5);
+  private static final int VERSION_LENGTH = 4;
+
+  static {
+    try {
+      MAGIC = BLUR_KEY_VALUE.getBytes(UTF_8);
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  static enum OperationType {
+    PUT, DELETE
+  }
+
+  static class Operation implements Writable {
+
+    OperationType type;
+    BytesWritable key = new BytesWritable();
+    BytesWritable value = new BytesWritable();
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      if (type == OperationType.DELETE) {
+        out.write(0);
+        key.write(out);
+      } else if (type == OperationType.PUT) {
+        out.write(1);
+        key.write(out);
+        value.write(out);
+      } else {
+        throw new RuntimeException("Not supported [" + type + "]");
+      }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      byte b = in.readByte();
+      switch (b) {
+      case 0:
+        type = OperationType.DELETE;
+        key.readFields(in);
+        return;
+      case 1:
+        type = OperationType.PUT;
+        key.readFields(in);
+        value.readFields(in);
+        return;
+      default:
+        throw new RuntimeException("Not supported [" + b + "]");
+      }
+    }
+
+  }
+
+  static class Value {
+    Value(BytesRef bytesRef, Path path) {
+      _bytesRef = bytesRef;
+      _path = path;
+    }
+
+    BytesRef _bytesRef;
+    Path _path;
+  }
+
+  private final ConcurrentNavigableMap<BytesRef, Value> _pointers = new ConcurrentSkipListMap<BytesRef, Value>();
+  private final Path _path;
+  private final ReentrantReadWriteLock _readWriteLock;
+  private final AtomicReference<SortedSet<FileStatus>> _fileStatus = new AtomicReference<SortedSet<FileStatus>>();
+  private final FileSystem _fileSystem;
+  private final AtomicLong _currentFileCounter = new AtomicLong();
+  private final WriteLock _writeLock;
+  private final ReadLock _readLock;
+  private final AtomicLong _size = new AtomicLong();
+  private final long _maxAmountAllowedPerFile;
+  private final TimerTask _idleLogTimerTask;
+  private final TimerTask _oldFileCleanerTimerTask;
+  private final AtomicLong _lastWrite = new AtomicLong();
+  private final Timer _hdfsKeyValueTimer;
+  private final long _maxOpenForWriting;
+  private final boolean _readOnly;
+
+  private FSDataOutputStream _output;
+  private Path _outputPath;
+  private boolean _isClosed;
+
+  public HdfsKeyValueStore(boolean readOnly, Timer hdfsKeyValueTimer, Configuration configuration, Path path)
+      throws IOException {
+    this(readOnly, hdfsKeyValueTimer, configuration, path, DEFAULT_MAX_AMOUNT_ALLOWED_PER_FILE,
+        DEFAULT_MAX_OPEN_FOR_WRITING);
+  }
+
+  public HdfsKeyValueStore(boolean readOnly, Timer hdfsKeyValueTimer, Configuration configuration, Path path,
+      long maxAmountAllowedPerFile) throws IOException {
+    this(readOnly, hdfsKeyValueTimer, configuration, path, maxAmountAllowedPerFile, DEFAULT_MAX_OPEN_FOR_WRITING);
+  }
+
+  public HdfsKeyValueStore(boolean readOnly, Timer hdfsKeyValueTimer, Configuration configuration, Path path,
+      long maxAmountAllowedPerFile, long maxOpenForWriting) throws IOException {
+    _readOnly = readOnly;
+    _maxOpenForWriting = maxOpenForWriting;
+    _maxAmountAllowedPerFile = maxAmountAllowedPerFile;
+    _path = path;
+    _fileSystem = _path.getFileSystem(configuration);
+    _fileSystem.mkdirs(_path);
+    _readWriteLock = new ReentrantReadWriteLock();
+    _writeLock = _readWriteLock.writeLock();
+    _readLock = _readWriteLock.readLock();
+    _fileStatus.set(getSortedSet(_path));
+    if (!_fileStatus.get().isEmpty()) {
+      _currentFileCounter.set(Long.parseLong(_fileStatus.get().last().getPath().getName()));
+    }
+    removeAnyTruncatedFiles();
+    loadIndexes();
+    cleanupOldFiles();
+    if (!_readOnly) {
+      _idleLogTimerTask = getIdleLogTimer();
+      _oldFileCleanerTimerTask = getOldFileCleanerTimer();
+      _hdfsKeyValueTimer = hdfsKeyValueTimer;
+      _hdfsKeyValueTimer.schedule(_idleLogTimerTask, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
+      _hdfsKeyValueTimer.schedule(_oldFileCleanerTimerTask, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
+    } else {
+      _idleLogTimerTask = null;
+      _oldFileCleanerTimerTask = null;
+      _hdfsKeyValueTimer = null;
+    }
+    // Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, HDFS_KV, SIZE,
+    // path.getParent().toString()), new Gauge<Long>() {
+    // @Override
+    // public Long value() {
+    // return _size.get();
+    // }
+    // });
+  }
+
+  private void removeAnyTruncatedFiles() throws IOException {
+    for (FileStatus fileStatus : _fileStatus.get()) {
+      Path path = fileStatus.getPath();
+      FSDataInputStream inputStream = _fileSystem.open(path);
+      long len = HdfsUtils.getFileLength(_fileSystem, path, inputStream);
+      inputStream.close();
+      if (len < MAGIC.length + VERSION_LENGTH) {
+        // Remove invalid file
+        LOG.warn("Removing file [{0}] because length of [{1}] is less than MAGIC plus version length of [{2}]", path,
+            len, MAGIC.length + VERSION_LENGTH);
+        _fileSystem.delete(path, false);
+      }
+    }
+  }
+
+  private TimerTask getOldFileCleanerTimer() {
+    return new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          cleanupOldFiles();
+        } catch (IOException e) {
+          LOG.error("Unknown error while trying to clean up old files.", e);
+        }
+      }
+    };
+  }
+
+  private TimerTask getIdleLogTimer() {
+    return new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          closeLogFileIfIdle();
+        } catch (IOException e) {
+          LOG.error("Unknown error while trying to close output file.", e);
+        }
+      }
+
+    };
+  }
+
+  @Override
+  public void sync() throws IOException {
+    ensureOpen();
+    _writeLock.lock();
+    ensureOpenForWriting();
+    try {
+      syncInternal();
+    } catch (RemoteException e) {
+      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key value store.", e);
+    } catch (LeaseExpiredException e) {
+      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key value store.", e);
+    } finally {
+      _writeLock.unlock();
+    }
+  }
+
+  @Override
+  public Iterable<Entry<BytesRef, BytesRef>> scan(BytesRef key) throws IOException {
+    ensureOpen();
+    if (key == null) {
+      key = _pointers.firstKey();
+    }
+    ConcurrentNavigableMap<BytesRef, Value> tailMap = _pointers.tailMap(key, true);
+    final Set<Entry<BytesRef, Value>> entrySet = tailMap.entrySet();
+    return new Iterable<Entry<BytesRef, BytesRef>>() {
+      @Override
+      public Iterator<Entry<BytesRef, BytesRef>> iterator() {
+        final Iterator<Entry<BytesRef, Value>> iterator = entrySet.iterator();
+        return new Iterator<Entry<BytesRef, BytesRef>>() {
+
+          @Override
+          public boolean hasNext() {
+            return iterator.hasNext();
+          }
+
+          @Override
+          public Entry<BytesRef, BytesRef> next() {
+            final Entry<BytesRef, Value> e = iterator.next();
+            return new Entry<BytesRef, BytesRef>() {
+
+              @Override
+              public BytesRef setValue(BytesRef value) {
+                throw new RuntimeException("Read only.");
+              }
+
+              @Override
+              public BytesRef getValue() {
+                return e.getValue()._bytesRef;
+              }
+
+              @Override
+              public BytesRef getKey() {
+                return e.getKey();
+              }
+            };
+          }
+
+          @Override
+          public void remove() {
+            throw new RuntimeException("Read only.");
+          }
+        };
+      }
+    };
+  }
+
+  @Override
+  public void put(BytesRef key, BytesRef value) throws IOException {
+    ensureOpen();
+    if (value == null) {
+      delete(key);
+      return;
+    }
+    _writeLock.lock();
+    ensureOpenForWriting();
+    try {
+      Operation op = getPutOperation(OperationType.PUT, key, value);
+      Path path = write(op);
+      BytesRef deepCopyOf = BytesRef.deepCopyOf(value);
+      _size.addAndGet(deepCopyOf.bytes.length);
+      Value old = _pointers.put(BytesRef.deepCopyOf(key), new Value(deepCopyOf, path));
+      if (old != null) {
+        _size.addAndGet(-old._bytesRef.bytes.length);
+      }
+    } catch (RemoteException e) {
+      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key value store.", e);
+    } catch (LeaseExpiredException e) {
+      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key value store.", e);
+    } finally {
+      _writeLock.unlock();
+    }
+  }
+
+  private void ensureOpenForWriting() throws IOException {
+    if (_output == null) {
+      openWriter();
+    }
+  }
+
+  private Path write(Operation op) throws IOException {
+    op.write(_output);
+    Path p = _outputPath;
+    if (_output.getPos() >= _maxAmountAllowedPerFile) {
+      rollFile();
+    }
+    return p;
+  }
+
+  private void rollFile() throws IOException {
+    LOG.info("Rolling file [" + _outputPath + "]");
+    _output.close();
+    _output = null;
+    openWriter();
+  }
+
+  public void cleanupOldFiles() throws IOException {
+    _writeLock.lock();
+    try {
+      if (!isOpenForWriting()) {
+        return;
+      }
+      SortedSet<FileStatus> fileStatusSet = getSortedSet(_path);
+      if (fileStatusSet == null || fileStatusSet.size() < 1) {
+        return;
+      }
+      Path newestGen = fileStatusSet.last().getPath();
+      if (!newestGen.equals(_outputPath)) {
+        throw new IOException("No longer the owner of [" + _path + "]");
+      }
+      Set<Path> existingFiles = new HashSet<Path>();
+      for (FileStatus fileStatus : fileStatusSet) {
+        existingFiles.add(fileStatus.getPath());
+      }
+      Set<Entry<BytesRef, Value>> entrySet = _pointers.entrySet();
+      existingFiles.remove(_outputPath);
+      for (Entry<BytesRef, Value> e : entrySet) {
+        Path p = e.getValue()._path;
+        existingFiles.remove(p);
+      }
+      for (Path p : existingFiles) {
+        LOG.info("Removing file no longer referenced [{0}]", p);
+        _fileSystem.delete(p, false);
+      }
+    } finally {
+      _writeLock.unlock();
+    }
+  }
+
+  private void closeLogFileIfIdle() throws IOException {
+    _writeLock.lock();
+    try {
+      if (_output != null && _lastWrite.get() + _maxOpenForWriting < System.currentTimeMillis()) {
+        // Close writer
+        LOG.info("Closing KV log due to inactivity [{0}].", _path);
+        try {
+          _output.close();
+        } finally {
+          _output = null;
+        }
+      }
+    } finally {
+      _writeLock.unlock();
+    }
+  }
+
+  private boolean isOpenForWriting() {
+    return _output != null;
+  }
+
+  private Operation getPutOperation(OperationType put, BytesRef key, BytesRef value) {
+    Operation operation = new Operation();
+    operation.type = put;
+    operation.key.set(key.bytes, key.offset, key.length);
+    operation.value.set(value.bytes, value.offset, value.length);
+    return operation;
+  }
+
+  private Operation getDeleteOperation(OperationType delete, BytesRef key) {
+    Operation operation = new Operation();
+    operation.type = delete;
+    operation.key.set(key.bytes, key.offset, key.length);
+    return operation;
+  }
+
+  @Override
+  public boolean get(BytesRef key, BytesRef value) throws IOException {
+    ensureOpen();
+    _readLock.lock();
+    try {
+      Value internalValue = _pointers.get(key);
+      if (internalValue == null) {
+        return false;
+      }
+      value.copyBytes(internalValue._bytesRef);
+      return true;
+    } finally {
+      _readLock.unlock();
+    }
+  }
+
+  @Override
+  public void delete(BytesRef key) throws IOException {
+    ensureOpen();
+    _writeLock.lock();
+    ensureOpenForWriting();
+    try {
+      Operation op = getDeleteOperation(OperationType.DELETE, key);
+      write(op);
+      Value old = _pointers.remove(key);
+      if (old != null) {
+        _size.addAndGet(-old._bytesRef.bytes.length);
+      }
+    } catch (RemoteException e) {
+      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key value store.", e);
+    } catch (LeaseExpiredException e) {
+      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key value store.", e);
+    } finally {
+      _writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!_isClosed) {
+      _isClosed = true;
+      if (_idleLogTimerTask != null) {
+        _idleLogTimerTask.cancel();
+      }
+      if (_oldFileCleanerTimerTask != null) {
+        _oldFileCleanerTimerTask.cancel();
+      }
+      if (_hdfsKeyValueTimer != null) {
+        _hdfsKeyValueTimer.purge();
+      }
+      _writeLock.lock();
+      try {
+        if (isOpenForWriting()) {
+          try {
+            syncInternal();
+          } finally {
+            IOUtils.closeQuietly(_output);
+            _output = null;
+          }
+        }
+      } finally {
+        _writeLock.unlock();
+      }
+    }
+  }
+
+  private void openWriter() throws IOException {
+    if (_readOnly) {
+      throw new IOException("Key value store is set in read only mode.");
+    }
+    _outputPath = getSegmentPath(_currentFileCounter.incrementAndGet());
+    LOG.info("Opening for writing [{0}].", _outputPath);
+    _output = _fileSystem.create(_outputPath, false);
+    _output.write(MAGIC);
+    _output.writeInt(VERSION);
+    syncInternal();
+  }
+
+  private Path getSegmentPath(long segment) {
+    return new Path(_path, buffer(segment));
+  }
+
+  private static String buffer(long number) {
+    String s = Long.toString(number);
+    StringBuilder builder = new StringBuilder();
+    for (int i = s.length(); i < 12; i++) {
+      builder.append('0');
+    }
+    return builder.append(s).toString();
+  }
+
+  private void loadIndexes() throws IOException {
+    for (FileStatus fileStatus : _fileStatus.get()) {
+      loadIndex(fileStatus.getPath());
+    }
+  }
+
+  private void ensureOpen() throws IOException {
+    if (_isClosed) {
+      throw new IOException("Already closed.");
+    }
+  }
+
+  private void syncInternal() throws IOException {
+    validateNextSegmentHasNotStarted();
+    _output.flush();
+    _output.sync();
+    _lastWrite.set(System.currentTimeMillis());
+  }
+
+  private void validateNextSegmentHasNotStarted() throws IOException {
+    Path p = getSegmentPath(_currentFileCounter.get() + 1);
+    if (_fileSystem.exists(p)) {
+      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key value store.");
+    }
+  }
+
+  private void loadIndex(Path path) throws IOException {
+    FSDataInputStream inputStream = _fileSystem.open(path);
+    byte[] buf = new byte[MAGIC.length];
+    inputStream.readFully(buf);
+    if (!Arrays.equals(MAGIC, buf)) {
+      throw new IOException("File [" + path + "] not a " + BLUR_KEY_VALUE + " file.");
+    }
+    int version = inputStream.readInt();
+    if (version == 1) {
+      long fileLength = HdfsUtils.getFileLength(_fileSystem, path, inputStream);
+      Operation operation = new Operation();
+      try {
+        while (inputStream.getPos() < fileLength) {
+          try {
+            operation.readFields(inputStream);
+          } catch (IOException e) {
+            // End of sync point found
+            return;
+          }
+          loadIndex(path, operation);
+        }
+      } finally {
+        inputStream.close();
+      }
+    } else {
+      throw new IOException("Unknown version [" + version + "]");
+    }
+  }
+
+  private void loadIndex(Path path, Operation operation) {
+    Value old;
+    switch (operation.type) {
+    case PUT:
+      BytesRef deepCopyOf = BytesRef.deepCopyOf(getKey(operation.value));
+      _size.addAndGet(deepCopyOf.bytes.length);
+      old = _pointers.put(BytesRef.deepCopyOf(getKey(operation.key)), new Value(deepCopyOf, path));
+      break;
+    case DELETE:
+      old = _pointers.remove(getKey(operation.key));
+      break;
+    default:
+      throw new RuntimeException("Not supported [" + operation.type + "]");
+    }
+    if (old != null) {
+      _size.addAndGet(-old._bytesRef.bytes.length);
+    }
+  }
+
+  private BytesRef getKey(BytesWritable key) {
+    return new BytesRef(key.getBytes(), 0, key.getLength());
+  }
+
+  private SortedSet<FileStatus> getSortedSet(Path p) throws IOException {
+    if (_fileSystem.exists(p)) {
+      FileStatus[] listStatus = _fileSystem.listStatus(p);
+      if (listStatus != null) {
+        return new TreeSet<FileStatus>(Arrays.asList(listStatus));
+      }
+    }
+    return new TreeSet<FileStatus>();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f88a9ef5/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsUtils.java b/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsUtils.java
new file mode 100644
index 0000000..a2431a3
--- /dev/null
+++ b/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsUtils.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.kvs;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class HdfsUtils {
+
+  private static final String IN = "in";
+  private static final String GET_FILE_LENGTH = "getFileLength";
+
+  public static long getFileLength(FileSystem fileSystem, Path path, FSDataInputStream inputStream) throws IOException {
+    FileStatus fileStatus = fileSystem.getFileStatus(path);
+    long dfsLength = getDFSLength(inputStream);
+    return Math.max(dfsLength, fileStatus.getLen());
+  }
+
+  public static long getDFSLength(FSDataInputStream inputStream) throws IOException {
+    try {
+      Field field = FilterInputStream.class.getDeclaredField(IN);
+      field.setAccessible(true);
+      Object dfs = field.get(inputStream);
+      Method method = dfs.getClass().getMethod(GET_FILE_LENGTH, new Class[] {});
+      Object length = method.invoke(dfs, new Object[] {});
+      return (Long) length;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f88a9ef5/blur-kvs/src/main/java/org/apache/blur/kvs/Store.java
----------------------------------------------------------------------
diff --git a/blur-kvs/src/main/java/org/apache/blur/kvs/Store.java b/blur-kvs/src/main/java/org/apache/blur/kvs/Store.java
new file mode 100644
index 0000000..3bfc3ec
--- /dev/null
+++ b/blur-kvs/src/main/java/org/apache/blur/kvs/Store.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.kvs;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+public interface Store extends Closeable {
+
+  void sync() throws IOException;
+
+  Iterable<Entry<BytesRef, BytesRef>> scan(BytesRef key) throws IOException;
+
+  void put(BytesRef key, BytesRef value) throws IOException;
+
+  boolean get(BytesRef key, BytesRef value) throws IOException;
+
+  void delete(BytesRef key) throws IOException;
+
+  void close() throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f88a9ef5/blur-kvs/src/test/java/org/apache/blur/HdfsMiniClusterUtil.java
----------------------------------------------------------------------
diff --git a/blur-kvs/src/test/java/org/apache/blur/HdfsMiniClusterUtil.java b/blur-kvs/src/test/java/org/apache/blur/HdfsMiniClusterUtil.java
new file mode 100644
index 0000000..c5e1c4e
--- /dev/null
+++ b/blur-kvs/src/test/java/org/apache/blur/HdfsMiniClusterUtil.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+public class HdfsMiniClusterUtil {
+
+  private static final Log LOG = LogFactory.getLog(HdfsMiniClusterUtil.class);
+
+  public static MiniDFSCluster startDfs(Configuration conf, boolean format, String path) {
+    String perm;
+    Path p = new Path(new File("./target").getAbsolutePath());
+    try {
+      FileSystem fileSystem = p.getFileSystem(conf);
+      FileStatus fileStatus = fileSystem.getFileStatus(p);
+      FsPermission permission = fileStatus.getPermission();
+      perm = permission.getUserAction().ordinal() + "" + permission.getGroupAction().ordinal() + ""
+          + permission.getOtherAction().ordinal();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    LOG.info("dfs.datanode.data.dir.perm=" + perm);
+    conf.set("dfs.datanode.data.dir.perm", perm);
+    System.setProperty("test.build.data", path);
+    try {
+      MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, (String[]) null);
+      cluster.waitActive();
+      return cluster;
+    } catch (Exception e) {
+      LOG.error("error opening file system", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void shutdownDfs(MiniDFSCluster cluster) {
+    if (cluster != null) {
+      LOG.info("Shutting down Mini DFS ");
+      try {
+        cluster.shutdown();
+      } catch (Exception e) {
+        // / Can get a java.lang.reflect.UndeclaredThrowableException thrown
+        // here because of an InterruptedException. Don't let exceptions in
+        // here be cause of test failure.
+      }
+      try {
+        FileSystem fs = cluster.getFileSystem();
+        if (fs != null) {
+          LOG.info("Shutting down FileSystem");
+          fs.close();
+        }
+        FileSystem.closeAll();
+      } catch (IOException e) {
+        LOG.error("error closing file system", e);
+      }
+
+      // This has got to be one of the worst hacks I have ever had to do.
+      // This is needed to shutdown 2 thread pools that are not shutdown by
+      // themselves.
+      ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
+      Thread[] threads = new Thread[100];
+      int enumerate = threadGroup.enumerate(threads);
+      for (int i = 0; i < enumerate; i++) {
+        Thread thread = threads[i];
+        if (thread.getName().startsWith("pool")) {
+          if (thread.isAlive()) {
+            thread.interrupt();
+            LOG.info("Stopping ThreadPoolExecutor [" + thread.getName() + "]");
+            Object target = getField(Thread.class, thread, "target");
+            if (target != null) {
+              ThreadPoolExecutor e = (ThreadPoolExecutor) getField(ThreadPoolExecutor.class, target, "this$0");
+              if (e != null) {
+                e.shutdownNow();
+              }
+            }
+            try {
+              LOG.info("Waiting for thread pool to exit [" + thread.getName() + "]");
+              thread.join();
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private static Object getField(Class<?> c, Object o, String fieldName) {
+    try {
+      Field field = c.getDeclaredField(fieldName);
+      field.setAccessible(true);
+      return field.get(o);
+    } catch (NoSuchFieldException e) {
+      try {
+        Field field = o.getClass().getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return field.get(o);
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f88a9ef5/blur-kvs/src/test/java/org/apache/blur/kvs/HdfsKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/blur-kvs/src/test/java/org/apache/blur/kvs/HdfsKeyValueStoreTest.java b/blur-kvs/src/test/java/org/apache/blur/kvs/HdfsKeyValueStoreTest.java
new file mode 100644
index 0000000..56f8805
--- /dev/null
+++ b/blur-kvs/src/test/java/org/apache/blur/kvs/HdfsKeyValueStoreTest.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.kvs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.Timer;
+
+import org.apache.blur.HdfsMiniClusterUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class HdfsKeyValueStoreTest {
+
+  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "./target/tmp_HdfsKeyValueStoreTest"));
+
+  private static Configuration _configuration = new Configuration();
+  private static MiniDFSCluster _cluster;
+
+  private static Timer _timer;
+  private Path _path;
+
+  @BeforeClass
+  public static void startCluster() {
+    _cluster = HdfsMiniClusterUtil.startDfs(_configuration, true, TMPDIR.getAbsolutePath());
+    _timer = new Timer("IndexImporter", true);
+  }
+
+  @AfterClass
+  public static void stopCluster() {
+    _timer.cancel();
+    _timer.purge();
+    HdfsMiniClusterUtil.shutdownDfs(_cluster);
+  }
+
+  @Before
+  public void setup() throws IOException {
+    FileSystem fileSystem = _cluster.getFileSystem();
+    _path = new Path("/test").makeQualified(fileSystem);
+    fileSystem.delete(_path, true);
+  }
+
+  @Test
+  public void testPutGet() throws IOException {
+    HdfsKeyValueStore store = new HdfsKeyValueStore(false, _timer, _configuration, _path);
+    store.put(toBytesRef("a"), toBytesRef("value1"));
+    store.put(toBytesRef("b"), toBytesRef("value2"));
+    store.sync();
+    BytesRef value = new BytesRef();
+    store.get(toBytesRef("a"), value);
+    assertEquals(new BytesRef("value1"), value);
+    store.get(toBytesRef("b"), value);
+    assertEquals(new BytesRef("value2"), value);
+    store.close();
+  }
+
+  @Test
+  public void testPutGetDelete() throws IOException {
+    HdfsKeyValueStore store = new HdfsKeyValueStore(false, _timer, _configuration, _path);
+    store.put(toBytesRef("a"), toBytesRef("value1"));
+    store.put(toBytesRef("b"), toBytesRef("value2"));
+    store.sync();
+    BytesRef value = new BytesRef();
+    store.get(toBytesRef("a"), value);
+    assertEquals(new BytesRef("value1"), value);
+    store.get(toBytesRef("b"), value);
+    assertEquals(new BytesRef("value2"), value);
+
+    store.delete(toBytesRef("b"));
+    store.sync();
+    assertFalse(store.get(toBytesRef("b"), value));
+    store.close();
+  }
+
+  @Test
+  public void testPutGetReopen() throws IOException {
+    HdfsKeyValueStore store1 = new HdfsKeyValueStore(false, _timer, _configuration, _path);
+    store1.put(toBytesRef("a"), toBytesRef("value1"));
+    store1.put(toBytesRef("b"), toBytesRef("value2"));
+    store1.sync();
+    BytesRef value1 = new BytesRef();
+    store1.get(toBytesRef("a"), value1);
+    assertEquals(new BytesRef("value1"), value1);
+    store1.get(toBytesRef("b"), value1);
+    assertEquals(new BytesRef("value2"), value1);
+    store1.close();
+
+    HdfsKeyValueStore store2 = new HdfsKeyValueStore(false, _timer, _configuration, _path);
+    BytesRef value2 = new BytesRef();
+    store2.get(toBytesRef("a"), value2);
+    assertEquals(new BytesRef("value1"), value2);
+    store2.get(toBytesRef("b"), value2);
+    assertEquals(new BytesRef("value2"), value2);
+    store2.close();
+  }
+
+  @Test
+  public void testFileRolling() throws IOException {
+    HdfsKeyValueStore store = new HdfsKeyValueStore(false, _timer, _configuration, _path, 1000);
+    FileSystem fileSystem = _path.getFileSystem(_configuration);
+    store.put(new BytesRef("a"), new BytesRef(""));
+    assertEquals(1, fileSystem.listStatus(_path).length);
+    store.put(new BytesRef("a"), new BytesRef(new byte[2000]));
+    assertEquals(2, fileSystem.listStatus(_path).length);
+    store.close();
+  }
+
+  @Test
+  public void testFileGC() throws IOException {
+    HdfsKeyValueStore store = new HdfsKeyValueStore(false, _timer, _configuration, _path, 1000);
+    store.put(new BytesRef("a"), new BytesRef(""));
+    FileSystem fileSystem = _path.getFileSystem(_configuration);
+    assertEquals(1, fileSystem.listStatus(_path).length);
+    store.put(new BytesRef("a"), new BytesRef(new byte[2000]));
+    assertEquals(2, fileSystem.listStatus(_path).length);
+    store.put(new BytesRef("a"), new BytesRef(new byte[2000]));
+    store.cleanupOldFiles();
+    assertEquals(2, fileSystem.listStatus(_path).length);
+    store.close();
+  }
+
+  @Test
+  public void testTwoKeyStoreInstancesWritingAtTheSameTime() throws IOException {
+    HdfsKeyValueStore store1 = new HdfsKeyValueStore(false, _timer, _configuration, _path);
+    listFiles();
+    store1.put(new BytesRef("a1"), new BytesRef(new byte[2000]));
+    listFiles();
+    HdfsKeyValueStore store2 = new HdfsKeyValueStore(false, _timer, _configuration, _path);
+    listFiles();
+    store2.put(new BytesRef("a1"), new BytesRef(new byte[1000]));
+    listFiles();
+    store1.put(new BytesRef("a2"), new BytesRef(new byte[2000]));
+    listFiles();
+    store2.put(new BytesRef("a2"), new BytesRef(new byte[1000]));
+    listFiles();
+    store1.put(new BytesRef("a3"), new BytesRef(new byte[2000]));
+    listFiles();
+    store2.put(new BytesRef("a3"), new BytesRef(new byte[1000]));
+    listFiles();
+    try {
+      store1.sync();
+      fail();
+    } catch (Exception e) {
+
+    }
+    store2.sync();
+
+    try {
+      store1.close();
+      fail();
+    } catch (Exception e) {
+
+    }
+
+    store2.close();
+
+    HdfsKeyValueStore store3 = new HdfsKeyValueStore(false, _timer, _configuration, _path);
+    Iterable<Entry<BytesRef, BytesRef>> scan = store3.scan(null);
+    for (Entry<BytesRef, BytesRef> e : scan) {
+      System.out.println(e.getValue().length);
+    }
+    store3.close();
+  }
+
+  @Test
+  public void testTwoKeyStoreInstancesWritingAtTheSameTimeSmallFiles() throws IOException {
+    HdfsKeyValueStore store1 = new HdfsKeyValueStore(false, _timer, _configuration, _path, 1000);
+    store1.put(new BytesRef("a1"), new BytesRef(new byte[2000]));
+    HdfsKeyValueStore store2 = new HdfsKeyValueStore(false, _timer, _configuration, _path, 1000);
+    store2.put(new BytesRef("a1"), new BytesRef(new byte[1000]));
+    try {
+      store1.put(new BytesRef("a2"), new BytesRef(new byte[2000]));
+      fail();
+    } catch (Exception e) {
+      // Should throw exception
+      store1.close();
+    }
+    store2.put(new BytesRef("a2"), new BytesRef(new byte[1000]));
+    store2.put(new BytesRef("a3"), new BytesRef(new byte[1000]));
+    store2.sync();
+    store2.close();
+
+    HdfsKeyValueStore store3 = new HdfsKeyValueStore(false, _timer, _configuration, _path);
+    Iterable<Entry<BytesRef, BytesRef>> scan = store3.scan(null);
+    for (Entry<BytesRef, BytesRef> e : scan) {
+      System.out.println(e.getValue().length);
+    }
+    store3.close();
+  }
+
+  @Test
+  public void testReadonlyPut() throws IOException {
+    HdfsKeyValueStore store1 = new HdfsKeyValueStore(false, _timer, _configuration, _path, 1000);
+    store1.put(new BytesRef("a1"), new BytesRef(new byte[2000]));
+
+    HdfsKeyValueStore store2 = new HdfsKeyValueStore(true, _timer, _configuration, _path, 1000);
+    assertTrue(store2.get(new BytesRef("a1"), new BytesRef(new byte[2000])));
+
+    try {
+      store2.put(new BytesRef("a1"), new BytesRef(new byte[2000]));
+      fail();
+    } catch (IOException e) {
+
+    }
+
+    try {
+      store2.delete(new BytesRef("a1"));
+      fail();
+    } catch (IOException e) {
+
+    }
+
+    try {
+      store2.sync();
+      fail();
+    } catch (IOException e) {
+
+    }
+
+    // Store 1 should still be able to write.
+    store1.put(new BytesRef("a2"), new BytesRef(new byte[2000]));
+
+    // Store 2 should not be able to find.
+    assertFalse(store2.get(new BytesRef("a2"), new BytesRef(new byte[2000])));
+
+    store2.close();
+    store1.close();
+
+  }
+
+  private void listFiles() throws IOException {
+    FileSystem fileSystem = _path.getFileSystem(_configuration);
+    for (FileStatus status : fileSystem.listStatus(_path)) {
+      System.out.println(status.getPath() + " " + status.getLen());
+    }
+  }
+
+  private BytesRef toBytesRef(String s) {
+    return new BytesRef(s);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f88a9ef5/blur-store/pom.xml
----------------------------------------------------------------------
diff --git a/blur-store/pom.xml b/blur-store/pom.xml
index 9262088..d4f4ecd 100644
--- a/blur-store/pom.xml
+++ b/blur-store/pom.xml
@@ -32,6 +32,18 @@
 			<version>${project.version}</version>
 		</dependency>
 		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-kvs</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-kvs</artifactId>
+			<type>test-jar</type>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
 			<groupId>org.apache.lucene</groupId>
 			<artifactId>lucene-core</artifactId>
 			<version>${lucene.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f88a9ef5/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
index 6193266..e6d5a64 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
@@ -30,6 +30,8 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.blur.kvs.BytesRef;
+import org.apache.blur.kvs.HdfsKeyValueStore;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.memory.MemoryLeakDetector;
@@ -41,7 +43,6 @@ import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.NoLockFactory;
-import org.apache.lucene.util.BytesRef;
 
 public class FastHdfsKeyValueDirectory extends Directory implements LastModified {
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f88a9ef5/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexInput.java b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexInput.java
index f37cea4..971d9d1 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexInput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexInput.java
@@ -18,8 +18,8 @@ package org.apache.blur.store.hdfs_v2;
 
 import java.io.IOException;
 
+import org.apache.blur.kvs.BytesRef;
 import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.util.BytesRef;
 
 public class FastHdfsKeyValueIndexInput extends IndexInput {
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f88a9ef5/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
deleted file mode 100644
index 09be70c..0000000
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ /dev/null
@@ -1,615 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.store.hdfs_v2;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.lucene.util.BytesRef;
-
-public class HdfsKeyValueStore implements Store {
-
-  public static final int DEFAULT_MAX_AMOUNT_ALLOWED_PER_FILE = 64 * 1024 * 1024;
-  public static final long DEFAULT_MAX_OPEN_FOR_WRITING = TimeUnit.MINUTES.toMillis(1);
-
-  private static final String UTF_8 = "UTF-8";
-  private static final String BLUR_KEY_VALUE = "blur_key_value";
-  private static final Log LOG = LogFactory.getLog(HdfsKeyValueStore.class);
-  private static final byte[] MAGIC;
-  private static final int VERSION = 1;
-  private static final long DAEMON_POLL_TIME = TimeUnit.SECONDS.toMillis(5);
-  private static final int VERSION_LENGTH = 4;
-
-  static {
-    try {
-      MAGIC = BLUR_KEY_VALUE.getBytes(UTF_8);
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  static enum OperationType {
-    PUT, DELETE
-  }
-
-  static class Operation implements Writable {
-
-    OperationType type;
-    BytesWritable key = new BytesWritable();
-    BytesWritable value = new BytesWritable();
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      if (type == OperationType.DELETE) {
-        out.write(0);
-        key.write(out);
-      } else if (type == OperationType.PUT) {
-        out.write(1);
-        key.write(out);
-        value.write(out);
-      } else {
-        throw new RuntimeException("Not supported [" + type + "]");
-      }
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      byte b = in.readByte();
-      switch (b) {
-      case 0:
-        type = OperationType.DELETE;
-        key.readFields(in);
-        return;
-      case 1:
-        type = OperationType.PUT;
-        key.readFields(in);
-        value.readFields(in);
-        return;
-      default:
-        throw new RuntimeException("Not supported [" + b + "]");
-      }
-    }
-
-  }
-
-  private static final Comparator<BytesRef> COMP = new Comparator<BytesRef>() {
-    @Override
-    public int compare(BytesRef b1, BytesRef b2) {
-      return WritableComparator.compareBytes(b1.bytes, b1.offset, b1.length, b2.bytes, b2.offset, b2.length);
-    }
-  };
-
-  static class Value {
-    Value(BytesRef bytesRef, Path path) {
-      _bytesRef = bytesRef;
-      _path = path;
-    }
-
-    BytesRef _bytesRef;
-    Path _path;
-  }
-
-  private final ConcurrentNavigableMap<BytesRef, Value> _pointers = new ConcurrentSkipListMap<BytesRef, Value>(COMP);
-  private final Path _path;
-  private final ReentrantReadWriteLock _readWriteLock;
-  private final AtomicReference<SortedSet<FileStatus>> _fileStatus = new AtomicReference<SortedSet<FileStatus>>();
-  private final FileSystem _fileSystem;
-  private final AtomicLong _currentFileCounter = new AtomicLong();
-  private final WriteLock _writeLock;
-  private final ReadLock _readLock;
-  private final AtomicLong _size = new AtomicLong();
-  private final long _maxAmountAllowedPerFile;
-  private final TimerTask _idleLogTimerTask;
-  private final TimerTask _oldFileCleanerTimerTask;
-  private final AtomicLong _lastWrite = new AtomicLong();
-  private final Timer _hdfsKeyValueTimer;
-  private final long _maxOpenForWriting;
-  private final boolean _readOnly;
-
-  private FSDataOutputStream _output;
-  private Path _outputPath;
-  private boolean _isClosed;
-
-  public HdfsKeyValueStore(boolean readOnly, Timer hdfsKeyValueTimer, Configuration configuration, Path path)
-      throws IOException {
-    this(readOnly, hdfsKeyValueTimer, configuration, path, DEFAULT_MAX_AMOUNT_ALLOWED_PER_FILE,
-        DEFAULT_MAX_OPEN_FOR_WRITING);
-  }
-
-  public HdfsKeyValueStore(boolean readOnly, Timer hdfsKeyValueTimer, Configuration configuration, Path path,
-      long maxAmountAllowedPerFile) throws IOException {
-    this(readOnly, hdfsKeyValueTimer, configuration, path, maxAmountAllowedPerFile, DEFAULT_MAX_OPEN_FOR_WRITING);
-  }
-
-  public HdfsKeyValueStore(boolean readOnly, Timer hdfsKeyValueTimer, Configuration configuration, Path path,
-      long maxAmountAllowedPerFile, long maxOpenForWriting) throws IOException {
-    _readOnly = readOnly;
-    _maxOpenForWriting = maxOpenForWriting;
-    _maxAmountAllowedPerFile = maxAmountAllowedPerFile;
-    _path = path;
-    _fileSystem = _path.getFileSystem(configuration);
-    _fileSystem.mkdirs(_path);
-    _readWriteLock = new ReentrantReadWriteLock();
-    _writeLock = _readWriteLock.writeLock();
-    _readLock = _readWriteLock.readLock();
-    _fileStatus.set(getSortedSet(_path));
-    if (!_fileStatus.get().isEmpty()) {
-      _currentFileCounter.set(Long.parseLong(_fileStatus.get().last().getPath().getName()));
-    }
-    removeAnyTruncatedFiles();
-    loadIndexes();
-    cleanupOldFiles();
-    if (!_readOnly) {
-      _idleLogTimerTask = getIdleLogTimer();
-      _oldFileCleanerTimerTask = getOldFileCleanerTimer();
-      _hdfsKeyValueTimer = hdfsKeyValueTimer;
-      _hdfsKeyValueTimer.schedule(_idleLogTimerTask, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
-      _hdfsKeyValueTimer.schedule(_oldFileCleanerTimerTask, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
-    } else {
-      _idleLogTimerTask = null;
-      _oldFileCleanerTimerTask = null;
-      _hdfsKeyValueTimer = null;
-    }
-    // Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, HDFS_KV, SIZE,
-    // path.getParent().toString()), new Gauge<Long>() {
-    // @Override
-    // public Long value() {
-    // return _size.get();
-    // }
-    // });
-  }
-
-  private void removeAnyTruncatedFiles() throws IOException {
-    for (FileStatus fileStatus : _fileStatus.get()) {
-      Path path = fileStatus.getPath();
-      FSDataInputStream inputStream = _fileSystem.open(path);
-      long len = HdfsUtils.getFileLength(_fileSystem, path, inputStream);
-      inputStream.close();
-      if (len < MAGIC.length + VERSION_LENGTH) {
-        // Remove invalid file
-        LOG.warn("Removing file [{0}] because length of [{1}] is less than MAGIC plus version length of [{2}]", path,
-            len, MAGIC.length + VERSION_LENGTH);
-        _fileSystem.delete(path, false);
-      }
-    }
-  }
-
-  private TimerTask getOldFileCleanerTimer() {
-    return new TimerTask() {
-      @Override
-      public void run() {
-        try {
-          cleanupOldFiles();
-        } catch (IOException e) {
-          LOG.error("Unknown error while trying to clean up old files.", e);
-        }
-      }
-    };
-  }
-
-  private TimerTask getIdleLogTimer() {
-    return new TimerTask() {
-      @Override
-      public void run() {
-        try {
-          closeLogFileIfIdle();
-        } catch (IOException e) {
-          LOG.error("Unknown error while trying to close output file.", e);
-        }
-      }
-
-    };
-  }
-
-  @Override
-  public void sync() throws IOException {
-    ensureOpen();
-    _writeLock.lock();
-    ensureOpenForWriting();
-    try {
-      syncInternal();
-    } catch (RemoteException e) {
-      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key value store.", e);
-    } catch (LeaseExpiredException e) {
-      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key value store.", e);
-    } finally {
-      _writeLock.unlock();
-    }
-  }
-
-  @Override
-  public Iterable<Entry<BytesRef, BytesRef>> scan(BytesRef key) throws IOException {
-    ensureOpen();
-    if (key == null) {
-      key = _pointers.firstKey();
-    }
-    ConcurrentNavigableMap<BytesRef, Value> tailMap = _pointers.tailMap(key, true);
-    final Set<Entry<BytesRef, Value>> entrySet = tailMap.entrySet();
-    return new Iterable<Entry<BytesRef, BytesRef>>() {
-      @Override
-      public Iterator<Entry<BytesRef, BytesRef>> iterator() {
-        final Iterator<Entry<BytesRef, Value>> iterator = entrySet.iterator();
-        return new Iterator<Entry<BytesRef, BytesRef>>() {
-
-          @Override
-          public boolean hasNext() {
-            return iterator.hasNext();
-          }
-
-          @Override
-          public Entry<BytesRef, BytesRef> next() {
-            final Entry<BytesRef, Value> e = iterator.next();
-            return new Entry<BytesRef, BytesRef>() {
-
-              @Override
-              public BytesRef setValue(BytesRef value) {
-                throw new RuntimeException("Read only.");
-              }
-
-              @Override
-              public BytesRef getValue() {
-                return e.getValue()._bytesRef;
-              }
-
-              @Override
-              public BytesRef getKey() {
-                return e.getKey();
-              }
-            };
-          }
-
-          @Override
-          public void remove() {
-            throw new RuntimeException("Read only.");
-          }
-        };
-      }
-    };
-  }
-
-  @Override
-  public void put(BytesRef key, BytesRef value) throws IOException {
-    ensureOpen();
-    if (value == null) {
-      delete(key);
-      return;
-    }
-    _writeLock.lock();
-    ensureOpenForWriting();
-    try {
-      Operation op = getPutOperation(OperationType.PUT, key, value);
-      Path path = write(op);
-      BytesRef deepCopyOf = BytesRef.deepCopyOf(value);
-      _size.addAndGet(deepCopyOf.bytes.length);
-      Value old = _pointers.put(BytesRef.deepCopyOf(key), new Value(deepCopyOf, path));
-      if (old != null) {
-        _size.addAndGet(-old._bytesRef.bytes.length);
-      }
-    } catch (RemoteException e) {
-      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key value store.", e);
-    } catch (LeaseExpiredException e) {
-      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key value store.", e);
-    } finally {
-      _writeLock.unlock();
-    }
-  }
-
-  private void ensureOpenForWriting() throws IOException {
-    if (_output == null) {
-      openWriter();
-    }
-  }
-
-  private Path write(Operation op) throws IOException {
-    op.write(_output);
-    Path p = _outputPath;
-    if (_output.getPos() >= _maxAmountAllowedPerFile) {
-      rollFile();
-    }
-    return p;
-  }
-
-  private void rollFile() throws IOException {
-    LOG.info("Rolling file [" + _outputPath + "]");
-    _output.close();
-    _output = null;
-    openWriter();
-  }
-
-  public void cleanupOldFiles() throws IOException {
-    _writeLock.lock();
-    try {
-      if (!isOpenForWriting()) {
-        return;
-      }
-      SortedSet<FileStatus> fileStatusSet = getSortedSet(_path);
-      if (fileStatusSet == null || fileStatusSet.size() < 1) {
-        return;
-      }
-      Path newestGen = fileStatusSet.last().getPath();
-      if (!newestGen.equals(_outputPath)) {
-        throw new IOException("No longer the owner of [" + _path + "]");
-      }
-      Set<Path> existingFiles = new HashSet<Path>();
-      for (FileStatus fileStatus : fileStatusSet) {
-        existingFiles.add(fileStatus.getPath());
-      }
-      Set<Entry<BytesRef, Value>> entrySet = _pointers.entrySet();
-      existingFiles.remove(_outputPath);
-      for (Entry<BytesRef, Value> e : entrySet) {
-        Path p = e.getValue()._path;
-        existingFiles.remove(p);
-      }
-      for (Path p : existingFiles) {
-        LOG.info("Removing file no longer referenced [{0}]", p);
-        _fileSystem.delete(p, false);
-      }
-    } finally {
-      _writeLock.unlock();
-    }
-  }
-
-  private void closeLogFileIfIdle() throws IOException {
-    _writeLock.lock();
-    try {
-      if (_output != null && _lastWrite.get() + _maxOpenForWriting < System.currentTimeMillis()) {
-        // Close writer
-        LOG.info("Closing KV log due to inactivity [{0}].", _path);
-        try {
-          _output.close();
-        } finally {
-          _output = null;
-        }
-      }
-    } finally {
-      _writeLock.unlock();
-    }
-  }
-
-  private boolean isOpenForWriting() {
-    return _output != null;
-  }
-
-  private Operation getPutOperation(OperationType put, BytesRef key, BytesRef value) {
-    Operation operation = new Operation();
-    operation.type = put;
-    operation.key.set(key.bytes, key.offset, key.length);
-    operation.value.set(value.bytes, value.offset, value.length);
-    return operation;
-  }
-
-  private Operation getDeleteOperation(OperationType delete, BytesRef key) {
-    Operation operation = new Operation();
-    operation.type = delete;
-    operation.key.set(key.bytes, key.offset, key.length);
-    return operation;
-  }
-
-  @Override
-  public boolean get(BytesRef key, BytesRef value) throws IOException {
-    ensureOpen();
-    _readLock.lock();
-    try {
-      Value internalValue = _pointers.get(key);
-      if (internalValue == null) {
-        return false;
-      }
-      value.copyBytes(internalValue._bytesRef);
-      return true;
-    } finally {
-      _readLock.unlock();
-    }
-  }
-
-  @Override
-  public void delete(BytesRef key) throws IOException {
-    ensureOpen();
-    _writeLock.lock();
-    ensureOpenForWriting();
-    try {
-      Operation op = getDeleteOperation(OperationType.DELETE, key);
-      write(op);
-      Value old = _pointers.remove(key);
-      if (old != null) {
-        _size.addAndGet(-old._bytesRef.bytes.length);
-      }
-    } catch (RemoteException e) {
-      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key value store.", e);
-    } catch (LeaseExpiredException e) {
-      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key value store.", e);
-    } finally {
-      _writeLock.unlock();
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (!_isClosed) {
-      _isClosed = true;
-      if (_idleLogTimerTask != null) {
-        _idleLogTimerTask.cancel();
-      }
-      if (_oldFileCleanerTimerTask != null) {
-        _oldFileCleanerTimerTask.cancel();
-      }
-      if (_hdfsKeyValueTimer != null) {
-        _hdfsKeyValueTimer.purge();
-      }
-      _writeLock.lock();
-      try {
-        if (isOpenForWriting()) {
-          try {
-            syncInternal();
-          } finally {
-            IOUtils.closeQuietly(_output);
-            _output = null;
-          }
-        }
-      } finally {
-        _writeLock.unlock();
-      }
-    }
-  }
-
-  private void openWriter() throws IOException {
-    if (_readOnly) {
-      throw new IOException("Key value store is set in read only mode.");
-    }
-    _outputPath = getSegmentPath(_currentFileCounter.incrementAndGet());
-    LOG.info("Opening for writing [{0}].", _outputPath);
-    _output = _fileSystem.create(_outputPath, false);
-    _output.write(MAGIC);
-    _output.writeInt(VERSION);
-    syncInternal();
-  }
-
-  private Path getSegmentPath(long segment) {
-    return new Path(_path, buffer(segment));
-  }
-
-  private static String buffer(long number) {
-    String s = Long.toString(number);
-    StringBuilder builder = new StringBuilder();
-    for (int i = s.length(); i < 12; i++) {
-      builder.append('0');
-    }
-    return builder.append(s).toString();
-  }
-
-  private void loadIndexes() throws IOException {
-    for (FileStatus fileStatus : _fileStatus.get()) {
-      loadIndex(fileStatus.getPath());
-    }
-  }
-
-  private void ensureOpen() throws IOException {
-    if (_isClosed) {
-      throw new IOException("Already closed.");
-    }
-  }
-
-  private void syncInternal() throws IOException {
-    validateNextSegmentHasNotStarted();
-    _output.flush();
-    _output.sync();
-    _lastWrite.set(System.currentTimeMillis());
-  }
-
-  private void validateNextSegmentHasNotStarted() throws IOException {
-    Path p = getSegmentPath(_currentFileCounter.get() + 1);
-    if (_fileSystem.exists(p)) {
-      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key value store.");
-    }
-  }
-
-  private void loadIndex(Path path) throws IOException {
-    FSDataInputStream inputStream = _fileSystem.open(path);
-    byte[] buf = new byte[MAGIC.length];
-    inputStream.readFully(buf);
-    if (!Arrays.equals(MAGIC, buf)) {
-      throw new IOException("File [" + path + "] not a " + BLUR_KEY_VALUE + " file.");
-    }
-    int version = inputStream.readInt();
-    if (version == 1) {
-      long fileLength = HdfsUtils.getFileLength(_fileSystem, path, inputStream);
-      Operation operation = new Operation();
-      try {
-        while (inputStream.getPos() < fileLength) {
-          try {
-            operation.readFields(inputStream);
-          } catch (IOException e) {
-            // End of sync point found
-            return;
-          }
-          loadIndex(path, operation);
-        }
-      } finally {
-        inputStream.close();
-      }
-    } else {
-      throw new IOException("Unknown version [" + version + "]");
-    }
-  }
-
-  private void loadIndex(Path path, Operation operation) {
-    Value old;
-    switch (operation.type) {
-    case PUT:
-      BytesRef deepCopyOf = BytesRef.deepCopyOf(getKey(operation.value));
-      _size.addAndGet(deepCopyOf.bytes.length);
-      old = _pointers.put(BytesRef.deepCopyOf(getKey(operation.key)), new Value(deepCopyOf, path));
-      break;
-    case DELETE:
-      old = _pointers.remove(getKey(operation.key));
-      break;
-    default:
-      throw new RuntimeException("Not supported [" + operation.type + "]");
-    }
-    if (old != null) {
-      _size.addAndGet(-old._bytesRef.bytes.length);
-    }
-  }
-
-  private BytesRef getKey(BytesWritable key) {
-    return new BytesRef(key.getBytes(), 0, key.getLength());
-  }
-
-  private SortedSet<FileStatus> getSortedSet(Path p) throws IOException {
-    if (_fileSystem.exists(p)) {
-      FileStatus[] listStatus = _fileSystem.listStatus(p);
-      if (listStatus != null) {
-        return new TreeSet<FileStatus>(Arrays.asList(listStatus));
-      }
-    }
-    return new TreeSet<FileStatus>();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f88a9ef5/blur-store/src/test/java/org/apache/blur/HdfsMiniClusterUtil.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/HdfsMiniClusterUtil.java b/blur-store/src/test/java/org/apache/blur/HdfsMiniClusterUtil.java
deleted file mode 100644
index c5e1c4e..0000000
--- a/blur-store/src/test/java/org/apache/blur/HdfsMiniClusterUtil.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-
-public class HdfsMiniClusterUtil {
-
-  private static final Log LOG = LogFactory.getLog(HdfsMiniClusterUtil.class);
-
-  public static MiniDFSCluster startDfs(Configuration conf, boolean format, String path) {
-    String perm;
-    Path p = new Path(new File("./target").getAbsolutePath());
-    try {
-      FileSystem fileSystem = p.getFileSystem(conf);
-      FileStatus fileStatus = fileSystem.getFileStatus(p);
-      FsPermission permission = fileStatus.getPermission();
-      perm = permission.getUserAction().ordinal() + "" + permission.getGroupAction().ordinal() + ""
-          + permission.getOtherAction().ordinal();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    LOG.info("dfs.datanode.data.dir.perm=" + perm);
-    conf.set("dfs.datanode.data.dir.perm", perm);
-    System.setProperty("test.build.data", path);
-    try {
-      MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, (String[]) null);
-      cluster.waitActive();
-      return cluster;
-    } catch (Exception e) {
-      LOG.error("error opening file system", e);
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static void shutdownDfs(MiniDFSCluster cluster) {
-    if (cluster != null) {
-      LOG.info("Shutting down Mini DFS ");
-      try {
-        cluster.shutdown();
-      } catch (Exception e) {
-        // / Can get a java.lang.reflect.UndeclaredThrowableException thrown
-        // here because of an InterruptedException. Don't let exceptions in
-        // here be cause of test failure.
-      }
-      try {
-        FileSystem fs = cluster.getFileSystem();
-        if (fs != null) {
-          LOG.info("Shutting down FileSystem");
-          fs.close();
-        }
-        FileSystem.closeAll();
-      } catch (IOException e) {
-        LOG.error("error closing file system", e);
-      }
-
-      // This has got to be one of the worst hacks I have ever had to do.
-      // This is needed to shutdown 2 thread pools that are not shutdown by
-      // themselves.
-      ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
-      Thread[] threads = new Thread[100];
-      int enumerate = threadGroup.enumerate(threads);
-      for (int i = 0; i < enumerate; i++) {
-        Thread thread = threads[i];
-        if (thread.getName().startsWith("pool")) {
-          if (thread.isAlive()) {
-            thread.interrupt();
-            LOG.info("Stopping ThreadPoolExecutor [" + thread.getName() + "]");
-            Object target = getField(Thread.class, thread, "target");
-            if (target != null) {
-              ThreadPoolExecutor e = (ThreadPoolExecutor) getField(ThreadPoolExecutor.class, target, "this$0");
-              if (e != null) {
-                e.shutdownNow();
-              }
-            }
-            try {
-              LOG.info("Waiting for thread pool to exit [" + thread.getName() + "]");
-              thread.join();
-            } catch (InterruptedException e) {
-              throw new RuntimeException(e);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  private static Object getField(Class<?> c, Object o, String fieldName) {
-    try {
-      Field field = c.getDeclaredField(fieldName);
-      field.setAccessible(true);
-      return field.get(o);
-    } catch (NoSuchFieldException e) {
-      try {
-        Field field = o.getClass().getDeclaredField(fieldName);
-        field.setAccessible(true);
-        return field.get(o);
-      } catch (Exception ex) {
-        throw new RuntimeException(ex);
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f88a9ef5/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
index 21b3ace..216fa8d 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
@@ -16,7 +16,8 @@
  */
 package org.apache.blur.store.hdfs_v2;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
@@ -28,6 +29,7 @@ import java.util.Timer;
 import java.util.TreeSet;
 
 import org.apache.blur.HdfsMiniClusterUtil;
+import org.apache.blur.kvs.HdfsKeyValueStore;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;


Mime
View raw message