drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h.@apache.org
Subject [4/4] drill git commit: DRILL-4275: create TransientStore for short-lived objects; refactor PersistentStore to introduce pagination mechanism
Date Fri, 19 Feb 2016 05:29:06 GMT
DRILL-4275: create TransientStore for short-lived objects; refactor PersistentStore to introduce pagination mechanism


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8126927f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8126927f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8126927f

Branch: refs/heads/master
Commit: 8126927fd6f538000a28fb77e704a37e20abbe6c
Parents: 9a3a5c4
Author: Hanifi Gunes <hanifigunes@gmail.com>
Authored: Mon Feb 15 15:16:47 2016 -0800
Committer: Hanifi Gunes <hanifigunes@gmail.com>
Committed: Thu Feb 18 16:47:02 2016 -0800

----------------------------------------------------------------------
 .../common/collections/ImmutableEntry.java      |  62 ++++
 .../exec/store/hbase/config/HBasePStore.java    | 236 ---------------
 .../store/hbase/config/HBasePStoreProvider.java | 157 +---------
 .../hbase/config/HBasePersistentStore.java      | 238 +++++++++++++++
 .../config/HBasePersistentStoreProvider.java    | 135 +++++++++
 .../drill/hbase/TestHBaseTableProvider.java     |  23 +-
 .../exec/store/mongo/config/MongoPStore.java    | 184 ------------
 .../store/mongo/config/MongoPStoreProvider.java |  88 ------
 .../mongo/config/MongoPersistentStore.java      | 190 ++++++++++++
 .../config/MongoPersistentStoreProvider.java    |  84 ++++++
 .../src/resources/drill-override-example.conf   |   2 +-
 exec/java-exec/pom.xml                          |   6 +
 .../apache/drill/exec/client/DrillClient.java   |   2 +-
 .../apache/drill/exec/compile/CodeCompiler.java |   3 -
 .../drill/exec/coord/ClusterCoordinator.java    |  18 +-
 .../coord/local/LocalClusterCoordinator.java    |  22 +-
 .../drill/exec/coord/local/MapBackedStore.java  |  85 ++++++
 .../exec/coord/store/BaseTransientStore.java    |  83 ++++++
 .../store/CachingTransientStoreFactory.java     |  63 ++++
 .../drill/exec/coord/store/TransientStore.java  | 106 +++++++
 .../exec/coord/store/TransientStoreConfig.java  |  74 +++++
 .../store/TransientStoreConfigBuilder.java      |  50 ++++
 .../exec/coord/store/TransientStoreEvent.java   |  71 +++++
 .../coord/store/TransientStoreEventType.java    |  27 ++
 .../exec/coord/store/TransientStoreFactory.java |  36 +++
 .../coord/store/TransientStoreListener.java     |  32 ++
 .../drill/exec/coord/zk/EventDispatcher.java    |  60 ++++
 .../apache/drill/exec/coord/zk/PathUtils.java   |  73 +++++
 .../exec/coord/zk/ZKClusterCoordinator.java     |  57 ++--
 .../drill/exec/coord/zk/ZkEphemeralStore.java   | 145 +++++++++
 .../exec/coord/zk/ZkTransientStoreFactory.java  |  50 ++++
 .../drill/exec/coord/zk/ZookeeperClient.java    | 238 +++++++++++++++
 .../drill/exec/exception/StoreException.java    |  42 +++
 .../exec/serialization/InstanceSerializer.java  |  25 ++
 .../exec/serialization/JacksonSerializer.java   |  59 ++++
 .../exec/serialization/ProtoSerializer.java     |  65 +++++
 .../org/apache/drill/exec/server/Drillbit.java  |  14 +-
 .../drill/exec/server/DrillbitContext.java      |  14 +-
 .../drill/exec/server/RemoteServiceSet.java     |   7 +-
 .../drill/exec/server/options/OptionValue.java  |   3 +-
 .../server/options/SystemOptionManager.java     |  34 ++-
 .../drill/exec/server/rest/DrillRestServer.java |   4 +-
 .../exec/server/rest/StorageResources.java      |   2 +-
 .../server/rest/profile/ProfileResources.java   |  97 ++++---
 .../drill/exec/store/StoragePluginRegistry.java |   4 +-
 .../exec/store/StoragePluginRegistryImpl.java   |  22 +-
 .../exec/store/sys/BasePersistentStore.java     |  32 ++
 .../exec/store/sys/CachingStoreProvider.java    |  70 -----
 .../org/apache/drill/exec/store/sys/EStore.java |  28 --
 .../drill/exec/store/sys/EStoreProvider.java    |  27 --
 .../drill/exec/store/sys/OptionIterator.java    |   5 +-
 .../org/apache/drill/exec/store/sys/PStore.java |  33 ---
 .../drill/exec/store/sys/PStoreConfig.java      | 166 -----------
 .../drill/exec/store/sys/PStoreProvider.java    |  25 --
 .../drill/exec/store/sys/PStoreRegistry.java    |  64 ----
 .../drill/exec/store/sys/PersistentStore.java   |  77 +++++
 .../exec/store/sys/PersistentStoreConfig.java   | 115 ++++++++
 .../exec/store/sys/PersistentStoreMode.java     |  26 ++
 .../exec/store/sys/PersistentStoreProvider.java |  42 +++
 .../exec/store/sys/PersistentStoreRegistry.java |  65 +++++
 .../drill/exec/store/sys/StaticDrillTable.java  |   5 +-
 .../drill/exec/store/sys/SystemTablePlugin.java |   2 +-
 .../drill/exec/store/sys/SystemTableScan.java   |   9 +-
 .../drill/exec/store/sys/ThreadsIterator.java   |   6 +-
 .../drill/exec/store/sys/VersionIterator.java   |   2 -
 .../drill/exec/store/sys/local/FilePStore.java  | 234 ---------------
 .../store/sys/local/LocalEStoreProvider.java    |  47 ---
 .../store/sys/local/LocalPStoreProvider.java    |  83 +-----
 .../drill/exec/store/sys/local/MapEStore.java   |  64 ----
 .../store/sys/local/NoWriteLocalPStore.java     |  69 -----
 .../store/sys/serialize/JacksonSerializer.java  |  86 ------
 .../store/sys/serialize/PClassSerializer.java   |  25 --
 .../store/sys/serialize/ProtoSerializer.java    |  93 ------
 .../store/sys/store/LocalPersistentStore.java   | 196 +++++++++++++
 .../sys/store/ZookeeperPersistentStore.java     | 135 +++++++++
 .../provider/BasePersistentStoreProvider.java   |  28 ++
 .../CachingPersistentStoreProvider.java         |  76 +++++
 .../provider/LocalPersistentStoreProvider.java  |  80 +++++
 .../ZookeeperPersistentStoreProvider.java       |  89 ++++++
 .../exec/store/sys/zk/ZkAbstractStore.java      | 291 -------------------
 .../drill/exec/store/sys/zk/ZkEStore.java       |  41 ---
 .../exec/store/sys/zk/ZkEStoreProvider.java     |  50 ----
 .../drill/exec/store/sys/zk/ZkPStore.java       |  41 ---
 .../exec/store/sys/zk/ZkPStoreProvider.java     |  92 +-----
 .../exec/testing/store/NoWriteLocalStore.java   |  66 +++++
 .../org/apache/drill/exec/work/WorkManager.java |   6 +-
 .../apache/drill/exec/work/foreman/Foreman.java |  10 +-
 .../drill/exec/work/foreman/QueryManager.java   |  52 ++--
 .../src/main/resources/drill-module.conf        |   2 +-
 .../java/org/apache/drill/BaseTestQuery.java    |   2 +-
 .../java/org/apache/drill/PlanningBase.java     |   6 +-
 .../exec/compile/CodeCompilerTestFactory.java   |   8 +-
 .../compile/bytecode/ReplaceMethodInvoke.java   |   5 +-
 .../drill/exec/coord/zk/TestEphemeralStore.java | 144 +++++++++
 .../exec/coord/zk/TestEventDispatcher.java      |  94 ++++++
 .../drill/exec/coord/zk/TestPathUtils.java      |  92 ++++++
 .../exec/coord/zk/TestZookeeperClient.java      | 201 +++++++++++++
 .../exec/physical/impl/TestOptiqPlans.java      |   4 +-
 .../exec/physical/impl/join/TestHashJoin.java   |   5 +-
 .../drill/exec/store/sys/PStoreTestUtil.java    |  14 +-
 .../exec/store/sys/TestPStoreProviders.java     |  10 +-
 101 files changed, 3967 insertions(+), 2465 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/common/src/main/java/org/apache/drill/common/collections/ImmutableEntry.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/collections/ImmutableEntry.java b/common/src/main/java/org/apache/drill/common/collections/ImmutableEntry.java
new file mode 100644
index 0000000..99ea544
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/collections/ImmutableEntry.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.collections;
+
+import java.util.Map;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+public class ImmutableEntry<K, V> implements Map.Entry<K, V>  {
+  private final K key;
+  private final V value;
+
+  public ImmutableEntry(final K key, final V value) {
+    this.key = Preconditions.checkNotNull(key, "key is required");
+    this.value = Preconditions.checkNotNull(value, "value is required");
+  }
+
+  @Override
+  public K getKey() {
+    return key;
+  }
+
+  @Override
+  public V getValue() {
+    return value;
+  }
+
+  @Override
+  public V setValue(final V value) {
+    throw new UnsupportedOperationException("entry is immutable");
+  }
+
+  @Override
+  public boolean equals(final Object other) {
+    if (other instanceof ImmutableEntry && other.getClass() == getClass()) {
+      final ImmutableEntry<K, V> entry = (ImmutableEntry<K, V>)other;
+      return Objects.equal(key, entry.key) && Objects.equal(value, entry.value);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(key, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
deleted file mode 100644
index 17ddcb1..0000000
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hbase.config;
-
-import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.FAMILY;
-import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.QUALIFIER;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-
-public class HBasePStore<V> implements PStore<V> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePStore.class);
-
-  private final PStoreConfig<V> config;
-
-  private final HTableInterface table;
-
-  private final String tableName;
-  private final byte[] tableNameStartKey;
-  private final byte[] tableNameStopKey;
-
-  public HBasePStore(PStoreConfig<V> config, HTableInterface table) throws IOException {
-    this.tableName = config.getName() + '\0';
-    this.tableNameStartKey = Bytes.toBytes(tableName); // "tableName\x00"
-    this.tableNameStopKey = this.tableNameStartKey.clone();
-    this.tableNameStopKey[tableNameStartKey.length-1] = 1;
-    this.config = config;
-    this.table = table;
-  }
-
-  @Override
-  public V get(String key) {
-    return get(key, FAMILY);
-  }
-
-  protected synchronized V get(String key, byte[] family) {
-    try {
-      Get get = new Get(row(key));
-      get.addColumn(family, QUALIFIER);
-      Result r = table.get(get);
-      if(r.isEmpty()){
-        return null;
-      }
-      return value(r);
-    } catch (IOException e) {
-      throw new DrillRuntimeException("Caught error while getting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e);
-    }
-  }
-
-  @Override
-  public void put(String key, V value) {
-    put(key, FAMILY, value);
-  }
-
-  protected synchronized void put(String key, byte[] family, V value) {
-    try {
-      Put put = new Put(row(key));
-      put.add(family, QUALIFIER, bytes(value));
-      table.put(put);
-    } catch (IOException e) {
-      throw new DrillRuntimeException("Caught error while putting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e);
-    }
-  }
-
-  @Override
-  public synchronized boolean putIfAbsent(String key, V value) {
-    try {
-      Put put = new Put(row(key));
-      put.add(FAMILY, QUALIFIER, bytes(value));
-      return table.checkAndPut(put.getRow(), FAMILY, QUALIFIER, null /*absent*/, put);
-    } catch (IOException e) {
-      throw new DrillRuntimeException("Caught error while putting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e);
-    }
-  }
-
-  @Override
-  public synchronized void delete(String key) {
-    delete(row(key));
-  }
-
-  @Override
-  public Iterator<Entry<String, V>> iterator() {
-    return new Iter();
-  }
-
-  private byte[] row(String key) {
-    return Bytes.toBytes(this.tableName + key);
-  }
-
-  private byte[] bytes(V value) {
-    try {
-      return config.getSerializer().serialize(value);
-    } catch (IOException e) {
-      throw new DrillRuntimeException(e);
-    }
-  }
-
-  private V value(Result result) {
-    try {
-      return config.getSerializer().deserialize(result.value());
-    } catch (IOException e) {
-      throw new DrillRuntimeException(e);
-    }
-  }
-
-  private void delete(byte[] row) {
-    try {
-      Delete del = new Delete(row);
-      table.delete(del);
-    } catch (IOException e) {
-      throw new DrillRuntimeException("Caught error while deleting row '" + Bytes.toStringBinary(row)
-          + "' from for table:" + Bytes.toString(table.getTableName()), e);
-    }
-  }
-
-  private class Iter implements Iterator<Entry<String, V>> {
-    private ResultScanner scanner;
-    private Result current = null;
-    private Result last = null;
-    private boolean done = false;
-    private int rowsRead = 0;
-
-    Iter() {
-      try {
-        Scan scan = new Scan(tableNameStartKey, tableNameStopKey);
-        scan.addColumn(FAMILY, QUALIFIER);
-        scan.setCaching(config.getMaxIteratorSize() > 100 ? 100 : config.getMaxIteratorSize());
-        scanner = table.getScanner(scan);
-      } catch (IOException e) {
-        throw new DrillRuntimeException("Caught error while creating HBase scanner for table:" + Bytes.toString(table.getTableName()), e);
-      }
-    }
-
-    @Override
-    public boolean hasNext()  {
-      if (config.getMode() == Mode.BLOB_PERSISTENT
-          && rowsRead >= config.getMaxIteratorSize()) {
-        done = true;
-      } else if (!done && current == null) {
-        try {
-          if ((current = scanner.next()) == null) {
-            done = true;
-          }
-        } catch (IOException e) {
-          throw new DrillRuntimeException("Caught error while fetching rows from for table:" + Bytes.toString(table.getTableName()), e);
-        }
-      }
-
-      if (done && scanner != null) {
-        scanner.close();
-      }
-      return (current != null);
-    }
-
-    @Override
-    public Entry<String, V> next() {
-      if (!hasNext()) {
-        throw new NoSuchElementException();
-      }
-      last = current;
-      current = null;
-      rowsRead++;
-      return new DeferredEntry(last);
-    }
-
-    @Override
-    public void remove() {
-      if (last == null) {
-        throw new IllegalStateException("remove() called on HBase persistent store iterator, but there is no last row.");
-      }
-      delete(last.getRow());
-    }
-
-  }
-
-  private class DeferredEntry implements Entry<String, V>{
-
-    private Result result;
-
-    public DeferredEntry(Result result) {
-      super();
-      this.result = result;
-    }
-
-    @Override
-    public String getKey() {
-      return Bytes.toString(result.getRow(), tableNameStartKey.length, result.getRow().length-tableNameStartKey.length);
-    }
-
-    @Override
-    public V getValue() {
-      return value(result);
-    }
-
-    @Override
-    public V setValue(V value) {
-      throw new UnsupportedOperationException();
-    }
-
-  }
-
-  @Override
-  public void close() {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
index e57f2b3..df32b87 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,149 +17,16 @@
  */
 package org.apache.drill.exec.store.hbase.config;
 
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.coord.ClusterCoordinator;
-import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
-import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.store.sys.PStoreRegistry;
-import org.apache.drill.exec.store.sys.local.LocalEStoreProvider;
-import org.apache.drill.exec.store.sys.zk.ZkEStoreProvider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.common.annotations.VisibleForTesting;
-
-public class HBasePStoreProvider implements PStoreProvider {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePStoreProvider.class);
-
-  static final byte[] FAMILY = Bytes.toBytes("s");
-
-  static final byte[] QUALIFIER = Bytes.toBytes("d");
-
-  private final String storeTableName;
-
-  private Configuration hbaseConf;
+import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
 
-  private HConnection connection;
-
-  private HTableInterface table;
-
-  private final boolean zkAvailable;
-  private final LocalEStoreProvider localEStoreProvider;
-  private final ZkEStoreProvider zkEStoreProvider;
-
-  public HBasePStoreProvider(PStoreRegistry registry) {
-    @SuppressWarnings("unchecked")
-    Map<String, Object> config = (Map<String, Object>) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_CONFIG);
-    this.hbaseConf = HBaseConfiguration.create();
-    this.hbaseConf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "drill-hbase-persistent-store-client");
-    if (config != null) {
-      for (Map.Entry<String, Object> entry : config.entrySet()) {
-        this.hbaseConf.set(entry.getKey(), String.valueOf(entry.getValue()));
-      }
-    }
-    this.storeTableName = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE);
-
-    ClusterCoordinator coord = registry.getClusterCoordinator();
-    if ((coord instanceof ZKClusterCoordinator)) {
-      this.localEStoreProvider = null;
-      this.zkEStoreProvider = new ZkEStoreProvider(((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator());
-      this.zkAvailable = true;
-    } else {
-      this.localEStoreProvider = new LocalEStoreProvider();
-      this.zkEStoreProvider = null;
-      this.zkAvailable = false;
-    }
-
-  }
-
-  @VisibleForTesting
-  public HBasePStoreProvider(Configuration conf, String storeTableName) {
-    this.hbaseConf = conf;
-    this.storeTableName = storeTableName;
-    this.localEStoreProvider = new LocalEStoreProvider();
-    this.zkEStoreProvider = null;
-    this.zkAvailable = false;
-  }
-
-
-
-  @Override
-  public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException {
-    switch(config.getMode()){
-    case EPHEMERAL:
-      if (this.zkAvailable) {
-        return zkEStoreProvider.getStore(config);
-      } else {
-        return localEStoreProvider.getStore(config);
-      }
-
-    case BLOB_PERSISTENT:
-    case PERSISTENT:
-      return new HBasePStore<V>(config, this.table);
-
-    default:
-      throw new IllegalStateException();
-    }
-  }
-
-
-  @Override
-  @SuppressWarnings("deprecation")
-  public void start() throws IOException {
-    this.connection = HConnectionManager.createConnection(hbaseConf);
-
-    try(HBaseAdmin admin = new HBaseAdmin(connection)) {
-      if (!admin.tableExists(storeTableName)) {
-        HTableDescriptor desc = new HTableDescriptor(storeTableName);
-        desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(1));
-        admin.createTable(desc);
-      } else {
-        HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(storeTableName));
-        if (!desc.hasFamily(FAMILY)) {
-          throw new DrillRuntimeException("The HBase table " + storeTableName
-              + " specified as persistent store exists but does not contain column family: "
-              + (Bytes.toString(FAMILY)));
-        }
-      }
-    }
-
-    this.table = connection.getTable(storeTableName);
-    this.table.setAutoFlush(true);
-  }
-
-  @Override
-  public synchronized void close() {
-    if (this.table != null) {
-      try {
-        this.table.close();
-        this.table = null;
-      } catch (IOException e) {
-        logger.warn("Caught exception while closing HBase table.", e);
-      }
-    }
-    if (this.connection != null && !this.connection.isClosed()) {
-      try {
-        this.connection.close();
-      } catch (IOException e) {
-        logger.warn("Caught exception while closing HBase connection.", e);
-      }
-      this.connection = null;
-    }
+/**
+ * Kept for possible references to old class name in configuration.
+ *
+ * @deprecated will be removed in 1.7
+ *    use {@link HBasePersistentStoreProvider} instead.
+ */
+public class HBasePStoreProvider extends HBasePersistentStoreProvider {
+  public HBasePStoreProvider(PersistentStoreRegistry registry) {
+    super(registry);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java
new file mode 100644
index 0000000..ac78eb0
--- /dev/null
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hbase.config;
+
+import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.FAMILY;
+import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.QUALIFIER;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+
+import com.google.common.collect.Iterators;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.store.sys.BasePersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreMode;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class HBasePersistentStore<V> extends BasePersistentStore<V> {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStore.class);
+
+  private final PersistentStoreConfig<V> config;
+  private final HTableInterface table;
+  private final String tableName;
+  private final byte[] tableNameStartKey;
+  private final byte[] tableNameStopKey;
+
+  public HBasePersistentStore(PersistentStoreConfig<V> config, HTableInterface table) {
+    this.tableName = config.getName() + '\0';
+    this.tableNameStartKey = Bytes.toBytes(tableName); // "tableName\x00"
+    this.tableNameStopKey = this.tableNameStartKey.clone();
+    this.tableNameStopKey[tableNameStartKey.length-1] = 1;
+    this.config = config;
+    this.table = table;
+  }
+
+  @Override
+  public PersistentStoreMode getMode() {
+    return PersistentStoreMode.PERSISTENT;
+  }
+
+  @Override
+  public V get(String key) {
+    return get(key, FAMILY);
+  }
+
+  protected synchronized V get(String key, byte[] family) {
+    try {
+      Get get = new Get(row(key));
+      get.addColumn(family, QUALIFIER);
+      Result r = table.get(get);
+      if(r.isEmpty()){
+        return null;
+      }
+      return value(r);
+    } catch (IOException e) {
+      throw new DrillRuntimeException("Caught error while getting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e);
+    }
+  }
+
+  @Override
+  public void put(String key, V value) {
+    put(key, FAMILY, value);
+  }
+
+  protected synchronized void put(String key, byte[] family, V value) {
+    try {
+      Put put = new Put(row(key));
+      put.add(family, QUALIFIER, bytes(value));
+      table.put(put);
+    } catch (IOException e) {
+      throw new DrillRuntimeException("Caught error while putting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e);
+    }
+  }
+
+  @Override
+  public synchronized boolean putIfAbsent(String key, V value) {
+    try {
+      Put put = new Put(row(key));
+      put.add(FAMILY, QUALIFIER, bytes(value));
+      return table.checkAndPut(put.getRow(), FAMILY, QUALIFIER, null /*absent*/, put);
+    } catch (IOException e) {
+      throw new DrillRuntimeException("Caught error while putting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e);
+    }
+  }
+
+  @Override
+  public synchronized void delete(String key) {
+    delete(row(key));
+  }
+
+  @Override
+  public Iterator<Entry<String, V>> getRange(int skip, int take) {
+    final Iterator<Entry<String, V>> iter = new Iter(take);
+    Iterators.advance(iter, skip);
+    return Iterators.limit(iter, take);
+  }
+
+  private byte[] row(String key) {
+    return Bytes.toBytes(this.tableName + key);
+  }
+
+  private byte[] bytes(V value) {
+    try {
+      return config.getSerializer().serialize(value);
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  private V value(Result result) {
+    try {
+      return config.getSerializer().deserialize(result.value());
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  private void delete(byte[] row) {
+    try {
+      Delete del = new Delete(row);
+      table.delete(del);
+    } catch (IOException e) {
+      throw new DrillRuntimeException("Caught error while deleting row '" + Bytes.toStringBinary(row)
+          + "' from for table:" + Bytes.toString(table.getTableName()), e);
+    }
+  }
+
+  private class Iter implements Iterator<Entry<String, V>> {
+    private ResultScanner scanner;
+    private Result current = null;
+    private Result last = null;
+    private boolean done = false;
+
+    Iter(int take) {
+      try {
+        Scan scan = new Scan(tableNameStartKey, tableNameStopKey);
+        scan.addColumn(FAMILY, QUALIFIER);
+        scan.setCaching(Math.min(take, 100));
+        scan.setBatch(take);  // set batch size
+        scanner = table.getScanner(scan);
+      } catch (IOException e) {
+        throw new DrillRuntimeException("Caught error while creating HBase scanner for table:" + Bytes.toString(table.getTableName()), e);
+      }
+    }
+
+    @Override
+    public boolean hasNext()  {
+      if (!done && current == null) {
+        try {
+          if ((current = scanner.next()) == null) {
+            done = true;
+          }
+        } catch (IOException e) {
+          throw new DrillRuntimeException("Caught error while fetching rows from for table:" + Bytes.toString(table.getTableName()), e);
+        }
+      }
+
+      if (done && scanner != null) {
+        scanner.close();
+      }
+      return (current != null);
+    }
+
+    @Override
+    public Entry<String, V> next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      last = current;
+      current = null;
+      return new DeferredEntry(last);
+    }
+
+    @Override
+    public void remove() {
+      if (last == null) {
+        throw new IllegalStateException("remove() called on HBase persistent store iterator, but there is no last row.");
+      }
+      delete(last.getRow());
+    }
+
+  }
+
+  private class DeferredEntry implements Entry<String, V>{
+
+    private Result result;
+
+    public DeferredEntry(Result result) {
+      super();
+      this.result = result;
+    }
+
+    @Override
+    public String getKey() {
+      return Bytes.toString(result.getRow(), tableNameStartKey.length, result.getRow().length-tableNameStartKey.length);
+    }
+
+    @Override
+    public V getValue() {
+      return value(result);
+    }
+
+    @Override
+    public V setValue(V value) {
+      throw new UnsupportedOperationException();
+    }
+
+  }
+
+  @Override
+  public void close() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java
new file mode 100644
index 0000000..6e379c6
--- /dev/null
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hbase.config;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.exception.StoreException;
+import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
+import org.apache.drill.exec.store.sys.store.provider.BasePersistentStoreProvider;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class HBasePersistentStoreProvider extends BasePersistentStoreProvider {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStoreProvider.class);
+
+  static final byte[] FAMILY = Bytes.toBytes("s");
+
+  static final byte[] QUALIFIER = Bytes.toBytes("d");
+
+  private final String storeTableName;
+
+  private Configuration hbaseConf;
+
+  private HConnection connection;
+
+  private HTableInterface table;
+
+  public HBasePersistentStoreProvider(PersistentStoreRegistry registry) {
+    @SuppressWarnings("unchecked")
+    final Map<String, Object> config = (Map<String, Object>) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_CONFIG);
+    this.hbaseConf = HBaseConfiguration.create();
+    this.hbaseConf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "drill-hbase-persistent-store-client");
+    if (config != null) {
+      for (Map.Entry<String, Object> entry : config.entrySet()) {
+        this.hbaseConf.set(entry.getKey(), String.valueOf(entry.getValue()));
+      }
+    }
+    this.storeTableName = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE);
+  }
+
+  @VisibleForTesting
+  public HBasePersistentStoreProvider(Configuration conf, String storeTableName) {
+    this.hbaseConf = conf;
+    this.storeTableName = storeTableName;
+  }
+
+
+
+  @Override
+  public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException {
+    switch(config.getMode()){
+    case BLOB_PERSISTENT:
+    case PERSISTENT:
+      return new HBasePersistentStore<>(config, this.table);
+
+    default:
+      throw new IllegalStateException();
+    }
+  }
+
+
+  @Override
+  @SuppressWarnings("deprecation")
+  public void start() throws IOException {
+    this.connection = HConnectionManager.createConnection(hbaseConf);
+
+    try(HBaseAdmin admin = new HBaseAdmin(connection)) {
+      if (!admin.tableExists(storeTableName)) {
+        HTableDescriptor desc = new HTableDescriptor(storeTableName);
+        desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(1));
+        admin.createTable(desc);
+      } else {
+        HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(storeTableName));
+        if (!desc.hasFamily(FAMILY)) {
+          throw new DrillRuntimeException("The HBase table " + storeTableName
+              + " specified as persistent store exists but does not contain column family: "
+              + (Bytes.toString(FAMILY)));
+        }
+      }
+    }
+
+    this.table = connection.getTable(storeTableName);
+    this.table.setAutoFlush(true);
+  }
+
+  @Override
+  public synchronized void close() {
+    if (this.table != null) {
+      try {
+        this.table.close();
+        this.table = null;
+      } catch (IOException e) {
+        logger.warn("Caught exception while closing HBase table.", e);
+      }
+    }
+    if (this.connection != null && !this.connection.isClosed()) {
+      try {
+        this.connection.close();
+      } catch (IOException e) {
+        logger.warn("Caught exception while closing HBase connection.", e);
+      }
+      this.connection = null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
index 9ff1b08..6b73283 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
@@ -19,32 +19,33 @@ package org.apache.drill.hbase;
 
 import static org.junit.Assert.assertEquals;
 
-import java.io.IOException;
 import java.util.Map.Entry;
 
+import com.google.common.collect.Lists;
 import org.apache.drill.common.config.LogicalPlanPersistence;
+import org.apache.drill.exec.exception.StoreException;
 import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
-import org.apache.drill.exec.store.hbase.config.HBasePStoreProvider;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestHBaseTableProvider extends BaseHBaseTest {
 
-  private static HBasePStoreProvider provider;
+  private static HBasePersistentStoreProvider provider;
 
   @BeforeClass // mask HBase cluster start function
   public static void setUpBeforeTestHBaseTableProvider() throws Exception {
-    provider = new HBasePStoreProvider(storagePluginConfig.getHBaseConf(), "drill_store");
+    provider = new HBasePersistentStoreProvider(storagePluginConfig.getHBaseConf(), "drill_store");
     provider.start();
   }
 
   @Test
-  public void testTableProvider() throws IOException {
+  public void testTableProvider() throws StoreException {
     LogicalPlanPersistence lp = PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(config);
-    PStore<String> hbaseStore = provider.getStore(PStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase").build());
+    PersistentStore<String> hbaseStore = provider.getOrCreateStore(PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase").build());
     hbaseStore.put("", "v0");
     hbaseStore.put("k1", "v1");
     hbaseStore.put("k2", "v2");
@@ -57,13 +58,13 @@ public class TestHBaseTableProvider extends BaseHBaseTest {
     assertEquals("testValue", hbaseStore.get(".test"));
 
     int rowCount = 0;
-    for (Entry<String, String> entry : hbaseStore) {
+    for (Entry<String, String> entry : Lists.newArrayList(hbaseStore.getAll())) {
       rowCount++;
       System.out.println(entry.getKey() + "=" + entry.getValue());
     }
     assertEquals(7, rowCount);
 
-    PStore<String> hbaseTestStore = provider.getStore(PStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase.test").build());
+    PersistentStore<String> hbaseTestStore = provider.getOrCreateStore(PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase.test").build());
     hbaseTestStore.put("", "v0");
     hbaseTestStore.put("k1", "v1");
     hbaseTestStore.put("k2", "v2");
@@ -75,7 +76,7 @@ public class TestHBaseTableProvider extends BaseHBaseTest {
     assertEquals("testValue", hbaseStore.get(".test"));
 
     rowCount = 0;
-    for (Entry<String, String> entry : hbaseTestStore) {
+    for (Entry<String, String> entry : Lists.newArrayList(hbaseTestStore.getAll())) {
       rowCount++;
       System.out.println(entry.getKey() + "=" + entry.getValue());
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
deleted file mode 100644
index 7883fc9..0000000
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.mongo.config;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.store.mongo.DrillMongoConstants;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoCursor;
-import com.mongodb.client.model.Filters;
-import com.mongodb.client.model.UpdateOptions;
-import com.mongodb.client.model.Updates;
-import com.mongodb.client.result.UpdateResult;
-
-import static org.apache.drill.exec.store.mongo.config.MongoPStoreProvider.pKey;
-
-public class MongoPStore<V> implements PStore<V>, DrillMongoConstants {
-
-  static final Logger logger = LoggerFactory.getLogger(MongoPStore.class);
-
-  private final PStoreConfig<V> config;
-
-  private final MongoCollection<Document> collection;
-
-  public MongoPStore(PStoreConfig<V> config, MongoCollection<Document> collection)
-      throws IOException {
-//    this.config = config;
-//    this.collection = collection;
-    throw new UnsupportedOperationException("Mongo DB PStore not currently supported");
-  }
-
-  @Override
-  public V get(String key) {
-    try {
-      Bson query = Filters.eq(ID, key);
-      Document document = collection.find(query).first();
-      if (document != null) {
-        return value((byte[]) document.get(pKey));
-      } else {
-        return null;
-      }
-    } catch (Exception e) {
-      logger.error(e.getMessage(), e);
-      throw new DrillRuntimeException(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public void put(String key, V value) {
-    try {
-      Document putObj = new Document(ID, key).append(pKey, bytes(value));
-      collection.insertOne(putObj);
-    } catch (Exception e) {
-      logger.error(e.getMessage(), e);
-      throw new DrillRuntimeException(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public boolean putIfAbsent(String key, V value) {
-    try {
-      Bson query = Filters.eq(ID, key);
-      Bson update = Updates.set(pKey, bytes(value));
-      UpdateResult updateResult = collection.updateOne(query, update, new UpdateOptions().upsert(true));
-      return updateResult.getModifiedCount() == 1;
-    } catch (Exception e) {
-      logger.error(e.getMessage(), e);
-      throw new DrillRuntimeException(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public void delete(String key) {
-    try {
-      Bson query = Filters.eq(ID, key);
-      collection.deleteOne(query);
-    } catch (Exception e) {
-      logger.error(e.getMessage(), e);
-      throw new DrillRuntimeException(e.getMessage(), e);
-    }
-  }
-
-  private byte[] bytes(V value) {
-    try {
-      return config.getSerializer().serialize(value);
-    } catch (IOException e) {
-      throw new DrillRuntimeException(e.getMessage(), e);
-    }
-  }
-
-  private V value(byte[] serialize) {
-    try {
-      return config.getSerializer().deserialize(serialize);
-    } catch (IOException e) {
-      throw new DrillRuntimeException(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public Iterator<Entry<String, V>> iterator() {
-    return new MongoIterator();
-  }
-
-  private class MongoIterator implements Iterator<Entry<String, V>> {
-
-    private MongoCursor<Document> cursor;
-
-    public MongoIterator() {
-      cursor = collection.find().iterator();
-    }
-
-    @Override
-    public boolean hasNext() {
-      return cursor.hasNext();
-    }
-
-    @Override
-    public Entry<String, V> next() {
-      if (!hasNext()) {
-        throw new NoSuchElementException();
-      }
-      return new DeferredEntry(cursor.next());
-    }
-
-    @Override
-    public void remove() {
-      cursor.remove();
-    }
-  }
-
-  private class DeferredEntry implements Entry<String, V> {
-
-    private Document result;
-
-    public DeferredEntry(Document result) {
-      this.result = result;
-    }
-
-    @Override
-    public String getKey() {
-      return result.get(ID).toString();
-    }
-
-    @Override
-    public V getValue() {
-      return get(result.get(ID).toString());
-    }
-
-    @Override
-    public V setValue(V value) {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  @Override
-  public void close() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
deleted file mode 100644
index ae9353a..0000000
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.mongo.config;
-
-import java.io.IOException;
-
-import org.apache.drill.exec.store.mongo.DrillMongoConstants;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.store.sys.PStoreRegistry;
-import org.apache.drill.exec.store.sys.local.LocalEStoreProvider;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
-import com.mongodb.WriteConcern;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.Indexes;
-
-public class MongoPStoreProvider implements PStoreProvider, DrillMongoConstants {
-
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
-      .getLogger(MongoPStoreProvider.class);
-
-  static final String pKey = "pKey";
-
-  private MongoClient client;
-
-  private MongoCollection<Document> collection;
-
-  private final String mongoURL;
-  private final LocalEStoreProvider localEStoreProvider;
-
-  public MongoPStoreProvider(PStoreRegistry registry) {
-    mongoURL = registry.getConfig().getString(SYS_STORE_PROVIDER_MONGO_URL);
-    localEStoreProvider = new LocalEStoreProvider();
-  }
-
-  @Override
-  public void start() throws IOException {
-    MongoClientURI clientURI = new MongoClientURI(mongoURL);
-    client = new MongoClient(clientURI);
-    MongoDatabase db = client.getDatabase(clientURI.getDatabase());
-    collection = db.getCollection(clientURI.getCollection()).withWriteConcern(WriteConcern.JOURNALED);
-    Bson index = Indexes.ascending(pKey);
-    collection.createIndex(index);
-  }
-
-  @Override
-  public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException {
-    switch(config.getMode()){
-    case BLOB_PERSISTENT:
-    case PERSISTENT:
-      return new MongoPStore<>(config, collection);
-    case EPHEMERAL:
-      return localEStoreProvider.getStore(config);
-    default:
-      throw new IllegalStateException();
-
-    }
-  }
-
-  @Override
-  public void close() {
-    if (client != null) {
-      client.close();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java
new file mode 100644
index 0000000..b5cc3ee
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo.config;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.store.mongo.DrillMongoConstants;
+import org.apache.drill.exec.store.sys.BasePersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreMode;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.Updates;
+import com.mongodb.client.result.UpdateResult;
+
+import static org.apache.drill.exec.store.mongo.config.MongoPersistentStoreProvider.pKey;
+
+public class MongoPersistentStore<V> extends BasePersistentStore<V> {
+
+  private static final Logger logger = LoggerFactory.getLogger(MongoPersistentStore.class);
+
+  private final PersistentStoreConfig<V> config;
+  private final MongoCollection<Document> collection;
+
+  public MongoPersistentStore(PersistentStoreConfig<V> config, MongoCollection<Document> collection) {
+//    this.config = config;
+//    this.collection = collection;
+    throw new UnsupportedOperationException("Mongo DB PStore not currently supported");
+  }
+
+  @Override
+  public PersistentStoreMode getMode() {
+    return PersistentStoreMode.PERSISTENT;
+  }
+
+  @Override
+  public V get(String key) {
+    try {
+      Bson query = Filters.eq(DrillMongoConstants.ID, key);
+      Document document = collection.find(query).first();
+      if (document != null) {
+        return value((byte[]) document.get(pKey));
+      } else {
+        return null;
+      }
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+      throw new DrillRuntimeException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void put(String key, V value) {
+    try {
+      Document putObj = new Document(DrillMongoConstants.ID, key).append(pKey, bytes(value));
+      collection.insertOne(putObj);
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+      throw new DrillRuntimeException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public boolean putIfAbsent(String key, V value) {
+    try {
+      Bson query = Filters.eq(DrillMongoConstants.ID, key);
+      Bson update = Updates.set(pKey, bytes(value));
+      UpdateResult updateResult = collection.updateOne(query, update, new UpdateOptions().upsert(true));
+      return updateResult.getModifiedCount() == 1;
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+      throw new DrillRuntimeException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void delete(String key) {
+    try {
+      Bson query = Filters.eq(DrillMongoConstants.ID, key);
+      collection.deleteOne(query);
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+      throw new DrillRuntimeException(e.getMessage(), e);
+    }
+  }
+
+  private byte[] bytes(V value) {
+    try {
+      return config.getSerializer().serialize(value);
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e.getMessage(), e);
+    }
+  }
+
+  private V value(byte[] serialize) {
+    try {
+      return config.getSerializer().deserialize(serialize);
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public Iterator<Entry<String, V>> getRange(int skip, int take) {
+    final MongoCursor<Document> cursor = collection.find().skip(skip).limit(take).iterator();
+    return new MongoIterator(cursor);
+  }
+
+  private class MongoIterator implements Iterator<Entry<String, V>> {
+
+    private MongoCursor<Document> cursor;
+
+    public MongoIterator(final MongoCursor<Document> cursor) {
+      this.cursor = Preconditions.checkNotNull(cursor);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return cursor.hasNext();
+    }
+
+    @Override
+    public Entry<String, V> next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      return new DeferredEntry(cursor.next());
+    }
+
+    @Override
+    public void remove() {
+      cursor.remove();
+    }
+  }
+
+  private class DeferredEntry implements Entry<String, V> {
+
+    private Document result;
+
+    public DeferredEntry(Document result) {
+      this.result = result;
+    }
+
+    @Override
+    public String getKey() {
+      return result.get(DrillMongoConstants.ID).toString();
+    }
+
+    @Override
+    public V getValue() {
+      return get(result.get(DrillMongoConstants.ID).toString());
+    }
+
+    @Override
+    public V setValue(V value) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java
new file mode 100644
index 0000000..33127bd
--- /dev/null
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mongo.config;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.exception.StoreException;
+import org.apache.drill.exec.store.mongo.DrillMongoConstants;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
+import org.apache.drill.exec.store.sys.store.provider.BasePersistentStoreProvider;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.WriteConcern;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.Indexes;
+
+public class MongoPersistentStoreProvider extends BasePersistentStoreProvider {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+      .getLogger(MongoPersistentStoreProvider.class);
+
+  static final String pKey = "pKey";
+
+  private MongoClient client;
+
+  private MongoCollection<Document> collection;
+
+  private final String mongoURL;
+
+  public MongoPersistentStoreProvider(PersistentStoreRegistry registry) throws StoreException {
+    mongoURL = registry.getConfig().getString(DrillMongoConstants.SYS_STORE_PROVIDER_MONGO_URL);
+  }
+
+  @Override
+  public void start() throws IOException {
+    MongoClientURI clientURI = new MongoClientURI(mongoURL);
+    client = new MongoClient(clientURI);
+    MongoDatabase db = client.getDatabase(clientURI.getDatabase());
+    collection = db.getCollection(clientURI.getCollection()).withWriteConcern(WriteConcern.JOURNALED);
+    Bson index = Indexes.ascending(pKey);
+    collection.createIndex(index);
+  }
+
+  @Override
+  public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) {
+    switch(config.getMode()){
+    case BLOB_PERSISTENT:
+    case PERSISTENT:
+      return new MongoPersistentStore<>(config, collection);
+    default:
+      throw new IllegalStateException();
+
+    }
+  }
+
+  @Override
+  public void close() {
+    if (client != null) {
+      client.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/distribution/src/resources/drill-override-example.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf
index bc77245..9e29a5f 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -97,7 +97,7 @@ drill.exec: {
     executor.threads: 4
   },
   sys.store.provider: {
-    class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider",
+    class: "org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider",
     # The following section is used by ZkPStoreProvider
     zk: {
       blobroot: "file:///var/log/drill"

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 751c810..604bc8e 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -336,6 +336,12 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+      <version>2.7.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.xerial.snappy</groupId>
       <artifactId>snappy-java</artifactId>
       <version>1.0.5-M3</version>

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 902b35b..f83285e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -288,7 +288,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
         try {
           clusterCoordinator.close();
           clusterCoordinator = null;
-        } catch (IOException e) {
+        } catch (Exception e) {
           logger.warn("Error while closing Cluster Coordinator.", e);
         }
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java
index 3f9da07..af328b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java
@@ -25,10 +25,7 @@ import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.server.options.SystemOptionManager;
-import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
index be2f3b1..ea9593e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
@@ -22,6 +22,8 @@ import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.drill.exec.coord.store.TransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
 
@@ -29,7 +31,7 @@ import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
  * Pluggable interface built to manage cluster coordination. Allows Drillbit or DrillClient to register its capabilities
  * as well as understand other node's existence and capabilities.
  **/
-public abstract class ClusterCoordinator implements Closeable {
+public abstract class ClusterCoordinator implements AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClusterCoordinator.class);
 
   protected ConcurrentHashMap<DrillbitStatusListener, DrillbitStatusListener> listeners = new ConcurrentHashMap<>(
@@ -60,16 +62,26 @@ public abstract class ClusterCoordinator implements Closeable {
   public abstract DistributedSemaphore getSemaphore(String name, int maximumLeases);
 
   /**
+   * Returns a {@link TransientStore store} instance with the given {@link TransientStoreConfig configuration}.
+   *
+   * Note that implementor might cache the instance so new instance creation is not guaranteed.
+   *
+   * @param config  store configuration
+   * @param <V>  value type for this store
+   */
+  public abstract <V> TransientStore<V> getOrCreateTransientStore(TransientStoreConfig<V> config);
+
+  /**
    * Actions to take when there are a set of new de-active drillbits.
    * @param unregisteredBits
    */
-  public void drillbitUnregistered(Set<DrillbitEndpoint> unregisteredBits) {
+  protected void drillbitUnregistered(Set<DrillbitEndpoint> unregisteredBits) {
     for (DrillbitStatusListener listener : listeners.keySet()) {
       listener.drillbitUnregistered(unregisteredBits);
     }
   }
 
-  public void drillbitRegistered(Set<DrillbitEndpoint> registeredBits) {
+  protected void drillbitRegistered(Set<DrillbitEndpoint> registeredBits) {
     for (DrillbitStatusListener listener : listeners.keySet()) {
       listener.drillbitRegistered(registeredBits);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
index 27868f0..8c13c42 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
@@ -28,6 +28,10 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.DistributedSemaphore;
+import org.apache.drill.exec.coord.store.CachingTransientStoreFactory;
+import org.apache.drill.exec.coord.store.TransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.coord.store.TransientStoreFactory;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.collect.Maps;
@@ -43,8 +47,19 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
   private final Map<RegistrationHandle, DrillbitEndpoint> endpoints = new ConcurrentHashMap<>();
   private final ConcurrentMap<String, DistributedSemaphore> semaphores = Maps.newConcurrentMap();
 
+  private final TransientStoreFactory factory = CachingTransientStoreFactory.of(new TransientStoreFactory() {
+    @Override
+    public <V> TransientStore<V> getOrCreateStore(TransientStoreConfig<V> config) {
+      return new MapBackedStore<>(config);
+    }
+
+    @Override
+    public void close() throws Exception { }
+  });
+
   @Override
-  public void close() throws IOException {
+  public void close() throws Exception {
+    factory.close();
     endpoints.clear();
   }
 
@@ -125,6 +140,11 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
     return semaphores.get(name);
   }
 
+  @Override
+  public <V> TransientStore<V> getOrCreateTransientStore(final TransientStoreConfig<V> config) {
+    return factory.getOrCreateStore(config);
+  }
+
   public class LocalSemaphore implements DistributedSemaphore {
     private final Semaphore semaphore;
     private final LocalLease localLease = new LocalLease();

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/MapBackedStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/MapBackedStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/MapBackedStore.java
new file mode 100644
index 0000000..c3c8f5a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/MapBackedStore.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.local;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.coord.store.BaseTransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.coord.store.TransientStoreEvent;
+import org.apache.drill.exec.coord.store.TransientStoreEventType;
+
+public class MapBackedStore<V> extends BaseTransientStore<V> {
+  private final ConcurrentMap<String, V> delegate = Maps.newConcurrentMap();
+
+  public MapBackedStore(final TransientStoreConfig<V> config) {
+    super(config);
+  }
+
+  @Override
+  public V get(final String key) {
+    return delegate.get(key);
+  }
+
+  @Override
+  public V put(final String key, final V value) {
+    final boolean hasKey = delegate.containsKey(key);
+    final V old = delegate.put(key, value);
+    if (old != value) {
+      final TransientStoreEventType type = hasKey ? TransientStoreEventType.UPDATE : TransientStoreEventType.CREATE;
+      fireListeners(TransientStoreEvent.of(type, key, value));
+    }
+    return old;
+  }
+
+  @Override
+  public V putIfAbsent(final String key, final V value) {
+    final V existing = delegate.putIfAbsent(key, value);
+    if (existing == null) {
+      fireListeners(TransientStoreEvent.of(TransientStoreEventType.CREATE, key, value));
+    }
+    return existing;
+  }
+
+  @Override
+  public V remove(final String key) {
+    final V existing = delegate.remove(key);
+    if (existing != null) {
+      fireListeners(TransientStoreEvent.of(TransientStoreEventType.DELETE, key, existing));
+    }
+    return existing;
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, V>> entries() {
+    return delegate.entrySet().iterator();
+  }
+
+  @Override
+  public int size() {
+    return delegate.size();
+  }
+
+  @Override
+  public void close() throws Exception {
+    delegate.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/BaseTransientStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/BaseTransientStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/BaseTransientStore.java
new file mode 100644
index 0000000..463e1fb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/BaseTransientStore.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.store;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+
+public abstract class BaseTransientStore<V> implements TransientStore<V> {
+  private final Set<TransientStoreListener> listeners = Collections.newSetFromMap(
+      Maps.<TransientStoreListener, Boolean>newConcurrentMap());
+
+  protected final TransientStoreConfig<V> config;
+
+  protected BaseTransientStore(final TransientStoreConfig<V> config) {
+    this.config = Preconditions.checkNotNull(config, "config cannot be null");
+  }
+
+  public TransientStoreConfig<V> getConfig() {
+    return config;
+  }
+
+  @Override
+  public Iterator<String> keys() {
+    return Iterators.transform(entries(), new Function<Map.Entry<String, V>, String>() {
+      @Nullable
+      @Override
+      public String apply(@Nullable Map.Entry<String, V> input) {
+        return input.getKey();
+      }
+    });
+  }
+
+  @Override
+  public Iterator<V> values() {
+    return Iterators.transform(entries(), new Function<Map.Entry<String, V>, V>() {
+      @Nullable
+      @Override
+      public V apply(final Map.Entry<String, V> entry) {
+        return entry.getValue();
+      }
+    });
+  }
+
+  protected void fireListeners(final TransientStoreEvent event) {
+    for (final TransientStoreListener listener:listeners) {
+      listener.onChange(event);
+    }
+  }
+
+  @Override
+  public void addListener(final TransientStoreListener listener) {
+    listeners.add(Preconditions.checkNotNull(listener, "listener cannot be null"));
+  }
+
+  @Override
+  public void removeListener(final TransientStoreListener listener) {
+    listeners.remove(Preconditions.checkNotNull(listener, "listener cannot be null"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/CachingTransientStoreFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/CachingTransientStoreFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/CachingTransientStoreFactory.java
new file mode 100644
index 0000000..b500c5d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/CachingTransientStoreFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.store;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.store.sys.PersistentStore;
+
+public class CachingTransientStoreFactory implements TransientStoreFactory {
+  private final TransientStoreFactory delegate;
+  private final Map<TransientStoreConfig, TransientStore> cache = Maps.newHashMap();
+
+  public CachingTransientStoreFactory(final TransientStoreFactory delegate) {
+    this.delegate = Preconditions.checkNotNull(delegate, "delegate factory is required");
+  }
+
+  @Override
+  public <V> TransientStore<V> getOrCreateStore(final TransientStoreConfig<V> config) {
+    final TransientStore<V> store = cache.get(Preconditions.checkNotNull(config, "config is required"));
+    if (store != null) {
+      return store;
+    }
+
+    final TransientStore<V> newStore = delegate.getOrCreateStore(config);
+    cache.put(config, newStore);
+    return newStore;
+  }
+
+  @Override
+  public void close() throws Exception {
+    final List<AutoCloseable> closeables = Lists.newArrayList();
+    for(final AutoCloseable store : cache.values()){
+      closeables.add(store);
+    }
+    closeables.add(delegate);
+    cache.clear();
+    AutoCloseables.close(closeables);
+  }
+
+  public static TransientStoreFactory of(final TransientStoreFactory delegate) {
+    return new CachingTransientStoreFactory(delegate);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStore.java
new file mode 100644
index 0000000..ca9b028
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStore.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.store;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * An abstraction for storing, retrieving and observing transient (key, value) pairs in a distributed environment.
+ *
+ * This abstraction diverges from {@link org.apache.drill.exec.store.sys.PersistentStore} in that the lifetime of
+ * a (key, value) tuple is bound to the lifetime of the node originally creating it. In other words, entries are evicted
+ * as soon as node/s originally stored them leaves the cluster. That should explain the reason for relocating this
+ * abstraction under cluster coordination package.
+ *
+ * Consumers of this interface can observe changes made to the store via attaching a {@link TransientStoreListener listener}.
+ *
+ */
+public interface TransientStore<V> extends AutoCloseable {
+  /**
+   * Returns a value corresponding to the given look-up key if exists, null otherwise.
+   * @param key  look-up key
+   */
+  V get(String key);
+
+  /**
+   * Stores the given (key, value) in this store overriding the existing value.
+   *
+   * @param key  look-up key
+   * @param value  value to store
+   * @return  the old value if the key exists, null otherwise
+   */
+  V put(String key, V value);
+
+  /**
+   * Stores the given (key, value) tuple in this store only if it does not exists.
+   *
+   * @param key  look-up key
+   * @param value  value to store
+   * @return  the old value if the key exists, null otherwise
+   */
+  V putIfAbsent(String key, V value);
+
+
+  /**
+   * Removes the (key, value) tuple from this store if the key exists.
+   *
+   * @param key  look-up key
+   * @return  the removed value if key exists, null otherwise
+   */
+  V remove(String key);
+
+  /**
+   * Returns an iterator of (key, value) tuples.
+   */
+  Iterator<Map.Entry<String, V>> entries();
+
+  /**
+   * Returns an iterator of keys.
+   */
+  Iterator<String> keys();
+
+
+  /**
+   * Returns an iterator of values.
+   */
+  Iterator<V> values();
+
+  /**
+   * Returns number of entries.
+   */
+  int size();
+
+  /**
+   * Adds a listener that observes store {@link TransientStoreEvent events}.
+   *
+   * Note that
+   * i) Calling this method with the same listener instance more than once has no effect.
+   * ii) Listeners are not necessarily invoked from the calling thread. Consumer should consider thread safety issues.
+   * iii) Subclasses might hold a strong reference to the listener. It is important that consumer
+   * {@link #removeListener(TransientStoreListener) removes} its listener once it is done observing events.
+   *
+   * @see #removeListener(TransientStoreListener)
+   */
+  void addListener(TransientStoreListener listener);
+
+  /**
+   * Removes the given listener from this store if exists, has no effect otherwise.
+   */
+  void removeListener(TransientStoreListener listener);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java
new file mode 100644
index 0000000..35c4a06
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.store;
+
+import com.dyuproject.protostuff.Schema;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.protobuf.Message;
+import org.apache.drill.exec.serialization.JacksonSerializer;
+import org.apache.drill.exec.serialization.ProtoSerializer;
+import org.apache.drill.exec.serialization.InstanceSerializer;
+
+public class TransientStoreConfig<V> {
+  private final String name;
+  private final InstanceSerializer<V> serializer;
+
+  protected TransientStoreConfig(final String name, final InstanceSerializer<V> serializer) {
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "name is required");
+    this.name = name;
+    this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null");
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public InstanceSerializer<V> getSerializer() {
+    return serializer;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(name, serializer);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof TransientStoreConfig && obj.getClass().equals(getClass())) {
+      final TransientStoreConfig<V> other = (TransientStoreConfig<V>)obj;
+      return Objects.equal(name, other.name) && Objects.equal(serializer, other.serializer);
+    }
+    return false;
+  }
+
+  public static <V> TransientStoreConfigBuilder<V> newBuilder() {
+    return new TransientStoreConfigBuilder<>();
+  }
+
+  public static <V extends Message, B extends Message.Builder> TransientStoreConfigBuilder<V> newProtoBuilder(final Schema<V> writeSchema, final Schema<B> readSchema) {
+    return TransientStoreConfig.<V>newBuilder().serializer(new ProtoSerializer<>(readSchema, writeSchema));
+  }
+
+  public static <V> TransientStoreConfigBuilder<V> newJacksonBuilder(final ObjectMapper mapper, final Class<V> klazz) {
+    return TransientStoreConfig.<V>newBuilder().serializer(new JacksonSerializer<>(mapper, klazz));
+  }
+
+}


Mime
View raw message