accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmil...@apache.org
Subject [accumulo] 01/09: Created AccumuloClient from Connector #636
Date Thu, 20 Sep 2018 17:48:03 GMT
This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 31a2de7f001ef3c5572fb17bf9f0db1c2802289f
Author: Mike Miller <mmiller@apache.org>
AuthorDate: Thu Sep 6 16:32:08 2018 -0400

    Created AccumuloClient from Connector #636
---
 .../accumulo/core/client/AccumuloClient.java       | 532 +++++++++++++++++++++
 .../core/client/impl/AccumuloClientImpl.java       | 449 +++++++++++++++++
 .../accumulo/test/functional/AccumuloClientIT.java |  95 ++++
 3 files changed, 1076 insertions(+)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java b/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
new file mode 100644
index 0000000..1fe865a
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.client.admin.NamespaceOperations;
+import org.apache.accumulo.core.client.admin.ReplicationOperations;
+import org.apache.accumulo.core.client.admin.SecurityOperations;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.impl.AccumuloClientImpl;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.security.Authorizations;
+
+/**
+ * Client connection to an Accumulo instance. Allows the user to request a scanner, deleter
or
+ * writer for the instance as well as various objects that permit administrative operations.
+ * Enforces security on the client side with by requiring user credentials.
+ *
+ * Supports fluent API. Various options can be provided to {@link #builder()} and when finished
a
+ * call to build() will return the AccumuloClient object. For example:
+ *
+ * {@code AccumuloClient.builder().forInstance(instanceName, zookeepers)
+ *         .usingPassword(user, password).withZkTimeout(1234).build();}
+ *
+ * @since 2.0.0
+ */
+public interface AccumuloClient {
+
+  /**
+   * Factory method to create a BatchScanner connected to Accumulo.
+   *
+   * @param tableName
+   *          the name of the table to query
+   * @param authorizations
+   *          A set of authorization labels that will be checked against the column visibility
of
+   *          each key in order to filter data. The authorizations passed in must be a subset
of the
+   *          accumulo user's set of authorizations. If the accumulo user has authorizations
(A1,
+   *          A2) and authorizations (A2, A3) are passed, then an exception will be thrown.
+   * @param numQueryThreads
+   *          the number of concurrent threads to spawn for querying
+   *
+   * @return BatchScanner object for configuring and querying
+   * @throws TableNotFoundException
+   *           when the specified table doesn't exist
+   */
+  public BatchScanner createBatchScanner(String tableName, Authorizations authorizations,
+      int numQueryThreads) throws TableNotFoundException;
+
+  /**
+   * Factory method to create a BatchScanner connected to Accumulo. This method uses the
number of
+   * query threads configured when AccumuloClient was created. If none were configured, defaults
+   * will be used.
+   *
+   * @param tableName
+   *          the name of the table to query
+   * @param authorizations
+   *          A set of authorization labels that will be checked against the column visibility
of
+   *          each key in order to filter data. The authorizations passed in must be a subset
of the
+   *          accumulo user's set of authorizations. If the accumulo user has authorizations
(A1,
+   *          A2) and authorizations (A2, A3) are passed, then an exception will be thrown.
+   *
+   * @return BatchScanner object for configuring and querying
+   * @throws TableNotFoundException
+   *           when the specified table doesn't exist
+   */
+  public BatchScanner createBatchScanner(String tableName, Authorizations authorizations)
+      throws TableNotFoundException;
+
+  /**
+   * Factory method to create BatchDeleter
+   *
+   * @param tableName
+   *          the name of the table to query and delete from
+   * @param authorizations
+   *          A set of authorization labels that will be checked against the column visibility
of
+   *          each key in order to filter data. The authorizations passed in must be a subset
of the
+   *          accumulo user's set of authorizations. If the accumulo user has authorizations
(A1,
+   *          A2) and authorizations (A2, A3) are passed, then an exception will be thrown.
+   * @param numQueryThreads
+   *          the number of concurrent threads to spawn for querying
+   * @param config
+   *          configuration used to create batch writer. This config takes precedence. Any
unset
+   *          values will be merged with config set when the AccumuloClient was created.
If no
+   *          config was set during AccumuloClient creation, BatchWriterConfig defaults will
be
+   *          used.
+   * @return BatchDeleter object for configuring and deleting
+   */
+
+  public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
+      int numQueryThreads, BatchWriterConfig config) throws TableNotFoundException;
+
+  /**
+   * Factory method to create BatchDeleter. This method uses BatchWriterConfig set when
+   * AccumuloClient was created. If none was set, BatchWriterConfig defaults will be used.
+   *
+   * @param tableName
+   *          the name of the table to query and delete from
+   * @param authorizations
+   *          A set of authorization labels that will be checked against the column visibility
of
+   *          each key in order to filter data. The authorizations passed in must be a subset
of the
+   *          accumulo user's set of authorizations. If the accumulo user has authorizations
(A1,
+   *          A2) and authorizations (A2, A3) are passed, then an exception will be thrown.
+   * @param numQueryThreads
+   *          the number of concurrent threads to spawn for querying
+   * @return BatchDeleter object
+   * @throws TableNotFoundException
+   *           if table not found
+   */
+  public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
+      int numQueryThreads) throws TableNotFoundException;
+
+  /**
+   * Factory method to create a BatchWriter connected to Accumulo.
+   *
+   * @param tableName
+   *          the name of the table to insert data into
+   * @param config
+   *          configuration used to create batch writer. This config will take precedence.
Any unset
+   *          values will merged with config set when the AccumuloClient was created. If
no config
+   *          was set during AccumuloClient creation, BatchWriterConfig defaults will be
used.
+   * @return BatchWriter object for configuring and writing data to
+   */
+  public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config)
+      throws TableNotFoundException;
+
+  /**
+   * Factory method to create a BatchWriter. This method uses BatchWriterConfig set when
+   * AccumuloClient was created. If none was set, BatchWriterConfig defaults will be used.
+   *
+   * @param tableName
+   *          the name of the table to insert data into
+   * @return BatchWriter object
+   * @throws TableNotFoundException
+   *           if table not found
+   */
+  public BatchWriter createBatchWriter(String tableName) throws TableNotFoundException;
+
+  /**
+   * Factory method to create a Multi-Table BatchWriter connected to Accumulo. Multi-table
batch
+   * writers can queue data for multiple tables. Also data for multiple tables can be sent
to a
+   * server in a single batch. Its an efficient way to ingest data into multiple tables from
a
+   * single process.
+   *
+   * @param config
+   *          configuration used to create multi-table batch writer. This config will take
+   *          precedence. Any unset values will merged with config set when the AccumuloClient
was
+   *          created. If no config was set during AccumuloClient creation, BatchWriterConfig
+   *          defaults will be used.
+   * @return MultiTableBatchWriter object for configuring and writing data to
+   */
+  public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config);
+
+  /**
+   * Factory method to create a Multi-Table BatchWriter. This method uses BatchWriterConfig
set when
+   * AccumuloClient was created. If none was set, BatchWriterConfig defaults will be used.
+   *
+   * @return MultiTableBatchWriter object
+   */
+  public MultiTableBatchWriter createMultiTableBatchWriter();
+
+  /**
+   * Factory method to create a Scanner connected to Accumulo.
+   *
+   * @param tableName
+   *          the name of the table to query data from
+   * @param authorizations
+   *          A set of authorization labels that will be checked against the column visibility
of
+   *          each key in order to filter data. The authorizations passed in must be a subset
of the
+   *          accumulo user's set of authorizations. If the accumulo user has authorizations
(A1,
+   *          A2) and authorizations (A2, A3) are passed, then an exception will be thrown.
+   *
+   * @return Scanner object for configuring and querying data with
+   * @throws TableNotFoundException
+   *           when the specified table doesn't exist
+   */
+  public Scanner createScanner(String tableName, Authorizations authorizations)
+      throws TableNotFoundException;
+
+  /**
+   * Factory method to create a ConditionalWriter connected to Accumulo.
+   *
+   * @param tableName
+   *          the name of the table to query data from
+   * @param config
+   *          configuration used to create conditional writer
+   *
+   * @return ConditionalWriter object for writing ConditionalMutations
+   * @throws TableNotFoundException
+   *           when the specified table doesn't exist
+   */
+  public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig
config)
+      throws TableNotFoundException;
+
+  /**
+   * Get the current user for this AccumuloClient
+   *
+   * @return the user name
+   */
+  public String whoami();
+
+  /**
+   * Returns a unique string that identifies this instance of accumulo.
+   *
+   * @return a UUID
+   */
+  public String getInstanceID();
+
+  /**
+   * Retrieves a TableOperations object to perform table functions, such as create and delete.
+   *
+   * @return an object to manipulate tables
+   */
+  public abstract TableOperations tableOperations();
+
+  /**
+   * Retrieves a NamespaceOperations object to perform namespace functions, such as create
and
+   * delete.
+   *
+   * @return an object to manipulate namespaces
+   */
+  public NamespaceOperations namespaceOperations();
+
+  /**
+   * Retrieves a SecurityOperations object to perform user security operations, such as creating
+   * users.
+   *
+   * @return an object to modify users and permissions
+   */
+  public SecurityOperations securityOperations();
+
+  /**
+   * Retrieves an InstanceOperations object to modify instance configuration.
+   *
+   * @return an object to modify instance configuration
+   */
+  public InstanceOperations instanceOperations();
+
+  /**
+   * Retrieves a ReplicationOperations object to manage replication configuration.
+   *
+   * @return an object to modify replication configuration
+   */
+  public ReplicationOperations replicationOperations();
+
+  /**
+   * @return {@link ClientInfo} which contains information about client connection to Accumulo
+   */
+  public abstract ClientInfo info();
+
+  /**
+   * Change user
+   *
+   * @param principal
+   *          Principal/username
+   * @param token
+   *          Authentication token
+   * @return {@link AccumuloClient} for new user
+   */
+  public abstract AccumuloClient changeUser(String principal, AuthenticationToken token)
+      throws AccumuloSecurityException, AccumuloException;
+
+  /**
+   * Builds ClientInfo after all options have been specified
+   */
+  public interface ClientInfoFactory {
+
+    /**
+     * Builds ClientInfo after all options have been specified
+     *
+     * @return ClientInfo
+     */
+    ClientInfo info();
+  }
+
+  /**
+   * Builds AccumuloClient
+   */
+  public interface AccumuloClientFactory extends ClientInfoFactory {
+
+    /**
+     * Builds AccumuloClient after all options have been specified
+     *
+     * @return AccumuloClient
+     */
+    AccumuloClient build() throws AccumuloException, AccumuloSecurityException;
+
+  }
+
+  /**
+   * Builder method for setting Accumulo instance and zookeepers
+   */
+  public interface InstanceArgs {
+    AuthenticationArgs forInstance(String instanceName, String zookeepers);
+  }
+
+  /**
+   * Builder methods for creating AccumuloClient using properties
+   */
+  public interface PropertyOptions extends InstanceArgs {
+
+    /**
+     * Build using properties file. An example properties file can be found at
+     * conf/accumulo-client.properties in the Accumulo tarball distribution.
+     *
+     * @param propertiesFile
+     *          Path to properties file
+     * @return this builder
+     */
+    AccumuloClientFactory usingProperties(String propertiesFile);
+
+    /**
+     * Build using Java properties object. A list of available properties can be found in
the
+     * documentation on the project website (http://accumulo.apache.org) under 'Development'
-&gt;
+     * 'Client Properties'
+     *
+     * @param properties
+     *          Properties object
+     * @return this builder
+     */
+    AccumuloClientFactory usingProperties(Properties properties);
+  }
+
+  public interface ClientInfoOptions extends PropertyOptions {
+
+    /**
+     * Build using Accumulo client information
+     *
+     * @param clientInfo
+     *          ClientInfo object
+     * @return this builder
+     */
+    FromOptions usingClientInfo(ClientInfo clientInfo);
+  }
+
+  /**
+   * Build methods for authentication
+   */
+  public interface AuthenticationArgs {
+
+    /**
+     * Build using password-based credentials
+     *
+     * @param username
+     *          User name
+     * @param password
+     *          Password
+     * @return this builder
+     */
+    ConnectionOptions usingPassword(String username, CharSequence password);
+
+    /**
+     * Build using Kerberos credentials
+     *
+     * @param principal
+     *          Principal
+     * @param keyTabFile
+     *          Path to keytab file
+     * @return this builder
+     */
+    ConnectionOptions usingKerberos(String principal, String keyTabFile);
+
+    /**
+     * Build using specified credentials
+     *
+     * @param principal
+     *          Principal/username
+     * @param token
+     *          Authentication token
+     * @return this builder
+     */
+    ConnectionOptions usingToken(String principal, AuthenticationToken token);
+  }
+
+  /**
+   * Build methods for SSL/TLS
+   */
+  public interface SslOptions extends AccumuloClientFactory {
+
+    /**
+     * Build with SSL trust store
+     *
+     * @param path
+     *          Path to trust store
+     * @return this builder
+     */
+    SslOptions withTruststore(String path);
+
+    /**
+     * Build with SSL trust store
+     *
+     * @param path
+     *          Path to trust store
+     * @param password
+     *          Password used to encrypt trust store
+     * @param type
+     *          Trust store type
+     * @return this builder
+     */
+    SslOptions withTruststore(String path, String password, String type);
+
+    /**
+     * Build with SSL key store
+     *
+     * @param path
+     *          Path to SSL key store
+     * @return this builder
+     */
+    SslOptions withKeystore(String path);
+
+    /**
+     * Build with SSL key store
+     *
+     * @param path
+     *          Path to keystore
+     * @param password
+     *          Password used to encrypt key store
+     * @param type
+     *          Key store type
+     * @return this builder
+     */
+    SslOptions withKeystore(String path, String password, String type);
+
+    /**
+     * Use JSSE system properties to configure SSL
+     *
+     * @return this builder
+     */
+    SslOptions useJsse();
+  }
+
+  /**
+   * Build methods for SASL
+   */
+  public interface SaslOptions extends AccumuloClientFactory {
+
+    /**
+     * Build with Kerberos Server Primary
+     *
+     * @param kerberosServerPrimary
+     *          Kerberos server primary
+     * @return this builder
+     */
+    SaslOptions withPrimary(String kerberosServerPrimary);
+
+    /**
+     * Build with SASL quality of protection
+     *
+     * @param qualityOfProtection
+     *          Quality of protection
+     * @return this builder
+     */
+    SaslOptions withQop(String qualityOfProtection);
+  }
+
+  /**
+   * Build methods for connection options
+   */
+  public interface ConnectionOptions extends AccumuloClientFactory {
+
+    /**
+     * Build using Zookeeper timeout
+     *
+     * @param timeout
+     *          Zookeeper timeout (in milliseconds)
+     * @return this builder
+     */
+    ConnectionOptions withZkTimeout(int timeout);
+
+    /**
+     * Build with SSL/TLS options
+     *
+     * @return this builder
+     */
+    SslOptions withSsl();
+
+    /**
+     * Build with SASL options
+     *
+     * @return this builder
+     */
+    SaslOptions withSasl();
+
+    /**
+     * Build with BatchWriterConfig defaults for BatchWriter, MultiTableBatchWriter &amp;
+     * BatchDeleter
+     *
+     * @param batchWriterConfig
+     *          BatchWriterConfig
+     * @return this builder
+     */
+    ConnectionOptions withBatchWriterConfig(BatchWriterConfig batchWriterConfig);
+
+    /**
+     * Build with default number of query threads for BatchScanner
+     */
+    ConnectionOptions withBatchScannerQueryThreads(int numQueryThreads);
+
+    /**
+     * Build with default batch size for Scanner
+     */
+    ConnectionOptions withScannerBatchSize(int batchSize);
+  }
+
+  public interface FromOptions extends ConnectionOptions, PropertyOptions, AuthenticationArgs
{
+
+  }
+
+  /**
+   * Creates builder for AccumuloClient.
+   *
+   * @return this builder
+   */
+  public static ClientInfoOptions builder() {
+    return new AccumuloClientImpl.AccumuloClientBuilderImpl();
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/AccumuloClientImpl.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/AccumuloClientImpl.java
new file mode 100644
index 0000000..82ced55
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/AccumuloClientImpl.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.client.admin.NamespaceOperations;
+import org.apache.accumulo.core.client.admin.ReplicationOperations;
+import org.apache.accumulo.core.client.admin.SecurityOperations;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.ClientProperty;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.trace.Tracer;
+
+public class AccumuloClientImpl extends Connector implements AccumuloClient {
+  private static final String SYSTEM_TOKEN_NAME = "org.apache.accumulo.server.security."
+      + "SystemCredentials$SystemToken";
+  private final ClientContext context;
+  private final String instanceID;
+  private SecurityOperations secops = null;
+  private TableOperationsImpl tableops = null;
+  private NamespaceOperations namespaceops = null;
+  private InstanceOperations instanceops = null;
+  private ReplicationOperations replicationops = null;
+
+  public AccumuloClientImpl(final ClientContext context)
+      throws AccumuloSecurityException, AccumuloException {
+    checkArgument(context != null, "Context is null");
+    checkArgument(context.getCredentials() != null, "Credentials are null");
+    checkArgument(context.getCredentials().getToken() != null, "Authentication token is null");
+    if (context.getCredentials().getToken().isDestroyed())
+      throw new AccumuloSecurityException(context.getCredentials().getPrincipal(),
+          SecurityErrorCode.TOKEN_EXPIRED);
+
+    this.context = context;
+    instanceID = context.getInstanceID();
+
+    // Skip fail fast for system services; string literal for class name, to avoid dependency
on
+    // server jar
+    final String tokenClassName = context.getCredentials().getToken().getClass().getName();
+    if (!SYSTEM_TOKEN_NAME.equals(tokenClassName)) {
+      ServerClient.executeVoid(context, iface -> {
+        if (!iface.authenticate(Tracer.traceInfo(), context.rpcCreds()))
+          throw new AccumuloSecurityException("Authentication failed, access denied",
+              SecurityErrorCode.BAD_CREDENTIALS);
+      });
+    }
+
+    this.tableops = new TableOperationsImpl(context);
+    this.namespaceops = new NamespaceOperationsImpl(context, tableops);
+  }
+
+  private Table.ID getTableId(String tableName) throws TableNotFoundException {
+    Table.ID tableId = Tables.getTableId(context, tableName);
+    if (Tables.getTableState(context, tableId) == TableState.OFFLINE)
+      throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId));
+    return tableId;
+  }
+
+  @Override
+  @Deprecated
+  public org.apache.accumulo.core.client.Instance getInstance() {
+    return context.getDeprecatedInstance();
+  }
+
+  @Override
+  public BatchScanner createBatchScanner(String tableName, Authorizations authorizations,
+      int numQueryThreads) throws TableNotFoundException {
+    checkArgument(tableName != null, "tableName is null");
+    checkArgument(authorizations != null, "authorizations is null");
+    return new TabletServerBatchReader(context, getTableId(tableName), authorizations,
+        numQueryThreads);
+  }
+
+  @Override
+  public BatchScanner createBatchScanner(String tableName, Authorizations authorizations)
+      throws TableNotFoundException {
+    Integer numQueryThreads = ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS
+        .getInteger(context.getClientInfo().getProperties());
+    Objects.requireNonNull(numQueryThreads);
+    return createBatchScanner(tableName, authorizations, numQueryThreads);
+  }
+
+  @Deprecated
+  @Override
+  public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
+      int numQueryThreads, long maxMemory, long maxLatency, int maxWriteThreads)
+      throws TableNotFoundException {
+    checkArgument(tableName != null, "tableName is null");
+    checkArgument(authorizations != null, "authorizations is null");
+    return new TabletServerBatchDeleter(context, getTableId(tableName), authorizations,
+        numQueryThreads, new BatchWriterConfig().setMaxMemory(maxMemory)
+            .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
+  }
+
+  @Override
+  public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
+      int numQueryThreads, BatchWriterConfig config) throws TableNotFoundException {
+    checkArgument(tableName != null, "tableName is null");
+    checkArgument(authorizations != null, "authorizations is null");
+    return new TabletServerBatchDeleter(context, getTableId(tableName), authorizations,
+        numQueryThreads, config.merge(context.getBatchWriterConfig()));
+  }
+
+  @Override
+  public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
+      int numQueryThreads) throws TableNotFoundException {
+    return createBatchDeleter(tableName, authorizations, numQueryThreads, new BatchWriterConfig());
+  }
+
+  @Deprecated
+  @Override
+  public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency,
+      int maxWriteThreads) throws TableNotFoundException {
+    checkArgument(tableName != null, "tableName is null");
+    return new BatchWriterImpl(context, getTableId(tableName),
+        new BatchWriterConfig().setMaxMemory(maxMemory)
+            .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
+  }
+
+  @Override
+  public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config)
+      throws TableNotFoundException {
+    checkArgument(tableName != null, "tableName is null");
+    // we used to allow null inputs for bw config
+    if (config == null) {
+      config = new BatchWriterConfig();
+    }
+    return new BatchWriterImpl(context, getTableId(tableName),
+        config.merge(context.getBatchWriterConfig()));
+  }
+
+  @Override
+  public BatchWriter createBatchWriter(String tableName) throws TableNotFoundException {
+    return createBatchWriter(tableName, new BatchWriterConfig());
+  }
+
+  @Deprecated
+  @Override
+  public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency,
+      int maxWriteThreads) {
+    return new MultiTableBatchWriterImpl(context, new BatchWriterConfig().setMaxMemory(maxMemory)
+        .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
+  }
+
+  @Override
+  public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) {
+    return new MultiTableBatchWriterImpl(context, config.merge(context.getBatchWriterConfig()));
+  }
+
+  @Override
+  public MultiTableBatchWriter createMultiTableBatchWriter() {
+    return createMultiTableBatchWriter(new BatchWriterConfig());
+  }
+
+  @Override
+  public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig
config)
+      throws TableNotFoundException {
+    return new ConditionalWriterImpl(context, getTableId(tableName), config);
+  }
+
+  @Override
+  public Scanner createScanner(String tableName, Authorizations authorizations)
+      throws TableNotFoundException {
+    checkArgument(tableName != null, "tableName is null");
+    checkArgument(authorizations != null, "authorizations is null");
+    Scanner scanner = new ScannerImpl(context, getTableId(tableName), authorizations);
+    Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE
+        .getInteger(context.getClientInfo().getProperties());
+    if (batchSize != null) {
+      scanner.setBatchSize(batchSize);
+    }
+    return scanner;
+  }
+
+  @Override
+  public String whoami() {
+    return context.getCredentials().getPrincipal();
+  }
+
+  @Override
+  public String getInstanceID() {
+    return instanceID;
+  }
+
+  @Override
+  public synchronized TableOperations tableOperations() {
+    return tableops;
+  }
+
+  @Override
+  public synchronized NamespaceOperations namespaceOperations() {
+    return namespaceops;
+  }
+
+  @Override
+  public synchronized SecurityOperations securityOperations() {
+    if (secops == null)
+      secops = new SecurityOperationsImpl(context);
+
+    return secops;
+  }
+
+  @Override
+  public synchronized InstanceOperations instanceOperations() {
+    if (instanceops == null)
+      instanceops = new InstanceOperationsImpl(context);
+
+    return instanceops;
+  }
+
+  @Override
+  public synchronized ReplicationOperations replicationOperations() {
+    if (null == replicationops) {
+      replicationops = new ReplicationOperationsImpl(context);
+    }
+
+    return replicationops;
+  }
+
+  @Override
+  public ClientInfo info() {
+    return this.context.getClientInfo();
+  }
+
+  @Override
+  public AccumuloClient changeUser(String principal, AuthenticationToken token)
+      throws AccumuloSecurityException, AccumuloException {
+    return AccumuloClient.builder().usingClientInfo(info()).usingToken(principal, token).build();
+  }
+
+  public static class AccumuloClientBuilderImpl
+      implements InstanceArgs, PropertyOptions, ClientInfoOptions, AuthenticationArgs,
+      ConnectionOptions, SslOptions, SaslOptions, AccumuloClientFactory, FromOptions {
+
+    private Properties properties = new Properties();
+    private AuthenticationToken token = null;
+
+    private ClientInfo getClientInfo() {
+      if (token != null) {
+        return new ClientInfoImpl(properties, token);
+      }
+      return new ClientInfoImpl(properties);
+    }
+
+    @Override
+    public AccumuloClient build() throws AccumuloException, AccumuloSecurityException {
+      return org.apache.accumulo.core.client.impl.ClientInfoFactory.getConnector(getClientInfo());
+    }
+
+    @Override
+    public ClientInfo info() {
+      return getClientInfo();
+    }
+
+    @Override
+    public AuthenticationArgs forInstance(String instanceName, String zookeepers) {
+      setProperty(ClientProperty.INSTANCE_NAME, instanceName);
+      setProperty(ClientProperty.INSTANCE_ZOOKEEPERS, zookeepers);
+      return this;
+    }
+
+    @Override
+    public SslOptions withTruststore(String path) {
+      setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path);
+      return this;
+    }
+
+    @Override
+    public SslOptions withTruststore(String path, String password, String type) {
+      setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path);
+      setProperty(ClientProperty.SSL_TRUSTSTORE_PASSWORD, password);
+      setProperty(ClientProperty.SSL_TRUSTSTORE_TYPE, type);
+      return this;
+    }
+
+    @Override
+    public SslOptions withKeystore(String path) {
+      setProperty(ClientProperty.SSL_KEYSTORE_PATH, path);
+      return this;
+    }
+
+    @Override
+    public SslOptions withKeystore(String path, String password, String type) {
+      setProperty(ClientProperty.SSL_KEYSTORE_PATH, path);
+      setProperty(ClientProperty.SSL_KEYSTORE_PASSWORD, password);
+      setProperty(ClientProperty.SSL_KEYSTORE_TYPE, type);
+      return this;
+    }
+
+    @Override
+    public SslOptions useJsse() {
+      setProperty(ClientProperty.SSL_USE_JSSE, "true");
+      return this;
+    }
+
+    @Override
+    public ConnectionOptions withZkTimeout(int timeout) {
+      setProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, Integer.toString(timeout) +
"ms");
+      return this;
+    }
+
+    @Override
+    public SslOptions withSsl() {
+      setProperty(ClientProperty.SSL_ENABLED, "true");
+      return this;
+    }
+
+    @Override
+    public SaslOptions withSasl() {
+      setProperty(ClientProperty.SASL_ENABLED, "true");
+      return this;
+    }
+
+    @Override
+    public ConnectionOptions withBatchWriterConfig(BatchWriterConfig batchWriterConfig) {
+      setProperty(ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES, batchWriterConfig.getMaxMemory());
+      setProperty(ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC,
+          batchWriterConfig.getMaxLatency(TimeUnit.SECONDS));
+      setProperty(ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC,
+          batchWriterConfig.getTimeout(TimeUnit.SECONDS));
+      setProperty(ClientProperty.BATCH_WRITER_MAX_WRITE_THREADS,
+          batchWriterConfig.getMaxWriteThreads());
+      setProperty(ClientProperty.BATCH_WRITER_DURABILITY,
+          batchWriterConfig.getDurability().toString());
+      return this;
+    }
+
+    @Override
+    public ConnectionOptions withBatchScannerQueryThreads(int numQueryThreads) {
+      setProperty(ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS, numQueryThreads);
+      return this;
+    }
+
+    @Override
+    public ConnectionOptions withScannerBatchSize(int batchSize) {
+      setProperty(ClientProperty.SCANNER_BATCH_SIZE, batchSize);
+      return this;
+    }
+
+    @Override
+    public SaslOptions withPrimary(String kerberosServerPrimary) {
+      setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY, kerberosServerPrimary);
+      return this;
+    }
+
+    @Override
+    public SaslOptions withQop(String qualityOfProtection) {
+      setProperty(ClientProperty.SASL_QOP, qualityOfProtection);
+      return this;
+    }
+
+    @Override
+    public AccumuloClientFactory usingProperties(String configFile) {
+      Properties properties = new Properties();
+      try (InputStream is = new FileInputStream(configFile)) {
+        properties.load(is);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(e);
+      }
+      return usingProperties(properties);
+    }
+
+    @Override
+    public AccumuloClientFactory usingProperties(Properties properties) {
+      this.properties = properties;
+      return this;
+    }
+
+    @Override
+    public ConnectionOptions usingPassword(String principal, CharSequence password) {
+      setProperty(ClientProperty.AUTH_PRINCIPAL, principal);
+      ClientProperty.setPassword(properties, password.toString());
+      return this;
+    }
+
+    @Override
+    public ConnectionOptions usingKerberos(String principal, String keyTabFile) {
+      setProperty(ClientProperty.AUTH_PRINCIPAL, principal);
+      ClientProperty.setKerberosKeytab(properties, keyTabFile);
+      return this;
+    }
+
+    @Override
+    public ConnectionOptions usingToken(String principal, AuthenticationToken token) {
+      setProperty(ClientProperty.AUTH_PRINCIPAL, principal);
+      this.token = token;
+      return this;
+    }
+
+    @Override
+    public FromOptions usingClientInfo(ClientInfo clientInfo) {
+      this.properties = clientInfo.getProperties();
+      return this;
+    }
+
+    public void setProperty(ClientProperty property, String value) {
+      properties.setProperty(property.getKey(), value);
+    }
+
+    public void setProperty(ClientProperty property, Long value) {
+      setProperty(property, Long.toString(value));
+    }
+
+    public void setProperty(ClientProperty property, Integer value) {
+      setProperty(property, Integer.toString(value));
+    }
+  }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AccumuloClientIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloClientIT.java
new file mode 100644
index 0000000..cbaeaf7
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloClientIT.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.ClientInfo;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.ClientProperty;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AccumuloClientIT extends AccumuloClusterHarness {
+
+  @Test
+  public void testConnectorBuilder() throws Exception {
+    AccumuloClient c = getAccumuloClient();
+    String instanceName = c.info().getInstanceName();
+    String zookeepers = c.info().getZooKeepers();
+    final String user = "testuser";
+    final String password = "testpassword";
+    c.securityOperations().createLocalUser(user, new PasswordToken(password));
+
+    AccumuloClient conn = AccumuloClient.builder().forInstance(instanceName, zookeepers)
+        .usingPassword(user, password).withZkTimeout(1234).build();
+
+    Assert.assertEquals(instanceName, conn.info().getInstanceName());
+    Assert.assertEquals(zookeepers, conn.info().getZooKeepers());
+    Assert.assertEquals(user, conn.whoami());
+    Assert.assertEquals(1234, conn.info().getZooKeepersSessionTimeOut());
+
+    ClientInfo info = AccumuloClient.builder().forInstance(instanceName, zookeepers)
+        .usingPassword(user, password).info();
+    Assert.assertEquals(instanceName, info.getInstanceName());
+    Assert.assertEquals(zookeepers, info.getZooKeepers());
+    Assert.assertEquals(user, info.getPrincipal());
+    Assert.assertTrue(info.getAuthenticationToken() instanceof PasswordToken);
+
+    Properties props = new Properties();
+    props.put(ClientProperty.INSTANCE_NAME.getKey(), instanceName);
+    props.put(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), zookeepers);
+    props.put(ClientProperty.AUTH_PRINCIPAL.getKey(), user);
+    props.put(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey(), "22s");
+    ClientProperty.setPassword(props, password);
+    conn = AccumuloClient.builder().usingProperties(props).build();
+
+    Assert.assertEquals(instanceName, conn.info().getInstanceName());
+    Assert.assertEquals(zookeepers, conn.info().getZooKeepers());
+    Assert.assertEquals(user, conn.whoami());
+    Assert.assertEquals(22000, conn.info().getZooKeepersSessionTimeOut());
+
+    final String user2 = "testuser2";
+    final String password2 = "testpassword2";
+    c.securityOperations().createLocalUser(user2, new PasswordToken(password2));
+
+    AccumuloClient conn2 = AccumuloClient.builder().usingClientInfo(conn.info())
+        .usingToken(user2, new PasswordToken(password2)).build();
+    Assert.assertEquals(instanceName, conn2.info().getInstanceName());
+    Assert.assertEquals(zookeepers, conn2.info().getZooKeepers());
+    Assert.assertEquals(user2, conn2.whoami());
+    info = conn2.info();
+    Assert.assertEquals(instanceName, info.getInstanceName());
+    Assert.assertEquals(zookeepers, info.getZooKeepers());
+    Assert.assertEquals(user2, info.getPrincipal());
+
+    final String user3 = "testuser3";
+    final String password3 = "testpassword3";
+    c.securityOperations().createLocalUser(user3, new PasswordToken(password3));
+
+    AccumuloClient conn3 = conn.changeUser(user3, new PasswordToken(password3));
+    Assert.assertEquals(instanceName, conn3.info().getInstanceName());
+    Assert.assertEquals(zookeepers, conn3.info().getZooKeepers());
+    Assert.assertEquals(user3, conn3.whoami());
+    info = conn3.info();
+    Assert.assertEquals(instanceName, info.getInstanceName());
+    Assert.assertEquals(zookeepers, info.getZooKeepers());
+    Assert.assertEquals(user3, info.getPrincipal());
+  }
+}


Mime
View raw message