Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9828F10042 for ; Fri, 5 Dec 2014 03:51:31 +0000 (UTC) Received: (qmail 35941 invoked by uid 500); 5 Dec 2014 03:51:31 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 35849 invoked by uid 500); 5 Dec 2014 03:51:31 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 35673 invoked by uid 99); 5 Dec 2014 03:51:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Dec 2014 03:51:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 27D06A1DCE1; Fri, 5 Dec 2014 03:51:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@accumulo.apache.org Date: Fri, 05 Dec 2014 03:51:34 -0000 Message-Id: <351a2141cab643f9bc3fd96a1c4f720d@git.apache.org> In-Reply-To: <84c8f88c9504490d931870c9cb586f82@git.apache.org> References: <84c8f88c9504490d931870c9cb586f82@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/6] accumulo git commit: ACCUMULO-1798 Add ability to specify compaction strategy for user specificed compactions. http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/proxy/src/main/cpp/AccumuloProxy.h ---------------------------------------------------------------------- diff --git a/proxy/src/main/cpp/AccumuloProxy.h b/proxy/src/main/cpp/AccumuloProxy.h index e9b7769..269884f 100644 --- a/proxy/src/main/cpp/AccumuloProxy.h +++ b/proxy/src/main/cpp/AccumuloProxy.h @@ -38,7 +38,7 @@ class AccumuloProxyIf { virtual void checkIteratorConflicts(const std::string& login, const std::string& tableName, const IteratorSetting& setting, const std::set & scopes) = 0; virtual void clearLocatorCache(const std::string& login, const std::string& tableName) = 0; virtual void cloneTable(const std::string& login, const std::string& tableName, const std::string& newTableName, const bool flush, const std::map & propertiesToSet, const std::set & propertiesToExclude) = 0; - virtual void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector & iterators, const bool flush, const bool wait) = 0; + virtual void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector & iterators, const bool flush, const bool wait, const CompactionStrategyConfig& compactionStrategy) = 0; virtual void cancelCompaction(const std::string& login, const std::string& tableName) = 0; virtual void createTable(const std::string& login, const std::string& tableName, const bool versioningIter, const TimeType::type type) = 0; virtual void deleteTable(const std::string& login, const std::string& tableName) = 0; @@ -159,7 +159,7 @@ class AccumuloProxyNull : virtual public AccumuloProxyIf { void cloneTable(const std::string& /* login */, const std::string& /* tableName */, const std::string& /* newTableName */, const bool /* flush */, const std::map & /* propertiesToSet */, const std::set & /* propertiesToExclude */) { return; } - void compactTable(const std::string& /* login */, const std::string& /* tableName */, const std::string& /* startRow */, const std::string& /* endRow */, const std::vector & /* iterators */, const bool /* flush */, const bool /* wait */) { + void compactTable(const std::string& /* login */, const std::string& /* tableName */, const std::string& /* startRow */, const std::string& /* endRow */, const std::vector & /* iterators */, const bool /* flush */, const bool /* wait */, const CompactionStrategyConfig& /* compactionStrategy */) { return; } void cancelCompaction(const std::string& /* login */, const std::string& /* tableName */) { @@ -1419,7 +1419,7 @@ class AccumuloProxy_cloneTable_presult { }; typedef struct _AccumuloProxy_compactTable_args__isset { - _AccumuloProxy_compactTable_args__isset() : login(false), tableName(false), startRow(false), endRow(false), iterators(false), flush(false), wait(false) {} + _AccumuloProxy_compactTable_args__isset() : login(false), tableName(false), startRow(false), endRow(false), iterators(false), flush(false), wait(false), compactionStrategy(false) {} bool login; bool tableName; bool startRow; @@ -1427,6 +1427,7 @@ typedef struct _AccumuloProxy_compactTable_args__isset { bool iterators; bool flush; bool wait; + bool compactionStrategy; } _AccumuloProxy_compactTable_args__isset; class AccumuloProxy_compactTable_args { @@ -1444,6 +1445,7 @@ class AccumuloProxy_compactTable_args { std::vector iterators; bool flush; bool wait; + CompactionStrategyConfig compactionStrategy; _AccumuloProxy_compactTable_args__isset __isset; @@ -1475,6 +1477,10 @@ class AccumuloProxy_compactTable_args { wait = val; } + void __set_compactionStrategy(const CompactionStrategyConfig& val) { + compactionStrategy = val; + } + bool operator == (const AccumuloProxy_compactTable_args & rhs) const { if (!(login == rhs.login)) @@ -1491,6 +1497,8 @@ class AccumuloProxy_compactTable_args { return false; if (!(wait == rhs.wait)) return false; + if (!(compactionStrategy == rhs.compactionStrategy)) + return false; return true; } bool operator != (const AccumuloProxy_compactTable_args &rhs) const { @@ -1518,6 +1526,7 @@ class AccumuloProxy_compactTable_pargs { const std::vector * iterators; const bool* flush; const bool* wait; + const CompactionStrategyConfig* compactionStrategy; uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; @@ -11342,8 +11351,8 @@ class AccumuloProxyClient : virtual public AccumuloProxyIf { void cloneTable(const std::string& login, const std::string& tableName, const std::string& newTableName, const bool flush, const std::map & propertiesToSet, const std::set & propertiesToExclude); void send_cloneTable(const std::string& login, const std::string& tableName, const std::string& newTableName, const bool flush, const std::map & propertiesToSet, const std::set & propertiesToExclude); void recv_cloneTable(); - void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector & iterators, const bool flush, const bool wait); - void send_compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector & iterators, const bool flush, const bool wait); + void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector & iterators, const bool flush, const bool wait, const CompactionStrategyConfig& compactionStrategy); + void send_compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector & iterators, const bool flush, const bool wait, const CompactionStrategyConfig& compactionStrategy); void recv_compactTable(); void cancelCompaction(const std::string& login, const std::string& tableName); void send_cancelCompaction(const std::string& login, const std::string& tableName); @@ -11815,13 +11824,13 @@ class AccumuloProxyMultiface : virtual public AccumuloProxyIf { ifaces_[i]->cloneTable(login, tableName, newTableName, flush, propertiesToSet, propertiesToExclude); } - void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector & iterators, const bool flush, const bool wait) { + void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector & iterators, const bool flush, const bool wait, const CompactionStrategyConfig& compactionStrategy) { size_t sz = ifaces_.size(); size_t i = 0; for (; i < (sz - 1); ++i) { - ifaces_[i]->compactTable(login, tableName, startRow, endRow, iterators, flush, wait); + ifaces_[i]->compactTable(login, tableName, startRow, endRow, iterators, flush, wait, compactionStrategy); } - ifaces_[i]->compactTable(login, tableName, startRow, endRow, iterators, flush, wait); + ifaces_[i]->compactTable(login, tableName, startRow, endRow, iterators, flush, wait, compactionStrategy); } void cancelCompaction(const std::string& login, const std::string& tableName) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/proxy/src/main/cpp/AccumuloProxy_server.skeleton.cpp ---------------------------------------------------------------------- diff --git a/proxy/src/main/cpp/AccumuloProxy_server.skeleton.cpp b/proxy/src/main/cpp/AccumuloProxy_server.skeleton.cpp index 2654c37..302aec2 100644 --- a/proxy/src/main/cpp/AccumuloProxy_server.skeleton.cpp +++ b/proxy/src/main/cpp/AccumuloProxy_server.skeleton.cpp @@ -73,7 +73,7 @@ class AccumuloProxyHandler : virtual public AccumuloProxyIf { printf("cloneTable\n"); } - void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector & iterators, const bool flush, const bool wait) { + void compactTable(const std::string& login, const std::string& tableName, const std::string& startRow, const std::string& endRow, const std::vector & iterators, const bool flush, const bool wait, const CompactionStrategyConfig& compactionStrategy) { // Your implementation goes here printf("compactTable\n"); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/proxy/src/main/cpp/proxy_types.cpp ---------------------------------------------------------------------- diff --git a/proxy/src/main/cpp/proxy_types.cpp b/proxy/src/main/cpp/proxy_types.cpp index edb0978..a055b48 100644 --- a/proxy/src/main/cpp/proxy_types.cpp +++ b/proxy/src/main/cpp/proxy_types.cpp @@ -2611,6 +2611,105 @@ void swap(WriterOptions &a, WriterOptions &b) { swap(a.__isset, b.__isset); } +const char* CompactionStrategyConfig::ascii_fingerprint = "F7C641917C22B35AE581CCD54910B00D"; +const uint8_t CompactionStrategyConfig::binary_fingerprint[16] = {0xF7,0xC6,0x41,0x91,0x7C,0x22,0xB3,0x5A,0xE5,0x81,0xCC,0xD5,0x49,0x10,0xB0,0x0D}; + +uint32_t CompactionStrategyConfig::read(::apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->className); + this->__isset.className = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_MAP) { + { + this->options.clear(); + uint32_t _size125; + ::apache::thrift::protocol::TType _ktype126; + ::apache::thrift::protocol::TType _vtype127; + xfer += iprot->readMapBegin(_ktype126, _vtype127, _size125); + uint32_t _i129; + for (_i129 = 0; _i129 < _size125; ++_i129) + { + std::string _key130; + xfer += iprot->readString(_key130); + std::string& _val131 = this->options[_key130]; + xfer += iprot->readString(_val131); + } + xfer += iprot->readMapEnd(); + } + this->__isset.options = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t CompactionStrategyConfig::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("CompactionStrategyConfig"); + + xfer += oprot->writeFieldBegin("className", ::apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeString(this->className); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("options", ::apache::thrift::protocol::T_MAP, 2); + { + xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRING, ::apache::thrift::protocol::T_STRING, static_cast(this->options.size())); + std::map ::const_iterator _iter132; + for (_iter132 = this->options.begin(); _iter132 != this->options.end(); ++_iter132) + { + xfer += oprot->writeString(_iter132->first); + xfer += oprot->writeString(_iter132->second); + } + xfer += oprot->writeMapEnd(); + } + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(CompactionStrategyConfig &a, CompactionStrategyConfig &b) { + using ::std::swap; + swap(a.className, b.className); + swap(a.options, b.options); + swap(a.__isset, b.__isset); +} + const char* UnknownScanner::ascii_fingerprint = "EFB929595D312AC8F305D5A794CFEDA1"; const uint8_t UnknownScanner::binary_fingerprint[16] = {0xEF,0xB9,0x29,0x59,0x5D,0x31,0x2A,0xC8,0xF3,0x05,0xD5,0xA7,0x94,0xCF,0xED,0xA1}; http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/proxy/src/main/cpp/proxy_types.h ---------------------------------------------------------------------- diff --git a/proxy/src/main/cpp/proxy_types.h b/proxy/src/main/cpp/proxy_types.h index 625586c..569de88 100644 --- a/proxy/src/main/cpp/proxy_types.h +++ b/proxy/src/main/cpp/proxy_types.h @@ -1555,6 +1555,57 @@ class WriterOptions { void swap(WriterOptions &a, WriterOptions &b); +typedef struct _CompactionStrategyConfig__isset { + _CompactionStrategyConfig__isset() : className(false), options(false) {} + bool className; + bool options; +} _CompactionStrategyConfig__isset; + +class CompactionStrategyConfig { + public: + + static const char* ascii_fingerprint; // = "F7C641917C22B35AE581CCD54910B00D"; + static const uint8_t binary_fingerprint[16]; // = {0xF7,0xC6,0x41,0x91,0x7C,0x22,0xB3,0x5A,0xE5,0x81,0xCC,0xD5,0x49,0x10,0xB0,0x0D}; + + CompactionStrategyConfig() : className() { + } + + virtual ~CompactionStrategyConfig() throw() {} + + std::string className; + std::map options; + + _CompactionStrategyConfig__isset __isset; + + void __set_className(const std::string& val) { + className = val; + } + + void __set_options(const std::map & val) { + options = val; + } + + bool operator == (const CompactionStrategyConfig & rhs) const + { + if (!(className == rhs.className)) + return false; + if (!(options == rhs.options)) + return false; + return true; + } + bool operator != (const CompactionStrategyConfig &rhs) const { + return !(*this == rhs); + } + + bool operator < (const CompactionStrategyConfig & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +void swap(CompactionStrategyConfig &a, CompactionStrategyConfig &b); + typedef struct _UnknownScanner__isset { _UnknownScanner__isset() : msg(false) {} bool msg; http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java ---------------------------------------------------------------------- diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java index bd0782d..b51d43d 100644 --- a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java +++ b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java @@ -56,6 +56,7 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.admin.ActiveCompaction; import org.apache.accumulo.core.client.admin.ActiveScan; +import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; @@ -83,6 +84,7 @@ import org.apache.accumulo.proxy.thrift.AccumuloProxy; import org.apache.accumulo.proxy.thrift.BatchScanOptions; import org.apache.accumulo.proxy.thrift.ColumnUpdate; import org.apache.accumulo.proxy.thrift.CompactionReason; +import org.apache.accumulo.proxy.thrift.CompactionStrategyConfig; import org.apache.accumulo.proxy.thrift.CompactionType; import org.apache.accumulo.proxy.thrift.Condition; import org.apache.accumulo.proxy.thrift.ConditionalStatus; @@ -331,12 +333,22 @@ public class ProxyServer implements AccumuloProxy.Iface { @Override public void compactTable(ByteBuffer login, String tableName, ByteBuffer startRow, ByteBuffer endRow, - List iterators, boolean flush, boolean wait) + List iterators, boolean flush, boolean wait, CompactionStrategyConfig compactionStrategy) throws org.apache.accumulo.proxy.thrift.AccumuloSecurityException, org.apache.accumulo.proxy.thrift.TableNotFoundException, org.apache.accumulo.proxy.thrift.AccumuloException, TException { try { - getConnector(login).tableOperations().compact(tableName, ByteBufferUtil.toText(startRow), ByteBufferUtil.toText(endRow), getIteratorSettings(iterators), - flush, wait); + CompactionConfig compactionConfig = new CompactionConfig().setStartRow(ByteBufferUtil.toText(startRow)).setEndRow(ByteBufferUtil.toText(endRow)) + .setIterators(getIteratorSettings(iterators)).setFlush(flush).setWait(wait); + + if (compactionStrategy != null) { + org.apache.accumulo.core.client.admin.CompactionStrategyConfig ccc = new org.apache.accumulo.core.client.admin.CompactionStrategyConfig( + compactionStrategy.getClassName()); + if (compactionStrategy.options != null) + ccc.setOptions(compactionStrategy.options); + compactionConfig.setCompactionStrategy(ccc); + } + + getConnector(login).tableOperations().compact(tableName, compactionConfig); } catch (Exception e) { handleExceptionTNF(e); }