Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7C439CD15 for ; Tue, 9 Dec 2014 05:15:39 +0000 (UTC) Received: (qmail 64247 invoked by uid 500); 9 Dec 2014 05:15:39 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 64214 invoked by uid 500); 9 Dec 2014 05:15:39 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 64177 invoked by uid 99); 9 Dec 2014 05:15:39 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Dec 2014 05:15:39 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E424CA1F5AF; Tue, 9 Dec 2014 05:15:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Tue, 09 Dec 2014 05:15:40 -0000 Message-Id: <9e8ea11c8cc1433ab1c85f9f4ef19a27@git.apache.org> In-Reply-To: <376c163b4f704f2d8dbf3298b04b74d2@git.apache.org> References: <376c163b4f704f2d8dbf3298b04b74d2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] accumulo git commit: ACCUMULO-3394 Change thrift package to rpc and make an rpc package in core too ACCUMULO-3394 Change thrift package to rpc and make an rpc package in core too Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/de1d3ee3 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/de1d3ee3 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/de1d3ee3 Branch: refs/heads/master Commit: de1d3ee381d308c9e4d9d9f41532bde4cc1e6706 Parents: b792790 Author: Josh Elser Authored: Tue Dec 9 00:05:19 2014 -0500 Committer: Josh Elser Committed: Tue Dec 9 00:05:19 2014 -0500 ---------------------------------------------------------------------- .../core/client/impl/ClientContext.java | 2 +- .../core/client/impl/ConditionalWriterImpl.java | 2 +- .../client/impl/InstanceOperationsImpl.java | 2 +- .../accumulo/core/client/impl/MasterClient.java | 2 +- .../core/client/impl/ReplicationClient.java | 2 +- .../accumulo/core/client/impl/ServerClient.java | 2 +- .../core/client/impl/TableOperationsImpl.java | 4 +- .../impl/TabletServerBatchReaderIterator.java | 2 +- .../client/impl/TabletServerBatchWriter.java | 2 +- .../core/client/impl/ThriftScanner.java | 2 +- .../core/client/impl/ThriftTransportKey.java | 2 +- .../core/client/impl/ThriftTransportPool.java | 2 +- .../accumulo/core/client/impl/Writer.java | 2 +- .../accumulo/core/rpc/SslConnectionParams.java | 275 ++++++++++++ .../accumulo/core/rpc/TBufferedSocket.java | 39 ++ .../accumulo/core/rpc/TTimeoutTransport.java | 60 +++ .../apache/accumulo/core/rpc/ThriftUtil.java | 434 +++++++++++++++++++ .../accumulo/core/util/SslConnectionParams.java | 275 ------------ .../accumulo/core/util/TBufferedSocket.java | 39 -- .../accumulo/core/util/TTimeoutTransport.java | 60 --- .../apache/accumulo/core/util/ThriftUtil.java | 433 ------------------ .../java/org/apache/accumulo/proxy/Proxy.java | 2 +- .../accumulo/server/AccumuloServerContext.java | 2 +- .../accumulo/server/client/BulkImporter.java | 2 +- .../accumulo/server/master/LiveTServerSet.java | 2 +- .../server/master/balancer/TabletBalancer.java | 2 +- .../server/rpc/ClientInfoProcessorFactory.java | 53 +++ .../server/rpc/CustomNonBlockingServer.java | 268 ++++++++++++ .../apache/accumulo/server/rpc/RpcWrapper.java | 62 +++ .../accumulo/server/rpc/ServerAddress.java | 42 ++ .../server/rpc/TBufferedServerSocket.java | 71 +++ .../server/rpc/TNonblockingServerSocket.java | 157 +++++++ .../accumulo/server/rpc/TServerUtils.java | 255 +++++++++++ .../accumulo/server/rpc/TimedProcessor.java | 69 +++ .../security/AuditedSecurityOperation.java | 2 +- .../thrift/ClientInfoProcessorFactory.java | 53 --- .../server/thrift/CustomNonBlockingServer.java | 268 ------------ .../accumulo/server/thrift/RpcWrapper.java | 62 --- .../accumulo/server/thrift/ServerAddress.java | 42 -- .../server/thrift/TBufferedServerSocket.java | 71 --- .../server/thrift/TNonblockingServerSocket.java | 157 ------- .../accumulo/server/thrift/TServerUtils.java | 255 ----------- .../accumulo/server/thrift/TimedProcessor.java | 69 --- .../server/util/VerifyTabletAssignments.java | 2 +- .../accumulo/server/util/TServerUtilsTest.java | 2 +- .../gc/GarbageCollectWriteAheadLogs.java | 2 +- .../accumulo/gc/SimpleGarbageCollector.java | 4 +- .../CloseWriteAheadLogReferences.java | 2 +- .../java/org/apache/accumulo/master/Master.java | 6 +- .../org/apache/accumulo/monitor/Monitor.java | 2 +- .../accumulo/monitor/ZooKeeperStatus.java | 2 +- .../monitor/servlets/TServersServlet.java | 2 +- .../apache/accumulo/tserver/TabletServer.java | 8 +- .../accumulo/tserver/session/Session.java | 2 +- .../apache/accumulo/test/WrongTabletTest.java | 2 +- .../accumulo/test/functional/ZombieTServer.java | 4 +- .../test/performance/thrift/NullTserver.java | 2 +- .../org/apache/accumulo/test/TotalQueuedIT.java | 2 +- ...bageCollectorCommunicatesWithTServersIT.java | 2 +- 59 files changed, 1828 insertions(+), 1827 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java index 8fd12f2..e75bec6 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java @@ -33,9 +33,9 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.CredentialProviderFactoryShim; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.rpc.SslConnectionParams; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.security.thrift.TCredentials; -import org.apache.accumulo.core.util.SslConnectionParams; import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java index c831915..ee56fff 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java @@ -60,6 +60,7 @@ import org.apache.accumulo.core.data.thrift.TConditionalSession; import org.apache.accumulo.core.data.thrift.TKeyExtent; import org.apache.accumulo.core.data.thrift.TMutation; import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.security.VisibilityEvaluator; @@ -72,7 +73,6 @@ import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.core.util.BadArgumentException; import org.apache.accumulo.core.util.ByteBufferUtil; import org.apache.accumulo.core.util.LoggingRunnable; -import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java index 4e74069..a62496b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java @@ -36,11 +36,11 @@ import org.apache.accumulo.core.client.impl.thrift.ClientService; import org.apache.accumulo.core.client.impl.thrift.ConfigurationType; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.master.thrift.MasterClientService; +import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; import org.apache.accumulo.core.trace.Tracer; import org.apache.accumulo.core.util.AddressUtil; -import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooCache; import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java index 092eec6..74b8ea9 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java @@ -28,7 +28,7 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.master.thrift.MasterClientService; -import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.log4j.Logger; import org.apache.thrift.TServiceClient; http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java index f2007e9..edfba50 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java @@ -28,7 +28,7 @@ import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator; import org.apache.accumulo.core.replication.thrift.ReplicationServicer; -import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooReader; http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java index ebccdab..1e44727 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java @@ -28,10 +28,10 @@ import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.impl.thrift.ClientService; import org.apache.accumulo.core.client.impl.thrift.ClientService.Client; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; -import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooCache; http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java index 1def091..4843a9c 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java @@ -49,7 +49,6 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import org.apache.accumulo.core.client.admin.CompactionConfig; - import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -92,6 +91,7 @@ import org.apache.accumulo.core.metadata.MetadataServicer; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; @@ -104,7 +104,6 @@ import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.util.OpTimer; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.TextUtil; -import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.hadoop.fs.FileStatus; @@ -116,6 +115,7 @@ import org.apache.log4j.Logger; import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; + import com.google.common.base.Joiner; public class TableOperationsImpl extends TableOperationsHelper { http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java index cd3dfd0..fb5c20b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java @@ -54,6 +54,7 @@ import org.apache.accumulo.core.data.thrift.TKeyExtent; import org.apache.accumulo.core.data.thrift.TKeyValue; import org.apache.accumulo.core.data.thrift.TRange; import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; @@ -61,7 +62,6 @@ import org.apache.accumulo.core.trace.Tracer; import org.apache.accumulo.core.trace.wrappers.TraceRunnable; import org.apache.accumulo.core.util.ByteBufferUtil; import org.apache.accumulo.core.util.OpTimer; -import org.apache.accumulo.core.util.ThriftUtil; import org.apache.hadoop.io.Text; import org.apache.log4j.Level; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java index a6112da..30a707e 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java @@ -54,6 +54,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.thrift.TMutation; import org.apache.accumulo.core.data.thrift.UpdateErrors; import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; @@ -63,7 +64,6 @@ import org.apache.accumulo.core.trace.Trace; import org.apache.accumulo.core.trace.Tracer; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.core.util.SimpleThreadPool; -import org.apache.accumulo.core.util.ThriftUtil; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; import org.apache.thrift.TApplicationException; http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java index 4b7d9ae..90e4421 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java @@ -48,6 +48,7 @@ import org.apache.accumulo.core.data.thrift.IterInfo; import org.apache.accumulo.core.data.thrift.ScanResult; import org.apache.accumulo.core.data.thrift.TKeyValue; import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; @@ -58,7 +59,6 @@ import org.apache.accumulo.core.trace.Trace; import org.apache.accumulo.core.trace.Tracer; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.core.util.OpTimer; -import org.apache.accumulo.core.util.ThriftUtil; import org.apache.hadoop.io.Text; import org.apache.log4j.Level; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java index a3a4f25..176e947 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java @@ -18,7 +18,7 @@ package org.apache.accumulo.core.client.impl; import static com.google.common.base.Preconditions.checkArgument; -import org.apache.accumulo.core.util.SslConnectionParams; +import org.apache.accumulo.core.rpc.SslConnectionParams; class ThriftTransportKey { private final String location; http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java index 3380e13..bdb04d9 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java @@ -31,9 +31,9 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.util.ThriftUtil; import org.apache.log4j.Logger; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java b/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java index df7074d..d7761e9 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java @@ -26,12 +26,12 @@ import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; import org.apache.accumulo.core.tabletserver.thrift.TDurability; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.trace.Tracer; -import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/rpc/SslConnectionParams.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/SslConnectionParams.java b/core/src/main/java/org/apache/accumulo/core/rpc/SslConnectionParams.java new file mode 100644 index 0000000..718bf85 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/rpc/SslConnectionParams.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.rpc; + +import java.io.File; +import java.io.FileNotFoundException; +import java.net.URL; +import java.util.Arrays; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.thrift.transport.TSSLTransportFactory.TSSLTransportParameters; + +public class SslConnectionParams { + private static final Logger log = Logger.getLogger(SslConnectionParams.class); + + private boolean useJsse = false; + private boolean clientAuth = false; + + private boolean keyStoreSet; + private String keyStorePath; + private String keyStorePass; + private String keyStoreType; + + private boolean trustStoreSet; + private String trustStorePath; + private String trustStorePass; + private String trustStoreType; + + private String[] cipherSuites; + private String[] serverProtocols; + private String clientProtocol; + + // Use the static construction methods + private SslConnectionParams() {} + + public static SslConnectionParams forConfig(AccumuloConfiguration conf, boolean server) { + if (!conf.getBoolean(Property.INSTANCE_RPC_SSL_ENABLED)) + return null; + + SslConnectionParams result = new SslConnectionParams(); + boolean requireClientAuth = conf.getBoolean(Property.INSTANCE_RPC_SSL_CLIENT_AUTH); + if (server) { + result.setClientAuth(requireClientAuth); + } + if (conf.getBoolean(Property.RPC_USE_JSSE)) { + result.setUseJsse(true); + return result; + } + + try { + if (!server || requireClientAuth) { + result.setTrustStoreFromConf(conf); + } + if (server || requireClientAuth) { + result.setKeyStoreFromConf(conf); + } + } catch (FileNotFoundException e) { + throw new IllegalArgumentException("Could not load configured keystore file", e); + } + + String ciphers = conf.get(Property.RPC_SSL_CIPHER_SUITES); + if (null != ciphers && !ciphers.isEmpty()) { + result.cipherSuites = StringUtils.split(ciphers, ','); + } + + String enabledProtocols = conf.get(Property.RPC_SSL_ENABLED_PROTOCOLS); + result.serverProtocols = StringUtils.split(enabledProtocols, ','); + + result.clientProtocol = conf.get(Property.RPC_SSL_CLIENT_PROTOCOL); + + return result; + } + + private static String passwordFromConf(AccumuloConfiguration conf, String defaultPassword, Property passwordOverrideProperty) { + String keystorePassword = conf.get(passwordOverrideProperty); + if (!keystorePassword.isEmpty()) { + if (log.isTraceEnabled()) + log.trace("Using explicit SSL private key password from " + passwordOverrideProperty.getKey()); + } else { + keystorePassword = defaultPassword; + } + return keystorePassword; + } + + private static String storePathFromConf(AccumuloConfiguration conf, Property pathProperty) throws FileNotFoundException { + return findKeystore(conf.getPath(pathProperty)); + } + + public void setKeyStoreFromConf(AccumuloConfiguration conf) throws FileNotFoundException { + keyStoreSet = true; + keyStorePath = storePathFromConf(conf, Property.RPC_SSL_KEYSTORE_PATH); + keyStorePass = passwordFromConf(conf, conf.get(Property.INSTANCE_SECRET), Property.RPC_SSL_KEYSTORE_PASSWORD); + keyStoreType = conf.get(Property.RPC_SSL_KEYSTORE_TYPE); + } + + public void setTrustStoreFromConf(AccumuloConfiguration conf) throws FileNotFoundException { + trustStoreSet = true; + trustStorePath = storePathFromConf(conf, Property.RPC_SSL_TRUSTSTORE_PATH); + trustStorePass = passwordFromConf(conf, "", Property.RPC_SSL_TRUSTSTORE_PASSWORD); + trustStoreType = conf.get(Property.RPC_SSL_TRUSTSTORE_TYPE); + } + + public static SslConnectionParams forServer(AccumuloConfiguration configuration) { + return forConfig(configuration, true); + } + + public static SslConnectionParams forClient(AccumuloConfiguration configuration) { + return forConfig(configuration, false); + } + + private static String findKeystore(String keystorePath) throws FileNotFoundException { + try { + // first just try the file + File file = new File(keystorePath); + if (file.exists()) + return file.getAbsolutePath(); + if (!file.isAbsolute()) { + // try classpath + URL url = SslConnectionParams.class.getClassLoader().getResource(keystorePath); + if (url != null) { + file = new File(url.toURI()); + if (file.exists()) + return file.getAbsolutePath(); + } + } + } catch (Exception e) { + log.warn("Exception finding keystore", e); + } + throw new FileNotFoundException("Failed to load SSL keystore from " + keystorePath); + } + + public void setUseJsse(boolean useJsse) { + this.useJsse = useJsse; + } + + public boolean useJsse() { + return useJsse; + } + + public void setClientAuth(boolean clientAuth) { + this.clientAuth = clientAuth; + } + + public boolean isClientAuth() { + return clientAuth; + } + + public String[] getServerProtocols() { + return serverProtocols; + } + + public String getClientProtocol() { + return clientProtocol; + } + + public boolean isKeyStoreSet() { + return keyStoreSet; + } + + public String getKeyStorePath() { + return keyStorePath; + } + + /** + * @return the keyStorePass + */ + public String getKeyStorePass() { + return keyStorePass; + } + + public String getKeyStoreType() { + return keyStoreType; + } + + public boolean isTrustStoreSet() { + return trustStoreSet; + } + + public String getTrustStorePath() { + return trustStorePath; + } + + public String getTrustStorePass() { + return trustStorePass; + } + + /** + * @return the trustStoreType + */ + public String getTrustStoreType() { + return trustStoreType; + } + + public TSSLTransportParameters getTTransportParams() { + if (useJsse) + throw new IllegalStateException("Cannot get TTransportParams for JSEE configuration."); + + // Null cipherSuites is implicitly handled + TSSLTransportParameters params = new TSSLTransportParameters(clientProtocol, cipherSuites); + + params.requireClientAuth(clientAuth); + if (keyStoreSet) { + params.setKeyStore(keyStorePath, keyStorePass, null, keyStoreType); + } + if (trustStoreSet) { + params.setTrustStore(trustStorePath, trustStorePass, null, trustStoreType); + } + return params; + } + + @Override + public int hashCode() { + int hash = 0; + hash = 31 * hash + (clientAuth ? 0 : 1); + hash = 31 * hash + (useJsse ? 0 : 1); + if (useJsse) + return hash; + hash = 31 * hash + (keyStoreSet ? 0 : 1); + hash = 31 * hash + (trustStoreSet ? 0 : 1); + if (keyStoreSet) { + hash = 31 * hash + keyStorePath.hashCode(); + } + if (trustStoreSet) { + hash = 31 * hash + trustStorePath.hashCode(); + } + hash = 31 * hash + clientProtocol.hashCode(); + hash = 31 * hash + Arrays.hashCode(serverProtocols); + return super.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof SslConnectionParams)) + return false; + + SslConnectionParams other = (SslConnectionParams) obj; + if (clientAuth != other.clientAuth) + return false; + if (useJsse) + return other.useJsse; + if (keyStoreSet) { + if (!other.keyStoreSet) + return false; + if (!keyStorePath.equals(other.keyStorePath) || !keyStorePass.equals(other.keyStorePass) || !keyStoreType.equals(other.keyStoreType)) + return false; + } + if (trustStoreSet) { + if (!other.trustStoreSet) + return false; + if (!trustStorePath.equals(other.trustStorePath) || !trustStorePass.equals(other.trustStorePass) || !trustStoreType.equals(other.trustStoreType)) + return false; + } + if (!Arrays.equals(serverProtocols, other.serverProtocols)) { + return false; + } + return clientProtocol.equals(other.clientProtocol); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/rpc/TBufferedSocket.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TBufferedSocket.java b/core/src/main/java/org/apache/accumulo/core/rpc/TBufferedSocket.java new file mode 100644 index 0000000..87b7c13 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/rpc/TBufferedSocket.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.rpc; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; + +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TSocket; + +public class TBufferedSocket extends TIOStreamTransport { + + String client; + + public TBufferedSocket(TSocket sock, int bufferSize) throws IOException { + super(new BufferedInputStream(sock.getSocket().getInputStream(), bufferSize), new BufferedOutputStream(sock.getSocket().getOutputStream(), bufferSize)); + client = sock.getSocket().getInetAddress().getHostAddress() + ":" + sock.getSocket().getPort(); + } + + public String getClientString() { + return client; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java new file mode 100644 index 0000000..6eace77 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.rpc; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.spi.SelectorProvider; + +import org.apache.hadoop.net.NetUtils; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TTransport; + +import com.google.common.net.HostAndPort; + +public class TTimeoutTransport { + + private static InputStream getInputStream(Socket socket, long timeout) { + try { + Method m = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE); + return (InputStream)m.invoke(null, socket, timeout); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static TTransport create(HostAndPort addr, long timeoutMillis) throws IOException { + return create(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis); + } + + public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException { + Socket socket = SelectorProvider.provider().openSocketChannel().socket(); + socket.setSoLinger(false, 0); + socket.setTcpNoDelay(true); + socket.connect(addr); + InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10); + OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10); + return new TIOStreamTransport(input, output); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java new file mode 100644 index 0000000..a3cb252 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.rpc; + +import java.io.FileInputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.UnknownHostException; +import java.security.KeyStore; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLServerSocket; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManagerFactory; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.ClientExec; +import org.apache.accumulo.core.client.impl.ClientExecReturn; +import org.apache.accumulo.core.client.impl.ThriftTransportPool; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.trace.Span; +import org.apache.accumulo.core.trace.Trace; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.TServiceClient; +import org.apache.thrift.TServiceClientFactory; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TMessage; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSSLTransportFactory; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; + +import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; + +public class ThriftUtil { + private static final Logger log = Logger.getLogger(ThriftUtil.class); + + public static class TraceProtocol extends TCompactProtocol { + private Span span = null; + + @Override + public void writeMessageBegin(TMessage message) throws TException { + span = Trace.start("client:" + message.name); + super.writeMessageBegin(message); + } + + @Override + public void writeMessageEnd() throws TException { + super.writeMessageEnd(); + span.stop(); + } + + public TraceProtocol(TTransport transport) { + super(transport); + } + } + + public static class TraceProtocolFactory extends TCompactProtocol.Factory { + private static final long serialVersionUID = 1L; + + @Override + public TProtocol getProtocol(TTransport trans) { + return new TraceProtocol(trans); + } + } + + static private TProtocolFactory protocolFactory = new TraceProtocolFactory(); + static private TTransportFactory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE); + + static public T createClient(TServiceClientFactory factory, TTransport transport) { + return factory.getClient(protocolFactory.getProtocol(transport), protocolFactory.getProtocol(transport)); + } + + static public T getClient(TServiceClientFactory factory, HostAndPort address, ClientContext context) + throws TTransportException { + return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address, context)); + } + + static public T getClientNoTimeout(TServiceClientFactory factory, String address, ClientContext context) + throws TTransportException { + return getClient(factory, address, context, 0); + } + + static public T getClient(TServiceClientFactory factory, String address, ClientContext context) + throws TTransportException { + TTransport transport = ThriftTransportPool.getInstance().getTransport(address, context.getClientTimeoutInMillis(), context); + return createClient(factory, transport); + } + + static private T getClient(TServiceClientFactory factory, String address, ClientContext context, long timeout) + throws TTransportException { + TTransport transport = ThriftTransportPool.getInstance().getTransport(address, timeout, context); + return createClient(factory, transport); + } + + static public void returnClient(TServiceClient iface) { // Eew... the typing here is horrible + if (iface != null) { + ThriftTransportPool.getInstance().returnTransport(iface.getInputProtocol().getTransport()); + } + } + + static public TabletClientService.Client getTServerClient(String address, ClientContext context) throws TTransportException { + return getClient(new TabletClientService.Client.Factory(), address, context); + } + + static public TabletClientService.Client getTServerClient(String address, ClientContext context, long timeout) throws TTransportException { + return getClient(new TabletClientService.Client.Factory(), address, context, timeout); + } + + public static void execute(String address, ClientContext context, ClientExec exec) throws AccumuloException, + AccumuloSecurityException { + while (true) { + TabletClientService.Client client = null; + try { + exec.execute(client = getTServerClient(address, context)); + break; + } catch (TTransportException tte) { + log.debug("getTServerClient request failed, retrying ... ", tte); + UtilWaitThread.sleep(100); + } catch (ThriftSecurityException e) { + throw new AccumuloSecurityException(e.user, e.code, e); + } catch (Exception e) { + throw new AccumuloException(e); + } finally { + if (client != null) + returnClient(client); + } + } + } + + public static T execute(String address, ClientContext context, ClientExecReturn exec) throws AccumuloException, + AccumuloSecurityException { + while (true) { + TabletClientService.Client client = null; + try { + return exec.execute(client = getTServerClient(address, context)); + } catch (TTransportException tte) { + log.debug("getTServerClient request failed, retrying ... ", tte); + UtilWaitThread.sleep(100); + } catch (ThriftSecurityException e) { + throw new AccumuloSecurityException(e.user, e.code, e); + } catch (Exception e) { + throw new AccumuloException(e); + } finally { + if (client != null) + returnClient(client); + } + } + } + + /** + * create a transport that is not pooled + */ + public static TTransport createTransport(HostAndPort address, ClientContext context) throws TException { + return createClientTransport(address, (int) context.getClientTimeoutInMillis(), context.getClientSslParams()); + } + + public static TTransportFactory transportFactory() { + return transportFactory; + } + + private final static Map factoryCache = new HashMap(); + + synchronized public static TTransportFactory transportFactory(int maxFrameSize) { + TTransportFactory factory = factoryCache.get(maxFrameSize); + if (factory == null) { + factory = new TFramedTransport.Factory(maxFrameSize); + factoryCache.put(maxFrameSize, factory); + } + return factory; + } + + synchronized public static TTransportFactory transportFactory(long maxFrameSize) { + if (maxFrameSize > Integer.MAX_VALUE || maxFrameSize < 1) + throw new RuntimeException("Thrift transport frames are limited to " + Integer.MAX_VALUE); + return transportFactory((int) maxFrameSize); + } + + public static TProtocolFactory protocolFactory() { + return protocolFactory; + } + + public static TServerSocket getServerSocket(int port, int timeout, InetAddress address, SslConnectionParams params) throws TTransportException { + TServerSocket tServerSock; + if (params.useJsse()) { + tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address); + } else { + tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address, params.getTTransportParams()); + } + + ServerSocket serverSock = tServerSock.getServerSocket(); + if (serverSock instanceof SSLServerSocket) { + SSLServerSocket sslServerSock = (SSLServerSocket) serverSock; + String[] protocols = params.getServerProtocols(); + + // Be nice for the user and automatically remove protocols that might not exist in their JVM. Keeps us from forcing config alterations too + // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6 + Set socketEnabledProtocols = new HashSet(Arrays.asList(sslServerSock.getEnabledProtocols())); + // Keep only the enabled protocols that were specified by the configuration + socketEnabledProtocols.retainAll(Arrays.asList(protocols)); + if (socketEnabledProtocols.isEmpty()) { + // Bad configuration... + throw new RuntimeException("No available protocols available for secure socket. Availaable protocols: " + + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols: " + Arrays.toString(protocols)); + } + + // Set the protocol(s) on the server socket + sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0])); + } + + return tServerSock; + } + + public static TTransport createClientTransport(HostAndPort address, int timeout, SslConnectionParams sslParams) throws TTransportException { + boolean success = false; + TTransport transport = null; + try { + if (sslParams != null) { + // TSSLTransportFactory handles timeout 0 -> forever natively + if (sslParams.useJsse()) { + transport = TSSLTransportFactory.getClientSocket(address.getHostText(), address.getPort(), timeout); + } else { + // JDK6's factory doesn't appear to pass the protocol onto the Socket properly so we have + // to do some magic to make sure that happens. Not an issue in JDK7 + + // Taken from thrift-0.9.1 to make the SSLContext + SSLContext sslContext = createSSLContext(sslParams); + + // Create the factory from it + SSLSocketFactory sslSockFactory = sslContext.getSocketFactory(); + + // Wrap the real factory with our own that will set the protocol on the Socket before returning it + ProtocolOverridingSSLSocketFactory wrappingSslSockFactory = new ProtocolOverridingSSLSocketFactory(sslSockFactory, + new String[] {sslParams.getClientProtocol()}); + + // Create the TSocket from that + transport = createClient(wrappingSslSockFactory, address.getHostText(), address.getPort(), timeout); + } + // TSSLTransportFactory leaves transports open, so no need to open here + } else if (timeout == 0) { + transport = new TSocket(address.getHostText(), address.getPort()); + transport.open(); + } else { + try { + transport = TTimeoutTransport.create(address, timeout); + } catch (IOException ex) { + throw new TTransportException(ex); + } + transport.open(); + } + transport = ThriftUtil.transportFactory().getTransport(transport); + success = true; + } finally { + if (!success && transport != null) { + transport.close(); + } + } + return transport; + } + + /** + * Lifted from TSSLTransportFactory in Thrift-0.9.1. The method to create a client socket with an SSLContextFactory object is not visibile to us. Have to use + * SslConnectionParams instead of TSSLTransportParameters because no getters exist on TSSLTransportParameters. + * + * @param params + * Parameters to use to create the SSLContext + */ + private static SSLContext createSSLContext(SslConnectionParams params) throws TTransportException { + SSLContext ctx; + try { + ctx = SSLContext.getInstance(params.getClientProtocol()); + TrustManagerFactory tmf = null; + KeyManagerFactory kmf = null; + + if (params.isTrustStoreSet()) { + tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + KeyStore ts = KeyStore.getInstance(params.getTrustStoreType()); + ts.load(new FileInputStream(params.getTrustStorePath()), params.getTrustStorePass().toCharArray()); + tmf.init(ts); + } + + if (params.isKeyStoreSet()) { + kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + KeyStore ks = KeyStore.getInstance(params.getKeyStoreType()); + ks.load(new FileInputStream(params.getKeyStorePath()), params.getKeyStorePass().toCharArray()); + kmf.init(ks, params.getKeyStorePass().toCharArray()); + } + + if (params.isKeyStoreSet() && params.isTrustStoreSet()) { + ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + } else if (params.isKeyStoreSet()) { + ctx.init(kmf.getKeyManagers(), null, null); + } else { + ctx.init(null, tmf.getTrustManagers(), null); + } + + } catch (Exception e) { + throw new TTransportException("Error creating the transport", e); + } + return ctx; + } + + /** + * Lifted from Thrift-0.9.1 because it was private. Create an SSLSocket with the given factory, host:port, and timeout. + * + * @param factory + * Factory to create the socket from + * @param host + * Destination host + * @param port + * Destination port + * @param timeout + * Socket timeout + */ + private static TSocket createClient(SSLSocketFactory factory, String host, int port, int timeout) throws TTransportException { + try { + SSLSocket socket = (SSLSocket) factory.createSocket(host, port); + socket.setSoTimeout(timeout); + return new TSocket(socket); + } catch (Exception e) { + throw new TTransportException("Could not connect to " + host + " on port " + port, e); + } + } + + /** + * JDK6's SSLSocketFactory doesn't seem to properly set the protocols on the Sockets that it creates which causes an SSLv2 client hello message during + * handshake, even when only TLSv1 is enabled. This only appears to be an issue on the client sockets, not the server sockets. + * + * This class wraps the SSLSocketFactory ensuring that the Socket is properly configured. + * http://www.coderanch.com/t/637177/Security/Disabling-handshake-message-Java + * + * This class can be removed when JDK6 support is officially unsupported by Accumulo + */ + private static class ProtocolOverridingSSLSocketFactory extends SSLSocketFactory { + + private final SSLSocketFactory delegate; + private final String[] enabledProtocols; + + public ProtocolOverridingSSLSocketFactory(final SSLSocketFactory delegate, final String[] enabledProtocols) { + Preconditions.checkNotNull(enabledProtocols); + Preconditions.checkArgument(0 != enabledProtocols.length, "Expected at least one protocol"); + this.delegate = delegate; + this.enabledProtocols = enabledProtocols; + } + + @Override + public String[] getDefaultCipherSuites() { + return delegate.getDefaultCipherSuites(); + } + + @Override + public String[] getSupportedCipherSuites() { + return delegate.getSupportedCipherSuites(); + } + + @Override + public Socket createSocket(final Socket socket, final String host, final int port, final boolean autoClose) throws IOException { + final Socket underlyingSocket = delegate.createSocket(socket, host, port, autoClose); + return overrideProtocol(underlyingSocket); + } + + @Override + public Socket createSocket(final String host, final int port) throws IOException, UnknownHostException { + final Socket underlyingSocket = delegate.createSocket(host, port); + return overrideProtocol(underlyingSocket); + } + + @Override + public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort) throws IOException, UnknownHostException { + final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort); + return overrideProtocol(underlyingSocket); + } + + @Override + public Socket createSocket(final InetAddress host, final int port) throws IOException { + final Socket underlyingSocket = delegate.createSocket(host, port); + return overrideProtocol(underlyingSocket); + } + + @Override + public Socket createSocket(final InetAddress host, final int port, final InetAddress localAddress, final int localPort) throws IOException { + final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort); + return overrideProtocol(underlyingSocket); + } + + /** + * Set the {@link javax.net.ssl.SSLSocket#getEnabledProtocols() enabled protocols} to {@link #enabledProtocols} if the socket is a + * {@link SSLSocket} + * + * @param socket + * The Socket + */ + private Socket overrideProtocol(final Socket socket) { + if (socket instanceof SSLSocket) { + ((SSLSocket) socket).setEnabledProtocols(enabledProtocols); + } + return socket; + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/util/SslConnectionParams.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/SslConnectionParams.java b/core/src/main/java/org/apache/accumulo/core/util/SslConnectionParams.java deleted file mode 100644 index cd433c6..0000000 --- a/core/src/main/java/org/apache/accumulo/core/util/SslConnectionParams.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.util; - -import java.io.File; -import java.io.FileNotFoundException; -import java.net.URL; -import java.util.Arrays; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.commons.lang.StringUtils; -import org.apache.log4j.Logger; -import org.apache.thrift.transport.TSSLTransportFactory.TSSLTransportParameters; - -public class SslConnectionParams { - private static final Logger log = Logger.getLogger(SslConnectionParams.class); - - private boolean useJsse = false; - private boolean clientAuth = false; - - private boolean keyStoreSet; - private String keyStorePath; - private String keyStorePass; - private String keyStoreType; - - private boolean trustStoreSet; - private String trustStorePath; - private String trustStorePass; - private String trustStoreType; - - private String[] cipherSuites; - private String[] serverProtocols; - private String clientProtocol; - - // Use the static construction methods - private SslConnectionParams() {} - - public static SslConnectionParams forConfig(AccumuloConfiguration conf, boolean server) { - if (!conf.getBoolean(Property.INSTANCE_RPC_SSL_ENABLED)) - return null; - - SslConnectionParams result = new SslConnectionParams(); - boolean requireClientAuth = conf.getBoolean(Property.INSTANCE_RPC_SSL_CLIENT_AUTH); - if (server) { - result.setClientAuth(requireClientAuth); - } - if (conf.getBoolean(Property.RPC_USE_JSSE)) { - result.setUseJsse(true); - return result; - } - - try { - if (!server || requireClientAuth) { - result.setTrustStoreFromConf(conf); - } - if (server || requireClientAuth) { - result.setKeyStoreFromConf(conf); - } - } catch (FileNotFoundException e) { - throw new IllegalArgumentException("Could not load configured keystore file", e); - } - - String ciphers = conf.get(Property.RPC_SSL_CIPHER_SUITES); - if (null != ciphers && !ciphers.isEmpty()) { - result.cipherSuites = StringUtils.split(ciphers, ','); - } - - String enabledProtocols = conf.get(Property.RPC_SSL_ENABLED_PROTOCOLS); - result.serverProtocols = StringUtils.split(enabledProtocols, ','); - - result.clientProtocol = conf.get(Property.RPC_SSL_CLIENT_PROTOCOL); - - return result; - } - - private static String passwordFromConf(AccumuloConfiguration conf, String defaultPassword, Property passwordOverrideProperty) { - String keystorePassword = conf.get(passwordOverrideProperty); - if (!keystorePassword.isEmpty()) { - if (log.isTraceEnabled()) - log.trace("Using explicit SSL private key password from " + passwordOverrideProperty.getKey()); - } else { - keystorePassword = defaultPassword; - } - return keystorePassword; - } - - private static String storePathFromConf(AccumuloConfiguration conf, Property pathProperty) throws FileNotFoundException { - return findKeystore(conf.getPath(pathProperty)); - } - - public void setKeyStoreFromConf(AccumuloConfiguration conf) throws FileNotFoundException { - keyStoreSet = true; - keyStorePath = storePathFromConf(conf, Property.RPC_SSL_KEYSTORE_PATH); - keyStorePass = passwordFromConf(conf, conf.get(Property.INSTANCE_SECRET), Property.RPC_SSL_KEYSTORE_PASSWORD); - keyStoreType = conf.get(Property.RPC_SSL_KEYSTORE_TYPE); - } - - public void setTrustStoreFromConf(AccumuloConfiguration conf) throws FileNotFoundException { - trustStoreSet = true; - trustStorePath = storePathFromConf(conf, Property.RPC_SSL_TRUSTSTORE_PATH); - trustStorePass = passwordFromConf(conf, "", Property.RPC_SSL_TRUSTSTORE_PASSWORD); - trustStoreType = conf.get(Property.RPC_SSL_TRUSTSTORE_TYPE); - } - - public static SslConnectionParams forServer(AccumuloConfiguration configuration) { - return forConfig(configuration, true); - } - - public static SslConnectionParams forClient(AccumuloConfiguration configuration) { - return forConfig(configuration, false); - } - - private static String findKeystore(String keystorePath) throws FileNotFoundException { - try { - // first just try the file - File file = new File(keystorePath); - if (file.exists()) - return file.getAbsolutePath(); - if (!file.isAbsolute()) { - // try classpath - URL url = SslConnectionParams.class.getClassLoader().getResource(keystorePath); - if (url != null) { - file = new File(url.toURI()); - if (file.exists()) - return file.getAbsolutePath(); - } - } - } catch (Exception e) { - log.warn("Exception finding keystore", e); - } - throw new FileNotFoundException("Failed to load SSL keystore from " + keystorePath); - } - - public void setUseJsse(boolean useJsse) { - this.useJsse = useJsse; - } - - public boolean useJsse() { - return useJsse; - } - - public void setClientAuth(boolean clientAuth) { - this.clientAuth = clientAuth; - } - - public boolean isClientAuth() { - return clientAuth; - } - - public String[] getServerProtocols() { - return serverProtocols; - } - - public String getClientProtocol() { - return clientProtocol; - } - - public boolean isKeyStoreSet() { - return keyStoreSet; - } - - public String getKeyStorePath() { - return keyStorePath; - } - - /** - * @return the keyStorePass - */ - public String getKeyStorePass() { - return keyStorePass; - } - - public String getKeyStoreType() { - return keyStoreType; - } - - public boolean isTrustStoreSet() { - return trustStoreSet; - } - - public String getTrustStorePath() { - return trustStorePath; - } - - public String getTrustStorePass() { - return trustStorePass; - } - - /** - * @return the trustStoreType - */ - public String getTrustStoreType() { - return trustStoreType; - } - - public TSSLTransportParameters getTTransportParams() { - if (useJsse) - throw new IllegalStateException("Cannot get TTransportParams for JSEE configuration."); - - // Null cipherSuites is implicitly handled - TSSLTransportParameters params = new TSSLTransportParameters(clientProtocol, cipherSuites); - - params.requireClientAuth(clientAuth); - if (keyStoreSet) { - params.setKeyStore(keyStorePath, keyStorePass, null, keyStoreType); - } - if (trustStoreSet) { - params.setTrustStore(trustStorePath, trustStorePass, null, trustStoreType); - } - return params; - } - - @Override - public int hashCode() { - int hash = 0; - hash = 31 * hash + (clientAuth ? 0 : 1); - hash = 31 * hash + (useJsse ? 0 : 1); - if (useJsse) - return hash; - hash = 31 * hash + (keyStoreSet ? 0 : 1); - hash = 31 * hash + (trustStoreSet ? 0 : 1); - if (keyStoreSet) { - hash = 31 * hash + keyStorePath.hashCode(); - } - if (trustStoreSet) { - hash = 31 * hash + trustStorePath.hashCode(); - } - hash = 31 * hash + clientProtocol.hashCode(); - hash = 31 * hash + Arrays.hashCode(serverProtocols); - return super.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof SslConnectionParams)) - return false; - - SslConnectionParams other = (SslConnectionParams) obj; - if (clientAuth != other.clientAuth) - return false; - if (useJsse) - return other.useJsse; - if (keyStoreSet) { - if (!other.keyStoreSet) - return false; - if (!keyStorePath.equals(other.keyStorePath) || !keyStorePass.equals(other.keyStorePass) || !keyStoreType.equals(other.keyStoreType)) - return false; - } - if (trustStoreSet) { - if (!other.trustStoreSet) - return false; - if (!trustStorePath.equals(other.trustStorePath) || !trustStorePass.equals(other.trustStorePass) || !trustStoreType.equals(other.trustStoreType)) - return false; - } - if (!Arrays.equals(serverProtocols, other.serverProtocols)) { - return false; - } - return clientProtocol.equals(other.clientProtocol); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/util/TBufferedSocket.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/TBufferedSocket.java b/core/src/main/java/org/apache/accumulo/core/util/TBufferedSocket.java deleted file mode 100644 index c2d9400..0000000 --- a/core/src/main/java/org/apache/accumulo/core/util/TBufferedSocket.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.util; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.IOException; - -import org.apache.thrift.transport.TIOStreamTransport; -import org.apache.thrift.transport.TSocket; - -public class TBufferedSocket extends TIOStreamTransport { - - String client; - - public TBufferedSocket(TSocket sock, int bufferSize) throws IOException { - super(new BufferedInputStream(sock.getSocket().getInputStream(), bufferSize), new BufferedOutputStream(sock.getSocket().getOutputStream(), bufferSize)); - client = sock.getSocket().getInetAddress().getHostAddress() + ":" + sock.getSocket().getPort(); - } - - public String getClientString() { - return client; - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java deleted file mode 100644 index a66b268..0000000 --- a/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.util; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.Method; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.nio.channels.spi.SelectorProvider; - -import org.apache.hadoop.net.NetUtils; -import org.apache.thrift.transport.TIOStreamTransport; -import org.apache.thrift.transport.TTransport; - -import com.google.common.net.HostAndPort; - -public class TTimeoutTransport { - - private static InputStream getInputStream(Socket socket, long timeout) { - try { - Method m = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE); - return (InputStream)m.invoke(null, socket, timeout); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static TTransport create(HostAndPort addr, long timeoutMillis) throws IOException { - return create(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis); - } - - public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException { - Socket socket = SelectorProvider.provider().openSocketChannel().socket(); - socket.setSoLinger(false, 0); - socket.setTcpNoDelay(true); - socket.connect(addr); - InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10); - OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10); - return new TIOStreamTransport(input, output); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java deleted file mode 100644 index 619131a..0000000 --- a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java +++ /dev/null @@ -1,433 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.util; - -import java.io.FileInputStream; -import java.io.IOException; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.UnknownHostException; -import java.security.KeyStore; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLServerSocket; -import javax.net.ssl.SSLSocket; -import javax.net.ssl.SSLSocketFactory; -import javax.net.ssl.TrustManagerFactory; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.impl.ClientContext; -import org.apache.accumulo.core.client.impl.ClientExec; -import org.apache.accumulo.core.client.impl.ClientExecReturn; -import org.apache.accumulo.core.client.impl.ThriftTransportPool; -import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; -import org.apache.accumulo.core.trace.Span; -import org.apache.accumulo.core.trace.Trace; -import org.apache.log4j.Logger; -import org.apache.thrift.TException; -import org.apache.thrift.TServiceClient; -import org.apache.thrift.TServiceClientFactory; -import org.apache.thrift.protocol.TCompactProtocol; -import org.apache.thrift.protocol.TMessage; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSSLTransportFactory; -import org.apache.thrift.transport.TServerSocket; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; -import org.apache.thrift.transport.TTransportFactory; - -import com.google.common.base.Preconditions; -import com.google.common.net.HostAndPort; - -public class ThriftUtil { - private static final Logger log = Logger.getLogger(ThriftUtil.class); - - public static class TraceProtocol extends TCompactProtocol { - private Span span = null; - - @Override - public void writeMessageBegin(TMessage message) throws TException { - span = Trace.start("client:" + message.name); - super.writeMessageBegin(message); - } - - @Override - public void writeMessageEnd() throws TException { - super.writeMessageEnd(); - span.stop(); - } - - public TraceProtocol(TTransport transport) { - super(transport); - } - } - - public static class TraceProtocolFactory extends TCompactProtocol.Factory { - private static final long serialVersionUID = 1L; - - @Override - public TProtocol getProtocol(TTransport trans) { - return new TraceProtocol(trans); - } - } - - static private TProtocolFactory protocolFactory = new TraceProtocolFactory(); - static private TTransportFactory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE); - - static public T createClient(TServiceClientFactory factory, TTransport transport) { - return factory.getClient(protocolFactory.getProtocol(transport), protocolFactory.getProtocol(transport)); - } - - static public T getClient(TServiceClientFactory factory, HostAndPort address, ClientContext context) - throws TTransportException { - return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address, context)); - } - - static public T getClientNoTimeout(TServiceClientFactory factory, String address, ClientContext context) - throws TTransportException { - return getClient(factory, address, context, 0); - } - - static public T getClient(TServiceClientFactory factory, String address, ClientContext context) - throws TTransportException { - TTransport transport = ThriftTransportPool.getInstance().getTransport(address, context.getClientTimeoutInMillis(), context); - return createClient(factory, transport); - } - - static private T getClient(TServiceClientFactory factory, String address, ClientContext context, long timeout) - throws TTransportException { - TTransport transport = ThriftTransportPool.getInstance().getTransport(address, timeout, context); - return createClient(factory, transport); - } - - static public void returnClient(TServiceClient iface) { // Eew... the typing here is horrible - if (iface != null) { - ThriftTransportPool.getInstance().returnTransport(iface.getInputProtocol().getTransport()); - } - } - - static public TabletClientService.Client getTServerClient(String address, ClientContext context) throws TTransportException { - return getClient(new TabletClientService.Client.Factory(), address, context); - } - - static public TabletClientService.Client getTServerClient(String address, ClientContext context, long timeout) throws TTransportException { - return getClient(new TabletClientService.Client.Factory(), address, context, timeout); - } - - public static void execute(String address, ClientContext context, ClientExec exec) throws AccumuloException, - AccumuloSecurityException { - while (true) { - TabletClientService.Client client = null; - try { - exec.execute(client = getTServerClient(address, context)); - break; - } catch (TTransportException tte) { - log.debug("getTServerClient request failed, retrying ... ", tte); - UtilWaitThread.sleep(100); - } catch (ThriftSecurityException e) { - throw new AccumuloSecurityException(e.user, e.code, e); - } catch (Exception e) { - throw new AccumuloException(e); - } finally { - if (client != null) - returnClient(client); - } - } - } - - public static T execute(String address, ClientContext context, ClientExecReturn exec) throws AccumuloException, - AccumuloSecurityException { - while (true) { - TabletClientService.Client client = null; - try { - return exec.execute(client = getTServerClient(address, context)); - } catch (TTransportException tte) { - log.debug("getTServerClient request failed, retrying ... ", tte); - UtilWaitThread.sleep(100); - } catch (ThriftSecurityException e) { - throw new AccumuloSecurityException(e.user, e.code, e); - } catch (Exception e) { - throw new AccumuloException(e); - } finally { - if (client != null) - returnClient(client); - } - } - } - - /** - * create a transport that is not pooled - */ - public static TTransport createTransport(HostAndPort address, ClientContext context) throws TException { - return createClientTransport(address, (int) context.getClientTimeoutInMillis(), context.getClientSslParams()); - } - - public static TTransportFactory transportFactory() { - return transportFactory; - } - - private final static Map factoryCache = new HashMap(); - - synchronized public static TTransportFactory transportFactory(int maxFrameSize) { - TTransportFactory factory = factoryCache.get(maxFrameSize); - if (factory == null) { - factory = new TFramedTransport.Factory(maxFrameSize); - factoryCache.put(maxFrameSize, factory); - } - return factory; - } - - synchronized public static TTransportFactory transportFactory(long maxFrameSize) { - if (maxFrameSize > Integer.MAX_VALUE || maxFrameSize < 1) - throw new RuntimeException("Thrift transport frames are limited to " + Integer.MAX_VALUE); - return transportFactory((int) maxFrameSize); - } - - public static TProtocolFactory protocolFactory() { - return protocolFactory; - } - - public static TServerSocket getServerSocket(int port, int timeout, InetAddress address, SslConnectionParams params) throws TTransportException { - TServerSocket tServerSock; - if (params.useJsse()) { - tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address); - } else { - tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address, params.getTTransportParams()); - } - - ServerSocket serverSock = tServerSock.getServerSocket(); - if (serverSock instanceof SSLServerSocket) { - SSLServerSocket sslServerSock = (SSLServerSocket) serverSock; - String[] protocols = params.getServerProtocols(); - - // Be nice for the user and automatically remove protocols that might not exist in their JVM. Keeps us from forcing config alterations too - // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6 - Set socketEnabledProtocols = new HashSet(Arrays.asList(sslServerSock.getEnabledProtocols())); - // Keep only the enabled protocols that were specified by the configuration - socketEnabledProtocols.retainAll(Arrays.asList(protocols)); - if (socketEnabledProtocols.isEmpty()) { - // Bad configuration... - throw new RuntimeException("No available protocols available for secure socket. Availaable protocols: " - + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols: " + Arrays.toString(protocols)); - } - - // Set the protocol(s) on the server socket - sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0])); - } - - return tServerSock; - } - - public static TTransport createClientTransport(HostAndPort address, int timeout, SslConnectionParams sslParams) throws TTransportException { - boolean success = false; - TTransport transport = null; - try { - if (sslParams != null) { - // TSSLTransportFactory handles timeout 0 -> forever natively - if (sslParams.useJsse()) { - transport = TSSLTransportFactory.getClientSocket(address.getHostText(), address.getPort(), timeout); - } else { - // JDK6's factory doesn't appear to pass the protocol onto the Socket properly so we have - // to do some magic to make sure that happens. Not an issue in JDK7 - - // Taken from thrift-0.9.1 to make the SSLContext - SSLContext sslContext = createSSLContext(sslParams); - - // Create the factory from it - SSLSocketFactory sslSockFactory = sslContext.getSocketFactory(); - - // Wrap the real factory with our own that will set the protocol on the Socket before returning it - ProtocolOverridingSSLSocketFactory wrappingSslSockFactory = new ProtocolOverridingSSLSocketFactory(sslSockFactory, - new String[] {sslParams.getClientProtocol()}); - - // Create the TSocket from that - transport = createClient(wrappingSslSockFactory, address.getHostText(), address.getPort(), timeout); - } - // TSSLTransportFactory leaves transports open, so no need to open here - } else if (timeout == 0) { - transport = new TSocket(address.getHostText(), address.getPort()); - transport.open(); - } else { - try { - transport = TTimeoutTransport.create(address, timeout); - } catch (IOException ex) { - throw new TTransportException(ex); - } - transport.open(); - } - transport = ThriftUtil.transportFactory().getTransport(transport); - success = true; - } finally { - if (!success && transport != null) { - transport.close(); - } - } - return transport; - } - - /** - * Lifted from TSSLTransportFactory in Thrift-0.9.1. The method to create a client socket with an SSLContextFactory object is not visibile to us. Have to use - * SslConnectionParams instead of TSSLTransportParameters because no getters exist on TSSLTransportParameters. - * - * @param params - * Parameters to use to create the SSLContext - */ - private static SSLContext createSSLContext(SslConnectionParams params) throws TTransportException { - SSLContext ctx; - try { - ctx = SSLContext.getInstance(params.getClientProtocol()); - TrustManagerFactory tmf = null; - KeyManagerFactory kmf = null; - - if (params.isTrustStoreSet()) { - tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); - KeyStore ts = KeyStore.getInstance(params.getTrustStoreType()); - ts.load(new FileInputStream(params.getTrustStorePath()), params.getTrustStorePass().toCharArray()); - tmf.init(ts); - } - - if (params.isKeyStoreSet()) { - kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - KeyStore ks = KeyStore.getInstance(params.getKeyStoreType()); - ks.load(new FileInputStream(params.getKeyStorePath()), params.getKeyStorePass().toCharArray()); - kmf.init(ks, params.getKeyStorePass().toCharArray()); - } - - if (params.isKeyStoreSet() && params.isTrustStoreSet()) { - ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); - } else if (params.isKeyStoreSet()) { - ctx.init(kmf.getKeyManagers(), null, null); - } else { - ctx.init(null, tmf.getTrustManagers(), null); - } - - } catch (Exception e) { - throw new TTransportException("Error creating the transport", e); - } - return ctx; - } - - /** - * Lifted from Thrift-0.9.1 because it was private. Create an SSLSocket with the given factory, host:port, and timeout. - * - * @param factory - * Factory to create the socket from - * @param host - * Destination host - * @param port - * Destination port - * @param timeout - * Socket timeout - */ - private static TSocket createClient(SSLSocketFactory factory, String host, int port, int timeout) throws TTransportException { - try { - SSLSocket socket = (SSLSocket) factory.createSocket(host, port); - socket.setSoTimeout(timeout); - return new TSocket(socket); - } catch (Exception e) { - throw new TTransportException("Could not connect to " + host + " on port " + port, e); - } - } - - /** - * JDK6's SSLSocketFactory doesn't seem to properly set the protocols on the Sockets that it creates which causes an SSLv2 client hello message during - * handshake, even when only TLSv1 is enabled. This only appears to be an issue on the client sockets, not the server sockets. - * - * This class wraps the SSLSocketFactory ensuring that the Socket is properly configured. - * http://www.coderanch.com/t/637177/Security/Disabling-handshake-message-Java - * - * This class can be removed when JDK6 support is officially unsupported by Accumulo - */ - private static class ProtocolOverridingSSLSocketFactory extends SSLSocketFactory { - - private final SSLSocketFactory delegate; - private final String[] enabledProtocols; - - public ProtocolOverridingSSLSocketFactory(final SSLSocketFactory delegate, final String[] enabledProtocols) { - Preconditions.checkNotNull(enabledProtocols); - Preconditions.checkArgument(0 != enabledProtocols.length, "Expected at least one protocol"); - this.delegate = delegate; - this.enabledProtocols = enabledProtocols; - } - - @Override - public String[] getDefaultCipherSuites() { - return delegate.getDefaultCipherSuites(); - } - - @Override - public String[] getSupportedCipherSuites() { - return delegate.getSupportedCipherSuites(); - } - - @Override - public Socket createSocket(final Socket socket, final String host, final int port, final boolean autoClose) throws IOException { - final Socket underlyingSocket = delegate.createSocket(socket, host, port, autoClose); - return overrideProtocol(underlyingSocket); - } - - @Override - public Socket createSocket(final String host, final int port) throws IOException, UnknownHostException { - final Socket underlyingSocket = delegate.createSocket(host, port); - return overrideProtocol(underlyingSocket); - } - - @Override - public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort) throws IOException, UnknownHostException { - final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort); - return overrideProtocol(underlyingSocket); - } - - @Override - public Socket createSocket(final InetAddress host, final int port) throws IOException { - final Socket underlyingSocket = delegate.createSocket(host, port); - return overrideProtocol(underlyingSocket); - } - - @Override - public Socket createSocket(final InetAddress host, final int port, final InetAddress localAddress, final int localPort) throws IOException { - final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort); - return overrideProtocol(underlyingSocket); - } - - /** - * Set the {@link javax.net.ssl.SSLSocket#getEnabledProtocols() enabled protocols} to {@link #enabledProtocols} if the socket is a - * {@link SSLSocket} - * - * @param socket - * The Socket - */ - private Socket overrideProtocol(final Socket socket) { - if (socket instanceof SSLSocket) { - ((SSLSocket) socket).setEnabledProtocols(enabledProtocols); - } - return socket; - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java ---------------------------------------------------------------------- diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java index 0a7b301..4b048eb 100644 --- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java +++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java @@ -27,7 +27,7 @@ import org.apache.accumulo.core.cli.Help; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.accumulo.proxy.thrift.AccumuloProxy; -import org.apache.accumulo.server.thrift.RpcWrapper; +import org.apache.accumulo.server.rpc.RpcWrapper; import org.apache.log4j.Logger; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TCompactProtocol;