drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject drill git commit: DRILL-1947: Cache PStore/EStore instances rather than recreating on each need. As part of this, make sure that PStoreConfig doesn't use identity equality.
Date Fri, 09 Jan 2015 21:06:29 GMT
Repository: drill
Updated Branches:
  refs/heads/master 1552c96f4 -> 7638dbb82


DRILL-1947: Cache PStore/EStore instances rather than recreating on each need.  As part of
this, make sure that PStoreConfig doesn't use identity equality.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7638dbb8
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7638dbb8
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7638dbb8

Branch: refs/heads/master
Commit: 7638dbb82606c9644d6ad02210fbfd5d8f6ae090
Parents: 1552c96
Author: Jacques Nadeau <jacques@apache.org>
Authored: Tue Jan 6 21:52:40 2015 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Fri Jan 9 08:15:53 2015 -0800

----------------------------------------------------------------------
 .../exec/store/hbase/config/HBasePStore.java    |  4 ++
 .../exec/store/mongo/config/MongoPStore.java    |  4 ++
 .../org/apache/drill/exec/server/Drillbit.java  |  9 ++-
 .../exec/store/sys/CachingStoreProvider.java    | 70 ++++++++++++++++++++
 .../drill/exec/store/sys/EStoreProvider.java    |  4 +-
 .../org/apache/drill/exec/store/sys/PStore.java |  2 +
 .../drill/exec/store/sys/PStoreConfig.java      | 47 +++++++++++++
 .../drill/exec/store/sys/PStoreProvider.java    |  4 +-
 .../drill/exec/store/sys/PStoreRegistry.java    |  2 +-
 .../drill/exec/store/sys/local/FilePStore.java  |  4 ++
 .../store/sys/local/LocalEStoreProvider.java    | 23 +++----
 .../drill/exec/store/sys/local/MapEStore.java   |  6 +-
 .../store/sys/local/NoWriteLocalPStore.java     |  4 ++
 .../store/sys/serialize/JacksonSerializer.java  | 40 +++++++++++
 .../store/sys/serialize/ProtoSerializer.java    | 38 +++++++++++
 .../exec/store/sys/zk/ZkAbstractStore.java      | 17 ++++-
 .../exec/store/sys/zk/ZkEStoreProvider.java     |  8 +++
 .../exec/store/sys/zk/ZkPStoreProvider.java     | 14 ++--
 18 files changed, 265 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
index 594b9f8..17ddcb1 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
@@ -229,4 +229,8 @@ public class HBasePStore<V> implements PStore<V> {
 
   }
 
+  @Override
+  public void close() {
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
index fc5c05b..ea3a5a2 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
@@ -177,4 +177,8 @@ public class MongoPStore<V> implements PStore<V>, DrillMongoConstants
{
       throw new UnsupportedOperationException();
     }
   }
+
+  @Override
+  public void close() {
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 4b9b20d..67342c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.server.rest.DrillRestServer;
 import org.apache.drill.exec.service.ServiceEngine;
+import org.apache.drill.exec.store.sys.CachingStoreProvider;
 import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.store.sys.PStoreRegistry;
 import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
@@ -106,7 +107,7 @@ public class Drillbit implements Closeable{
 
     if(serviceSet != null) {
       this.coord = serviceSet.getCoordinator();
-      this.storeProvider = new LocalPStoreProvider(config);
+      this.storeProvider = new CachingStoreProvider(new LocalPStoreProvider(config));
     } else {
       Runtime.getRuntime().addShutdownHook(new ShutdownThread(config));
       this.coord = new ZKClusterCoordinator(config);
@@ -175,7 +176,11 @@ public class Drillbit implements Closeable{
       logger.warn("Failure while shutting down embedded jetty server.");
     }
     Closeables.closeQuietly(engine);
-    Closeables.closeQuietly(storeProvider);
+    try{
+      storeProvider.close();
+    }catch(Exception e){
+      logger.warn("Failure while closing store provider.", e);
+    }
     Closeables.closeQuietly(coord);
     Closeables.closeQuietly(manager);
     Closeables.closeQuietly(context);

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java
new file mode 100644
index 0000000..68440cb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class CachingStoreProvider implements PStoreProvider, AutoCloseable {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachingStoreProvider.class);
+
+  private final ConcurrentMap<PStoreConfig<?>, PStore<?>> storeCache =
Maps.newConcurrentMap();
+  private final PStoreProvider provider;
+
+  public CachingStoreProvider(PStoreProvider provider) {
+    super();
+    this.provider = provider;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException
{
+    PStore<?> s = storeCache.get(config);
+    if(s == null){
+      PStore<?> newStore = provider.getStore(config);
+      s = storeCache.putIfAbsent(config, newStore);
+      if(s == null){
+        s = newStore;
+      }else{
+        newStore.close();
+      }
+    }
+
+    return (PStore<V>) s;
+
+  }
+
+  @Override
+  public void start() throws IOException {
+    provider.start();
+  }
+
+  @Override
+  public void close() throws Exception {
+    for(PStore<?> store : storeCache.values()){
+      store.close();
+    }
+    storeCache.clear();
+    provider.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
index b09c5b4..4c79a28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
@@ -18,12 +18,10 @@
 
 package org.apache.drill.exec.store.sys;
 
-import java.io.IOException;
 
 /**
  * Interface to define the provider which return EStore.
  */
 
-public interface EStoreProvider {
-  public <V> PStore<V> getStore(PStoreConfig<V> table) throws IOException;
+public interface EStoreProvider extends PStoreProvider {
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
index 26c00ea..b629645 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.sys;
 
 import java.util.Map;
 
+
 /**
  * Interface for reading and writing values to a persistent storage provider.  Iterators
are guaranteed to be returned in key order.
  * @param <V>
@@ -28,4 +29,5 @@ public interface PStore<V> extends Iterable<Map.Entry<String,
V>> {
   public void put(String key, V value);
   public boolean putIfAbsent(String key, V value);
   public void delete(String key);
+  public void close();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
index 83c2243..bd9d977 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
@@ -116,4 +116,51 @@ public class PStoreConfig<V> {
 
   }
 
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + maxIteratorSize;
+    result = prime * result + ((mode == null) ? 0 : mode.hashCode());
+    result = prime * result + ((name == null) ? 0 : name.hashCode());
+    result = prime * result + ((valueSerializer == null) ? 0 : valueSerializer.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    PStoreConfig other = (PStoreConfig) obj;
+    if (maxIteratorSize != other.maxIteratorSize) {
+      return false;
+    }
+    if (mode != other.mode) {
+      return false;
+    }
+    if (name == null) {
+      if (other.name != null) {
+        return false;
+      }
+    } else if (!name.equals(other.name)) {
+      return false;
+    }
+    if (valueSerializer == null) {
+      if (other.valueSerializer != null) {
+        return false;
+      }
+    } else if (!valueSerializer.equals(other.valueSerializer)) {
+      return false;
+    }
+    return true;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
index 6371dfa..efa223e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
@@ -17,11 +17,9 @@
  */
 package org.apache.drill.exec.store.sys;
 
-import java.io.Closeable;
 import java.io.IOException;
 
-public interface PStoreProvider extends AutoCloseable, Closeable{
-
+public interface PStoreProvider extends AutoCloseable {
   public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException;
   public void start() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java
index 580d20a..532e6be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java
@@ -53,7 +53,7 @@ public class PStoreRegistry {
       logger.info("Using the configured PStoreProvider class: '{}'.", storeProviderClassName);
       Class<? extends PStoreProvider> storeProviderClass = (Class<? extends PStoreProvider>)
Class.forName(storeProviderClassName);
       Constructor<? extends PStoreProvider> c = storeProviderClass.getConstructor(PStoreRegistry.class);
-      return c.newInstance(this);
+      return new CachingStoreProvider(c.newInstance(this));
     } catch (ConfigException.Missing | ClassNotFoundException | NoSuchMethodException | SecurityException
         | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
e) {
       logger.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
index 416a21a..40f25e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
@@ -228,4 +228,8 @@ public class FilePStore<V> implements PStore<V> {
     }
   }
 
+  @Override
+  public void close() {
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
index 094d093..e7c2f94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
@@ -19,7 +19,6 @@
 package org.apache.drill.exec.store.sys.local;
 
 import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.exec.store.sys.EStore;
 import org.apache.drill.exec.store.sys.EStoreProvider;
@@ -27,24 +26,22 @@ import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 
 public class LocalEStoreProvider implements EStoreProvider{
-  private ConcurrentMap<PStoreConfig<?>, EStore<?>> estores = Maps.newConcurrentMap();
 
   @Override
   public <V> EStore<V> getStore(PStoreConfig<V> storeConfig) throws IOException
{
     Preconditions.checkArgument(storeConfig.getMode() == Mode.EPHEMERAL, "Estore configurations
must be set ephemeral.");
 
-    if (! (estores.containsKey(storeConfig)) ) {
-      EStore<V> p = new MapEStore<V>();
-      EStore<?> p2 = estores.putIfAbsent(storeConfig, p);
-      if(p2 != null) {
-        return (EStore<V>) p2;
-      }
-      return p;
-    } else {
-      return (EStore<V>) estores.get(storeConfig);
-    }
+    return new MapEStore<V>();
   }
+
+  @Override
+  public void start() throws IOException {
+  }
+
+  @Override
+  public void close() {
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
index 2723916..96e51e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
@@ -29,7 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * Implementation of EStore using ConcurrentHashMap.
  * @param <V>
  */
-public class MapEStore <V> implements EStore<V> {
+public class MapEStore<V> implements EStore<V> {
   ConcurrentHashMap<String, V> store = new ConcurrentHashMap<>();
 
   @Override
@@ -57,4 +57,8 @@ public class MapEStore <V> implements EStore<V> {
     V out = store.putIfAbsent(key, value);
     return out == null;
   }
+
+  @Override
+  public void close() {
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
index 71a41f0..c675618 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
@@ -62,4 +62,8 @@ public class NoWriteLocalPStore<V> implements PStore<V>{
     blobMap.remove(key);
   }
 
+  @Override
+  public void close() {
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java
index b8a5cdd..53452f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java
@@ -43,4 +43,44 @@ public class JacksonSerializer<X> implements PClassSerializer<X>
{
   public X deserialize(byte[] bytes) throws IOException {
     return reader.readValue(bytes);
   }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((reader == null) ? 0 : reader.hashCode());
+    result = prime * result + ((writer == null) ? 0 : writer.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    JacksonSerializer other = (JacksonSerializer) obj;
+    if (reader == null) {
+      if (other.reader != null) {
+        return false;
+      }
+    } else if (!reader.equals(other.reader)) {
+      return false;
+    }
+    if (writer == null) {
+      if (other.writer != null) {
+        return false;
+      }
+    } else if (!writer.equals(other.writer)) {
+      return false;
+    }
+    return true;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java
index 1ea714e..52df7a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java
@@ -51,5 +51,43 @@ public class ProtoSerializer<X, B extends Message.Builder> implements
PClassSeri
     return (X) b.build();
   }
 
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((readSchema == null) ? 0 : readSchema.hashCode());
+    result = prime * result + ((writeSchema == null) ? 0 : writeSchema.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    ProtoSerializer other = (ProtoSerializer) obj;
+    if (readSchema == null) {
+      if (other.readSchema != null) {
+        return false;
+      }
+    } else if (!readSchema.equals(other.readSchema)) {
+      return false;
+    }
+    if (writeSchema == null) {
+      if (other.writeSchema != null) {
+        return false;
+      }
+    } else if (!writeSchema.equals(other.writeSchema)) {
+      return false;
+    }
+    return true;
+  }
+
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
index d61f3b4..01059a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
@@ -27,9 +27,8 @@ import java.util.Map.Entry;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.drill.exec.rpc.data.DataTunnel;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.zookeeper.CreateMode;
 
@@ -40,7 +39,8 @@ import com.google.common.collect.Lists;
  * This is the abstract class that is shared by ZkPStore (Persistent store) and ZkEStore
(Ephemeral Store)
  * @param <V>
  */
-public abstract class ZkAbstractStore<V> {
+public abstract class ZkAbstractStore<V> implements AutoCloseable {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkAbstractStore.class);
 
   protected CuratorFramework framework;
   protected PStoreConfig<V> config;
@@ -206,4 +206,15 @@ public abstract class ZkAbstractStore<V> {
     }
 
   }
+
+  @Override
+  public void close() {
+    try{
+      childrenCache.close();
+    }catch(IOException e){
+      logger.warn("Failure while closing out abstract store.", e);
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
index 1c2c3fd..7d7d475 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
@@ -40,4 +40,12 @@ public class ZkEStoreProvider implements EStoreProvider{
     Preconditions.checkArgument(store.getMode() == Mode.EPHEMERAL);
     return new ZkEStore<V>(curator,store);
   }
+
+  @Override
+  public void start() throws IOException {
+  }
+
+  @Override
+  public void close() throws Exception {
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
index 03d2441..f8fa2bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.sys.zk;
 
-import java.io.File;
 import java.io.IOException;
 
 import org.apache.curator.framework.CuratorFramework;
@@ -26,7 +25,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.sys.EStore;
+import org.apache.drill.exec.store.sys.EStoreProvider;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.drill.exec.store.sys.PStoreProvider;
@@ -45,7 +44,7 @@ public class ZkPStoreProvider implements PStoreProvider {
 
   private final DrillFileSystem fs;
   private final Path blobRoot;
-  private final ZkEStoreProvider zkEStoreProvider;
+  private final EStoreProvider zkEStoreProvider;
 
   public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException {
     ClusterCoordinator coord = registry.getClusterCoordinator();
@@ -66,7 +65,6 @@ public class ZkPStoreProvider implements PStoreProvider {
       throw new DrillbitStartupException("Failure while attempting to set up blob store.",
e);
     }
 
-
     this.zkEStoreProvider = new ZkEStoreProvider(curator);
   }
 
@@ -79,11 +77,6 @@ public class ZkPStoreProvider implements PStoreProvider {
   }
 
   @Override
-  public void close() {
-  }
-
-
-  @Override
   public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException
{
     switch(config.getMode()){
     case BLOB_PERSISTENT:
@@ -101,4 +94,7 @@ public class ZkPStoreProvider implements PStoreProvider {
   public void start() {
   }
 
+  @Override
+  public void close() {
+  }
 }


Mime
View raw message