accumulo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Keith Turner <ke...@deenlo.com>
Subject Re: [1/2] git commit: ACCUMULO-1833 Rework the getBatchWriter method on MTBW to remove zookeeper lock contention and get better concurrent throughput.
Date Fri, 08 Nov 2013 02:28:17 GMT
On Thu, Nov 7, 2013 at 8:39 PM, <elserj@apache.org> wrote:

> Updated Branches:
>   refs/heads/ACCUMULO-1833-caching [created] 3b6eade61
>
>
> ACCUMULO-1833 Rework the getBatchWriter method on MTBW to remove
> zookeeper lock contention and get better concurrent throughput.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
> Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cba87980
> Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cba87980
> Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cba87980
>
> Branch: refs/heads/ACCUMULO-1833-caching
> Commit: cba87980cbd731338c58f05734ebb3d3e683b440
> Parents: 060188a
> Author: Josh Elser <josh.elser@gmail.com>
> Authored: Thu Nov 7 16:49:41 2013 -0500
> Committer: Josh Elser <josh.elser@gmail.com>
> Committed: Thu Nov 7 16:49:41 2013 -0500
>
> ----------------------------------------------------------------------
>  core/pom.xml                                    |   4 +
>  .../apache/accumulo/core/client/Connector.java  |  44 ++++++-
>  .../core/client/impl/ConnectorImpl.java         |  12 ++
>  .../client/impl/MultiTableBatchWriterImpl.java  | 116 ++++++++++++++-----
>  .../core/client/mock/MockConnector.java         |  11 ++
>  5 files changed, 159 insertions(+), 28 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/pom.xml
> ----------------------------------------------------------------------
> diff --git a/core/pom.xml b/core/pom.xml
> index f7539f5..d02a3cd 100644
> --- a/core/pom.xml
> +++ b/core/pom.xml
> @@ -30,6 +30,10 @@
>        <artifactId>jcommander</artifactId>
>      </dependency>
>      <dependency>
> +      <groupId>com.google.guava</groupId>
> +      <artifactId>guava</artifactId>
> +    </dependency>
> +    <dependency>
>        <groupId>jline</groupId>
>        <artifactId>jline</artifactId>
>      </dependency>
>
>
> http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/Connector.java
> ----------------------------------------------------------------------
> diff --git
> a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
> b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
> index d2e7321..68dc881 100644
> --- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
> +++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
> @@ -16,6 +16,8 @@
>   */
>  package org.apache.accumulo.core.client;
>
> +import java.util.concurrent.TimeUnit;
> +
>  import org.apache.accumulo.core.client.admin.InstanceOperations;
>  import org.apache.accumulo.core.client.admin.SecurityOperations;
>  import org.apache.accumulo.core.client.admin.TableOperations;
> @@ -146,8 +148,32 @@ public abstract class Connector {
>    public abstract MultiTableBatchWriter createMultiTableBatchWriter(long
> maxMemory, long maxLatency, int maxWriteThreads);
>
>    /**
> +   * Factory method to create a Multi-Table BatchWriter connected to
> Accumulo. Multi-table batch writers can queue data for multiple tables,
> which is good for
> +   * ingesting data into multiple tables from the same source. Caching of
> ZooKeeper table information defaults to {@link
> MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME}
> +   * and {@link MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME_UNIT}
> +   *
> +   * @param maxMemory
> +   *          size in bytes of the maximum memory to batch before writing
> +   * @param maxLatency
> +   *          size in milliseconds; set to 0 or Long.MAX_VALUE to allow
> the maximum time to hold a batch before writing
> +   * @param maxWriteThreads
> +   *          the maximum number of threads to use for writing data to
> the tablet servers
> +   * @param cacheTime
> +   *          Duration of time to cache ZooKeeper table information
> +   * @param cacheTimeUnit
> +   *          Unit of time to apply to {@link cacheTime}
> +   *
> +   * @return MultiTableBatchWriter object for configuring and writing
> data to
> +   * @deprecated since 1.5.0; Use {@link
> #createMultiTableBatchWriter(BatchWriterConfig)} instead.
> +   * @since 1.5.1
> +   */
> +  @Deprecated
> +  public abstract MultiTableBatchWriter createMultiTableBatchWriter(long
> maxMemory, long maxLatency, int maxWriteThreads, long cacheTime, TimeUnit
> cacheTimeUnit);
>

I don't think this change needs to impact that API.  I suspect just adding
a cache w/ a really short timeout (like 50ms to 100ms) will give the
performance benefit we are looking for.  Also making API changes in a bug
fix release means its possible to write something for 1.5.1 that will not
work w/ 1.5.0



> +
> +  /**
>     * Factory method to create a Multi-Table BatchWriter connected to
> Accumulo. Multi-table batch writers can queue data for multiple tables.
> Also data for
> -   * multiple tables can be sent to a server in a single batch. Its an
> efficient way to ingest data into multiple tables from a single process.
> +   * multiple tables can be sent to a server in a single batch. Its an
> efficient way to ingest data into multiple tables from a single process.
> Caching
> +   * of ZooKeeper table information defaults to {@link
> MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME} and {@link
> MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME_UNIT}
>     *
>     * @param config
>     *          configuration used to create multi-table batch writer
> @@ -158,6 +184,22 @@ public abstract class Connector {
>    public abstract MultiTableBatchWriter
> createMultiTableBatchWriter(BatchWriterConfig config);
>
>    /**
> +   * Factory method to create a Multi-Table BatchWriter connected to
> Accumulo. Multi-table batch writers can queue data for multiple tables.
> Also data for
> +   * multiple tables can be sent to a server in a single batch. Its an
> efficient way to ingest data into multiple tables from a single process.
> This method
> +   * also allows the user to provide parameters as to how long table
> information from ZooKeeper is cached.
> +   * @param config
> +   *          configuration used to create the multi-table batch writer
> +   * @param cacheTime
> +   *          Duration of time to cache ZooKeeper table information
> +   * @param cacheTimeUnit
> +   *          Unit of time to apply to {@link cacheTime}
> +   * @return
> +   * @since 1.5.1
> +   */
> +  public abstract MultiTableBatchWriter
> createMultiTableBatchWriter(BatchWriterConfig config, long cacheTime,
> TimeUnit cacheTimeUnit);
> +
> +
> +  /**
>     * Factory method to create a Scanner connected to Accumulo.
>     *
>     * @param tableName
>
>
> http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
> ----------------------------------------------------------------------
> diff --git
> a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
> b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
> index 1702082..89d2813 100644
> ---
> a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
> +++
> b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
> @@ -126,12 +126,24 @@ public class ConnectorImpl extends Connector {
>          .setMaxLatency(maxLatency,
> TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
>    }
>
> +  @Deprecated
> +  @Override
> +  public MultiTableBatchWriter createMultiTableBatchWriter(long
> maxMemory, long maxLatency, int maxWriteThreads, long cacheTime, TimeUnit
> cacheTimeUnit) {
> +    return new MultiTableBatchWriterImpl(instance, credentials, new
> BatchWriterConfig().setMaxMemory(maxMemory)
> +        .setMaxLatency(maxLatency,
> TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads), cacheTime,
> cacheTimeUnit);
> +  }
> +
>    @Override
>    public MultiTableBatchWriter
> createMultiTableBatchWriter(BatchWriterConfig config) {
>      return new MultiTableBatchWriterImpl(instance, credentials, config);
>    }
>
>    @Override
> +  public MultiTableBatchWriter
> createMultiTableBatchWriter(BatchWriterConfig config, long timeToCache,
> TimeUnit timeUnit) {
> +    return new MultiTableBatchWriterImpl(instance, credentials, config,
> timeToCache, timeUnit);
> +  }
> +
> +  @Override
>    public Scanner createScanner(String tableName, Authorizations
> authorizations) throws TableNotFoundException {
>      ArgumentChecker.notNull(tableName, authorizations);
>      return new ScannerImpl(instance, credentials, getTableId(tableName),
> authorizations);
>
>
> http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
> ----------------------------------------------------------------------
> diff --git
> a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
> b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
> index 4537ae8..06b6f75 100644
> ---
> a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
> +++
> b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
> @@ -16,7 +16,9 @@
>   */
>  package org.apache.accumulo.core.client.impl;
>
> -import java.util.HashMap;
> +import java.util.concurrent.ConcurrentHashMap;
> +import java.util.concurrent.ExecutionException;
> +import java.util.concurrent.TimeUnit;
>
>  import org.apache.accumulo.core.client.AccumuloException;
>  import org.apache.accumulo.core.client.AccumuloSecurityException;
> @@ -33,62 +35,97 @@ import
> org.apache.accumulo.core.security.thrift.TCredentials;
>  import org.apache.accumulo.core.util.ArgumentChecker;
>  import org.apache.log4j.Logger;
>
> +import com.google.common.cache.CacheBuilder;
> +import com.google.common.cache.CacheLoader;
> +import com.google.common.cache.LoadingCache;
> +
>  public class MultiTableBatchWriterImpl implements MultiTableBatchWriter {
> +  public static final long DEFAULT_CACHE_TIME = 60;
> +  public static final TimeUnit DEFAULT_CACHE_TIME_UNIT = TimeUnit.SECONDS;
> +
>    static final Logger log =
> Logger.getLogger(MultiTableBatchWriterImpl.class);
>    private boolean closed;
> -
> +
>    private class TableBatchWriter implements BatchWriter {
> -
> +
>      private String table;
> -
> +
>      TableBatchWriter(String table) {
>        this.table = table;
>      }
> -
> +
>      @Override
>      public void addMutation(Mutation m) throws MutationsRejectedException
> {
>        ArgumentChecker.notNull(m);
>        bw.addMutation(table, m);
>      }
> -
> +
>      @Override
>      public void addMutations(Iterable<Mutation> iterable) throws
> MutationsRejectedException {
>        bw.addMutation(table, iterable.iterator());
>      }
> -
> +
>      @Override
>      public void close() {
>        throw new UnsupportedOperationException("Must close all tables, can
> not close an individual table");
>      }
> -
> +
>      @Override
>      public void flush() {
>        throw new UnsupportedOperationException("Must flush all tables, can
> not flush an individual table");
>      }
> -
> +
>    }
> -
> +
> +  /**
> +   * CacheLoader which will look up the internal table ID for a given
> table name.
> +   */
> +  private class TableNameToIdLoader extends CacheLoader<String,String> {
> +
> +    @Override
> +    public String load(String tableName) throws Exception {
> +      String tableId = Tables.getNameToIdMap(instance).get(tableName);
> +
> +      if (tableId == null)
> +        throw new TableNotFoundException(tableId, tableName, null);
> +
> +      if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
> +        throw new TableOfflineException(instance, tableId);
> +
> +      return tableId;
> +    }
> +
> +  }
> +
>    private TabletServerBatchWriter bw;
> -  private HashMap<String,BatchWriter> tableWriters;
> +  private ConcurrentHashMap<String,BatchWriter> tableWriters;
>    private Instance instance;
> -
> +  private final LoadingCache<String,String> nameToIdCache;
> +
>    public MultiTableBatchWriterImpl(Instance instance, TCredentials
> credentials, BatchWriterConfig config) {
> -    ArgumentChecker.notNull(instance, credentials);
> +    this(instance, credentials, config, DEFAULT_CACHE_TIME,
> DEFAULT_CACHE_TIME_UNIT);
> +  }
> +
> +  public MultiTableBatchWriterImpl(Instance instance, TCredentials
> credentials, BatchWriterConfig config, long cacheTime, TimeUnit
> cacheTimeUnit) {
> +    ArgumentChecker.notNull(instance, credentials, config, cacheTimeUnit);
>      this.instance = instance;
>      this.bw = new TabletServerBatchWriter(instance, credentials, config);
> -    tableWriters = new HashMap<String,BatchWriter>();
> +    tableWriters = new ConcurrentHashMap<String,BatchWriter>();
>      this.closed = false;
> +
> +    nameToIdCache = CacheBuilder.newBuilder().expireAfterWrite(cacheTime,
> cacheTimeUnit).concurrencyLevel(8).maximumSize(64).initialCapacity(16)
> +        .build(new TableNameToIdLoader());
>    }
> -
> +
>    public boolean isClosed() {
>      return this.closed;
>    }
> -
> +
>    public void close() throws MutationsRejectedException {
>      bw.close();
>      this.closed = true;
>    }
> -
> +
>    /**
>     * Warning: do not rely upon finalize to close this class. Finalize is
> not guaranteed to be called.
>     */
> @@ -105,16 +142,41 @@ public class MultiTableBatchWriterImpl implements
> MultiTableBatchWriter {
>      }
>    }
>
> +  /**
> +   * Returns the table ID for the given table name.
> +   * @param tableName The name of the table which to find the ID for
> +   * @return The table ID, or null if the table name doesn't exist
> +   */
> +  private String getId(String tableName) throws TableNotFoundException {
> +    try {
> +      return nameToIdCache.get(tableName);
> +    } catch (ExecutionException e) {
> +      Throwable cause = e.getCause();
> +
> +      if (null == cause) {
> +        throw new RuntimeException(e);
> +      }
> +
> +      if (cause instanceof TableNotFoundException) {
> +
> +        throw (TableNotFoundException) cause;
> +      }
> +
> +      if (cause instanceof TableOfflineException) {
> +        throw (TableOfflineException) cause;
> +      }
> +
> +      log.error("Unexpected exception when fetching table id for " +
> tableName);
> +
> +      throw new RuntimeException(e);
> +    }
> +  }
> +
>    @Override
> -  public synchronized BatchWriter getBatchWriter(String tableName) throws
> AccumuloException, AccumuloSecurityException, TableNotFoundException {
> +  public BatchWriter getBatchWriter(String tableName) throws
> AccumuloException, AccumuloSecurityException, TableNotFoundException {
>      ArgumentChecker.notNull(tableName);
> -    String tableId = Tables.getNameToIdMap(instance).get(tableName);
> -    if (tableId == null)
> -      throw new TableNotFoundException(tableId, tableName, null);
> -
> -    if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
> -      throw new TableOfflineException(instance, tableId);
> -
> +    String tableId = getId(tableName);
> +
>      BatchWriter tbw = tableWriters.get(tableId);
>      if (tbw == null) {
>        tbw = new TableBatchWriter(tableId);
> @@ -122,10 +184,10 @@ public class MultiTableBatchWriterImpl implements
> MultiTableBatchWriter {
>      }
>      return tbw;
>    }
> -
> +
>    @Override
>    public void flush() throws MutationsRejectedException {
>      bw.flush();
>    }
> -
> +
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
> ----------------------------------------------------------------------
> diff --git
> a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
> b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
> index 1179559..2aa6291 100644
> ---
> a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
> +++
> b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
> @@ -90,12 +90,23 @@ public class MockConnector extends Connector {
>      return new MockMultiTableBatchWriter(acu);
>    }
>
> +  @Deprecated
> +  @Override
> +  public MultiTableBatchWriter createMultiTableBatchWriter(long
> maxMemory, long maxLatency, int maxWriteThreads, long cacheTime, TimeUnit
> cacheTimeUnit) {
> +    return new MockMultiTableBatchWriter(acu);
> +  }
> +
>    @Override
>    public MultiTableBatchWriter
> createMultiTableBatchWriter(BatchWriterConfig config) {
>      return createMultiTableBatchWriter(config.getMaxMemory(),
> config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads());
>    }
>
>    @Override
> +  public MultiTableBatchWriter
> createMultiTableBatchWriter(BatchWriterConfig config, long cacheTime,
> TimeUnit cacheTimeUnit) {
> +    return createMultiTableBatchWriter(config.getMaxMemory(),
> config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads(),
> cacheTime, cacheTimeUnit);
> +  }
> +
> +  @Override
>    public Scanner createScanner(String tableName, Authorizations
> authorizations) throws TableNotFoundException {
>      MockTable table = acu.tables.get(tableName);
>      if (table == null)
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message