Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 57A2418A77 for ; Fri, 19 Feb 2016 05:29:04 +0000 (UTC) Received: (qmail 74189 invoked by uid 500); 19 Feb 2016 05:29:04 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 74115 invoked by uid 500); 19 Feb 2016 05:29:04 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 74009 invoked by uid 99); 19 Feb 2016 05:29:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Feb 2016 05:29:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D8455E0092; Fri, 19 Feb 2016 05:29:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hg@apache.org To: commits@drill.apache.org Date: Fri, 19 Feb 2016 05:29:06 -0000 Message-Id: <8c698da02c9842fa9e66528ba0ca567b@git.apache.org> In-Reply-To: <67b18a6a7ed04d81a965274340401838@git.apache.org> References: <67b18a6a7ed04d81a965274340401838@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] drill git commit: DRILL-4275: create TransientStore for short-lived objects; refactor PersistentStore to introduce pagination mechanism DRILL-4275: create TransientStore for short-lived objects; refactor PersistentStore to introduce pagination mechanism Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8126927f Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8126927f Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8126927f Branch: refs/heads/master Commit: 8126927fd6f538000a28fb77e704a37e20abbe6c Parents: 9a3a5c4 Author: Hanifi Gunes Authored: Mon Feb 15 15:16:47 2016 -0800 Committer: Hanifi Gunes Committed: Thu Feb 18 16:47:02 2016 -0800 ---------------------------------------------------------------------- .../common/collections/ImmutableEntry.java | 62 ++++ .../exec/store/hbase/config/HBasePStore.java | 236 --------------- .../store/hbase/config/HBasePStoreProvider.java | 157 +--------- .../hbase/config/HBasePersistentStore.java | 238 +++++++++++++++ .../config/HBasePersistentStoreProvider.java | 135 +++++++++ .../drill/hbase/TestHBaseTableProvider.java | 23 +- .../exec/store/mongo/config/MongoPStore.java | 184 ------------ .../store/mongo/config/MongoPStoreProvider.java | 88 ------ .../mongo/config/MongoPersistentStore.java | 190 ++++++++++++ .../config/MongoPersistentStoreProvider.java | 84 ++++++ .../src/resources/drill-override-example.conf | 2 +- exec/java-exec/pom.xml | 6 + .../apache/drill/exec/client/DrillClient.java | 2 +- .../apache/drill/exec/compile/CodeCompiler.java | 3 - .../drill/exec/coord/ClusterCoordinator.java | 18 +- .../coord/local/LocalClusterCoordinator.java | 22 +- .../drill/exec/coord/local/MapBackedStore.java | 85 ++++++ .../exec/coord/store/BaseTransientStore.java | 83 ++++++ .../store/CachingTransientStoreFactory.java | 63 ++++ .../drill/exec/coord/store/TransientStore.java | 106 +++++++ .../exec/coord/store/TransientStoreConfig.java | 74 +++++ .../store/TransientStoreConfigBuilder.java | 50 ++++ .../exec/coord/store/TransientStoreEvent.java | 71 +++++ .../coord/store/TransientStoreEventType.java | 27 ++ .../exec/coord/store/TransientStoreFactory.java | 36 +++ .../coord/store/TransientStoreListener.java | 32 ++ .../drill/exec/coord/zk/EventDispatcher.java | 60 ++++ .../apache/drill/exec/coord/zk/PathUtils.java | 73 +++++ .../exec/coord/zk/ZKClusterCoordinator.java | 57 ++-- .../drill/exec/coord/zk/ZkEphemeralStore.java | 145 +++++++++ .../exec/coord/zk/ZkTransientStoreFactory.java | 50 ++++ .../drill/exec/coord/zk/ZookeeperClient.java | 238 +++++++++++++++ .../drill/exec/exception/StoreException.java | 42 +++ .../exec/serialization/InstanceSerializer.java | 25 ++ .../exec/serialization/JacksonSerializer.java | 59 ++++ .../exec/serialization/ProtoSerializer.java | 65 +++++ .../org/apache/drill/exec/server/Drillbit.java | 14 +- .../drill/exec/server/DrillbitContext.java | 14 +- .../drill/exec/server/RemoteServiceSet.java | 7 +- .../drill/exec/server/options/OptionValue.java | 3 +- .../server/options/SystemOptionManager.java | 34 ++- .../drill/exec/server/rest/DrillRestServer.java | 4 +- .../exec/server/rest/StorageResources.java | 2 +- .../server/rest/profile/ProfileResources.java | 97 ++++--- .../drill/exec/store/StoragePluginRegistry.java | 4 +- .../exec/store/StoragePluginRegistryImpl.java | 22 +- .../exec/store/sys/BasePersistentStore.java | 32 ++ .../exec/store/sys/CachingStoreProvider.java | 70 ----- .../org/apache/drill/exec/store/sys/EStore.java | 28 -- .../drill/exec/store/sys/EStoreProvider.java | 27 -- .../drill/exec/store/sys/OptionIterator.java | 5 +- .../org/apache/drill/exec/store/sys/PStore.java | 33 --- .../drill/exec/store/sys/PStoreConfig.java | 166 ----------- .../drill/exec/store/sys/PStoreProvider.java | 25 -- .../drill/exec/store/sys/PStoreRegistry.java | 64 ---- .../drill/exec/store/sys/PersistentStore.java | 77 +++++ .../exec/store/sys/PersistentStoreConfig.java | 115 ++++++++ .../exec/store/sys/PersistentStoreMode.java | 26 ++ .../exec/store/sys/PersistentStoreProvider.java | 42 +++ .../exec/store/sys/PersistentStoreRegistry.java | 65 +++++ .../drill/exec/store/sys/StaticDrillTable.java | 5 +- .../drill/exec/store/sys/SystemTablePlugin.java | 2 +- .../drill/exec/store/sys/SystemTableScan.java | 9 +- .../drill/exec/store/sys/ThreadsIterator.java | 6 +- .../drill/exec/store/sys/VersionIterator.java | 2 - .../drill/exec/store/sys/local/FilePStore.java | 234 --------------- .../store/sys/local/LocalEStoreProvider.java | 47 --- .../store/sys/local/LocalPStoreProvider.java | 83 +----- .../drill/exec/store/sys/local/MapEStore.java | 64 ---- .../store/sys/local/NoWriteLocalPStore.java | 69 ----- .../store/sys/serialize/JacksonSerializer.java | 86 ------ .../store/sys/serialize/PClassSerializer.java | 25 -- .../store/sys/serialize/ProtoSerializer.java | 93 ------ .../store/sys/store/LocalPersistentStore.java | 196 +++++++++++++ .../sys/store/ZookeeperPersistentStore.java | 135 +++++++++ .../provider/BasePersistentStoreProvider.java | 28 ++ .../CachingPersistentStoreProvider.java | 76 +++++ .../provider/LocalPersistentStoreProvider.java | 80 +++++ .../ZookeeperPersistentStoreProvider.java | 89 ++++++ .../exec/store/sys/zk/ZkAbstractStore.java | 291 ------------------- .../drill/exec/store/sys/zk/ZkEStore.java | 41 --- .../exec/store/sys/zk/ZkEStoreProvider.java | 50 ---- .../drill/exec/store/sys/zk/ZkPStore.java | 41 --- .../exec/store/sys/zk/ZkPStoreProvider.java | 92 +----- .../exec/testing/store/NoWriteLocalStore.java | 66 +++++ .../org/apache/drill/exec/work/WorkManager.java | 6 +- .../apache/drill/exec/work/foreman/Foreman.java | 10 +- .../drill/exec/work/foreman/QueryManager.java | 52 ++-- .../src/main/resources/drill-module.conf | 2 +- .../java/org/apache/drill/BaseTestQuery.java | 2 +- .../java/org/apache/drill/PlanningBase.java | 6 +- .../exec/compile/CodeCompilerTestFactory.java | 8 +- .../compile/bytecode/ReplaceMethodInvoke.java | 5 +- .../drill/exec/coord/zk/TestEphemeralStore.java | 144 +++++++++ .../exec/coord/zk/TestEventDispatcher.java | 94 ++++++ .../drill/exec/coord/zk/TestPathUtils.java | 92 ++++++ .../exec/coord/zk/TestZookeeperClient.java | 201 +++++++++++++ .../exec/physical/impl/TestOptiqPlans.java | 4 +- .../exec/physical/impl/join/TestHashJoin.java | 5 +- .../drill/exec/store/sys/PStoreTestUtil.java | 14 +- .../exec/store/sys/TestPStoreProviders.java | 10 +- 101 files changed, 3967 insertions(+), 2465 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/common/src/main/java/org/apache/drill/common/collections/ImmutableEntry.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/collections/ImmutableEntry.java b/common/src/main/java/org/apache/drill/common/collections/ImmutableEntry.java new file mode 100644 index 0000000..99ea544 --- /dev/null +++ b/common/src/main/java/org/apache/drill/common/collections/ImmutableEntry.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.common.collections; + +import java.util.Map; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +public class ImmutableEntry implements Map.Entry { + private final K key; + private final V value; + + public ImmutableEntry(final K key, final V value) { + this.key = Preconditions.checkNotNull(key, "key is required"); + this.value = Preconditions.checkNotNull(value, "value is required"); + } + + @Override + public K getKey() { + return key; + } + + @Override + public V getValue() { + return value; + } + + @Override + public V setValue(final V value) { + throw new UnsupportedOperationException("entry is immutable"); + } + + @Override + public boolean equals(final Object other) { + if (other instanceof ImmutableEntry && other.getClass() == getClass()) { + final ImmutableEntry entry = (ImmutableEntry)other; + return Objects.equal(key, entry.key) && Objects.equal(value, entry.value); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(key, value); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java deleted file mode 100644 index 17ddcb1..0000000 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java +++ /dev/null @@ -1,236 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.hbase.config; - -import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.FAMILY; -import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.QUALIFIER; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.NoSuchElementException; - -import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.exec.store.sys.PStore; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.drill.exec.store.sys.PStoreConfig.Mode; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; - -public class HBasePStore implements PStore { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePStore.class); - - private final PStoreConfig config; - - private final HTableInterface table; - - private final String tableName; - private final byte[] tableNameStartKey; - private final byte[] tableNameStopKey; - - public HBasePStore(PStoreConfig config, HTableInterface table) throws IOException { - this.tableName = config.getName() + '\0'; - this.tableNameStartKey = Bytes.toBytes(tableName); // "tableName\x00" - this.tableNameStopKey = this.tableNameStartKey.clone(); - this.tableNameStopKey[tableNameStartKey.length-1] = 1; - this.config = config; - this.table = table; - } - - @Override - public V get(String key) { - return get(key, FAMILY); - } - - protected synchronized V get(String key, byte[] family) { - try { - Get get = new Get(row(key)); - get.addColumn(family, QUALIFIER); - Result r = table.get(get); - if(r.isEmpty()){ - return null; - } - return value(r); - } catch (IOException e) { - throw new DrillRuntimeException("Caught error while getting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e); - } - } - - @Override - public void put(String key, V value) { - put(key, FAMILY, value); - } - - protected synchronized void put(String key, byte[] family, V value) { - try { - Put put = new Put(row(key)); - put.add(family, QUALIFIER, bytes(value)); - table.put(put); - } catch (IOException e) { - throw new DrillRuntimeException("Caught error while putting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e); - } - } - - @Override - public synchronized boolean putIfAbsent(String key, V value) { - try { - Put put = new Put(row(key)); - put.add(FAMILY, QUALIFIER, bytes(value)); - return table.checkAndPut(put.getRow(), FAMILY, QUALIFIER, null /*absent*/, put); - } catch (IOException e) { - throw new DrillRuntimeException("Caught error while putting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e); - } - } - - @Override - public synchronized void delete(String key) { - delete(row(key)); - } - - @Override - public Iterator> iterator() { - return new Iter(); - } - - private byte[] row(String key) { - return Bytes.toBytes(this.tableName + key); - } - - private byte[] bytes(V value) { - try { - return config.getSerializer().serialize(value); - } catch (IOException e) { - throw new DrillRuntimeException(e); - } - } - - private V value(Result result) { - try { - return config.getSerializer().deserialize(result.value()); - } catch (IOException e) { - throw new DrillRuntimeException(e); - } - } - - private void delete(byte[] row) { - try { - Delete del = new Delete(row); - table.delete(del); - } catch (IOException e) { - throw new DrillRuntimeException("Caught error while deleting row '" + Bytes.toStringBinary(row) - + "' from for table:" + Bytes.toString(table.getTableName()), e); - } - } - - private class Iter implements Iterator> { - private ResultScanner scanner; - private Result current = null; - private Result last = null; - private boolean done = false; - private int rowsRead = 0; - - Iter() { - try { - Scan scan = new Scan(tableNameStartKey, tableNameStopKey); - scan.addColumn(FAMILY, QUALIFIER); - scan.setCaching(config.getMaxIteratorSize() > 100 ? 100 : config.getMaxIteratorSize()); - scanner = table.getScanner(scan); - } catch (IOException e) { - throw new DrillRuntimeException("Caught error while creating HBase scanner for table:" + Bytes.toString(table.getTableName()), e); - } - } - - @Override - public boolean hasNext() { - if (config.getMode() == Mode.BLOB_PERSISTENT - && rowsRead >= config.getMaxIteratorSize()) { - done = true; - } else if (!done && current == null) { - try { - if ((current = scanner.next()) == null) { - done = true; - } - } catch (IOException e) { - throw new DrillRuntimeException("Caught error while fetching rows from for table:" + Bytes.toString(table.getTableName()), e); - } - } - - if (done && scanner != null) { - scanner.close(); - } - return (current != null); - } - - @Override - public Entry next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - last = current; - current = null; - rowsRead++; - return new DeferredEntry(last); - } - - @Override - public void remove() { - if (last == null) { - throw new IllegalStateException("remove() called on HBase persistent store iterator, but there is no last row."); - } - delete(last.getRow()); - } - - } - - private class DeferredEntry implements Entry{ - - private Result result; - - public DeferredEntry(Result result) { - super(); - this.result = result; - } - - @Override - public String getKey() { - return Bytes.toString(result.getRow(), tableNameStartKey.length, result.getRow().length-tableNameStartKey.length); - } - - @Override - public V getValue() { - return value(result); - } - - @Override - public V setValue(V value) { - throw new UnsupportedOperationException(); - } - - } - - @Override - public void close() { - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java index e57f2b3..df32b87 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,149 +17,16 @@ */ package org.apache.drill.exec.store.hbase.config; -import java.io.IOException; -import java.util.Map; - -import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.exec.coord.ClusterCoordinator; -import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; -import org.apache.drill.exec.store.hbase.DrillHBaseConstants; -import org.apache.drill.exec.store.sys.PStore; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.drill.exec.store.sys.PStoreProvider; -import org.apache.drill.exec.store.sys.PStoreRegistry; -import org.apache.drill.exec.store.sys.local.LocalEStoreProvider; -import org.apache.drill.exec.store.sys.zk.ZkEStoreProvider; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.util.Bytes; - -import com.google.common.annotations.VisibleForTesting; - -public class HBasePStoreProvider implements PStoreProvider { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePStoreProvider.class); - - static final byte[] FAMILY = Bytes.toBytes("s"); - - static final byte[] QUALIFIER = Bytes.toBytes("d"); - - private final String storeTableName; - - private Configuration hbaseConf; +import org.apache.drill.exec.store.sys.PersistentStoreRegistry; - private HConnection connection; - - private HTableInterface table; - - private final boolean zkAvailable; - private final LocalEStoreProvider localEStoreProvider; - private final ZkEStoreProvider zkEStoreProvider; - - public HBasePStoreProvider(PStoreRegistry registry) { - @SuppressWarnings("unchecked") - Map config = (Map) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_CONFIG); - this.hbaseConf = HBaseConfiguration.create(); - this.hbaseConf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "drill-hbase-persistent-store-client"); - if (config != null) { - for (Map.Entry entry : config.entrySet()) { - this.hbaseConf.set(entry.getKey(), String.valueOf(entry.getValue())); - } - } - this.storeTableName = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE); - - ClusterCoordinator coord = registry.getClusterCoordinator(); - if ((coord instanceof ZKClusterCoordinator)) { - this.localEStoreProvider = null; - this.zkEStoreProvider = new ZkEStoreProvider(((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator()); - this.zkAvailable = true; - } else { - this.localEStoreProvider = new LocalEStoreProvider(); - this.zkEStoreProvider = null; - this.zkAvailable = false; - } - - } - - @VisibleForTesting - public HBasePStoreProvider(Configuration conf, String storeTableName) { - this.hbaseConf = conf; - this.storeTableName = storeTableName; - this.localEStoreProvider = new LocalEStoreProvider(); - this.zkEStoreProvider = null; - this.zkAvailable = false; - } - - - - @Override - public PStore getStore(PStoreConfig config) throws IOException { - switch(config.getMode()){ - case EPHEMERAL: - if (this.zkAvailable) { - return zkEStoreProvider.getStore(config); - } else { - return localEStoreProvider.getStore(config); - } - - case BLOB_PERSISTENT: - case PERSISTENT: - return new HBasePStore(config, this.table); - - default: - throw new IllegalStateException(); - } - } - - - @Override - @SuppressWarnings("deprecation") - public void start() throws IOException { - this.connection = HConnectionManager.createConnection(hbaseConf); - - try(HBaseAdmin admin = new HBaseAdmin(connection)) { - if (!admin.tableExists(storeTableName)) { - HTableDescriptor desc = new HTableDescriptor(storeTableName); - desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(1)); - admin.createTable(desc); - } else { - HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(storeTableName)); - if (!desc.hasFamily(FAMILY)) { - throw new DrillRuntimeException("The HBase table " + storeTableName - + " specified as persistent store exists but does not contain column family: " - + (Bytes.toString(FAMILY))); - } - } - } - - this.table = connection.getTable(storeTableName); - this.table.setAutoFlush(true); - } - - @Override - public synchronized void close() { - if (this.table != null) { - try { - this.table.close(); - this.table = null; - } catch (IOException e) { - logger.warn("Caught exception while closing HBase table.", e); - } - } - if (this.connection != null && !this.connection.isClosed()) { - try { - this.connection.close(); - } catch (IOException e) { - logger.warn("Caught exception while closing HBase connection.", e); - } - this.connection = null; - } +/** + * Kept for possible references to old class name in configuration. + * + * @deprecated will be removed in 1.7 + * use {@link HBasePersistentStoreProvider} instead. + */ +public class HBasePStoreProvider extends HBasePersistentStoreProvider { + public HBasePStoreProvider(PersistentStoreRegistry registry) { + super(registry); } - } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java new file mode 100644 index 0000000..ac78eb0 --- /dev/null +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hbase.config; + +import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.FAMILY; +import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.QUALIFIER; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.NoSuchElementException; + +import com.google.common.collect.Iterators; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.store.sys.BasePersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.PersistentStoreMode; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; + +public class HBasePersistentStore extends BasePersistentStore { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStore.class); + + private final PersistentStoreConfig config; + private final HTableInterface table; + private final String tableName; + private final byte[] tableNameStartKey; + private final byte[] tableNameStopKey; + + public HBasePersistentStore(PersistentStoreConfig config, HTableInterface table) { + this.tableName = config.getName() + '\0'; + this.tableNameStartKey = Bytes.toBytes(tableName); // "tableName\x00" + this.tableNameStopKey = this.tableNameStartKey.clone(); + this.tableNameStopKey[tableNameStartKey.length-1] = 1; + this.config = config; + this.table = table; + } + + @Override + public PersistentStoreMode getMode() { + return PersistentStoreMode.PERSISTENT; + } + + @Override + public V get(String key) { + return get(key, FAMILY); + } + + protected synchronized V get(String key, byte[] family) { + try { + Get get = new Get(row(key)); + get.addColumn(family, QUALIFIER); + Result r = table.get(get); + if(r.isEmpty()){ + return null; + } + return value(r); + } catch (IOException e) { + throw new DrillRuntimeException("Caught error while getting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e); + } + } + + @Override + public void put(String key, V value) { + put(key, FAMILY, value); + } + + protected synchronized void put(String key, byte[] family, V value) { + try { + Put put = new Put(row(key)); + put.add(family, QUALIFIER, bytes(value)); + table.put(put); + } catch (IOException e) { + throw new DrillRuntimeException("Caught error while putting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e); + } + } + + @Override + public synchronized boolean putIfAbsent(String key, V value) { + try { + Put put = new Put(row(key)); + put.add(FAMILY, QUALIFIER, bytes(value)); + return table.checkAndPut(put.getRow(), FAMILY, QUALIFIER, null /*absent*/, put); + } catch (IOException e) { + throw new DrillRuntimeException("Caught error while putting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e); + } + } + + @Override + public synchronized void delete(String key) { + delete(row(key)); + } + + @Override + public Iterator> getRange(int skip, int take) { + final Iterator> iter = new Iter(take); + Iterators.advance(iter, skip); + return Iterators.limit(iter, take); + } + + private byte[] row(String key) { + return Bytes.toBytes(this.tableName + key); + } + + private byte[] bytes(V value) { + try { + return config.getSerializer().serialize(value); + } catch (IOException e) { + throw new DrillRuntimeException(e); + } + } + + private V value(Result result) { + try { + return config.getSerializer().deserialize(result.value()); + } catch (IOException e) { + throw new DrillRuntimeException(e); + } + } + + private void delete(byte[] row) { + try { + Delete del = new Delete(row); + table.delete(del); + } catch (IOException e) { + throw new DrillRuntimeException("Caught error while deleting row '" + Bytes.toStringBinary(row) + + "' from for table:" + Bytes.toString(table.getTableName()), e); + } + } + + private class Iter implements Iterator> { + private ResultScanner scanner; + private Result current = null; + private Result last = null; + private boolean done = false; + + Iter(int take) { + try { + Scan scan = new Scan(tableNameStartKey, tableNameStopKey); + scan.addColumn(FAMILY, QUALIFIER); + scan.setCaching(Math.min(take, 100)); + scan.setBatch(take); // set batch size + scanner = table.getScanner(scan); + } catch (IOException e) { + throw new DrillRuntimeException("Caught error while creating HBase scanner for table:" + Bytes.toString(table.getTableName()), e); + } + } + + @Override + public boolean hasNext() { + if (!done && current == null) { + try { + if ((current = scanner.next()) == null) { + done = true; + } + } catch (IOException e) { + throw new DrillRuntimeException("Caught error while fetching rows from for table:" + Bytes.toString(table.getTableName()), e); + } + } + + if (done && scanner != null) { + scanner.close(); + } + return (current != null); + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + last = current; + current = null; + return new DeferredEntry(last); + } + + @Override + public void remove() { + if (last == null) { + throw new IllegalStateException("remove() called on HBase persistent store iterator, but there is no last row."); + } + delete(last.getRow()); + } + + } + + private class DeferredEntry implements Entry{ + + private Result result; + + public DeferredEntry(Result result) { + super(); + this.result = result; + } + + @Override + public String getKey() { + return Bytes.toString(result.getRow(), tableNameStartKey.length, result.getRow().length-tableNameStartKey.length); + } + + @Override + public V getValue() { + return value(result); + } + + @Override + public V setValue(V value) { + throw new UnsupportedOperationException(); + } + + } + + @Override + public void close() { + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java new file mode 100644 index 0000000..6e379c6 --- /dev/null +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hbase.config; + +import java.io.IOException; +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.exception.StoreException; +import org.apache.drill.exec.store.hbase.DrillHBaseConstants; +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.PersistentStoreRegistry; +import org.apache.drill.exec.store.sys.store.provider.BasePersistentStoreProvider; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.util.Bytes; + +public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStoreProvider.class); + + static final byte[] FAMILY = Bytes.toBytes("s"); + + static final byte[] QUALIFIER = Bytes.toBytes("d"); + + private final String storeTableName; + + private Configuration hbaseConf; + + private HConnection connection; + + private HTableInterface table; + + public HBasePersistentStoreProvider(PersistentStoreRegistry registry) { + @SuppressWarnings("unchecked") + final Map config = (Map) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_CONFIG); + this.hbaseConf = HBaseConfiguration.create(); + this.hbaseConf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "drill-hbase-persistent-store-client"); + if (config != null) { + for (Map.Entry entry : config.entrySet()) { + this.hbaseConf.set(entry.getKey(), String.valueOf(entry.getValue())); + } + } + this.storeTableName = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE); + } + + @VisibleForTesting + public HBasePersistentStoreProvider(Configuration conf, String storeTableName) { + this.hbaseConf = conf; + this.storeTableName = storeTableName; + } + + + + @Override + public PersistentStore getOrCreateStore(PersistentStoreConfig config) throws StoreException { + switch(config.getMode()){ + case BLOB_PERSISTENT: + case PERSISTENT: + return new HBasePersistentStore<>(config, this.table); + + default: + throw new IllegalStateException(); + } + } + + + @Override + @SuppressWarnings("deprecation") + public void start() throws IOException { + this.connection = HConnectionManager.createConnection(hbaseConf); + + try(HBaseAdmin admin = new HBaseAdmin(connection)) { + if (!admin.tableExists(storeTableName)) { + HTableDescriptor desc = new HTableDescriptor(storeTableName); + desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(1)); + admin.createTable(desc); + } else { + HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(storeTableName)); + if (!desc.hasFamily(FAMILY)) { + throw new DrillRuntimeException("The HBase table " + storeTableName + + " specified as persistent store exists but does not contain column family: " + + (Bytes.toString(FAMILY))); + } + } + } + + this.table = connection.getTable(storeTableName); + this.table.setAutoFlush(true); + } + + @Override + public synchronized void close() { + if (this.table != null) { + try { + this.table.close(); + this.table = null; + } catch (IOException e) { + logger.warn("Caught exception while closing HBase table.", e); + } + } + if (this.connection != null && !this.connection.isClosed()) { + try { + this.connection.close(); + } catch (IOException e) { + logger.warn("Caught exception while closing HBase connection.", e); + } + this.connection = null; + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java index 9ff1b08..6b73283 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java @@ -19,32 +19,33 @@ package org.apache.drill.hbase; import static org.junit.Assert.assertEquals; -import java.io.IOException; import java.util.Map.Entry; +import com.google.common.collect.Lists; import org.apache.drill.common.config.LogicalPlanPersistence; +import org.apache.drill.exec.exception.StoreException; import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory; -import org.apache.drill.exec.store.hbase.config.HBasePStoreProvider; -import org.apache.drill.exec.store.sys.PStore; -import org.apache.drill.exec.store.sys.PStoreConfig; +import org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider; +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; public class TestHBaseTableProvider extends BaseHBaseTest { - private static HBasePStoreProvider provider; + private static HBasePersistentStoreProvider provider; @BeforeClass // mask HBase cluster start function public static void setUpBeforeTestHBaseTableProvider() throws Exception { - provider = new HBasePStoreProvider(storagePluginConfig.getHBaseConf(), "drill_store"); + provider = new HBasePersistentStoreProvider(storagePluginConfig.getHBaseConf(), "drill_store"); provider.start(); } @Test - public void testTableProvider() throws IOException { + public void testTableProvider() throws StoreException { LogicalPlanPersistence lp = PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(config); - PStore hbaseStore = provider.getStore(PStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase").build()); + PersistentStore hbaseStore = provider.getOrCreateStore(PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase").build()); hbaseStore.put("", "v0"); hbaseStore.put("k1", "v1"); hbaseStore.put("k2", "v2"); @@ -57,13 +58,13 @@ public class TestHBaseTableProvider extends BaseHBaseTest { assertEquals("testValue", hbaseStore.get(".test")); int rowCount = 0; - for (Entry entry : hbaseStore) { + for (Entry entry : Lists.newArrayList(hbaseStore.getAll())) { rowCount++; System.out.println(entry.getKey() + "=" + entry.getValue()); } assertEquals(7, rowCount); - PStore hbaseTestStore = provider.getStore(PStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase.test").build()); + PersistentStore hbaseTestStore = provider.getOrCreateStore(PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase.test").build()); hbaseTestStore.put("", "v0"); hbaseTestStore.put("k1", "v1"); hbaseTestStore.put("k2", "v2"); @@ -75,7 +76,7 @@ public class TestHBaseTableProvider extends BaseHBaseTest { assertEquals("testValue", hbaseStore.get(".test")); rowCount = 0; - for (Entry entry : hbaseTestStore) { + for (Entry entry : Lists.newArrayList(hbaseTestStore.getAll())) { rowCount++; System.out.println(entry.getKey() + "=" + entry.getValue()); } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java deleted file mode 100644 index 7883fc9..0000000 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java +++ /dev/null @@ -1,184 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.mongo.config; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.NoSuchElementException; - -import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.exec.store.mongo.DrillMongoConstants; -import org.apache.drill.exec.store.sys.PStore; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.bson.Document; -import org.bson.conversions.Bson; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoCursor; -import com.mongodb.client.model.Filters; -import com.mongodb.client.model.UpdateOptions; -import com.mongodb.client.model.Updates; -import com.mongodb.client.result.UpdateResult; - -import static org.apache.drill.exec.store.mongo.config.MongoPStoreProvider.pKey; - -public class MongoPStore implements PStore, DrillMongoConstants { - - static final Logger logger = LoggerFactory.getLogger(MongoPStore.class); - - private final PStoreConfig config; - - private final MongoCollection collection; - - public MongoPStore(PStoreConfig config, MongoCollection collection) - throws IOException { -// this.config = config; -// this.collection = collection; - throw new UnsupportedOperationException("Mongo DB PStore not currently supported"); - } - - @Override - public V get(String key) { - try { - Bson query = Filters.eq(ID, key); - Document document = collection.find(query).first(); - if (document != null) { - return value((byte[]) document.get(pKey)); - } else { - return null; - } - } catch (Exception e) { - logger.error(e.getMessage(), e); - throw new DrillRuntimeException(e.getMessage(), e); - } - } - - @Override - public void put(String key, V value) { - try { - Document putObj = new Document(ID, key).append(pKey, bytes(value)); - collection.insertOne(putObj); - } catch (Exception e) { - logger.error(e.getMessage(), e); - throw new DrillRuntimeException(e.getMessage(), e); - } - } - - @Override - public boolean putIfAbsent(String key, V value) { - try { - Bson query = Filters.eq(ID, key); - Bson update = Updates.set(pKey, bytes(value)); - UpdateResult updateResult = collection.updateOne(query, update, new UpdateOptions().upsert(true)); - return updateResult.getModifiedCount() == 1; - } catch (Exception e) { - logger.error(e.getMessage(), e); - throw new DrillRuntimeException(e.getMessage(), e); - } - } - - @Override - public void delete(String key) { - try { - Bson query = Filters.eq(ID, key); - collection.deleteOne(query); - } catch (Exception e) { - logger.error(e.getMessage(), e); - throw new DrillRuntimeException(e.getMessage(), e); - } - } - - private byte[] bytes(V value) { - try { - return config.getSerializer().serialize(value); - } catch (IOException e) { - throw new DrillRuntimeException(e.getMessage(), e); - } - } - - private V value(byte[] serialize) { - try { - return config.getSerializer().deserialize(serialize); - } catch (IOException e) { - throw new DrillRuntimeException(e.getMessage(), e); - } - } - - @Override - public Iterator> iterator() { - return new MongoIterator(); - } - - private class MongoIterator implements Iterator> { - - private MongoCursor cursor; - - public MongoIterator() { - cursor = collection.find().iterator(); - } - - @Override - public boolean hasNext() { - return cursor.hasNext(); - } - - @Override - public Entry next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - return new DeferredEntry(cursor.next()); - } - - @Override - public void remove() { - cursor.remove(); - } - } - - private class DeferredEntry implements Entry { - - private Document result; - - public DeferredEntry(Document result) { - this.result = result; - } - - @Override - public String getKey() { - return result.get(ID).toString(); - } - - @Override - public V getValue() { - return get(result.get(ID).toString()); - } - - @Override - public V setValue(V value) { - throw new UnsupportedOperationException(); - } - } - - @Override - public void close() { - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java deleted file mode 100644 index ae9353a..0000000 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.mongo.config; - -import java.io.IOException; - -import org.apache.drill.exec.store.mongo.DrillMongoConstants; -import org.apache.drill.exec.store.sys.PStore; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.drill.exec.store.sys.PStoreProvider; -import org.apache.drill.exec.store.sys.PStoreRegistry; -import org.apache.drill.exec.store.sys.local.LocalEStoreProvider; -import org.bson.Document; -import org.bson.conversions.Bson; - -import com.mongodb.MongoClient; -import com.mongodb.MongoClientURI; -import com.mongodb.WriteConcern; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoDatabase; -import com.mongodb.client.model.Indexes; - -public class MongoPStoreProvider implements PStoreProvider, DrillMongoConstants { - - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory - .getLogger(MongoPStoreProvider.class); - - static final String pKey = "pKey"; - - private MongoClient client; - - private MongoCollection collection; - - private final String mongoURL; - private final LocalEStoreProvider localEStoreProvider; - - public MongoPStoreProvider(PStoreRegistry registry) { - mongoURL = registry.getConfig().getString(SYS_STORE_PROVIDER_MONGO_URL); - localEStoreProvider = new LocalEStoreProvider(); - } - - @Override - public void start() throws IOException { - MongoClientURI clientURI = new MongoClientURI(mongoURL); - client = new MongoClient(clientURI); - MongoDatabase db = client.getDatabase(clientURI.getDatabase()); - collection = db.getCollection(clientURI.getCollection()).withWriteConcern(WriteConcern.JOURNALED); - Bson index = Indexes.ascending(pKey); - collection.createIndex(index); - } - - @Override - public PStore getStore(PStoreConfig config) throws IOException { - switch(config.getMode()){ - case BLOB_PERSISTENT: - case PERSISTENT: - return new MongoPStore<>(config, collection); - case EPHEMERAL: - return localEStoreProvider.getStore(config); - default: - throw new IllegalStateException(); - - } - } - - @Override - public void close() { - if (client != null) { - client.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java new file mode 100644 index 0000000..b5cc3ee --- /dev/null +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mongo.config; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.NoSuchElementException; + +import com.google.common.base.Preconditions; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.store.mongo.DrillMongoConstants; +import org.apache.drill.exec.store.sys.BasePersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.PersistentStoreMode; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.Updates; +import com.mongodb.client.result.UpdateResult; + +import static org.apache.drill.exec.store.mongo.config.MongoPersistentStoreProvider.pKey; + +public class MongoPersistentStore extends BasePersistentStore { + + private static final Logger logger = LoggerFactory.getLogger(MongoPersistentStore.class); + + private final PersistentStoreConfig config; + private final MongoCollection collection; + + public MongoPersistentStore(PersistentStoreConfig config, MongoCollection collection) { +// this.config = config; +// this.collection = collection; + throw new UnsupportedOperationException("Mongo DB PStore not currently supported"); + } + + @Override + public PersistentStoreMode getMode() { + return PersistentStoreMode.PERSISTENT; + } + + @Override + public V get(String key) { + try { + Bson query = Filters.eq(DrillMongoConstants.ID, key); + Document document = collection.find(query).first(); + if (document != null) { + return value((byte[]) document.get(pKey)); + } else { + return null; + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new DrillRuntimeException(e.getMessage(), e); + } + } + + @Override + public void put(String key, V value) { + try { + Document putObj = new Document(DrillMongoConstants.ID, key).append(pKey, bytes(value)); + collection.insertOne(putObj); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new DrillRuntimeException(e.getMessage(), e); + } + } + + @Override + public boolean putIfAbsent(String key, V value) { + try { + Bson query = Filters.eq(DrillMongoConstants.ID, key); + Bson update = Updates.set(pKey, bytes(value)); + UpdateResult updateResult = collection.updateOne(query, update, new UpdateOptions().upsert(true)); + return updateResult.getModifiedCount() == 1; + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new DrillRuntimeException(e.getMessage(), e); + } + } + + @Override + public void delete(String key) { + try { + Bson query = Filters.eq(DrillMongoConstants.ID, key); + collection.deleteOne(query); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new DrillRuntimeException(e.getMessage(), e); + } + } + + private byte[] bytes(V value) { + try { + return config.getSerializer().serialize(value); + } catch (IOException e) { + throw new DrillRuntimeException(e.getMessage(), e); + } + } + + private V value(byte[] serialize) { + try { + return config.getSerializer().deserialize(serialize); + } catch (IOException e) { + throw new DrillRuntimeException(e.getMessage(), e); + } + } + + @Override + public Iterator> getRange(int skip, int take) { + final MongoCursor cursor = collection.find().skip(skip).limit(take).iterator(); + return new MongoIterator(cursor); + } + + private class MongoIterator implements Iterator> { + + private MongoCursor cursor; + + public MongoIterator(final MongoCursor cursor) { + this.cursor = Preconditions.checkNotNull(cursor); + } + + @Override + public boolean hasNext() { + return cursor.hasNext(); + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return new DeferredEntry(cursor.next()); + } + + @Override + public void remove() { + cursor.remove(); + } + } + + private class DeferredEntry implements Entry { + + private Document result; + + public DeferredEntry(Document result) { + this.result = result; + } + + @Override + public String getKey() { + return result.get(DrillMongoConstants.ID).toString(); + } + + @Override + public V getValue() { + return get(result.get(DrillMongoConstants.ID).toString()); + } + + @Override + public V setValue(V value) { + throw new UnsupportedOperationException(); + } + } + + @Override + public void close() { + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java new file mode 100644 index 0000000..33127bd --- /dev/null +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStoreProvider.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mongo.config; + +import java.io.IOException; + +import org.apache.drill.exec.exception.StoreException; +import org.apache.drill.exec.store.mongo.DrillMongoConstants; +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.PersistentStoreRegistry; +import org.apache.drill.exec.store.sys.store.provider.BasePersistentStoreProvider; +import org.bson.Document; +import org.bson.conversions.Bson; + +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.WriteConcern; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Indexes; + +public class MongoPersistentStoreProvider extends BasePersistentStoreProvider { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory + .getLogger(MongoPersistentStoreProvider.class); + + static final String pKey = "pKey"; + + private MongoClient client; + + private MongoCollection collection; + + private final String mongoURL; + + public MongoPersistentStoreProvider(PersistentStoreRegistry registry) throws StoreException { + mongoURL = registry.getConfig().getString(DrillMongoConstants.SYS_STORE_PROVIDER_MONGO_URL); + } + + @Override + public void start() throws IOException { + MongoClientURI clientURI = new MongoClientURI(mongoURL); + client = new MongoClient(clientURI); + MongoDatabase db = client.getDatabase(clientURI.getDatabase()); + collection = db.getCollection(clientURI.getCollection()).withWriteConcern(WriteConcern.JOURNALED); + Bson index = Indexes.ascending(pKey); + collection.createIndex(index); + } + + @Override + public PersistentStore getOrCreateStore(PersistentStoreConfig config) { + switch(config.getMode()){ + case BLOB_PERSISTENT: + case PERSISTENT: + return new MongoPersistentStore<>(config, collection); + default: + throw new IllegalStateException(); + + } + } + + @Override + public void close() { + if (client != null) { + client.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/distribution/src/resources/drill-override-example.conf ---------------------------------------------------------------------- diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf index bc77245..9e29a5f 100644 --- a/distribution/src/resources/drill-override-example.conf +++ b/distribution/src/resources/drill-override-example.conf @@ -97,7 +97,7 @@ drill.exec: { executor.threads: 4 }, sys.store.provider: { - class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider", + class: "org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider", # The following section is used by ZkPStoreProvider zk: { blobroot: "file:///var/log/drill" http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/pom.xml ---------------------------------------------------------------------- diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index 751c810..604bc8e 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -336,6 +336,12 @@ + org.apache.curator + curator-test + 2.7.1 + test + + org.xerial.snappy snappy-java 1.0.5-M3 http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 902b35b..f83285e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -288,7 +288,7 @@ public class DrillClient implements Closeable, ConnectionThrottle { try { clusterCoordinator.close(); clusterCoordinator = null; - } catch (IOException e) { + } catch (Exception e) { logger.warn("Error while closing Cluster Coordinator.", e); } } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java index 3f9da07..af328b1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java @@ -25,10 +25,7 @@ import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.expr.CodeGenerator; -import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.options.OptionManager; -import org.apache.drill.exec.server.options.SystemOptionManager; -import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java index be2f3b1..ea9593e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java @@ -22,6 +22,8 @@ import java.util.Collection; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.drill.exec.coord.store.TransientStore; +import org.apache.drill.exec.coord.store.TransientStoreConfig; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.work.foreman.DrillbitStatusListener; @@ -29,7 +31,7 @@ import org.apache.drill.exec.work.foreman.DrillbitStatusListener; * Pluggable interface built to manage cluster coordination. Allows Drillbit or DrillClient to register its capabilities * as well as understand other node's existence and capabilities. **/ -public abstract class ClusterCoordinator implements Closeable { +public abstract class ClusterCoordinator implements AutoCloseable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClusterCoordinator.class); protected ConcurrentHashMap listeners = new ConcurrentHashMap<>( @@ -60,16 +62,26 @@ public abstract class ClusterCoordinator implements Closeable { public abstract DistributedSemaphore getSemaphore(String name, int maximumLeases); /** + * Returns a {@link TransientStore store} instance with the given {@link TransientStoreConfig configuration}. + * + * Note that implementor might cache the instance so new instance creation is not guaranteed. + * + * @param config store configuration + * @param value type for this store + */ + public abstract TransientStore getOrCreateTransientStore(TransientStoreConfig config); + + /** * Actions to take when there are a set of new de-active drillbits. * @param unregisteredBits */ - public void drillbitUnregistered(Set unregisteredBits) { + protected void drillbitUnregistered(Set unregisteredBits) { for (DrillbitStatusListener listener : listeners.keySet()) { listener.drillbitUnregistered(unregisteredBits); } } - public void drillbitRegistered(Set registeredBits) { + protected void drillbitRegistered(Set registeredBits) { for (DrillbitStatusListener listener : listeners.keySet()) { listener.drillbitRegistered(registeredBits); } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java index 27868f0..8c13c42 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java @@ -28,6 +28,10 @@ import java.util.concurrent.TimeUnit; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.DistributedSemaphore; +import org.apache.drill.exec.coord.store.CachingTransientStoreFactory; +import org.apache.drill.exec.coord.store.TransientStore; +import org.apache.drill.exec.coord.store.TransientStoreConfig; +import org.apache.drill.exec.coord.store.TransientStoreFactory; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.google.common.collect.Maps; @@ -43,8 +47,19 @@ public class LocalClusterCoordinator extends ClusterCoordinator { private final Map endpoints = new ConcurrentHashMap<>(); private final ConcurrentMap semaphores = Maps.newConcurrentMap(); + private final TransientStoreFactory factory = CachingTransientStoreFactory.of(new TransientStoreFactory() { + @Override + public TransientStore getOrCreateStore(TransientStoreConfig config) { + return new MapBackedStore<>(config); + } + + @Override + public void close() throws Exception { } + }); + @Override - public void close() throws IOException { + public void close() throws Exception { + factory.close(); endpoints.clear(); } @@ -125,6 +140,11 @@ public class LocalClusterCoordinator extends ClusterCoordinator { return semaphores.get(name); } + @Override + public TransientStore getOrCreateTransientStore(final TransientStoreConfig config) { + return factory.getOrCreateStore(config); + } + public class LocalSemaphore implements DistributedSemaphore { private final Semaphore semaphore; private final LocalLease localLease = new LocalLease(); http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/MapBackedStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/MapBackedStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/MapBackedStore.java new file mode 100644 index 0000000..c3c8f5a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/MapBackedStore.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.local; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +import com.google.common.collect.Maps; +import org.apache.drill.exec.coord.store.BaseTransientStore; +import org.apache.drill.exec.coord.store.TransientStoreConfig; +import org.apache.drill.exec.coord.store.TransientStoreEvent; +import org.apache.drill.exec.coord.store.TransientStoreEventType; + +public class MapBackedStore extends BaseTransientStore { + private final ConcurrentMap delegate = Maps.newConcurrentMap(); + + public MapBackedStore(final TransientStoreConfig config) { + super(config); + } + + @Override + public V get(final String key) { + return delegate.get(key); + } + + @Override + public V put(final String key, final V value) { + final boolean hasKey = delegate.containsKey(key); + final V old = delegate.put(key, value); + if (old != value) { + final TransientStoreEventType type = hasKey ? TransientStoreEventType.UPDATE : TransientStoreEventType.CREATE; + fireListeners(TransientStoreEvent.of(type, key, value)); + } + return old; + } + + @Override + public V putIfAbsent(final String key, final V value) { + final V existing = delegate.putIfAbsent(key, value); + if (existing == null) { + fireListeners(TransientStoreEvent.of(TransientStoreEventType.CREATE, key, value)); + } + return existing; + } + + @Override + public V remove(final String key) { + final V existing = delegate.remove(key); + if (existing != null) { + fireListeners(TransientStoreEvent.of(TransientStoreEventType.DELETE, key, existing)); + } + return existing; + } + + @Override + public Iterator> entries() { + return delegate.entrySet().iterator(); + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public void close() throws Exception { + delegate.clear(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/BaseTransientStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/BaseTransientStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/BaseTransientStore.java new file mode 100644 index 0000000..463e1fb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/BaseTransientStore.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.store; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; + +public abstract class BaseTransientStore implements TransientStore { + private final Set listeners = Collections.newSetFromMap( + Maps.newConcurrentMap()); + + protected final TransientStoreConfig config; + + protected BaseTransientStore(final TransientStoreConfig config) { + this.config = Preconditions.checkNotNull(config, "config cannot be null"); + } + + public TransientStoreConfig getConfig() { + return config; + } + + @Override + public Iterator keys() { + return Iterators.transform(entries(), new Function, String>() { + @Nullable + @Override + public String apply(@Nullable Map.Entry input) { + return input.getKey(); + } + }); + } + + @Override + public Iterator values() { + return Iterators.transform(entries(), new Function, V>() { + @Nullable + @Override + public V apply(final Map.Entry entry) { + return entry.getValue(); + } + }); + } + + protected void fireListeners(final TransientStoreEvent event) { + for (final TransientStoreListener listener:listeners) { + listener.onChange(event); + } + } + + @Override + public void addListener(final TransientStoreListener listener) { + listeners.add(Preconditions.checkNotNull(listener, "listener cannot be null")); + } + + @Override + public void removeListener(final TransientStoreListener listener) { + listeners.remove(Preconditions.checkNotNull(listener, "listener cannot be null")); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/CachingTransientStoreFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/CachingTransientStoreFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/CachingTransientStoreFactory.java new file mode 100644 index 0000000..b500c5d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/CachingTransientStoreFactory.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.store; + +import java.util.List; +import java.util.Map; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.exec.store.sys.PersistentStore; + +public class CachingTransientStoreFactory implements TransientStoreFactory { + private final TransientStoreFactory delegate; + private final Map cache = Maps.newHashMap(); + + public CachingTransientStoreFactory(final TransientStoreFactory delegate) { + this.delegate = Preconditions.checkNotNull(delegate, "delegate factory is required"); + } + + @Override + public TransientStore getOrCreateStore(final TransientStoreConfig config) { + final TransientStore store = cache.get(Preconditions.checkNotNull(config, "config is required")); + if (store != null) { + return store; + } + + final TransientStore newStore = delegate.getOrCreateStore(config); + cache.put(config, newStore); + return newStore; + } + + @Override + public void close() throws Exception { + final List closeables = Lists.newArrayList(); + for(final AutoCloseable store : cache.values()){ + closeables.add(store); + } + closeables.add(delegate); + cache.clear(); + AutoCloseables.close(closeables); + } + + public static TransientStoreFactory of(final TransientStoreFactory delegate) { + return new CachingTransientStoreFactory(delegate); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStore.java new file mode 100644 index 0000000..ca9b028 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStore.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.store; + +import java.util.Iterator; +import java.util.Map; + +/** + * An abstraction for storing, retrieving and observing transient (key, value) pairs in a distributed environment. + * + * This abstraction diverges from {@link org.apache.drill.exec.store.sys.PersistentStore} in that the lifetime of + * a (key, value) tuple is bound to the lifetime of the node originally creating it. In other words, entries are evicted + * as soon as node/s originally stored them leaves the cluster. That should explain the reason for relocating this + * abstraction under cluster coordination package. + * + * Consumers of this interface can observe changes made to the store via attaching a {@link TransientStoreListener listener}. + * + */ +public interface TransientStore extends AutoCloseable { + /** + * Returns a value corresponding to the given look-up key if exists, null otherwise. + * @param key look-up key + */ + V get(String key); + + /** + * Stores the given (key, value) in this store overriding the existing value. + * + * @param key look-up key + * @param value value to store + * @return the old value if the key exists, null otherwise + */ + V put(String key, V value); + + /** + * Stores the given (key, value) tuple in this store only if it does not exists. + * + * @param key look-up key + * @param value value to store + * @return the old value if the key exists, null otherwise + */ + V putIfAbsent(String key, V value); + + + /** + * Removes the (key, value) tuple from this store if the key exists. + * + * @param key look-up key + * @return the removed value if key exists, null otherwise + */ + V remove(String key); + + /** + * Returns an iterator of (key, value) tuples. + */ + Iterator> entries(); + + /** + * Returns an iterator of keys. + */ + Iterator keys(); + + + /** + * Returns an iterator of values. + */ + Iterator values(); + + /** + * Returns number of entries. + */ + int size(); + + /** + * Adds a listener that observes store {@link TransientStoreEvent events}. + * + * Note that + * i) Calling this method with the same listener instance more than once has no effect. + * ii) Listeners are not necessarily invoked from the calling thread. Consumer should consider thread safety issues. + * iii) Subclasses might hold a strong reference to the listener. It is important that consumer + * {@link #removeListener(TransientStoreListener) removes} its listener once it is done observing events. + * + * @see #removeListener(TransientStoreListener) + */ + void addListener(TransientStoreListener listener); + + /** + * Removes the given listener from this store if exists, has no effect otherwise. + */ + void removeListener(TransientStoreListener listener); +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java new file mode 100644 index 0000000..35c4a06 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.store; + +import com.dyuproject.protostuff.Schema; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.protobuf.Message; +import org.apache.drill.exec.serialization.JacksonSerializer; +import org.apache.drill.exec.serialization.ProtoSerializer; +import org.apache.drill.exec.serialization.InstanceSerializer; + +public class TransientStoreConfig { + private final String name; + private final InstanceSerializer serializer; + + protected TransientStoreConfig(final String name, final InstanceSerializer serializer) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "name is required"); + this.name = name; + this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null"); + } + + public String getName() { + return name; + } + + public InstanceSerializer getSerializer() { + return serializer; + } + + @Override + public int hashCode() { + return Objects.hashCode(name, serializer); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof TransientStoreConfig && obj.getClass().equals(getClass())) { + final TransientStoreConfig other = (TransientStoreConfig)obj; + return Objects.equal(name, other.name) && Objects.equal(serializer, other.serializer); + } + return false; + } + + public static TransientStoreConfigBuilder newBuilder() { + return new TransientStoreConfigBuilder<>(); + } + + public static TransientStoreConfigBuilder newProtoBuilder(final Schema writeSchema, final Schema readSchema) { + return TransientStoreConfig.newBuilder().serializer(new ProtoSerializer<>(readSchema, writeSchema)); + } + + public static TransientStoreConfigBuilder newJacksonBuilder(final ObjectMapper mapper, final Class klazz) { + return TransientStoreConfig.newBuilder().serializer(new JacksonSerializer<>(mapper, klazz)); + } + +}