Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 705DF10B78 for ; Tue, 25 Nov 2014 22:36:46 +0000 (UTC) Received: (qmail 50744 invoked by uid 500); 25 Nov 2014 22:36:46 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 50643 invoked by uid 500); 25 Nov 2014 22:36:46 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 50385 invoked by uid 99); 25 Nov 2014 22:36:46 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Nov 2014 22:36:46 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C26BFA1AAB6; Tue, 25 Nov 2014 22:36:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Tue, 25 Nov 2014 22:36:54 -0000 Message-Id: <365b755038b445aaab84a873513e3b4f@git.apache.org> In-Reply-To: <8e88d34caa1e4ba0a2674c2ab43d0f36@git.apache.org> References: <8e88d34caa1e4ba0a2674c2ab43d0f36@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/10] accumulo git commit: ACCUMULO-3199 Internal refactor to add ClientContext ACCUMULO-3199 Internal refactor to add ClientContext This patch introduces a new ClientContext object that contains Credentials, Instance, and Configuration provided from the client API. This new object is passed around internally in place of the previous three objects. An AccumuloServerContext is also introduced, which extends the ClientContext. Together, these objects ensure the proper configuration, credentials, and everything else needed to communicate with other system components are available to any RPC-related code. These new object types also reduce the need to create multiple references to commonly used internal objects (such as HdfsZooInstance and SystemCredentials), and avoids storing information in static fields. As a side-effect, this should allow for better testing with mocked components. This fixes ACCUMULO-3252, and may lay some groundwork for ACCUMULO-2589. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/42c25faa Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/42c25faa Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/42c25faa Branch: refs/heads/master Commit: 42c25faa7bdd5c64bcec7b6e9a23d1a9bece3bf9 Parents: bfcb0ed Author: Christopher Tubbs Authored: Thu Nov 20 20:50:55 2014 -0500 Committer: Christopher Tubbs Committed: Mon Nov 24 15:19:50 2014 -0500 ---------------------------------------------------------------------- .../apache/accumulo/core/cli/ClientOpts.java | 2 +- .../accumulo/core/client/ZooKeeperInstance.java | 13 +- .../core/client/impl/BatchWriterImpl.java | 10 +- .../client/impl/ClientConfigurationHelper.java | 143 ----------- .../core/client/impl/ClientContext.java | 229 ++++++++++++++++++ .../core/client/impl/ConditionalWriterImpl.java | 48 ++-- .../core/client/impl/ConnectorImpl.java | 73 +++--- .../client/impl/InstanceOperationsImpl.java | 65 ++--- .../accumulo/core/client/impl/MasterClient.java | 45 ++-- .../client/impl/MultiTableBatchWriterImpl.java | 20 +- .../client/impl/NamespaceOperationsImpl.java | 42 ++-- .../core/client/impl/ReplicationClient.java | 52 ++-- .../client/impl/ReplicationOperationsImpl.java | 39 +-- .../core/client/impl/RootTabletLocator.java | 29 ++- .../accumulo/core/client/impl/ScannerImpl.java | 16 +- .../core/client/impl/ScannerIterator.java | 20 +- .../client/impl/SecurityOperationsImpl.java | 54 ++--- .../accumulo/core/client/impl/ServerClient.java | 37 ++- .../core/client/impl/TableOperationsImpl.java | 169 ++++++------- .../core/client/impl/TabletLocator.java | 22 +- .../core/client/impl/TabletLocatorImpl.java | 61 +++-- .../client/impl/TabletServerBatchDeleter.java | 16 +- .../client/impl/TabletServerBatchReader.java | 16 +- .../impl/TabletServerBatchReaderIterator.java | 45 ++-- .../client/impl/TabletServerBatchWriter.java | 45 ++-- .../core/client/impl/ThriftScanner.java | 52 ++-- .../core/client/impl/ThriftTransportKey.java | 5 +- .../core/client/impl/ThriftTransportPool.java | 14 +- .../core/client/impl/TimeoutTabletLocator.java | 18 +- .../accumulo/core/client/impl/Writer.java | 38 ++- .../client/mock/impl/MockTabletLocator.java | 11 +- .../core/metadata/MetadataLocationObtainer.java | 32 +-- .../core/metadata/MetadataServicer.java | 18 +- .../core/metadata/ServicerForMetadataTable.java | 7 +- .../core/metadata/ServicerForRootTable.java | 8 +- .../core/metadata/ServicerForUserTables.java | 7 +- .../core/metadata/TableMetadataServicer.java | 13 +- .../AccumuloReplicationReplayer.java | 11 +- .../apache/accumulo/core/util/ThriftUtil.java | 42 ++-- .../impl/ClientConfigurationHelperTest.java | 110 --------- .../core/client/impl/ClientContextTest.java | 109 +++++++++ .../core/client/impl/RootTabletLocatorTest.java | 13 +- .../core/client/impl/ScannerImplTest.java | 7 +- .../client/impl/TableOperationsImplTest.java | 6 +- .../core/client/impl/TabletLocatorImplTest.java | 236 ++++++++++--------- .../core/metadata/MetadataServicerTest.java | 15 +- .../ReplicationOperationsImplTest.java | 14 +- .../core/client/mapred/AbstractInputFormat.java | 9 +- .../client/mapreduce/AbstractInputFormat.java | 9 +- .../mapreduce/lib/impl/InputConfigurator.java | 10 +- .../impl/MiniAccumuloClusterImpl.java | 8 +- .../MiniAccumuloClusterStartStopTest.java | 36 ++- .../accumulo/server/AccumuloServerContext.java | 67 ++++++ .../accumulo/server/client/BulkImporter.java | 97 +++----- .../server/client/ClientServiceHandler.java | 26 +- .../accumulo/server/client/HdfsZooInstance.java | 3 +- .../server/conf/ServerConfigurationFactory.java | 7 +- .../server/conf/TableConfiguration.java | 13 +- .../apache/accumulo/server/fs/VolumeUtil.java | 18 +- .../apache/accumulo/server/init/Initialize.java | 11 +- .../accumulo/server/master/LiveTServerSet.java | 70 +++--- .../master/balancer/TableLoadBalancer.java | 3 +- .../server/master/balancer/TabletBalancer.java | 13 +- .../server/master/state/MetaDataStateStore.java | 30 +-- .../master/state/MetaDataTableScanner.java | 26 +- .../master/state/RootTabletStateStore.java | 14 +- .../server/master/state/TabletStateStore.java | 14 +- .../accumulo/server/problems/ProblemReport.java | 14 +- .../problems/ProblemReportingIterator.java | 12 +- .../server/problems/ProblemReports.java | 27 ++- .../server/replication/ReplicationUtil.java | 65 ++--- .../security/AuditedSecurityOperation.java | 28 +-- .../server/security/SecurityOperation.java | 31 +-- .../server/security/SystemCredentials.java | 16 -- .../org/apache/accumulo/server/util/Admin.java | 72 +++--- .../server/util/FindOfflineTablets.java | 23 +- .../accumulo/server/util/ListVolumesUsed.java | 16 +- .../server/util/MasterMetadataUtil.java | 43 ++-- .../accumulo/server/util/MetadataTableUtil.java | 176 +++++++------- .../accumulo/server/util/RandomizeVolumes.java | 7 +- .../util/RemoveEntriesForMissingFiles.java | 25 +- .../server/util/ReplicationTableUtil.java | 22 +- .../accumulo/server/util/TServerUtils.java | 18 +- .../server/util/VerifyTabletAssignments.java | 31 +-- .../server/client/BulkImporterTest.java | 29 ++- .../server/conf/TableConfigurationTest.java | 13 +- .../master/balancer/TableLoadBalancerTest.java | 28 ++- .../problems/ProblemReportingIteratorTest.java | 6 +- .../server/security/SystemCredentialsTest.java | 17 +- .../server/util/ReplicationTableUtilTest.java | 9 +- .../gc/GarbageCollectWriteAheadLogs.java | 40 ++-- .../accumulo/gc/SimpleGarbageCollector.java | 113 +++------ .../CloseWriteAheadLogReferences.java | 46 ++-- .../gc/GarbageCollectWriteAheadLogsTest.java | 51 ++-- .../accumulo/gc/SimpleGarbageCollectorTest.java | 18 +- .../CloseWriteAheadLogReferencesTest.java | 38 ++- .../java/org/apache/accumulo/master/Master.java | 146 +++++++----- .../master/MasterClientServiceHandler.java | 46 +--- .../accumulo/master/TabletGroupWatcher.java | 9 +- .../master/metrics/ReplicationMetrics.java | 78 +----- .../MasterReplicationCoordinator.java | 5 +- .../master/replication/ReplicationDriver.java | 2 +- .../accumulo/master/replication/WorkDriver.java | 6 +- .../accumulo/master/replication/WorkMaker.java | 11 +- .../accumulo/master/tableOps/BulkImport.java | 22 +- .../master/tableOps/CancelCompactions.java | 11 +- .../master/tableOps/ChangeTableState.java | 9 +- .../accumulo/master/tableOps/CloneTable.java | 30 +-- .../accumulo/master/tableOps/CompactRange.java | 26 +- .../master/tableOps/CreateNamespace.java | 5 +- .../accumulo/master/tableOps/CreateTable.java | 13 +- .../master/tableOps/DeleteNamespace.java | 3 +- .../accumulo/master/tableOps/DeleteTable.java | 20 +- .../accumulo/master/tableOps/ExportTable.java | 34 ++- .../accumulo/master/tableOps/ImportTable.java | 16 +- .../accumulo/master/tableOps/RenameTable.java | 10 +- .../accumulo/master/tableOps/TableRangeOp.java | 13 +- .../apache/accumulo/master/TestMergeState.java | 10 +- .../MasterReplicationCoordinatorTest.java | 4 +- .../master/replication/WorkMakerTest.java | 17 +- .../accumulo/monitor/EmbeddedWebServer.java | 2 +- .../org/apache/accumulo/monitor/Monitor.java | 37 ++- .../accumulo/monitor/servlets/BasicServlet.java | 13 +- .../accumulo/monitor/servlets/LogServlet.java | 2 +- .../monitor/servlets/MasterServlet.java | 9 +- .../monitor/servlets/OperationServlet.java | 10 +- .../monitor/servlets/ProblemServlet.java | 6 +- .../monitor/servlets/ReplicationServlet.java | 24 +- .../monitor/servlets/TServersServlet.java | 9 +- .../monitor/servlets/TablesServlet.java | 12 +- .../accumulo/monitor/servlets/VisServlet.java | 2 +- .../accumulo/monitor/servlets/XMLServlet.java | 3 +- .../accumulo/monitor/servlets/trace/Basic.java | 2 +- .../apache/accumulo/tserver/FileManager.java | 44 ++-- .../apache/accumulo/tserver/TabletServer.java | 91 ++++--- .../tserver/TabletServerResourceManager.java | 22 +- .../tserver/log/TabletServerLogger.java | 11 +- .../replication/AccumuloReplicaSystem.java | 47 ++-- .../BatchWriterReplicationReplayer.java | 14 +- .../replication/ReplicationServicerHandler.java | 35 ++- .../tserver/replication/ReplicationWorker.java | 2 +- .../accumulo/tserver/tablet/Compactor.java | 19 +- .../tserver/tablet/DatafileManager.java | 24 +- .../accumulo/tserver/tablet/MinorCompactor.java | 18 +- .../apache/accumulo/tserver/tablet/Tablet.java | 57 +++-- .../LargestFirstMemoryManagerTest.java | 28 ++- .../BatchWriterReplicationReplayerTest.java | 41 +++- .../java/org/apache/accumulo/shell/Shell.java | 17 +- .../apache/accumulo/test/GetMasterStats.java | 10 +- .../apache/accumulo/test/WrongTabletTest.java | 18 +- .../continuous/ContinuousStatsCollector.java | 10 +- .../accumulo/test/functional/ZombieTServer.java | 14 +- .../metadata/MetadataBatchScanTest.java | 11 +- .../performance/scan/CollectTabletStats.java | 18 +- .../test/performance/thrift/NullTserver.java | 23 +- .../test/randomwalk/concurrent/Shutdown.java | 10 +- .../test/randomwalk/concurrent/StartAll.java | 10 +- .../randomwalk/security/WalkingSecurity.java | 9 +- .../server/security/SystemCredentialsIT.java | 2 +- .../test/MasterRepairsDualAssignmentIT.java | 7 +- .../accumulo/test/MetaConstraintRetryIT.java | 4 +- .../accumulo/test/MultiTableBatchWriterIT.java | 12 +- .../test/TableConfigurationUpdateIT.java | 2 +- .../org/apache/accumulo/test/TotalQueuedIT.java | 12 +- .../accumulo/test/functional/AbstractMacIT.java | 4 + .../functional/BalanceAfterCommsFailureIT.java | 6 +- .../test/functional/ConfigurableMacIT.java | 7 + .../test/functional/DynamicThreadPoolsIT.java | 3 +- .../test/functional/MasterAssignmentIT.java | 9 +- .../test/functional/MetadataMaxFilesIT.java | 6 +- .../functional/SimpleBalancerFairnessIT.java | 4 +- .../accumulo/test/functional/SimpleMacIT.java | 7 +- .../test/functional/SplitRecoveryIT.java | 76 +++--- ...bageCollectorCommunicatesWithTServersIT.java | 17 +- .../UnorderedWorkAssignerReplicationIT.java | 5 + 175 files changed, 2536 insertions(+), 2542 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/42c25faa/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java index a6a5e81..f6ea934 100644 --- a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java +++ b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java @@ -211,7 +211,7 @@ public class ClientOpts extends Help { return getInstance().getConnector(principal, getToken()); } - protected ClientConfiguration getClientConfiguration() throws IllegalArgumentException { + public ClientConfiguration getClientConfiguration() throws IllegalArgumentException { if (cachedClientConfig != null) return cachedClientConfig; http://git-wip-us.apache.org/repos/asf/accumulo/blob/42c25faa/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java index 4e11a14..b12e189 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java @@ -26,6 +26,7 @@ import java.util.UUID; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.impl.ClientContext; import org.apache.accumulo.core.client.impl.ConnectorImpl; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; @@ -135,6 +136,7 @@ public class ZooKeeperInstance implements Instance { public ZooKeeperInstance(Configuration config) { this(config, new ZooCacheFactory()); } + ZooKeeperInstance(Configuration config, ZooCacheFactory zcf) { checkArgument(config != null, "config is null"); if (config instanceof ClientConfiguration) { @@ -236,7 +238,7 @@ public class ZooKeeperInstance implements Instance { @Override public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException { - return new ConnectorImpl(this, new Credentials(principal, token)); + return new ConnectorImpl(new ClientContext(this, new Credentials(principal, token), clientConf)); } @Override @@ -246,15 +248,6 @@ public class ZooKeeperInstance implements Instance { } /** - * Used for retrieving the clientConfiguration which was provided (if any); should not be considered public API. - */ - @Deprecated - public final Configuration getClientConfiguration() { - // TODO ACCUMULO-3199 - return clientConf; - } - - /** * Given a zooCache and instanceId, look up the instance name. */ public static String lookupInstanceName(ZooCache zooCache, UUID instanceId) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/42c25faa/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java index bd76a50..08f1475 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java @@ -17,26 +17,24 @@ package org.apache.accumulo.core.client.impl; import static com.google.common.base.Preconditions.checkArgument; + import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.security.Credentials; public class BatchWriterImpl implements BatchWriter { private final String table; private final TabletServerBatchWriter bw; - public BatchWriterImpl(Instance instance, Credentials credentials, String table, BatchWriterConfig config) { - checkArgument(instance != null, "instance is null"); - checkArgument(credentials != null, "credentials is null"); + public BatchWriterImpl(ClientContext context, String table, BatchWriterConfig config) { + checkArgument(context != null, "context is null"); checkArgument(table != null, "table is null"); if (config == null) config = new BatchWriterConfig(); this.table = table; - this.bw = new TabletServerBatchWriter(instance, credentials, config); + this.bw = new TabletServerBatchWriter(context, config); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/42c25faa/core/src/main/java/org/apache/accumulo/core/client/impl/ClientConfigurationHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientConfigurationHelper.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientConfigurationHelper.java deleted file mode 100644 index 603172f..0000000 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientConfigurationHelper.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.impl; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; - -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.CredentialProviderFactoryShim; -import org.apache.accumulo.core.conf.DefaultConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.commons.configuration.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class exists to get client RPC configuration available as an {@link AccumuloConfiguration} object, which is expected by the RPC layer. - *

- * This exists as a workaround for ACCUMULO-3199 (TODO). - */ -public class ClientConfigurationHelper { - private static final Logger log = LoggerFactory.getLogger(ClientConfigurationHelper.class); - - /** - * This retrieves a the RPC-related client-side configuration in the {@link ClientConfiguration} provided to - * {@link ZooKeeperInstance#ZooKeeperInstance(Configuration)}, if one was provided, and converts it to a generic {@link AccumuloConfiguration} instance, which - * is used by the RPC layer. - * - *

- * If any other {@link Instance} is provided, it will return only defaults. - * - *

- * Servers can, and should, retrieve their RPC configuration from their own configuration, which can be obtained from the ServerConfigurationFactory in the - * server module. It should NOT use this method, because it will probably not do what you expect. - * - */ - public static AccumuloConfiguration getClientRpcConfiguration(Instance instance) { - if (instance instanceof ZooKeeperInstance) { - @SuppressWarnings("deprecation") - Configuration clientConf = ((ZooKeeperInstance) instance).getClientConfiguration(); - return convertClientConfig(DefaultConfiguration.getInstance(), clientConf); - } - return DefaultConfiguration.getInstance(); - } - - public static AccumuloConfiguration convertClientConfig(final AccumuloConfiguration base, final Configuration config) { - - return new AccumuloConfiguration() { - @Override - public String get(Property property) { - final String key = property.getKey(); - - // Attempt to load sensitive properties from a CredentialProvider, if configured - if (property.isSensitive()) { - org.apache.hadoop.conf.Configuration hadoopConf = getHadoopConfiguration(); - if (null != hadoopConf) { - try { - char[] value = CredentialProviderFactoryShim.getValueFromCredentialProvider(hadoopConf, key); - if (null != value) { - log.trace("Loaded sensitive value for {} from CredentialProvider", key); - return new String(value); - } else { - log.trace("Tried to load sensitive value for {} from CredentialProvider, but none was found", key); - } - } catch (IOException e) { - log.warn("Failed to extract sensitive property ({}) from Hadoop CredentialProvider, falling back to base AccumuloConfiguration", key, e); - } - } - } - if (config.containsKey(key)) - return config.getString(key); - else - return base.get(property); - } - - @Override - public void getProperties(Map props, PropertyFilter filter) { - base.getProperties(props, filter); - - @SuppressWarnings("unchecked") - Iterator keyIter = config.getKeys(); - while (keyIter.hasNext()) { - String key = keyIter.next(); - if (filter.accept(key)) - props.put(key, config.getString(key)); - } - - // Attempt to load sensitive properties from a CredentialProvider, if configured - org.apache.hadoop.conf.Configuration hadoopConf = getHadoopConfiguration(); - if (null != hadoopConf) { - try { - for (String key : CredentialProviderFactoryShim.getKeys(hadoopConf)) { - if (!Property.isValidPropertyKey(key) || !Property.isSensitive(key)) { - continue; - } - - if (filter.accept(key)) { - char[] value = CredentialProviderFactoryShim.getValueFromCredentialProvider(hadoopConf, key); - if (null != value) { - props.put(key, new String(value)); - } - } - } - } catch (IOException e) { - log.warn("Failed to extract sensitive properties from Hadoop CredentialProvider, falling back to accumulo-site.xml", e); - } - } - } - - private org.apache.hadoop.conf.Configuration getHadoopConfiguration() { - String credProviderPaths = config.getString(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey()); - if (null != credProviderPaths && !credProviderPaths.isEmpty()) { - org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); - hadoopConf.set(CredentialProviderFactoryShim.CREDENTIAL_PROVIDER_PATH, credProviderPaths); - return hadoopConf; - } - - log.trace("Did not find credential provider configuration in ClientConfiguration"); - - return null; - } - }; - - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/42c25faa/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java new file mode 100644 index 0000000..8fd12f2 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.CredentialProviderFactoryShim; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.util.SslConnectionParams; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class represents any essential configuration and credentials needed to initiate RPC operations throughout the code. It is intended to represent a shared + * object that contains these things from when the client was first constructed. It is not public API, and is only an internal representation of the context in + * which a client is executing RPCs. If additional parameters are added to the public API that need to be used in the internals of Accumulo, they should be + * added to this object for later retrieval, rather than as a separate parameter. Any state in this object should be available at the time of its construction. + */ +public class ClientContext { + + private static final Logger log = LoggerFactory.getLogger(ClientContext.class); + + private final Instance inst; + private Credentials creds; + private final AccumuloConfiguration rpcConf; + private Connector conn; + + /** + * Instantiate a client context + */ + public ClientContext(Instance instance, Credentials credentials, ClientConfiguration clientConf) { + this(instance, credentials, convertClientConfig(checkNotNull(clientConf, "clientConf is null"))); + } + + /** + * Instantiate a client context from an existing {@link AccumuloConfiguration}. This is primarily intended for subclasses and testing. + */ + public ClientContext(Instance instance, Credentials credentials, AccumuloConfiguration serverConf) { + inst = checkNotNull(instance, "instance is null"); + creds = checkNotNull(credentials, "credentials is null"); + rpcConf = checkNotNull(serverConf, "serverConf is null"); + } + + /** + * Retrieve the instance used to construct this context + */ + public Instance getInstance() { + return inst; + } + + /** + * Retrieve the credentials used to construct this context + */ + public synchronized Credentials getCredentials() { + return creds; + } + + /** + * Update the credentials in the current context after changing the current user's password or other auth token + */ + public synchronized void setCredentials(Credentials newCredentials) { + checkArgument(newCredentials != null, "newCredentials is null"); + creds = newCredentials; + } + + /** + * Retrieve the configuration used to construct this context + */ + public AccumuloConfiguration getConfiguration() { + return rpcConf; + } + + /** + * Retrieve the universal RPC client timeout from the configuration + */ + public long getClientTimeoutInMillis() { + return getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT); + } + + /** + * Retrieve SSL/TLS configuration to initiate an RPC connection to a server + */ + public SslConnectionParams getClientSslParams() { + return SslConnectionParams.forClient(getConfiguration()); + } + + /** + * Retrieve a connector + */ + public Connector getConnector() throws AccumuloException, AccumuloSecurityException { + // avoid making more connectors than necessary + if (conn == null) { + if (getInstance() instanceof ZooKeeperInstance) { + // reuse existing context + conn = new ConnectorImpl(this); + } else { + Credentials c = getCredentials(); + conn = getInstance().getConnector(c.getPrincipal(), c.getToken()); + } + } + return conn; + } + + /** + * Serialize the credentials just before initiating the RPC call + */ + public TCredentials rpcCreds() { + return getCredentials().toThrift(getInstance()); + } + + /** + * A utility method for converting client configuration to a standard configuration object for use internally. + * + * @param config + * the original {@link ClientConfiguration} + * @return the client configuration presented in the form of an {@link AccumuloConfiguration} + */ + public static AccumuloConfiguration convertClientConfig(final Configuration config) { + + final AccumuloConfiguration defaults = DefaultConfiguration.getInstance(); + + return new AccumuloConfiguration() { + @Override + public String get(Property property) { + final String key = property.getKey(); + + // Attempt to load sensitive properties from a CredentialProvider, if configured + if (property.isSensitive()) { + org.apache.hadoop.conf.Configuration hadoopConf = getHadoopConfiguration(); + if (null != hadoopConf) { + try { + char[] value = CredentialProviderFactoryShim.getValueFromCredentialProvider(hadoopConf, key); + if (null != value) { + log.trace("Loaded sensitive value for {} from CredentialProvider", key); + return new String(value); + } else { + log.trace("Tried to load sensitive value for {} from CredentialProvider, but none was found", key); + } + } catch (IOException e) { + log.warn("Failed to extract sensitive property ({}) from Hadoop CredentialProvider, falling back to base AccumuloConfiguration", key, e); + } + } + } + if (config.containsKey(key)) + return config.getString(key); + else + return defaults.get(property); + } + + @Override + public void getProperties(Map props, PropertyFilter filter) { + defaults.getProperties(props, filter); + + Iterator keyIter = config.getKeys(); + while (keyIter.hasNext()) { + String key = keyIter.next().toString(); + if (filter.accept(key)) + props.put(key, config.getString(key)); + } + + // Attempt to load sensitive properties from a CredentialProvider, if configured + org.apache.hadoop.conf.Configuration hadoopConf = getHadoopConfiguration(); + if (null != hadoopConf) { + try { + for (String key : CredentialProviderFactoryShim.getKeys(hadoopConf)) { + if (!Property.isValidPropertyKey(key) || !Property.isSensitive(key)) { + continue; + } + + if (filter.accept(key)) { + char[] value = CredentialProviderFactoryShim.getValueFromCredentialProvider(hadoopConf, key); + if (null != value) { + props.put(key, new String(value)); + } + } + } + } catch (IOException e) { + log.warn("Failed to extract sensitive properties from Hadoop CredentialProvider, falling back to accumulo-site.xml", e); + } + } + } + + private org.apache.hadoop.conf.Configuration getHadoopConfiguration() { + String credProviderPaths = config.getString(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey()); + if (null != credProviderPaths && !credProviderPaths.isEmpty()) { + org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); + hadoopConf.set(CredentialProviderFactoryShim.CREDENTIAL_PROVIDER_PATH, credProviderPaths); + return hadoopConf; + } + + log.trace("Did not find credential provider configuration in ClientConfiguration"); + + return null; + } + }; + + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/42c25faa/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java index cb0b90e..c831915 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java @@ -48,8 +48,6 @@ import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.TimedOutException; import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Condition; import org.apache.accumulo.core.data.ConditionalMutation; @@ -64,7 +62,6 @@ import org.apache.accumulo.core.data.thrift.TMutation; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.security.VisibilityEvaluator; import org.apache.accumulo.core.security.VisibilityParseException; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; @@ -106,8 +103,7 @@ class ConditionalWriterImpl implements ConditionalWriter { private VisibilityEvaluator ve; @SuppressWarnings("unchecked") private Map cache = Collections.synchronizedMap(new LRUMap(1000)); - private Instance instance; - private Credentials credentials; + private final ClientContext context; private TabletLocator locator; private String tableId; private long timeout; @@ -284,13 +280,13 @@ class ConditionalWriterImpl implements ConditionalWriter { Map> binnedMutations = new HashMap>(); try { - locator.binMutations(credentials, mutations, binnedMutations, failures); + locator.binMutations(context, mutations, binnedMutations, failures); if (failures.size() == mutations.size()) - if (!Tables.exists(instance, tableId)) + if (!Tables.exists(context.getInstance(), tableId)) throw new TableDeletedException(tableId); - else if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) - throw new TableOfflineException(instance, tableId); + else if (Tables.getTableState(context.getInstance(), tableId) == TableState.OFFLINE) + throw new TableOfflineException(context.getInstance(), tableId); } catch (Exception e) { for (QCMutation qcm : mutations) @@ -373,13 +369,12 @@ class ConditionalWriterImpl implements ConditionalWriter { } } - ConditionalWriterImpl(Instance instance, Credentials credentials, String tableId, ConditionalWriterConfig config) { - this.instance = instance; - this.credentials = credentials; + ConditionalWriterImpl(ClientContext context, String tableId, ConditionalWriterConfig config) { + this.context = context; this.auths = config.getAuthorizations(); this.ve = new VisibilityEvaluator(config.getAuthorizations()); this.threadPool = new ScheduledThreadPoolExecutor(config.getMaxWriteThreads()); - this.locator = TabletLocator.getLocator(instance, new Text(tableId)); + this.locator = TabletLocator.getLocator(context, new Text(tableId)); this.serverQueues = new HashMap(); this.tableId = tableId; this.timeout = config.getTimeout(TimeUnit.MILLISECONDS); @@ -500,8 +495,8 @@ class ConditionalWriterImpl implements ConditionalWriter { } } - TConditionalSession tcs = client.startConditionalUpdate(tinfo, credentials.toThrift(instance), ByteBufferUtil.toByteBuffers(auths.getAuthorizations()), - tableId, DurabilityImpl.toThrift(durability)); + TConditionalSession tcs = client.startConditionalUpdate(tinfo, context.rpcCreds(), ByteBufferUtil.toByteBuffers(auths.getAuthorizations()), tableId, + DurabilityImpl.toThrift(durability)); synchronized (cachedSessionIDs) { SessionID sid = new SessionID(); @@ -547,11 +542,10 @@ class ConditionalWriterImpl implements ConditionalWriter { private TabletClientService.Iface getClient(String location) throws TTransportException { TabletClientService.Iface client; - AccumuloConfiguration rpcConfig = ClientConfigurationHelper.getClientRpcConfiguration(instance); - if (timeout < rpcConfig.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)) - client = ThriftUtil.getTServerClient(location, rpcConfig, timeout); + if (timeout < context.getClientTimeoutInMillis()) + client = ThriftUtil.getTServerClient(location, context, timeout); else - client = ThriftUtil.getTServerClient(location, rpcConfig); + client = ThriftUtil.getTServerClient(location, context); return client; } @@ -607,16 +601,16 @@ class ConditionalWriterImpl implements ConditionalWriter { queueRetry(ignored, location); } catch (ThriftSecurityException tse) { - AccumuloSecurityException ase = new AccumuloSecurityException(credentials.getPrincipal(), tse.getCode(), Tables.getPrintableTableInfoFromId(instance, - tableId), tse); + AccumuloSecurityException ase = new AccumuloSecurityException(context.getCredentials().getPrincipal(), tse.getCode(), Tables.getPrintableTableInfoFromId( + context.getInstance(), tableId), tse); queueException(location, cmidToCm, ase); } catch (TTransportException e) { - locator.invalidateCache(location); + locator.invalidateCache(context.getInstance(), location); invalidateSession(location, mutations, cmidToCm, sessionId); } catch (TApplicationException tae) { queueException(location, cmidToCm, new AccumuloServerException(location, tae)); } catch (TException e) { - locator.invalidateCache(location); + locator.invalidateCache(context.getInstance(), location); invalidateSession(location, mutations, cmidToCm, sessionId); } catch (Exception e) { queueException(location, cmidToCm, e); @@ -659,13 +653,13 @@ class ConditionalWriterImpl implements ConditionalWriter { * * If a conditional mutation is taking a long time to process, then this method will wait for it to finish... unless this exceeds timeout. */ - private void invalidateSession(SessionID sessionId, String location) throws AccumuloException, - AccumuloSecurityException, TableNotFoundException { + private void invalidateSession(SessionID sessionId, String location) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { long sleepTime = 50; long startTime = System.currentTimeMillis(); + Instance instance = context.getInstance(); LockID lid = new LockID(ZooUtil.getRoot(instance) + Constants.ZTSERVERS, sessionId.lockId); ZooCacheFactory zcf = new ZooCacheFactory(); @@ -673,7 +667,7 @@ class ConditionalWriterImpl implements ConditionalWriter { if (!ZooLock.isLockHeld(zcf.getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()), lid)) { // ACCUMULO-1152 added a tserver lock check to the tablet location cache, so this invalidation prevents future attempts to contact the // tserver even its gone zombie and is still running w/o a lock - locator.invalidateCache(location); + locator.invalidateCache(context.getInstance(), location); return; } @@ -685,7 +679,7 @@ class ConditionalWriterImpl implements ConditionalWriter { } catch (TApplicationException tae) { throw new AccumuloServerException(location, tae); } catch (TException e) { - locator.invalidateCache(location); + locator.invalidateCache(context.getInstance(), location); } if ((System.currentTimeMillis() - startTime) + sleepTime > timeout) http://git-wip-us.apache.org/repos/asf/accumulo/blob/42c25faa/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java index 2d077d9..f481cc3 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java @@ -16,10 +16,10 @@ */ package org.apache.accumulo.core.client.impl; -import java.util.concurrent.TimeUnit; - import static com.google.common.base.Preconditions.checkArgument; +import java.util.concurrent.TimeUnit; + import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchDeleter; @@ -43,60 +43,55 @@ import org.apache.accumulo.core.client.impl.thrift.ClientService; import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.trace.Tracer; public class ConnectorImpl extends Connector { - private final Instance instance; - private final Credentials credentials; + private final ClientContext context; private SecurityOperations secops = null; - private TableOperations tableops = null; + private TableOperationsImpl tableops = null; private NamespaceOperations namespaceops = null; private InstanceOperations instanceops = null; private ReplicationOperations replicationops = null; - public ConnectorImpl(final Instance instance, Credentials cred) throws AccumuloException, AccumuloSecurityException { - checkArgument(instance != null, "instance is null"); - checkArgument(cred != null, "cred is null"); - if (cred.getToken().isDestroyed()) - throw new AccumuloSecurityException(cred.getPrincipal(), SecurityErrorCode.TOKEN_EXPIRED); + public ConnectorImpl(final ClientContext context) throws AccumuloException, AccumuloSecurityException { + checkArgument(context != null, "context is null"); + if (context.getCredentials().getToken().isDestroyed()) + throw new AccumuloSecurityException(context.getCredentials().getPrincipal(), SecurityErrorCode.TOKEN_EXPIRED); - this.instance = instance; - - this.credentials = cred; + this.context = context; // Skip fail fast for system services; string literal for class name, to avoid - if (!"org.apache.accumulo.server.security.SystemCredentials$SystemToken".equals(cred.getToken().getClass().getName())) { - ServerClient.execute(instance, new ClientExec() { + if (!"org.apache.accumulo.server.security.SystemCredentials$SystemToken".equals(context.getCredentials().getToken().getClass().getName())) { + ServerClient.execute(context, new ClientExec() { @Override public void execute(ClientService.Client iface) throws Exception { - if (!iface.authenticate(Tracer.traceInfo(), credentials.toThrift(instance))) + if (!iface.authenticate(Tracer.traceInfo(), context.rpcCreds())) throw new AccumuloSecurityException("Authentication failed, access denied", SecurityErrorCode.BAD_CREDENTIALS); } }); } - this.tableops = new TableOperationsImpl(instance, credentials); - this.namespaceops = new NamespaceOperationsImpl(instance, credentials, (TableOperationsImpl) tableops); + this.tableops = new TableOperationsImpl(context); + this.namespaceops = new NamespaceOperationsImpl(context, tableops); } private String getTableId(String tableName) throws TableNotFoundException { - String tableId = Tables.getTableId(instance, tableName); - if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) - throw new TableOfflineException(instance, tableId); + String tableId = Tables.getTableId(context.getInstance(), tableName); + if (Tables.getTableState(context.getInstance(), tableId) == TableState.OFFLINE) + throw new TableOfflineException(context.getInstance(), tableId); return tableId; } @Override public Instance getInstance() { - return instance; + return context.getInstance(); } @Override public BatchScanner createBatchScanner(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException { checkArgument(tableName != null, "tableName is null"); checkArgument(authorizations != null, "authorizations is null"); - return new TabletServerBatchReader(instance, credentials, getTableId(tableName), authorizations, numQueryThreads); + return new TabletServerBatchReader(context, getTableId(tableName), authorizations, numQueryThreads); } @Deprecated @@ -105,8 +100,8 @@ public class ConnectorImpl extends Connector { int maxWriteThreads) throws TableNotFoundException { checkArgument(tableName != null, "tableName is null"); checkArgument(authorizations != null, "authorizations is null"); - return new TabletServerBatchDeleter(instance, credentials, getTableId(tableName), authorizations, numQueryThreads, new BatchWriterConfig() - .setMaxMemory(maxMemory).setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads)); + return new TabletServerBatchDeleter(context, getTableId(tableName), authorizations, numQueryThreads, new BatchWriterConfig().setMaxMemory(maxMemory) + .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads)); } @Override @@ -114,50 +109,50 @@ public class ConnectorImpl extends Connector { throws TableNotFoundException { checkArgument(tableName != null, "tableName is null"); checkArgument(authorizations != null, "authorizations is null"); - return new TabletServerBatchDeleter(instance, credentials, getTableId(tableName), authorizations, numQueryThreads, config); + return new TabletServerBatchDeleter(context, getTableId(tableName), authorizations, numQueryThreads, config); } @Deprecated @Override public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency, int maxWriteThreads) throws TableNotFoundException { checkArgument(tableName != null, "tableName is null"); - return new BatchWriterImpl(instance, credentials, getTableId(tableName), new BatchWriterConfig().setMaxMemory(maxMemory) - .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads)); + return new BatchWriterImpl(context, getTableId(tableName), new BatchWriterConfig().setMaxMemory(maxMemory).setMaxLatency(maxLatency, TimeUnit.MILLISECONDS) + .setMaxWriteThreads(maxWriteThreads)); } @Override public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) throws TableNotFoundException { checkArgument(tableName != null, "tableName is null"); - return new BatchWriterImpl(instance, credentials, getTableId(tableName), config); + return new BatchWriterImpl(context, getTableId(tableName), config); } @Deprecated @Override public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads) { - return new MultiTableBatchWriterImpl(instance, credentials, new BatchWriterConfig().setMaxMemory(maxMemory) - .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads)); + return new MultiTableBatchWriterImpl(context, new BatchWriterConfig().setMaxMemory(maxMemory).setMaxLatency(maxLatency, TimeUnit.MILLISECONDS) + .setMaxWriteThreads(maxWriteThreads)); } @Override public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) { - return new MultiTableBatchWriterImpl(instance, credentials, config); + return new MultiTableBatchWriterImpl(context, config); } @Override public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException { - return new ConditionalWriterImpl(instance, credentials, getTableId(tableName), config); + return new ConditionalWriterImpl(context, getTableId(tableName), config); } @Override public Scanner createScanner(String tableName, Authorizations authorizations) throws TableNotFoundException { checkArgument(tableName != null, "tableName is null"); checkArgument(authorizations != null, "authorizations is null"); - return new ScannerImpl(instance, credentials, getTableId(tableName), authorizations); + return new ScannerImpl(context, getTableId(tableName), authorizations); } @Override public String whoami() { - return credentials.getPrincipal(); + return context.getCredentials().getPrincipal(); } @Override @@ -173,7 +168,7 @@ public class ConnectorImpl extends Connector { @Override public synchronized SecurityOperations securityOperations() { if (secops == null) - secops = new SecurityOperationsImpl(instance, credentials); + secops = new SecurityOperationsImpl(context); return secops; } @@ -181,7 +176,7 @@ public class ConnectorImpl extends Connector { @Override public synchronized InstanceOperations instanceOperations() { if (instanceops == null) - instanceops = new InstanceOperationsImpl(instance, credentials); + instanceops = new InstanceOperationsImpl(context); return instanceops; } @@ -189,7 +184,7 @@ public class ConnectorImpl extends Connector { @Override public synchronized ReplicationOperations replicationOperations() { if (null == replicationops) { - replicationops = new ReplicationOperationsImpl(instance, credentials); + replicationops = new ReplicationOperationsImpl(context); } return replicationops; http://git-wip-us.apache.org/repos/asf/accumulo/blob/42c25faa/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java index 9848612..4e74069 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java @@ -28,16 +28,14 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.ActiveCompaction; import org.apache.accumulo.core.client.admin.ActiveScan; import org.apache.accumulo.core.client.admin.InstanceOperations; -import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.impl.thrift.ClientService; import org.apache.accumulo.core.client.impl.thrift.ConfigurationType; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; -import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.master.thrift.MasterClientService; -import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; import org.apache.accumulo.core.trace.Tracer; @@ -54,32 +52,21 @@ import org.apache.thrift.transport.TTransportException; * Provides a class for administering the accumulo instance */ public class InstanceOperationsImpl implements InstanceOperations { - private Instance instance; - private Credentials credentials; - private AccumuloConfiguration rpcConfig; - - /** - * @param instance - * the connection information for this instance - * @param credentials - * the Credential, containing principal and Authentication Token - */ - public InstanceOperationsImpl(Instance instance, Credentials credentials) { - checkArgument(instance != null, "instance is null"); - checkArgument(credentials != null, "credentials is null"); - this.instance = instance; - this.rpcConfig = ClientConfigurationHelper.getClientRpcConfiguration(instance); - this.credentials = credentials; + private final ClientContext context; + + public InstanceOperationsImpl(ClientContext context) { + checkArgument(context != null, "context is null"); + this.context = context; } @Override public void setProperty(final String property, final String value) throws AccumuloException, AccumuloSecurityException { checkArgument(property != null, "property is null"); checkArgument(value != null, "value is null"); - MasterClient.execute(instance, new ClientExec() { + MasterClient.execute(context, new ClientExec() { @Override public void execute(MasterClientService.Client client) throws Exception { - client.setSystemProperty(Tracer.traceInfo(), credentials.toThrift(instance), property, value); + client.setSystemProperty(Tracer.traceInfo(), context.rpcCreds(), property, value); } }); } @@ -87,36 +74,37 @@ public class InstanceOperationsImpl implements InstanceOperations { @Override public void removeProperty(final String property) throws AccumuloException, AccumuloSecurityException { checkArgument(property != null, "property is null"); - MasterClient.execute(instance, new ClientExec() { + MasterClient.execute(context, new ClientExec() { @Override public void execute(MasterClientService.Client client) throws Exception { - client.removeSystemProperty(Tracer.traceInfo(), credentials.toThrift(instance), property); + client.removeSystemProperty(Tracer.traceInfo(), context.rpcCreds(), property); } }); } @Override public Map getSystemConfiguration() throws AccumuloException, AccumuloSecurityException { - return ServerClient.execute(instance, new ClientExecReturn,ClientService.Client>() { + return ServerClient.execute(context, new ClientExecReturn,ClientService.Client>() { @Override public Map execute(ClientService.Client client) throws Exception { - return client.getConfiguration(Tracer.traceInfo(), credentials.toThrift(instance), ConfigurationType.CURRENT); + return client.getConfiguration(Tracer.traceInfo(), context.rpcCreds(), ConfigurationType.CURRENT); } }); } @Override public Map getSiteConfiguration() throws AccumuloException, AccumuloSecurityException { - return ServerClient.execute(instance, new ClientExecReturn,ClientService.Client>() { + return ServerClient.execute(context, new ClientExecReturn,ClientService.Client>() { @Override public Map execute(ClientService.Client client) throws Exception { - return client.getConfiguration(Tracer.traceInfo(), credentials.toThrift(instance), ConfigurationType.SITE); + return client.getConfiguration(Tracer.traceInfo(), context.rpcCreds(), ConfigurationType.SITE); } }); } @Override public List getTabletServers() { + Instance instance = context.getInstance(); ZooCache cache = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; List results = new ArrayList(); @@ -138,12 +126,12 @@ public class InstanceOperationsImpl implements InstanceOperations { public List getActiveScans(String tserver) throws AccumuloException, AccumuloSecurityException { Client client = null; try { - client = ThriftUtil.getTServerClient(tserver, rpcConfig); + client = ThriftUtil.getTServerClient(tserver, context); List as = new ArrayList(); - for (org.apache.accumulo.core.tabletserver.thrift.ActiveScan activeScan : client.getActiveScans(Tracer.traceInfo(), credentials.toThrift(instance))) { + for (org.apache.accumulo.core.tabletserver.thrift.ActiveScan activeScan : client.getActiveScans(Tracer.traceInfo(), context.rpcCreds())) { try { - as.add(new ActiveScanImpl(instance, activeScan)); + as.add(new ActiveScanImpl(context.getInstance(), activeScan)); } catch (TableNotFoundException e) { throw new AccumuloException(e); } @@ -163,10 +151,10 @@ public class InstanceOperationsImpl implements InstanceOperations { @Override public boolean testClassLoad(final String className, final String asTypeName) throws AccumuloException, AccumuloSecurityException { - return ServerClient.execute(instance, new ClientExecReturn() { + return ServerClient.execute(context, new ClientExecReturn() { @Override public Boolean execute(ClientService.Client client) throws Exception { - return client.checkClass(Tracer.traceInfo(), credentials.toThrift(instance), className, asTypeName); + return client.checkClass(Tracer.traceInfo(), context.rpcCreds(), className, asTypeName); } }); } @@ -175,12 +163,11 @@ public class InstanceOperationsImpl implements InstanceOperations { public List getActiveCompactions(String tserver) throws AccumuloException, AccumuloSecurityException { Client client = null; try { - client = ThriftUtil.getTServerClient(tserver, rpcConfig); + client = ThriftUtil.getTServerClient(tserver, context); List as = new ArrayList(); - for (org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction activeCompaction : client.getActiveCompactions(Tracer.traceInfo(), - credentials.toThrift(instance))) { - as.add(new ActiveCompactionImpl(instance, activeCompaction)); + for (org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction activeCompaction : client.getActiveCompactions(Tracer.traceInfo(), context.rpcCreds())) { + as.add(new ActiveCompactionImpl(context.getInstance(), activeCompaction)); } return as; } catch (TTransportException e) { @@ -199,9 +186,9 @@ public class InstanceOperationsImpl implements InstanceOperations { public void ping(String tserver) throws AccumuloException { TTransport transport = null; try { - transport = ThriftUtil.createTransport(AddressUtil.parseAddress(tserver, false), rpcConfig); + transport = ThriftUtil.createTransport(AddressUtil.parseAddress(tserver, false), context); TabletClientService.Client client = ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport); - client.getTabletServerStatus(Tracer.traceInfo(), credentials.toThrift(instance)); + client.getTabletServerStatus(Tracer.traceInfo(), context.rpcCreds()); } catch (TTransportException e) { throw new AccumuloException(e); } catch (ThriftSecurityException e) { @@ -218,7 +205,7 @@ public class InstanceOperationsImpl implements InstanceOperations { @Override public void waitForBalance() throws AccumuloException { try { - MasterClient.execute(instance, new ClientExec() { + MasterClient.execute(context, new ClientExec() { @Override public void execute(MasterClientService.Client client) throws Exception { client.waitForBalance(Tracer.traceInfo()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/42c25faa/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java index 53f178a..092eec6 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java @@ -23,7 +23,6 @@ import java.util.List; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.NamespaceNotFoundException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; @@ -38,21 +37,20 @@ import org.apache.thrift.transport.TTransportException; public class MasterClient { private static final Logger log = Logger.getLogger(MasterClient.class); - public static MasterClientService.Client getConnectionWithRetry(Instance instance) { - checkArgument(instance != null, "instance is null"); - + public static MasterClientService.Client getConnectionWithRetry(ClientContext context) { while (true) { - MasterClientService.Client result = getConnection(instance); + MasterClientService.Client result = getConnection(context); if (result != null) return result; UtilWaitThread.sleep(250); } - } - public static MasterClientService.Client getConnection(Instance instance) { - List locations = instance.getMasterLocations(); + public static MasterClientService.Client getConnection(ClientContext context) { + checkArgument(context != null, "context is null"); + + List locations = context.getInstance().getMasterLocations(); if (locations.size() == 0) { log.debug("No masters..."); @@ -65,8 +63,7 @@ public class MasterClient { try { // Master requests can take a long time: don't ever time out - MasterClientService.Client client = ThriftUtil.getClientNoTimeout(new MasterClientService.Client.Factory(), master, - ClientConfigurationHelper.getClientRpcConfiguration(instance)); + MasterClientService.Client client = ThriftUtil.getClientNoTimeout(new MasterClientService.Client.Factory(), master, context); return client; } catch (TTransportException tte) { if (tte.getCause().getClass().equals(UnknownHostException.class)) { @@ -87,12 +84,12 @@ public class MasterClient { } } - public static T execute(Instance instance, ClientExecReturn exec) throws AccumuloException, AccumuloSecurityException, - TableNotFoundException { + public static T execute(ClientContext context, ClientExecReturn exec) throws AccumuloException, + AccumuloSecurityException, TableNotFoundException { MasterClientService.Client client = null; while (true) { try { - client = getConnectionWithRetry(instance); + client = getConnectionWithRetry(context); return exec.execute(client); } catch (TTransportException tte) { log.debug("MasterClient request failed, retrying ... ", tte); @@ -119,12 +116,12 @@ public class MasterClient { } } - public static void executeGeneric(Instance instance, ClientExec exec) throws AccumuloException, AccumuloSecurityException, - TableNotFoundException { + public static void executeGeneric(ClientContext context, ClientExec exec) throws AccumuloException, + AccumuloSecurityException, TableNotFoundException { MasterClientService.Client client = null; while (true) { try { - client = getConnectionWithRetry(instance); + client = getConnectionWithRetry(context); exec.execute(client); break; } catch (TTransportException tte) { @@ -152,24 +149,24 @@ public class MasterClient { } } - public static void executeTable(Instance instance, ClientExec exec) throws AccumuloException, AccumuloSecurityException, - TableNotFoundException { - executeGeneric(instance, exec); + public static void executeTable(ClientContext context, ClientExec exec) throws AccumuloException, + AccumuloSecurityException, TableNotFoundException { + executeGeneric(context, exec); } - public static void executeNamespace(Instance instance, ClientExec exec) throws AccumuloException, AccumuloSecurityException, - NamespaceNotFoundException { + public static void executeNamespace(ClientContext context, ClientExec exec) throws AccumuloException, + AccumuloSecurityException, NamespaceNotFoundException { try { - executeGeneric(instance, exec); + executeGeneric(context, exec); } catch (TableNotFoundException e) { if (e.getCause() instanceof NamespaceNotFoundException) throw (NamespaceNotFoundException) e.getCause(); } } - public static void execute(Instance instance, ClientExec exec) throws AccumuloException, AccumuloSecurityException { + public static void execute(ClientContext context, ClientExec exec) throws AccumuloException, AccumuloSecurityException { try { - executeGeneric(instance, exec); + executeGeneric(context, exec); } catch (TableNotFoundException e) { throw new AssertionError(e); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/42c25faa/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java index b09582a..ace8701 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java @@ -17,6 +17,7 @@ package org.apache.accumulo.core.client.impl; import static com.google.common.base.Preconditions.checkArgument; + import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -34,7 +35,6 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.security.Credentials; import org.apache.log4j.Logger; import com.google.common.cache.CacheBuilder; @@ -88,6 +88,7 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { @Override public String load(String tableName) throws Exception { + Instance instance = context.getInstance(); String tableId = Tables.getNameToIdMap(instance).get(tableName); if (tableId == null) @@ -103,20 +104,19 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { private TabletServerBatchWriter bw; private ConcurrentHashMap tableWriters; - private Instance instance; + private final ClientContext context; private final LoadingCache nameToIdCache; - public MultiTableBatchWriterImpl(Instance instance, Credentials credentials, BatchWriterConfig config) { - this(instance, credentials, config, DEFAULT_CACHE_TIME, DEFAULT_CACHE_TIME_UNIT); + public MultiTableBatchWriterImpl(ClientContext context, BatchWriterConfig config) { + this(context, config, DEFAULT_CACHE_TIME, DEFAULT_CACHE_TIME_UNIT); } - public MultiTableBatchWriterImpl(Instance instance, Credentials credentials, BatchWriterConfig config, long cacheTime, TimeUnit cacheTimeUnit) { - checkArgument(instance != null, "instance is null"); - checkArgument(credentials != null, "credentials is null"); + public MultiTableBatchWriterImpl(ClientContext context, BatchWriterConfig config, long cacheTime, TimeUnit cacheTimeUnit) { + checkArgument(context != null, "context is null"); checkArgument(config != null, "config is null"); checkArgument(cacheTimeUnit != null, "cacheTimeUnit is null"); - this.instance = instance; - this.bw = new TabletServerBatchWriter(instance, credentials, config); + this.context = context; + this.bw = new TabletServerBatchWriter(context, config); tableWriters = new ConcurrentHashMap(); this.closed = new AtomicBoolean(false); this.cacheLastState = new AtomicLong(0); @@ -126,10 +126,12 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { .build(new TableNameToIdLoader()); } + @Override public boolean isClosed() { return this.closed.get(); } + @Override public void close() throws MutationsRejectedException { this.closed.set(true); bw.close(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/42c25faa/core/src/main/java/org/apache/accumulo/core/client/impl/NamespaceOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/NamespaceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/NamespaceOperationsImpl.java index f756ef1..0b6330d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/NamespaceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/NamespaceOperationsImpl.java @@ -17,6 +17,7 @@ package org.apache.accumulo.core.client.impl; import static com.google.common.base.Preconditions.checkArgument; + import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; @@ -30,14 +31,13 @@ import java.util.TreeSet; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.NamespaceExistsException; import org.apache.accumulo.core.client.NamespaceNotEmptyException; import org.apache.accumulo.core.client.NamespaceNotFoundException; -import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.impl.thrift.ClientService; import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; @@ -54,24 +54,21 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; public class NamespaceOperationsImpl extends NamespaceOperationsHelper { - private Instance instance; - private Credentials credentials; + private final ClientContext context; private TableOperationsImpl tableOps; private static final Logger log = Logger.getLogger(TableOperations.class); - public NamespaceOperationsImpl(Instance instance, Credentials credentials, TableOperationsImpl tableOps) { - checkArgument(instance != null, "instance is null"); - checkArgument(credentials != null, "credentials is null"); - this.instance = instance; - this.credentials = credentials; + public NamespaceOperationsImpl(ClientContext context, TableOperationsImpl tableOps) { + checkArgument(context != null, "context is null"); + this.context = context; this.tableOps = tableOps; } @Override public SortedSet list() { OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Fetching list of namespaces..."); - TreeSet namespaces = new TreeSet(Namespaces.getNameToIdMap(instance).keySet()); + TreeSet namespaces = new TreeSet(Namespaces.getNameToIdMap(context.getInstance()).keySet()); opTimer.stop("Fetched " + namespaces.size() + " namespaces in %DURATION%"); return namespaces; } @@ -81,7 +78,7 @@ public class NamespaceOperationsImpl extends NamespaceOperationsHelper { checkArgument(namespace != null, "namespace is null"); OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Checking if namespace " + namespace + " exists..."); - boolean exists = Namespaces.getNameToIdMap(instance).containsKey(namespace); + boolean exists = Namespaces.getNameToIdMap(context.getInstance()).containsKey(namespace); opTimer.stop("Checked existance of " + exists + " in %DURATION%"); return exists; } @@ -101,14 +98,15 @@ public class NamespaceOperationsImpl extends NamespaceOperationsHelper { @Override public void delete(String namespace) throws AccumuloException, AccumuloSecurityException, NamespaceNotFoundException, NamespaceNotEmptyException { checkArgument(namespace != null, "namespace is null"); - String namespaceId = Namespaces.getNamespaceId(instance, namespace); + String namespaceId = Namespaces.getNamespaceId(context.getInstance(), namespace); if (namespaceId.equals(Namespaces.ACCUMULO_NAMESPACE_ID) || namespaceId.equals(Namespaces.DEFAULT_NAMESPACE_ID)) { + Credentials credentials = context.getCredentials(); log.debug(credentials.getPrincipal() + " attempted to delete the " + namespaceId + " namespace"); throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.UNSUPPORTED_OPERATION); } - if (Namespaces.getTableIds(instance, namespaceId).size() > 0) { + if (Namespaces.getTableIds(context.getInstance(), namespaceId).size() > 0) { throw new NamespaceNotEmptyException(namespaceId, namespace, null); } @@ -140,10 +138,10 @@ public class NamespaceOperationsImpl extends NamespaceOperationsHelper { checkArgument(property != null, "property is null"); checkArgument(value != null, "value is null"); - MasterClient.executeNamespace(instance, new ClientExec() { + MasterClient.executeNamespace(context, new ClientExec() { @Override public void execute(MasterClientService.Client client) throws Exception { - client.setNamespaceProperty(Tracer.traceInfo(), credentials.toThrift(instance), namespace, property, value); + client.setNamespaceProperty(Tracer.traceInfo(), context.rpcCreds(), namespace, property, value); } }); } @@ -153,10 +151,10 @@ public class NamespaceOperationsImpl extends NamespaceOperationsHelper { checkArgument(namespace != null, "namespace is null"); checkArgument(property != null, "property is null"); - MasterClient.executeNamespace(instance, new ClientExec() { + MasterClient.executeNamespace(context, new ClientExec() { @Override public void execute(MasterClientService.Client client) throws Exception { - client.removeNamespaceProperty(Tracer.traceInfo(), credentials.toThrift(instance), namespace, property); + client.removeNamespaceProperty(Tracer.traceInfo(), context.rpcCreds(), namespace, property); } }); } @@ -165,10 +163,10 @@ public class NamespaceOperationsImpl extends NamespaceOperationsHelper { public Iterable> getProperties(final String namespace) throws AccumuloException, NamespaceNotFoundException { checkArgument(namespace != null, "namespace is null"); try { - return ServerClient.executeRaw(instance, new ClientExecReturn,ClientService.Client>() { + return ServerClient.executeRaw(context, new ClientExecReturn,ClientService.Client>() { @Override public Map execute(ClientService.Client client) throws Exception { - return client.getNamespaceConfiguration(Tracer.traceInfo(), credentials.toThrift(instance), namespace); + return client.getNamespaceConfiguration(Tracer.traceInfo(), context.rpcCreds(), namespace); } }).entrySet(); } catch (ThriftTableOperationException e) { @@ -189,7 +187,7 @@ public class NamespaceOperationsImpl extends NamespaceOperationsHelper { @Override public Map namespaceIdMap() { - return Namespaces.getNameToIdMap(instance); + return Namespaces.getNameToIdMap(context.getInstance()); } @Override @@ -200,10 +198,10 @@ public class NamespaceOperationsImpl extends NamespaceOperationsHelper { checkArgument(asTypeName != null, "asTypeName is null"); try { - return ServerClient.executeRaw(instance, new ClientExecReturn() { + return ServerClient.executeRaw(context, new ClientExecReturn() { @Override public Boolean execute(ClientService.Client client) throws Exception { - return client.checkNamespaceClass(Tracer.traceInfo(), credentials.toThrift(instance), namespace, className, asTypeName); + return client.checkNamespaceClass(Tracer.traceInfo(), context.rpcCreds(), namespace, className, asTypeName); } }); } catch (ThriftTableOperationException e) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/42c25faa/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java index 901128b..f2007e9 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java @@ -26,7 +26,6 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; -import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator; import org.apache.accumulo.core.replication.thrift.ReplicationServicer; import org.apache.accumulo.core.util.ThriftUtil; @@ -45,16 +44,17 @@ public class ReplicationClient { private static final Logger log = LoggerFactory.getLogger(ReplicationClient.class); /** - * @param instance - * Instance for the peer replicant + * @param context + * the client session for the peer replicant * @return Client to the ReplicationCoordinator service */ - public static ReplicationCoordinator.Client getCoordinatorConnectionWithRetry(Instance instance) throws AccumuloException { - checkNotNull(instance); + public static ReplicationCoordinator.Client getCoordinatorConnectionWithRetry(ClientContext context) throws AccumuloException { + checkNotNull(context); + Instance instance = context.getInstance(); for (int attempts = 1; attempts <= 10; attempts++) { - ReplicationCoordinator.Client result = getCoordinatorConnection(instance); + ReplicationCoordinator.Client result = getCoordinatorConnection(context); if (result != null) return result; log.debug("Could not get ReplicationCoordinator connection to {}, will retry", instance.getInstanceName()); @@ -68,7 +68,8 @@ public class ReplicationClient { throw new AccumuloException("Timed out trying to communicate with master from " + instance.getInstanceName()); } - public static ReplicationCoordinator.Client getCoordinatorConnection(Instance instance) { + public static ReplicationCoordinator.Client getCoordinatorConnection(ClientContext context) { + Instance instance = context.getInstance(); List locations = instance.getMasterLocations(); if (locations.size() == 0) { @@ -83,9 +84,6 @@ public class ReplicationClient { return null; } - - AccumuloConfiguration conf = ClientConfigurationHelper.getClientRpcConfiguration(instance); - String zkPath = ZooUtil.getRoot(instance) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR; String replCoordinatorAddr; @@ -107,8 +105,7 @@ public class ReplicationClient { try { // Master requests can take a long time: don't ever time out - ReplicationCoordinator.Client client = ThriftUtil.getClientNoTimeout(new ReplicationCoordinator.Client.Factory(), coordinatorAddr.toString(), - conf); + ReplicationCoordinator.Client client = ThriftUtil.getClientNoTimeout(new ReplicationCoordinator.Client.Factory(), coordinatorAddr.toString(), context); return client; } catch (TTransportException tte) { log.debug("Failed to connect to master coordinator service ({})", coordinatorAddr, tte); @@ -119,18 +116,18 @@ public class ReplicationClient { /** * Attempt a single time to create a ReplicationServicer client to the given host * - * @param inst - * Instance to the peer replicant + * @param context + * The client session for the peer replicant * @param server * Server to connect to * @return A ReplicationServicer client to the given host in the given instance */ - public static ReplicationServicer.Client getServicerConnection(Instance inst, String server) throws TTransportException { - checkNotNull(inst); + public static ReplicationServicer.Client getServicerConnection(ClientContext context, String server) throws TTransportException { + checkNotNull(context); checkNotNull(server); try { - return ThriftUtil.getClientNoTimeout(new ReplicationServicer.Client.Factory(), server, ClientConfigurationHelper.getClientRpcConfiguration(inst)); + return ThriftUtil.getClientNoTimeout(new ReplicationServicer.Client.Factory(), server, context); } catch (TTransportException tte) { log.debug("Failed to connect to servicer ({}), will retry...", server, tte); throw tte; @@ -155,12 +152,12 @@ public class ReplicationClient { } } - public static T executeCoordinatorWithReturn(Instance instance, ClientExecReturn exec) throws AccumuloException, - AccumuloSecurityException { + public static T executeCoordinatorWithReturn(ClientContext context, ClientExecReturn exec) + throws AccumuloException, AccumuloSecurityException { ReplicationCoordinator.Client client = null; for (int i = 0; i < 10; i++) { try { - client = getCoordinatorConnectionWithRetry(instance); + client = getCoordinatorConnectionWithRetry(context); return exec.execute(client); } catch (TTransportException tte) { log.debug("ReplicationClient coordinator request failed, retrying ... ", tte); @@ -181,13 +178,14 @@ public class ReplicationClient { } } - throw new AccumuloException("Could not connect to ReplicationCoordinator at " + instance.getInstanceName()); + throw new AccumuloException("Could not connect to ReplicationCoordinator at " + context.getInstance().getInstanceName()); } - public static void executeCoordinator(Instance instance, ClientExec exec) throws AccumuloException, AccumuloSecurityException { + public static void executeCoordinator(ClientContext context, ClientExec exec) throws AccumuloException, + AccumuloSecurityException { ReplicationCoordinator.Client client = null; try { - client = getCoordinatorConnectionWithRetry(instance); + client = getCoordinatorConnectionWithRetry(context); exec.execute(client); } catch (TTransportException tte) { log.debug("ReplicationClient coordinator request failed, retrying ... ", tte); @@ -204,12 +202,12 @@ public class ReplicationClient { } } - public static T executeServicerWithReturn(Instance instance, String tserver, ClientExecReturn exec) + public static T executeServicerWithReturn(ClientContext context, String tserver, ClientExecReturn exec) throws AccumuloException, AccumuloSecurityException, TTransportException { ReplicationServicer.Client client = null; while (true) { try { - client = getServicerConnection(instance, tserver); + client = getServicerConnection(context, tserver); return exec.execute(client); } catch (ThriftSecurityException e) { throw new AccumuloSecurityException(e.user, e.code, e); @@ -224,11 +222,11 @@ public class ReplicationClient { } } - public static void executeServicer(Instance instance, String tserver, ClientExec exec) throws AccumuloException, + public static void executeServicer(ClientContext context, String tserver, ClientExec exec) throws AccumuloException, AccumuloSecurityException, TTransportException { ReplicationServicer.Client client = null; try { - client = getServicerConnection(instance, tserver); + client = getServicerConnection(context, tserver); exec.execute(client); return; } catch (ThriftSecurityException e) {