drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h.@apache.org
Subject [3/4] drill git commit: DRILL-4275: create TransientStore for short-lived objects; refactor PersistentStore to introduce pagination mechanism
Date Fri, 19 Feb 2016 05:29:05 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfigBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfigBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfigBuilder.java
new file mode 100644
index 0000000..a019a77
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfigBuilder.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.store;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.serialization.InstanceSerializer;
+
+public class TransientStoreConfigBuilder<T> {
+  private String name;
+  private InstanceSerializer<T> serializer;
+
+  protected TransientStoreConfigBuilder() { }
+
+  public String name() {
+    return name;
+  }
+
+  public TransientStoreConfigBuilder<T> name(final String name) {
+    this.name = Preconditions.checkNotNull(name);
+    return this;
+  }
+
+  public InstanceSerializer<T> serializer() {
+    return serializer;
+  }
+
+  public TransientStoreConfigBuilder<T> serializer(final InstanceSerializer<T> serializer) {
+    this.serializer = Preconditions.checkNotNull(serializer);
+    return this;
+  }
+
+  public TransientStoreConfig<T> build() {
+    return new TransientStoreConfig<>(name, serializer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEvent.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEvent.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEvent.java
new file mode 100644
index 0000000..a0b5725
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEvent.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.store;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents an event created as a result of an operation over a particular (key, value) entry in a
+ * {@link TransientStore store} instance.
+ *
+ * Types of operations are enumerated in {@link TransientStoreEventType}
+ *
+ * @param <V>  value type
+ */
+public class TransientStoreEvent<V> {
+  private final TransientStoreEventType type;
+  private final String key;
+  private final V value;
+
+  public TransientStoreEvent(final TransientStoreEventType type, final String key, final V value) {
+    this.type = Preconditions.checkNotNull(type);
+    this.key = Preconditions.checkNotNull(key);
+    this.value = Preconditions.checkNotNull(value);
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public TransientStoreEventType getType() {
+    return type;
+  }
+
+  public V getValue() {
+    return value;
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (obj instanceof TransientStoreEvent && obj.getClass().equals(getClass())) {
+      final TransientStoreEvent<V> other = (TransientStoreEvent<V>)obj;
+      return Objects.equal(type, other.type) && Objects.equal(key, other.key) && Objects.equal(value, other.value);
+    }
+    return super.equals(obj);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(type, key, value);
+  }
+
+  public static <T> TransientStoreEvent<T>of(final TransientStoreEventType type, final String key, final T value) {
+    return new TransientStoreEvent<>(type, key, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEventType.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEventType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEventType.java
new file mode 100644
index 0000000..51ae2c7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEventType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.store;
+
+/**
+ * Types of store events.
+ */
+public enum TransientStoreEventType {
+  CREATE,
+  UPDATE,
+  DELETE
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
new file mode 100644
index 0000000..c3d351d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.store;
+
+import org.apache.drill.exec.store.sys.PersistentStore;
+
+/**
+ * Factory that is used to obtain a {@link TransientStore store} instance.
+ */
+public interface TransientStoreFactory extends AutoCloseable {
+
+  /**
+   * Returns a {@link TransientStore transient store} instance for the given configuration.
+   *
+   * Note that implementors have liberty to cache previous {@link PersistentStore store} instances.
+   *
+   * @param config  store configuration
+   * @param <V>  store value type
+   */
+  <V> TransientStore<V> getOrCreateStore(TransientStoreConfig<V> config);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java
new file mode 100644
index 0000000..ca8fa9d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.store;
+
+/**
+ * A listener used for observing {@link TransientStore transient store} {@link TransientStoreEvent events}.
+ */
+public interface TransientStoreListener {
+
+  /**
+   * {@link TransientStore transient store} fires this method with event details upon an observed change.
+   *
+   * @param event  event details
+   */
+  void onChange(TransientStoreEvent event);
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/EventDispatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/EventDispatcher.java
new file mode 100644
index 0000000..580cfcd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/EventDispatcher.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.drill.exec.coord.store.TransientStoreEvent;
+import org.apache.drill.exec.coord.store.TransientStoreEventType;
+
+/**
+ * An abstraction used for dispatching store {@link TransientStoreEvent events}.
+ *
+ * @param <V>  value type
+ */
+public class EventDispatcher<V> implements PathChildrenCacheListener {
+  public final static Map<PathChildrenCacheEvent.Type, TransientStoreEventType> MAPPINGS = ImmutableMap
+      .<PathChildrenCacheEvent.Type, TransientStoreEventType>builder()
+      .put(PathChildrenCacheEvent.Type.CHILD_ADDED, TransientStoreEventType.CREATE)
+      .put(PathChildrenCacheEvent.Type.CHILD_REMOVED, TransientStoreEventType.DELETE)
+      .put(PathChildrenCacheEvent.Type.CHILD_UPDATED, TransientStoreEventType.UPDATE)
+      .build();
+
+  private final ZkEphemeralStore<V> store;
+
+  protected EventDispatcher(final ZkEphemeralStore<V> store) {
+    this.store = Preconditions.checkNotNull(store, "store is required");
+  }
+
+  @Override
+  public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) throws Exception {
+    final PathChildrenCacheEvent.Type original = event.getType();
+    final TransientStoreEventType mapped = MAPPINGS.get(original);
+    if (mapped != null) { // dispatch the event to listeners only if it can be mapped
+      final String path = event.getData().getPath();
+      final byte[] bytes = event.getData().getData();
+      final V value = store.getConfig().getSerializer().deserialize(bytes);
+      store.fireListeners(TransientStoreEvent.of(mapped, path, value));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/PathUtils.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/PathUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/PathUtils.java
new file mode 100644
index 0000000..f01b989
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/PathUtils.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import com.google.common.base.Preconditions;
+import org.apache.parquet.Strings;
+
+/**
+ * A convenience class used to expedite zookeeper paths manipulations.
+ */
+public final class PathUtils {
+
+  /**
+   * Returns a normalized, combined path out of the given path segments.
+   *
+   * @param parts  path segments to combine
+   * @see #normalize(String)
+   */
+  public static final String join(final String... parts) {
+    final StringBuilder sb = new StringBuilder();
+    for (final String part:parts) {
+      Preconditions.checkNotNull(part, "parts cannot contain null");
+      if (!Strings.isNullOrEmpty(part)) {
+        sb.append(part).append("/");
+      }
+    }
+    if (sb.length() > 0) {
+      sb.deleteCharAt(sb.length() - 1);
+    }
+    final String path = sb.toString();
+    return normalize(path);
+  }
+
+  /**
+   * Normalizes the given path eliminating repeated forward slashes.
+   *
+   * @return  normalized path
+   */
+  public static final String normalize(final String path) {
+    if (Strings.isNullOrEmpty(Preconditions.checkNotNull(path))) {
+      return path;
+    }
+
+    final StringBuilder builder = new StringBuilder();
+    char last = path.charAt(0);
+    builder.append(last);
+    for (int i=1; i<path.length(); i++) {
+      char cur = path.charAt(i);
+      if (last == '/' && cur == last) {
+        continue;
+      }
+      builder.append(cur);
+      last = cur;
+    }
+    return builder.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
index b831852..4926f9c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
@@ -24,13 +24,14 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.collect.Lists;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -42,15 +43,19 @@ import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.DistributedSemaphore;
 import org.apache.drill.exec.coord.DrillServiceInstanceHelper;
+import org.apache.drill.exec.coord.store.CachingTransientStoreFactory;
+import org.apache.drill.exec.coord.store.TransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.coord.store.TransientStoreFactory;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.base.Function;
-import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
 
 /**
  * Manages cluster coordination utilizing zookeeper. *
@@ -60,15 +65,14 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
 
   private CuratorFramework curator;
   private ServiceDiscovery<DrillbitEndpoint> discovery;
-  private ServiceCache<DrillbitEndpoint> serviceCache;
   private volatile Collection<DrillbitEndpoint> endpoints = Collections.emptyList();
   private final String serviceName;
   private final CountDownLatch initialConnection = new CountDownLatch(1);
+  private final TransientStoreFactory factory;
+  private ServiceCache<DrillbitEndpoint> serviceCache;
 
   private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$");
 
-
-
   public ZKClusterCoordinator(DrillConfig config) throws IOException{
     this(config, null);
   }
@@ -100,11 +104,8 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
       .build();
     curator.getConnectionStateListenable().addListener(new InitialConnectionListener());
     curator.start();
-    discovery = getDiscovery();
-    serviceCache = discovery.
-      serviceCacheBuilder()
-      .name(serviceName)
-      .build();
+    discovery = newDiscovery();
+    factory = CachingTransientStoreFactory.of(new ZkTransientStoreFactory(curator));
   }
 
   public CuratorFramework getCurator() {
@@ -115,8 +116,6 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
   public void start(long millisToWait) throws Exception {
     logger.debug("Starting ZKClusterCoordination.");
     discovery.start();
-    serviceCache.start();
-    serviceCache.addListener(new ZKListener());
 
     if(millisToWait != 0) {
       boolean success = this.initialConnection.await(millisToWait, TimeUnit.MILLISECONDS);
@@ -127,6 +126,12 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
       this.initialConnection.await();
     }
 
+    serviceCache = discovery
+        .serviceCacheBuilder()
+        .name(serviceName)
+        .build();
+    serviceCache.addListener(new EndpointListener());
+    serviceCache.start();
     updateEndpoints();
   }
 
@@ -142,29 +147,28 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
 
   }
 
-  private class ZKListener implements ServiceCacheListener {
-
+  private class EndpointListener implements ServiceCacheListener {
     @Override
-    public void stateChanged(CuratorFramework client, ConnectionState newState) {
-    }
+    public void stateChanged(CuratorFramework client, ConnectionState newState) { }
 
     @Override
     public void cacheChanged() {
-      logger.debug("Cache changed, updating.");
+      logger.debug("Got cache changed --> updating endpoints");
       updateEndpoints();
     }
   }
 
-  public void close() throws IOException {
-    serviceCache.close();
-    discovery.close();
-    curator.close();
+  public void close() throws Exception {
+    // discovery attempts to close its caches(ie serviceCache) already. however, being good citizens we make sure to
+    // explicitly close serviceCache. Not only that we make sure to close serviceCache before discovery to prevent
+    // double releasing and disallowing jvm to spit bothering warnings. simply put, we are great!
+    AutoCloseables.close(serviceCache, discovery, curator, factory);
   }
 
   @Override
   public RegistrationHandle register(DrillbitEndpoint data) {
     try {
-      ServiceInstance<DrillbitEndpoint> serviceInstance = getServiceInstance(data);
+      ServiceInstance<DrillbitEndpoint> serviceInstance = newServiceInstance(data);
       discovery.registerService(serviceInstance);
       return new ZKRegistrationHandle(serviceInstance.getId());
     } catch (Exception e) {
@@ -206,6 +210,11 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
     return new ZkDistributedSemaphore(curator, "/semaphore/" + name, maximumLeases);
   }
 
+  @Override
+  public <V> TransientStore<V> getOrCreateTransientStore(final TransientStoreConfig<V> config) {
+    final ZkEphemeralStore<V> store = (ZkEphemeralStore<V>)factory.getOrCreateStore(config);
+    return store;
+  }
 
   private synchronized void updateEndpoints() {
     try {
@@ -253,7 +262,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
     }
   }
 
-  private ServiceInstance<DrillbitEndpoint> getServiceInstance(DrillbitEndpoint endpoint) throws Exception {
+  protected ServiceInstance<DrillbitEndpoint> newServiceInstance(DrillbitEndpoint endpoint) throws Exception {
     return ServiceInstance.<DrillbitEndpoint>builder()
       .name(serviceName)
       .payload(endpoint)
@@ -261,7 +270,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
   }
 
 
-  public ServiceDiscovery<DrillbitEndpoint> getDiscovery() {
+  protected ServiceDiscovery<DrillbitEndpoint> newDiscovery() {
     return ServiceDiscoveryBuilder
       .builder(DrillbitEndpoint.class)
       .basePath("/")

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java
new file mode 100644
index 0000000..94e03ad
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.drill.common.collections.ImmutableEntry;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.coord.store.BaseTransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.coord.store.TransientStoreEvent;
+import org.apache.drill.exec.serialization.InstanceSerializer;
+import org.apache.zookeeper.CreateMode;
+
+public class ZkEphemeralStore<V> extends BaseTransientStore<V> {
+
+  @VisibleForTesting
+  protected final PathChildrenCacheListener dispatcher = new EventDispatcher<>(this);
+  private final ZookeeperClient client;
+
+  public ZkEphemeralStore(final TransientStoreConfig<V> config, final CuratorFramework curator) {
+    super(config);
+    this.client = new ZookeeperClient(curator, PathUtils.join("/", config.getName()), CreateMode.EPHEMERAL);
+  }
+
+  public void start() throws Exception {
+    getClient().getCache().getListenable().addListener(dispatcher);
+    getClient().start();
+  }
+
+  protected ZookeeperClient getClient() {
+    return client;
+  }
+
+  @Override
+  public V get(final String key) {
+    final byte[] bytes = getClient().get(key);
+    if (bytes == null) {
+      return null;
+    }
+    try {
+      return config.getSerializer().deserialize(bytes);
+    } catch (final IOException e) {
+      throw new DrillRuntimeException(String.format("unable to deserialize value at %s", key), e);
+    }
+  }
+
+  @Override
+  public V put(final String key, final V value) {
+    final InstanceSerializer<V> serializer = config.getSerializer();
+    try {
+      final byte[] old = getClient().get(key);
+      final byte[] bytes = serializer.serialize(value);
+      getClient().put(key, bytes);
+      if (old == null) {
+        return null;
+      }
+      return serializer.deserialize(old);
+    } catch (final IOException e) {
+      throw new DrillRuntimeException(String.format("unable to de/serialize value of type %s", value.getClass()), e);
+    }
+  }
+
+  @Override
+  public V putIfAbsent(final String key, final V value) {
+    final V old = get(key);
+    if (old == null) {
+      try {
+        final byte[] bytes = config.getSerializer().serialize(value);
+        getClient().put(key, bytes);
+      } catch (final IOException e) {
+        throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e);
+      }
+    }
+    return old;
+  }
+
+  @Override
+  public V remove(final String key) {
+    final V existing = get(key);
+    if (existing != null) {
+      getClient().delete(key);
+    }
+    return existing;
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, V>> entries() {
+    return Iterators.transform(getClient().entries(), new Function<Map.Entry<String, byte[]>, Map.Entry<String, V>>() {
+      @Nullable
+      @Override
+      public Map.Entry<String, V> apply(@Nullable Map.Entry<String, byte[]> input) {
+        try {
+          final V value = config.getSerializer().deserialize(input.getValue());
+          return new ImmutableEntry<>(input.getKey(), value);
+        } catch (final IOException e) {
+          throw new DrillRuntimeException(String.format("unable to deserialize value at key %s", input.getKey()), e);
+        }
+      }
+    });
+  }
+
+  @Override
+  public int size() {
+    return getClient().getCache().getCurrentData().size();
+  }
+
+  @Override
+  public void close() throws Exception {
+    getClient().close();
+  }
+
+  /**
+   * This method override ensures package level method visibility.
+   */
+  @Override
+  protected void fireListeners(TransientStoreEvent event) {
+    super.fireListeners(event);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkTransientStoreFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkTransientStoreFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkTransientStoreFactory.java
new file mode 100644
index 0000000..a58c376
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkTransientStoreFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.coord.store.TransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.coord.store.TransientStoreFactory;
+
+public class ZkTransientStoreFactory implements TransientStoreFactory {
+
+  private final CuratorFramework curator;
+
+  public ZkTransientStoreFactory(final CuratorFramework curator) {
+    this.curator = Preconditions.checkNotNull(curator, "curator is required");
+  }
+
+  @Override
+  public <V> ZkEphemeralStore<V> getOrCreateStore(TransientStoreConfig<V> config) {
+    final ZkEphemeralStore<V> store = new ZkEphemeralStore<>(config, curator);
+    try {
+      store.start();
+    } catch (final Exception e) {
+      throw new DrillRuntimeException("unable to start zookeeper transient store", e);
+    }
+    return store;
+  }
+
+  @Override
+  public void close() throws Exception {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
new file mode 100644
index 0000000..1c33f71
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.drill.common.collections.ImmutableEntry;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.zookeeper.CreateMode;
+
+/**
+ * A namespace aware Zookeeper client.
+ *
+ * The implementation only operates under the given namespace and is safe to use.
+ *
+ * Note that instance of this class holds onto resources that must be released via {@code #close()}.
+ */
+public class ZookeeperClient implements AutoCloseable {
+  private final CuratorFramework curator;
+  private final String root;
+  private final PathChildrenCache cache;
+  private final CreateMode mode;
+
+  public ZookeeperClient(final CuratorFramework curator, final String root, final CreateMode mode) {
+    this.curator = Preconditions.checkNotNull(curator, "curator is required");
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(root), "root path is required");
+    Preconditions.checkArgument(root.charAt(0) == '/', "root path must be absolute");
+    this.root = root;
+    this.mode = Preconditions.checkNotNull(mode, "mode is required");
+    this.cache = new PathChildrenCache(curator, root, true);
+  }
+
+  /**
+   * Starts the client. This call ensures the creation of the root path.
+   *
+   * @throws Exception  if cache fails to start or root path creation fails.
+   * @see #close()
+   */
+  public void start() throws Exception {
+    curator.newNamespaceAwareEnsurePath(root).ensure(curator.getZookeeperClient()); // ensure root is created
+    getCache().start();
+  }
+
+  public PathChildrenCache getCache() {
+    return cache;
+  }
+
+  public String getRoot() {
+    return root;
+  }
+
+  public CreateMode getMode() {
+    return mode;
+  }
+
+  /**
+   * Returns true if path exists in the cache, false otherwise.
+   *
+   * Note that calls to this method are eventually consistent.
+   *
+   * @param path  path to check
+   */
+  public boolean hasPath(final String path) {
+    return hasPath(path, false);
+  }
+
+  /**
+   * Checks if the given path exists.
+   *
+   * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise,
+   * the check is eventually consistent.
+   *
+   * @param path  path to check
+   * @param consistent  whether the check should be consistent
+   * @return
+   */
+  public boolean hasPath(final String path, final boolean consistent) {
+    Preconditions.checkNotNull(path, "path is required");
+
+    final String target = PathUtils.join(root, path);
+    try {
+      if (consistent) {
+        return curator.checkExists().forPath(target) != null;
+      } else {
+        return getCache().getCurrentData(target) != null;
+      }
+    } catch (final Exception e) {
+      throw new DrillRuntimeException("error while checking path on zookeeper", e);
+    }
+  }
+
+  /**
+   * Returns a value corresponding to the given path if path exists in the cache, null otherwise.
+   *
+   * Note that calls to this method are eventually consistent.
+   *
+   * @param path  target path
+   */
+  public byte[] get(final String path) {
+    return get(path, false);
+  }
+
+  /**
+   * Returns the value corresponding to the given key, null otherwise.
+   *
+   * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise,
+   * the check is eventually consistent.
+   *
+   * @param path  target path
+   */
+  public byte[] get(final String path, final boolean consistent) {
+    Preconditions.checkNotNull(path, "path is required");
+
+    final String target = PathUtils.join(root, path);
+    if (consistent) {
+      try {
+        return curator.getData().forPath(target);
+      } catch (final Exception ex) {
+        throw new DrillRuntimeException(String.format("error retrieving value for [%s]", path), ex);
+      }
+    } else {
+      final ChildData data = getCache().getCurrentData(target);
+      if (data != null) {
+        return data.getData();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Creates the given path without placing any data in.
+   *
+   * @param path  target path
+   */
+  public void create(final String path) {
+    Preconditions.checkNotNull(path, "path is required");
+
+    final String target = PathUtils.join(root, path);
+    try {
+      curator.create().withMode(mode).forPath(target);
+      getCache().rebuildNode(target);
+    } catch (final Exception e) {
+      throw new DrillRuntimeException("unable to put ", e);
+    }
+  }
+
+  /**
+   * Puts the given byte sequence into the given path.
+   *
+   * If path does not exists, this call creates it.
+   *
+   * @param path  target path
+   * @param data  data to store
+   */
+  public void put(final String path, final byte[] data) {
+    Preconditions.checkNotNull(path, "path is required");
+    Preconditions.checkNotNull(data, "data is required");
+
+    final String target = PathUtils.join(root, path);
+    try {
+      // we make a consistent read to ensure this call won't fail upon consecutive calls on the same path
+      // before cache is updated
+      if (hasPath(path, true)) {
+        curator.setData().forPath(target, data);
+      } else {
+        curator.create().withMode(mode).forPath(target, data);
+      }
+      getCache().rebuildNode(target);
+
+    } catch (final Exception e) {
+      throw new DrillRuntimeException("unable to put ", e);
+    }
+  }
+
+  /**
+   * Deletes the given node residing at the given path
+   *
+   * @param path  target path to delete
+   */
+  public void delete(final String path) {
+    Preconditions.checkNotNull(path, "path is required");
+
+    final String target = PathUtils.join(root, path);
+    try {
+      curator.delete().forPath(target);
+      getCache().rebuildNode(target);
+    } catch (final Exception e) {
+      throw new DrillRuntimeException(String.format("unable to delete node at %s", target), e);
+    }
+  }
+
+  /**
+   * Returns an iterator of (key, value) pairs residing under {@link #getRoot() root} path.
+   */
+  public Iterator<Map.Entry<String, byte[]>> entries() {
+    final String prefix = PathUtils.join(root, "/");
+    return Iterables.transform(getCache().getCurrentData(), new Function<ChildData, Map.Entry<String, byte[]>>() {
+      @Nullable
+      @Override
+      public Map.Entry<String, byte[]> apply(final ChildData data) {
+        // normalize key name removing the root prefix. resultant key must be a relative path, not beginning with a '/'.
+        final String key = data.getPath().replace(prefix, "");
+        return new ImmutableEntry<>(key, data.getData());
+      }
+    }).iterator();
+  }
+
+  @Override
+  public void close() throws Exception {
+    getCache().close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/StoreException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/StoreException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/StoreException.java
new file mode 100644
index 0000000..506d485
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/StoreException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillException;
+
+public class StoreException extends DrillException {
+  public StoreException() {
+    super();
+  }
+
+  public StoreException(Throwable cause) {
+    super(cause);
+  }
+
+  public StoreException(String message) {
+    super(message);
+  }
+
+  public StoreException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public StoreException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/InstanceSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/InstanceSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/InstanceSerializer.java
new file mode 100644
index 0000000..f44d835
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/InstanceSerializer.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.serialization;
+
+import java.io.IOException;
+
+public interface InstanceSerializer<T> {
+  byte[] serialize(T instance) throws IOException;
+  T deserialize(byte[] raw) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/JacksonSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/JacksonSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/JacksonSerializer.java
new file mode 100644
index 0000000..676929d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/JacksonSerializer.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.serialization;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.base.Objects;
+
+public class JacksonSerializer<T> implements InstanceSerializer<T> {
+  private final ObjectReader reader;
+  private final ObjectWriter writer;
+
+  public JacksonSerializer(final ObjectMapper mapper, final Class<T> klazz) {
+    this.reader = mapper.readerFor(klazz);
+    this.writer = mapper.writer();
+  }
+
+  @Override
+  public T deserialize(final byte[] raw) throws IOException {
+    return reader.readValue(raw);
+  }
+
+  @Override
+  public byte[] serialize(final T instance) throws IOException {
+    return writer.writeValueAsBytes(instance);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof JacksonSerializer && obj.getClass().equals(getClass())) {
+      final JacksonSerializer<T> other = (JacksonSerializer<T>)obj;
+      return Objects.equal(reader, other.reader) && Objects.equal(writer, other.writer);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(reader, writer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/ProtoSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/ProtoSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/ProtoSerializer.java
new file mode 100644
index 0000000..e3ee5f6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/ProtoSerializer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import com.dyuproject.protostuff.JsonIOUtil;
+import com.dyuproject.protostuff.Schema;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Message;
+
+public class ProtoSerializer<T, B extends Message.Builder> implements InstanceSerializer<T> {
+  private final Schema<B> readSchema;
+  private final Schema<T> writeSchema;
+
+  public ProtoSerializer(final Schema<B> readSchema, final Schema<T> writeSchema) {
+    this.readSchema = Preconditions.checkNotNull(readSchema);
+    this.writeSchema = Preconditions.checkNotNull(writeSchema);
+  }
+
+  @Override
+  public T deserialize(final byte[] raw) throws IOException {
+    final B builder = readSchema.newMessage();
+    JsonIOUtil.mergeFrom(raw, builder, readSchema, false);
+    return (T)builder.build();
+  }
+
+  @Override
+  public byte[] serialize(final T instance) throws IOException {
+    final ByteArrayOutputStream out = new ByteArrayOutputStream();
+    JsonIOUtil.writeTo(out, instance, writeSchema, false);
+    return out.toByteArray();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(readSchema, writeSchema);
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (obj instanceof ProtoSerializer && obj.getClass().equals(getClass())) {
+      final ProtoSerializer<T, B> other = (ProtoSerializer<T, B>)obj;
+      return Objects.equal(readSchema, other.readSchema) && Objects.equal(writeSchema, other.writeSchema);
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index c781493..441fa91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -37,10 +37,10 @@ import org.apache.drill.exec.server.options.OptionValue.OptionType;
 import org.apache.drill.exec.server.rest.WebServer;
 import org.apache.drill.exec.service.ServiceEngine;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.sys.CachingStoreProvider;
-import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.store.sys.PStoreRegistry;
-import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
+import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
+import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
 import org.apache.drill.exec.work.WorkManager;
 import org.apache.zookeeper.Environment;
 
@@ -63,7 +63,7 @@ public class Drillbit implements AutoCloseable {
 
   private final ClusterCoordinator coord;
   private final ServiceEngine engine;
-  private final PStoreProvider storeProvider;
+  private final PersistentStoreProvider storeProvider;
   private final WorkManager manager;
   private final BootStrapContext context;
   private final WebServer webServer;
@@ -93,10 +93,10 @@ public class Drillbit implements AutoCloseable {
 
     if (serviceSet != null) {
       coord = serviceSet.getCoordinator();
-      storeProvider = new CachingStoreProvider(new LocalPStoreProvider(config));
+      storeProvider = new CachingPersistentStoreProvider(new LocalPersistentStoreProvider(config));
     } else {
       coord = new ZKClusterCoordinator(config);
-      storeProvider = new PStoreRegistry(this.coord, config).newPStoreProvider();
+      storeProvider = new PersistentStoreRegistry(this.coord, config).newPStoreProvider();
     }
     logger.info("Construction completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index aa6a0da..1af6d11 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -39,11 +39,11 @@ import org.apache.drill.exec.rpc.data.DataConnectionCreator;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
 
 import com.codahale.metrics.MetricRegistry;
 
-public class DrillbitContext {
+public class DrillbitContext implements AutoCloseable {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
 
   private final BootStrapContext context;
@@ -57,7 +57,7 @@ public class DrillbitContext {
   private final WorkEventBus workBus;
   private final FunctionImplementationRegistry functionRegistry;
   private final SystemOptionManager systemOptions;
-  private final PStoreProvider provider;
+  private final PersistentStoreProvider provider;
   private final CodeCompiler compiler;
   private final ScanResult classpathScan;
   private final LogicalPlanPersistence lpPersistence;
@@ -70,7 +70,7 @@ public class DrillbitContext {
       Controller controller,
       DataConnectionCreator connectionsPool,
       WorkEventBus workBus,
-      PStoreProvider provider) {
+      PersistentStoreProvider provider) {
     this.classpathScan = context.getClasspathScan();
     this.workBus = workBus;
     this.controller = checkNotNull(controller);
@@ -152,7 +152,7 @@ public class DrillbitContext {
     return reader;
   }
 
-  public PStoreProvider getPersistentStoreProvider() {
+  public PersistentStoreProvider getStoreProvider() {
     return provider;
   }
 
@@ -180,4 +180,8 @@ public class DrillbitContext {
     return classpathScan;
   }
 
+  @Override
+  public void close() throws Exception {
+    getOptionManager().close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
index 72eb306..06bb686 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
@@ -17,15 +17,12 @@
  */
 package org.apache.drill.exec.server;
 
-import java.io.Closeable;
-import java.io.IOException;
-
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.local.LocalClusterCoordinator;
 import org.apache.drill.exec.memory.BufferAllocator;
 
-public class RemoteServiceSet implements Closeable {
+public class RemoteServiceSet implements AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
 
   private final ClusterCoordinator coordinator;
@@ -41,7 +38,7 @@ public class RemoteServiceSet implements Closeable {
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() throws Exception {
     coordinator.close();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
index a2b2e93..8753a51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
@@ -23,11 +23,12 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import org.apache.drill.exec.store.sys.PersistentStore;
 
 /**
  * An {@link OptionValue option value} is used by an {@link OptionManager} to store a run-time setting. This setting,
  * for example, could affect a query in execution stage. Instances of this class are JSON serializable and can be stored
- * in a {@link org.apache.drill.exec.store.sys.PStore persistent store} (see {@link SystemOptionManager#options}), or
+ * in a {@link PersistentStore persistent store} (see {@link SystemOptionManager#options}), or
  * in memory (see {@link InMemoryOptionManager#options}).
  */
 @JsonInclude(Include.NON_NULL)

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index e54b914..8b14076 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.drill.common.config.LogicalPlanPersistence;
@@ -34,9 +35,9 @@ import org.apache.drill.exec.compile.ClassTransformer;
 import org.apache.drill.exec.compile.QueryClassLoader;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.server.options.OptionValue.OptionType;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
 import org.apache.drill.exec.util.AssertionUtil;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -46,7 +47,7 @@ import static com.google.common.base.Preconditions.checkArgument;
  * Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
  * persist between restarts.
  */
-public class SystemOptionManager extends BaseOptionManager {
+public class SystemOptionManager extends BaseOptionManager implements AutoCloseable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemOptionManager.class);
 
   private static final CaseInsensitiveMap<OptionValidator> VALIDATORS;
@@ -143,19 +144,19 @@ public class SystemOptionManager extends BaseOptionManager {
     VALIDATORS = CaseInsensitiveMap.newImmutableMap(tmp);
   }
 
-  private final PStoreConfig<OptionValue> config;
+  private final PersistentStoreConfig<OptionValue> config;
 
-  private final PStoreProvider provider;
+  private final PersistentStoreProvider provider;
 
   /**
    * Persistent store for options that have been changed from default.
    * NOTE: CRUD operations must use lowercase keys.
    */
-  private PStore<OptionValue> options;
+  private PersistentStore<OptionValue> options;
 
-  public SystemOptionManager(LogicalPlanPersistence lpPersistence, final PStoreProvider provider) {
+  public SystemOptionManager(LogicalPlanPersistence lpPersistence, final PersistentStoreProvider provider) {
     this.provider = provider;
-    this.config =  PStoreConfig.newJacksonBuilder(lpPersistence.getMapper(), OptionValue.class)
+    this.config =  PersistentStoreConfig.newJacksonBuilder(lpPersistence.getMapper(), OptionValue.class)
         .name("sys.options")
         .build();
   }
@@ -166,10 +167,10 @@ public class SystemOptionManager extends BaseOptionManager {
    * @return this option manager
    * @throws IOException
    */
-  public SystemOptionManager init() throws IOException {
-    options = provider.getStore(config);
+  public SystemOptionManager init() throws Exception {
+    options = provider.getOrCreateStore(config);
     // if necessary, deprecate and replace options from persistent store
-    for (final Entry<String, OptionValue> option : options) {
+    for (final Entry<String, OptionValue> option : Lists.newArrayList(options.getAll())) {
       final String name = option.getKey();
       final OptionValidator validator = VALIDATORS.get(name);
       if (validator == null) {
@@ -215,7 +216,7 @@ public class SystemOptionManager extends BaseOptionManager {
       buildList.put(entry.getKey(), entry.getValue().getDefault());
     }
     // override if changed
-    for (final Map.Entry<String, OptionValue> entry : options) {
+    for (final Map.Entry<String, OptionValue> entry : Lists.newArrayList(options.getAll())) {
       buildList.put(entry.getKey(), entry.getValue());
     }
     return buildList.values().iterator();
@@ -260,7 +261,7 @@ public class SystemOptionManager extends BaseOptionManager {
   public void deleteAllOptions(OptionType type) {
     checkArgument(type == OptionType.SYSTEM, "OptionType must be SYSTEM.");
     final Set<String> names = Sets.newHashSet();
-    for (final Map.Entry<String, OptionValue> entry : options) {
+    for (final Map.Entry<String, OptionValue> entry : Lists.newArrayList(options.getAll())) {
       names.add(entry.getKey());
     }
     for (final String name : names) {
@@ -272,4 +273,9 @@ public class SystemOptionManager extends BaseOptionManager {
   public OptionList getOptionList() {
     return (OptionList) IteratorUtils.toList(iterator());
   }
+
+  @Override
+  public void close() throws Exception {
+    options.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index 300c617..d8533b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -22,7 +22,7 @@ import org.apache.drill.exec.server.rest.auth.AuthDynamicFeature;
 import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
 import org.apache.drill.exec.server.rest.profile.ProfileResources;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
 import org.apache.drill.exec.work.WorkManager;
 import org.glassfish.hk2.api.Factory;
 import org.glassfish.hk2.utilities.binding.AbstractBinder;
@@ -80,7 +80,7 @@ public class DrillRestServer extends ResourceConfig {
       protected void configure() {
         bind(workManager).to(WorkManager.class);
         bind(workManager.getContext().getLpPersistence().getMapper()).to(ObjectMapper.class);
-        bind(workManager.getContext().getPersistentStoreProvider()).to(PStoreProvider.class);
+        bind(workManager.getContext().getStoreProvider()).to(PersistentStoreProvider.class);
         bind(workManager.getContext().getStorage()).to(StoragePluginRegistry.class);
         bindFactory(DrillUserPrincipalProvider.class).to(DrillUserPrincipal.class);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index 2af9cac..3266eda 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -73,7 +73,7 @@ public class StorageResources {
   public List<PluginConfigWrapper> getStoragePluginsJSON() {
 
     List<PluginConfigWrapper> list = Lists.newArrayList();
-    for (Map.Entry<String, StoragePluginConfig> entry : storage.getStore()) {
+    for (Map.Entry<String, StoragePluginConfig> entry : Lists.newArrayList(storage.getStore().getAll())) {
       PluginConfigWrapper plugin = new PluginConfigWrapper(entry.getKey(), entry.getValue());
       list.add(plugin);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
index 0c04c9e..ddc9da1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.server.rest.profile;
 
-import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -36,7 +35,10 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.SecurityContext;
 import javax.xml.bind.annotation.XmlRootElement;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.store.TransientStore;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
@@ -44,8 +46,8 @@ import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.server.rest.ViewableWithPermissions;
 import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
 import org.apache.drill.exec.work.WorkManager;
 import org.apache.drill.exec.work.foreman.Foreman;
 import org.apache.drill.exec.work.foreman.QueryManager;
@@ -58,6 +60,8 @@ import com.google.common.collect.Lists;
 public class ProfileResources {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProfileResources.class);
 
+  public final static int MAX_PROFILES = 100;
+
   @Inject WorkManager work;
   @Inject DrillUserPrincipal principal;
   @Inject SecurityContext sc;
@@ -119,8 +123,12 @@ public class ProfileResources {
 
   }
 
-  private PStoreProvider provider(){
-    return work.getContext().getPersistentStoreProvider();
+  protected PersistentStoreProvider getProvider() {
+    return work.getContext().getStoreProvider();
+  }
+
+  protected ClusterCoordinator getCoordinator() {
+    return work.getContext().getClusterCoordinator();
   }
 
   @XmlRootElement
@@ -146,38 +154,37 @@ public class ProfileResources {
   @Path("/profiles.json")
   @Produces(MediaType.APPLICATION_JSON)
   public QProfiles getProfilesJSON() {
-    PStore<QueryProfile> completed = null;
-    PStore<QueryInfo> running = null;
     try {
-      completed = provider().getStore(QueryManager.QUERY_PROFILE);
-      running = provider().getStore(QueryManager.RUNNING_QUERY_INFO);
-    } catch (IOException e) {
-      logger.debug("Failed to get profiles from persistent or ephemeral store.");
-      return new QProfiles(new ArrayList<ProfileInfo>(), new ArrayList<ProfileInfo>());
-    }
+      final PersistentStore<QueryProfile> completed = getProvider().getOrCreateStore(QueryManager.QUERY_PROFILE);
+      final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO);
 
-    List<ProfileInfo> runningQueries = Lists.newArrayList();
+      final List<ProfileInfo> runningQueries = Lists.newArrayList();
 
-    for (Map.Entry<String, QueryInfo> entry : running) {
-      QueryInfo profile = entry.getValue();
-      if (principal.canManageProfileOf(profile.getUser())) {
-        runningQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(),
-            profile.getQuery(), profile.getState().name(), profile.getUser()));
+      for (final Map.Entry<String, QueryInfo> entry: Lists.newArrayList(running.entries())) {
+        final QueryInfo profile = entry.getValue();
+        if (principal.canManageProfileOf(profile.getUser())) {
+          runningQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(),
+              profile.getQuery(), profile.getState().name(), profile.getUser()));
+        }
       }
-    }
 
-    Collections.sort(runningQueries, Collections.reverseOrder());
+      Collections.sort(runningQueries, Collections.reverseOrder());
 
-    List<ProfileInfo> finishedQueries = Lists.newArrayList();
-    for (Map.Entry<String, QueryProfile> entry : completed) {
-      QueryProfile profile = entry.getValue();
-      if (principal.canManageProfileOf(profile.getUser())) {
-        finishedQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(),
-            profile.getQuery(), profile.getState().name(), profile.getUser()));
+      List<ProfileInfo> finishedQueries = Lists.newArrayList();
+      for (Map.Entry<String, QueryProfile> entry : Lists.newArrayList(completed.getRange(0, MAX_PROFILES))) {
+        QueryProfile profile = entry.getValue();
+        if (principal.canManageProfileOf(profile.getUser())) {
+          finishedQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(),
+              profile.getQuery(), profile.getState().name(), profile.getUser()));
+        }
       }
+
+      return new QProfiles(runningQueries, finishedQueries);
+    } catch (Exception e) {
+      logger.debug("Failed to get profiles from persistent or ephemeral store.");
+      return new QProfiles(new ArrayList<ProfileInfo>(), new ArrayList<ProfileInfo>());
     }
 
-    return new QProfiles(runningQueries, finishedQueries);
   }
 
   @GET
@@ -188,7 +195,7 @@ public class ProfileResources {
     return ViewableWithPermissions.create("/rest/profile/list.ftl", sc, profiles);
   }
 
-  private QueryProfile getQueryProfile(String queryId) throws IOException {
+  private QueryProfile getQueryProfile(String queryId) {
     QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
 
     // first check local running
@@ -200,9 +207,9 @@ public class ProfileResources {
     }
 
     // then check remote running
-    try{
-      PStore<QueryInfo> runningQueries = provider().getStore(QueryManager.RUNNING_QUERY_INFO);
-      QueryInfo info = runningQueries.get(queryId);
+    try {
+      final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO);
+      final QueryInfo info = running.get(queryId);
       if (info != null) {
         QueryProfile queryProfile = work.getContext()
             .getController()
@@ -217,11 +224,15 @@ public class ProfileResources {
     }
 
     // then check blob store
-    PStore<QueryProfile> profiles = provider().getStore(QueryManager.QUERY_PROFILE);
-    QueryProfile queryProfile = profiles.get(queryId);
-    if (queryProfile != null) {
-      checkOrThrowProfileViewAuthorization(queryProfile);
-      return queryProfile;
+    try {
+      final PersistentStore<QueryProfile> profiles = getProvider().getOrCreateStore(QueryManager.QUERY_PROFILE);
+      final QueryProfile queryProfile = profiles.get(queryId);
+      if (queryProfile != null) {
+        checkOrThrowProfileViewAuthorization(queryProfile);
+        return queryProfile;
+      }
+    } catch (final Exception e) {
+      throw new DrillRuntimeException("error while retrieving profile", e);
     }
 
     throw UserException.validationError()
@@ -236,7 +247,7 @@ public class ProfileResources {
   public String getProfileJSON(@PathParam("queryid") String queryId) {
     try {
       return new String(QueryManager.QUERY_PROFILE.getSerializer().serialize(getQueryProfile(queryId)));
-    } catch (IOException e) {
+    } catch (Exception e) {
       logger.debug("Failed to serialize profile for: " + queryId);
       return ("{ 'message' : 'error (unable to serialize profile)' }");
     }
@@ -245,7 +256,7 @@ public class ProfileResources {
   @GET
   @Path("/profiles/{queryid}")
   @Produces(MediaType.TEXT_HTML)
-  public Viewable getProfile(@PathParam("queryid") String queryId) throws IOException {
+  public Viewable getProfile(@PathParam("queryid") String queryId){
     ProfileWrapper wrapper = new ProfileWrapper(getQueryProfile(queryId));
     return ViewableWithPermissions.create("/rest/profile/profile.ftl", sc, wrapper);
   }
@@ -254,7 +265,7 @@ public class ProfileResources {
   @GET
   @Path("/profiles/cancel/{queryid}")
   @Produces(MediaType.TEXT_PLAIN)
-  public String cancelQuery(@PathParam("queryid") String queryId) throws IOException {
+  public String cancelQuery(@PathParam("queryid") String queryId) {
 
     QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
 
@@ -267,9 +278,9 @@ public class ProfileResources {
     }
 
     // then check remote running
-    try{
-      PStore<QueryInfo> runningQueries = provider().getStore(QueryManager.RUNNING_QUERY_INFO);
-      QueryInfo info = runningQueries.get(queryId);
+    try {
+      final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO);
+      final QueryInfo info = running.get(queryId);
       checkOrThrowQueryCancelAuthorization(info.getUser(), queryId);
       Ack a = work.getContext().getController().getTunnel(info.getForeman()).requestCancelQuery(id).checkedGet(2, TimeUnit.SECONDS);
       if(a.getOk()){

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index c7d364b..b6eed2d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -26,7 +26,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PersistentStore;
 
 public interface StoragePluginRegistry extends Iterable<Map.Entry<String, StoragePlugin>>, AutoCloseable {
   final String SYS_PLUGIN = "sys";
@@ -104,7 +104,7 @@ public interface StoragePluginRegistry extends Iterable<Map.Entry<String, Storag
    * Get the PStore for this StoragePluginRegistry. (Used in the management layer.)
    * @return PStore for StoragePlugin configuration objects.
    */
-  PStore<StoragePluginConfig> getStore();
+  PersistentStore<StoragePluginConfig> getStore();
 
   /**
    * Return StoragePlugin rule sets.

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index fefa183..e680502 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -46,6 +46,7 @@ import org.apache.drill.common.scanner.ClassPathScanner;
 import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.exception.StoreException;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.planner.logical.DrillRuleSets;
 import org.apache.drill.exec.planner.logical.StoragePlugins;
@@ -54,8 +55,8 @@ import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
 import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
 import org.apache.drill.exec.store.sys.SystemTablePlugin;
 import org.apache.drill.exec.store.sys.SystemTablePluginConfig;
 
@@ -82,7 +83,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
 
   private DrillbitContext context;
   private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
-  private final PStore<StoragePluginConfig> pluginSystemTable;
+  private final PersistentStore<StoragePluginConfig> pluginSystemTable;
   private final LogicalPlanPersistence lpPersistence;
   private final ScanResult classpathScan;
   private final LoadingCache<StoragePluginConfig, StoragePlugin> ephemeralPlugins;
@@ -93,12 +94,12 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
     this.classpathScan = checkNotNull(context.getClasspathScan());
     try {
       this.pluginSystemTable = context //
-          .getPersistentStoreProvider() //
-          .getStore(PStoreConfig //
+          .getStoreProvider() //
+          .getOrCreateStore(PersistentStoreConfig //
               .newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class) //
               .name(PSTORE_NAME) //
               .build());
-    } catch (IOException | RuntimeException e) {
+    } catch (StoreException | RuntimeException e) {
       logger.error("Failure while loading storage plugin registry.", e);
       throw new RuntimeException("Failure while reading and loading storage plugin configuration.", e);
     }
@@ -120,7 +121,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
         });
   }
 
-  public PStore<StoragePluginConfig> getStore() {
+  public PersistentStore<StoragePluginConfig> getStore() {
     return pluginSystemTable;
   }
 
@@ -137,7 +138,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
        * Check if the storage plugins system table has any entries. If not, load the boostrap-storage-plugin file into
        * the system table.
        */
-      if (!pluginSystemTable.iterator().hasNext()) {
+      if (!pluginSystemTable.getAll().hasNext()) {
         // bootstrap load the config since no plugins are stored.
         logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
         Collection<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
@@ -162,7 +163,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
       }
 
       Map<String, StoragePlugin> activePlugins = new HashMap<String, StoragePlugin>();
-      for (Map.Entry<String, StoragePluginConfig> entry : pluginSystemTable) {
+      for (Map.Entry<String, StoragePluginConfig> entry : Lists.newArrayList(pluginSystemTable.getAll())) {
         String name = entry.getKey();
         StoragePluginConfig config = entry.getValue();
         if (config.isEnabled()) {
@@ -385,7 +386,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
         Set<String> currentPluginNames = Sets.newHashSet(plugins.names());
         // iterate through the plugin instances in the persistence store adding
         // any new ones and refreshing those whose configuration has changed
-        for (Map.Entry<String, StoragePluginConfig> config : pluginSystemTable) {
+        for (Map.Entry<String, StoragePluginConfig> config : Lists.newArrayList(pluginSystemTable.getAll())) {
           if (config.getValue().isEnabled()) {
             getPlugin(config.getKey());
             currentPluginNames.remove(config.getKey());
@@ -460,6 +461,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
   public synchronized void close() throws Exception {
     ephemeralPlugins.invalidateAll();
     plugins.close();
+    pluginSystemTable.close();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
new file mode 100644
index 0000000..248c3cb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.drill.common.collections.ImmutableEntry;
+
+public abstract class BasePersistentStore<V> implements PersistentStore<V> {
+
+  @Override
+  public Iterator<Map.Entry<String, V>> getAll() {
+    return getRange(0, Integer.MAX_VALUE);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java
deleted file mode 100644
index 68440cb..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-public class CachingStoreProvider implements PStoreProvider, AutoCloseable {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachingStoreProvider.class);
-
-  private final ConcurrentMap<PStoreConfig<?>, PStore<?>> storeCache = Maps.newConcurrentMap();
-  private final PStoreProvider provider;
-
-  public CachingStoreProvider(PStoreProvider provider) {
-    super();
-    this.provider = provider;
-  }
-
-  @SuppressWarnings("unchecked")
-  public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException {
-    PStore<?> s = storeCache.get(config);
-    if(s == null){
-      PStore<?> newStore = provider.getStore(config);
-      s = storeCache.putIfAbsent(config, newStore);
-      if(s == null){
-        s = newStore;
-      }else{
-        newStore.close();
-      }
-    }
-
-    return (PStore<V>) s;
-
-  }
-
-  @Override
-  public void start() throws IOException {
-    provider.start();
-  }
-
-  @Override
-  public void close() throws Exception {
-    for(PStore<?> store : storeCache.values()){
-      store.close();
-    }
-    storeCache.clear();
-    provider.close();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
deleted file mode 100644
index 2d04957..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.store.sys;
-
-
-/**
- * Interfaces to define EStore, which is keep track of status/information for running queries. The information
- * would be gone, if the query is completed, or the foreman drillbit is not responding.
- * @param <V>
- */
-public interface EStore <V> extends PStore<V> {
-}


Mime
View raw message