accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [5/6] accumulo git commit: Merge branch '1.7' into 1.8
Date Fri, 28 Jul 2017 00:28:17 GMT
Merge branch '1.7' into 1.8


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a205559d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a205559d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a205559d

Branch: refs/heads/1.8
Commit: a205559dd3609d5aa8c4161877fd182d9581829f
Parents: 3f01418 f88f5cf
Author: Christopher Tubbs <ctubbsii@apache.org>
Authored: Thu Jul 27 20:11:32 2017 -0400
Committer: Christopher Tubbs <ctubbsii@apache.org>
Committed: Thu Jul 27 20:11:32 2017 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/client/impl/ThriftScanner.java    | 1 -
 .../accumulo/core/iterators/system/LocalityGroupIterator.java  | 4 ++--
 .../accumulo/core/iterators/system/ColumnFilterTest.java       | 6 +++---
 .../server/master/balancer/HostRegexTableLoadBalancer.java     | 6 +++---
 .../org/apache/accumulo/server/security/UserImpersonation.java | 1 -
 .../master/balancer/BaseHostRegexTableLoadBalancerTest.java    | 4 ++--
 .../server/master/balancer/HostRegexTableLoadBalancerTest.java | 6 +++---
 .../src/main/java/org/apache/accumulo/master/Master.java       | 2 +-
 .../org/apache/accumulo/test/BalanceWithOfflineTableIT.java    | 1 -
 .../apache/accumulo/test/functional/FunctionalTestUtils.java   | 2 +-
 10 files changed, 15 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/a205559d/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
index fc3be57,bdf381d..be98327
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
@@@ -162,15 -152,16 +162,14 @@@ public class ThriftScanner 
  
      Map<String,Map<String,String>> serverSideIteratorOptions;
  
 -    public ScanState(ClientContext context, Text tableId, Authorizations authorizations,
Range range, SortedSet<Column> fetchedColumns, int size,
 -        List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>>
serverSideIteratorOptions, boolean isolated) {
 -      this(context, tableId, authorizations, range, fetchedColumns, size, serverSideIteratorList,
serverSideIteratorOptions, isolated,
 -          Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD);
 -    }
 +    SamplerConfiguration samplerConfig;
  
 -    public ScanState(ClientContext context, Text tableId, Authorizations authorizations,
Range range, SortedSet<Column> fetchedColumns, int size,
 -        List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>>
serverSideIteratorOptions, boolean isolated, long readaheadThreshold) {
 +    public ScanState(ClientContext context, String tableId, Authorizations authorizations,
Range range, SortedSet<Column> fetchedColumns, int size,
 +        List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>>
serverSideIteratorOptions, boolean isolated, long readaheadThreshold,
 +        SamplerConfiguration samplerConfig, long batchTimeOut, String classLoaderContext)
{
        this.context = context;
- 
        this.authorizations = authorizations;
 +      this.classLoaderContext = classLoaderContext;
  
        columns = new ArrayList<>(fetchedColumns.size());
        for (Column column : fetchedColumns) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a205559d/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
index dba3a32,ac8355b..e1cd610
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
@@@ -70,69 -65,14 +70,69 @@@ public class LocalityGroupIterator exte
      private InterruptibleIterator iterator;
    }
  
 -  private LocalityGroup groups[];
 -  private Set<ByteSequence> nonDefaultColumnFamilies;
 +  public static class LocalityGroupContext {
 +    final List<LocalityGroup> groups;
 +    final LocalityGroup defaultGroup;
 +    final Map<ByteSequence,LocalityGroup> groupByCf;
 +
 +    public LocalityGroupContext(LocalityGroup[] groups) {
 +      this.groups = Collections.unmodifiableList(Arrays.asList(groups));
-       this.groupByCf = new HashMap<ByteSequence,LocalityGroup>();
++      this.groupByCf = new HashMap<>();
 +      LocalityGroup foundDefault = null;
 +
 +      for (LocalityGroup group : groups) {
 +        if (group.isDefaultLocalityGroup && group.columnFamilies == null) {
 +          if (foundDefault != null) {
 +            throw new IllegalStateException("Found multiple default locality groups");
 +          }
 +          foundDefault = group;
 +        } else {
 +          for (Entry<ByteSequence,MutableLong> entry : group.columnFamilies.entrySet())
{
 +            if (entry.getValue().longValue() > 0) {
 +              if (groupByCf.containsKey(entry.getKey())) {
 +                throw new IllegalStateException("Found the same cf in multiple locality
groups");
 +              }
 +              groupByCf.put(entry.getKey(), group);
 +            }
 +          }
 +        }
 +      }
 +      defaultGroup = foundDefault;
 +    }
 +  }
 +
 +  /**
 +   * This will cache the arguments used in the seek call along with the locality groups
seeked.
 +   */
 +  public static class LocalityGroupSeekCache {
 +    private ImmutableSet<ByteSequence> lastColumnFamilies;
 +    private volatile boolean lastInclusive;
 +    private Collection<LocalityGroup> lastUsed;
 +
 +    public ImmutableSet<ByteSequence> getLastColumnFamilies() {
 +      return lastColumnFamilies;
 +    }
 +
 +    public boolean isLastInclusive() {
 +      return lastInclusive;
 +    }
 +
 +    public Collection<LocalityGroup> getLastUsed() {
 +      return lastUsed;
 +    }
 +
 +    public int getNumLGSeeked() {
 +      return (lastUsed == null ? 0 : lastUsed.size());
 +    }
 +  }
 +
 +  private final LocalityGroupContext lgContext;
 +  private LocalityGroupSeekCache lgCache;
    private AtomicBoolean interruptFlag;
  
 -  public LocalityGroupIterator(LocalityGroup groups[], Set<ByteSequence> nonDefaultColumnFamilies)
{
 +  public LocalityGroupIterator(LocalityGroup groups[]) {
      super(groups.length);
 -    this.groups = groups;
 -    this.nonDefaultColumnFamilies = nonDefaultColumnFamilies;
 +    this.lgContext = new LocalityGroupContext(groups);
    }
  
    @Override
@@@ -173,116 -97,51 +173,116 @@@
      else
        cfSet = Collections.emptySet();
  
 -    for (LocalityGroup lgr : groups) {
 -      // when include is set to true it means this locality groups contains
 -      // wanted column families
 -      boolean include = false;
 +    // determine the set of groups to use
 +    Collection<LocalityGroup> groups = Collections.emptyList();
  
 -      if (cfSet.size() == 0) {
 -        include = !inclusive;
 -      } else if (lgr.isDefaultLocalityGroup && lgr.columnFamilies == null) {
 -        // do not know what column families are in the default locality group,
 -        // only know what column families are not in it
 +    // if no column families specified, then include all groups unless !inclusive
 +    if (cfSet.size() == 0) {
 +      if (!inclusive) {
 +        groups = lgContext.groups;
 +      }
 +    } else {
-       groups = new HashSet<LocalityGroup>();
++      groups = new HashSet<>();
  
 +      // do not know what column families are in the default locality group,
 +      // only know what column families are not in it
 +      if (lgContext.defaultGroup != null) {
          if (inclusive) {
 -          if (!nonDefaultColumnFamilies.containsAll(cfSet)) {
 +          if (!lgContext.groupByCf.keySet().containsAll(cfSet)) {
              // default LG may contain wanted and unwanted column families
 -            include = true;
 +            groups.add(lgContext.defaultGroup);
            }// else - everything wanted is in other locality groups, so nothing to do
          } else {
 -          // must include, if all excluded column families are in other locality groups
 -          // then there are not unwanted column families in default LG
 -          include = true;
 +          // must include the default group as it may include cfs not in our cfSet
 +          groups.add(lgContext.defaultGroup);
 +        }
 +      }
 +
 +      /*
 +       * Need to consider the following cases for inclusive and exclusive (lgcf:locality
group column family set, cf:column family set) lgcf and cf are disjoint
 +       * lgcf and cf are the same cf contains lgcf lgcf contains cf lgccf and cf intersect
but neither is a subset of the other
 +       */
 +      if (!inclusive) {
 +        for (Entry<ByteSequence,LocalityGroup> entry : lgContext.groupByCf.entrySet())
{
 +          if (!cfSet.contains(entry.getKey())) {
 +            groups.add(entry.getValue());
 +          }
 +        }
 +      } else if (lgContext.groupByCf.size() <= cfSet.size()) {
 +        for (Entry<ByteSequence,LocalityGroup> entry : lgContext.groupByCf.entrySet())
{
 +          if (cfSet.contains(entry.getKey())) {
 +            groups.add(entry.getValue());
 +          }
          }
        } else {
 -        /*
 -         * Need to consider the following cases for inclusive and exclusive (lgcf:locality
group column family set, cf:column family set) lgcf and cf are
 -         * disjoint lgcf and cf are the same cf contains lgcf lgcf contains cf lgccf and
cf intersect but neither is a subset of the other
 -         */
 -
 -        for (Entry<ByteSequence,MutableLong> entry : lgr.columnFamilies.entrySet())
 -          if (entry.getValue().longValue() > 0)
 -            if (cfSet.contains(entry.getKey())) {
 -              if (inclusive)
 -                include = true;
 -            } else if (!inclusive) {
 -              include = true;
 -            }
 +        for (ByteSequence cf : cfSet) {
 +          LocalityGroup group = lgContext.groupByCf.get(cf);
 +          if (group != null) {
 +            groups.add(group);
 +          }
 +        }
        }
 +    }
  
 -      if (include) {
 +    for (LocalityGroup lgr : groups) {
 +      lgr.getIterator().seek(range, EMPTY_CF_SET, false);
 +      hiter.addSource(lgr.getIterator());
 +    }
 +
 +    return groups;
 +  }
 +
 +  /**
 +   * This seek method will reuse the supplied LocalityGroupSeekCache if it can. Otherwise
it will delegate to the _seek method.
 +   *
 +   * @param hiter
 +   *          The heap iterator
 +   * @param lgContext
 +   *          The locality groups
 +   * @param range
 +   *          The range to seek
 +   * @param columnFamilies
 +   *          The column fams to seek
 +   * @param inclusive
 +   *          The inclusiveness of the column fams
 +   * @param lgSeekCache
 +   *          A cache returned by the previous call to this method
 +   * @return A cache for this seek call
 +   * @throws IOException
 +   *           thrown if an locality group seek fails
 +   */
 +  public static LocalityGroupSeekCache seek(HeapIterator hiter, LocalityGroupContext lgContext,
Range range, Collection<ByteSequence> columnFamilies,
 +      boolean inclusive, LocalityGroupSeekCache lgSeekCache) throws IOException {
 +    if (lgSeekCache == null)
 +      lgSeekCache = new LocalityGroupSeekCache();
 +
 +    // determine if the arguments have changed since the last time
 +    boolean sameArgs = false;
 +    ImmutableSet<ByteSequence> cfSet = null;
 +    if (lgSeekCache.lastUsed != null && inclusive == lgSeekCache.lastInclusive)
{
 +      if (columnFamilies instanceof Set) {
 +        sameArgs = lgSeekCache.lastColumnFamilies.equals(columnFamilies);
 +      } else {
 +        cfSet = ImmutableSet.copyOf(columnFamilies);
 +        sameArgs = lgSeekCache.lastColumnFamilies.equals(cfSet);
 +      }
 +    }
 +
 +    // if the column families and inclusiveness have not changed, then we can simply re-seek
the
 +    // locality groups we discovered last round and rebuild the heap.
 +    if (sameArgs) {
 +      hiter.clear();
 +      for (LocalityGroup lgr : lgSeekCache.lastUsed) {
          lgr.getIterator().seek(range, EMPTY_CF_SET, false);
          hiter.addSource(lgr.getIterator());
 -        numLGSeeked++;
 -      }// every column family is excluded, zero count, or not present
 +      }
 +    } else { // otherwise capture the parameters, and use the static seek method to locate
the locality groups to use.
 +      lgSeekCache.lastColumnFamilies = (cfSet == null ? ImmutableSet.copyOf(columnFamilies)
: cfSet);
 +      lgSeekCache.lastInclusive = inclusive;
 +      lgSeekCache.lastUsed = _seek(hiter, lgContext, range, columnFamilies, inclusive);
      }
  
 -    return numLGSeeked;
 +    return lgSeekCache;
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a205559d/server/base/src/main/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancer.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancer.java
index 030598a,d7702f2..433f843
--- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancer.java
@@@ -101,12 -87,9 +101,12 @@@ public class HostRegexTableLoadBalance
    private Map<String,String> tableIdToTableName = null;
    private Map<String,Pattern> poolNameToRegexPattern = null;
    private volatile long lastOOBCheck = System.currentTimeMillis();
 -  private boolean isIpBasedRegex = false;
 +  private volatile boolean isIpBasedRegex = false;
    private Map<String,SortedMap<TServerInstance,TabletServerStatus>> pools =
new HashMap<>();
 -  private int maxTServerMigrations = HOST_BALANCER_REGEX_MAX_MIGRATIONS_DEFAULT;
 +  private volatile int maxTServerMigrations = HOST_BALANCER_REGEX_MAX_MIGRATIONS_DEFAULT;
 +  private volatile int maxOutstandingMigrations = DEFAULT_OUTSTANDING_MIGRATIONS;
-   private final Map<KeyExtent,TabletMigration> migrationsFromLastPass = new HashMap<KeyExtent,TabletMigration>();
-   private final Map<String,Long> tableToTimeSinceNoMigrations = new HashMap<String,Long>();
++  private final Map<KeyExtent,TabletMigration> migrationsFromLastPass = new HashMap<>();
++  private final Map<String,Long> tableToTimeSinceNoMigrations = new HashMap<>();
  
    /**
     * Group the set of current tservers by pool name. Tservers that don't match a regex are
put into a default pool. This could be expensive in the terms of the
@@@ -464,33 -391,6 +464,33 @@@
      return minBalanceTime;
    }
  
 +  /**
 +   * Get a mutable table info for the specified table and server
 +   */
 +  private TableInfo getTableInfo(SortedMap<TServerInstance,TabletServerStatus> currentCopy,
Multimap<TServerInstance,String> serverTableIdCopied,
 +      String tableId, TServerInstance server) {
 +    TableInfo newInfo = null;
 +    if (currentCopy.containsKey(server)) {
 +      Map<String,TableInfo> newTableMap = currentCopy.get(server).getTableMap();
 +      if (newTableMap != null) {
 +        newInfo = newTableMap.get(tableId);
 +        if (newInfo != null) {
 +          Collection<String> tableIdCopied = serverTableIdCopied.get(server);
 +          if (tableIdCopied.isEmpty()) {
-             newTableMap = new HashMap<String,TableInfo>(newTableMap);
++            newTableMap = new HashMap<>(newTableMap);
 +            currentCopy.get(server).setTableMap(newTableMap);
 +          }
 +          if (!tableIdCopied.contains(tableId)) {
 +            newInfo = new TableInfo(newInfo);
 +            newTableMap.put(tableId, newInfo);
 +            tableIdCopied.add(tableId);
 +          }
 +        }
 +      }
 +    }
 +    return newInfo;
 +  }
 +
    @Override
    public void propertyChanged(String key) {
      parseConfiguration(this.configuration);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a205559d/server/base/src/test/java/org/apache/accumulo/server/master/balancer/BaseHostRegexTableLoadBalancerTest.java
----------------------------------------------------------------------
diff --cc server/base/src/test/java/org/apache/accumulo/server/master/balancer/BaseHostRegexTableLoadBalancerTest.java
index e44491d,d41099e..44cf76b
--- a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/BaseHostRegexTableLoadBalancerTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/BaseHostRegexTableLoadBalancerTest.java
@@@ -181,24 -174,6 +181,24 @@@ public abstract class BaseHostRegexTabl
      }
    }
  
 +  protected class TestDefaultBalancer extends DefaultLoadBalancer {
 +    @Override
 +    public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, String
tableId) throws ThriftSecurityException, TException {
 +      String tableName = idToTableName(tableId);
 +      TServerInstance initialLocation = initialTableLocation.get(tableName);
 +      if (tserver.equals(initialLocation)) {
-         List<TabletStats> list = new ArrayList<TabletStats>(5);
++        List<TabletStats> list = new ArrayList<>(5);
 +        for (KeyExtent extent : tableExtents.get(tableName)) {
 +          TabletStats stats = new TabletStats();
 +          stats.setExtent(extent.toThrift());
 +          list.add(stats);
 +        }
 +        return list;
 +      }
 +      return null;
 +    }
 +  }
 +
    protected static final Table FOO = new Table("foo", "1");
    protected static final Table BAR = new Table("bar", "2");
    protected static final Table BAZ = new Table("baz", "3");
@@@ -339,21 -296,7 +339,21 @@@
      String base = "192.168.0.";
      TreeMap<TServerInstance,TabletServerStatus> current = new TreeMap<>();
      for (int i = 1; i <= numTservers; i++) {
 -      current.put(new TServerInstance(base + i + ":9997", 1), new TabletServerStatus());
 +      TabletServerStatus status = new TabletServerStatus();
-       Map<String,TableInfo> tableMap = new HashMap<String,TableInfo>();
++      Map<String,TableInfo> tableMap = new HashMap<>();
 +      tableMap.put(FOO.getId(), new TableInfo());
 +      tableMap.put(BAR.getId(), new TableInfo());
 +      tableMap.put(BAZ.getId(), new TableInfo());
 +      status.setTableMap(tableMap);
 +      current.put(new TServerInstance(base + i + ":9997", 1), status);
 +    }
 +    // now put all of the tablets on one server
 +    for (Map.Entry<String,TServerInstance> entry : initialTableLocation.entrySet())
{
 +      TabletServerStatus status = current.get(entry.getValue());
 +      if (status != null) {
 +        String tableId = getTableOperations().tableIdMap().get(entry.getKey());
 +        status.getTableMap().get(tableId).setOnlineTablets(5);
 +      }
      }
      return current;
    }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a205559d/server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerTest.java
----------------------------------------------------------------------
diff --cc server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerTest.java
index 094ada8,24054f5..1f622f4
--- a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerTest.java
@@@ -75,58 -73,12 +75,58 @@@ public class HostRegexTableLoadBalancer
    }
  
    @Test
 -  public void testBalanceWithMigrations() {
 -    List<TabletMigration> migrations = new ArrayList<>();
 +  public void testBalance() {
      init(factory);
-     Set<KeyExtent> migrations = new HashSet<KeyExtent>();
-     List<TabletMigration> migrationsOut = new ArrayList<TabletMigration>();
 -    long wait = this.balance(Collections.unmodifiableSortedMap(createCurrent(2)), Collections.singleton(new
KeyExtent()), migrations);
++    Set<KeyExtent> migrations = new HashSet<>();
++    List<TabletMigration> migrationsOut = new ArrayList<>();
 +    long wait = this.balance(Collections.unmodifiableSortedMap(createCurrent(15)), migrations,
migrationsOut);
      Assert.assertEquals(20000, wait);
 -    Assert.assertEquals(0, migrations.size());
 +    // should balance four tablets in one of the tables before reaching max
 +    Assert.assertEquals(4, migrationsOut.size());
 +
 +    // now balance again passing in the new migrations
 +    for (TabletMigration m : migrationsOut) {
 +      migrations.add(m.tablet);
 +    }
 +    migrationsOut.clear();
 +    wait = this.balance(Collections.unmodifiableSortedMap(createCurrent(15)), migrations,
migrationsOut);
 +    Assert.assertEquals(20000, wait);
 +    // should balance four tablets in one of the other tables before reaching max
 +    Assert.assertEquals(4, migrationsOut.size());
 +
 +    // now balance again passing in the new migrations
 +    for (TabletMigration m : migrationsOut) {
 +      migrations.add(m.tablet);
 +    }
 +    migrationsOut.clear();
 +    wait = this.balance(Collections.unmodifiableSortedMap(createCurrent(15)), migrations,
migrationsOut);
 +    Assert.assertEquals(20000, wait);
 +    // should balance four tablets in one of the other tables before reaching max
 +    Assert.assertEquals(4, migrationsOut.size());
 +
 +    // now balance again passing in the new migrations
 +    for (TabletMigration m : migrationsOut) {
 +      migrations.add(m.tablet);
 +    }
 +    migrationsOut.clear();
 +    wait = this.balance(Collections.unmodifiableSortedMap(createCurrent(15)), migrations,
migrationsOut);
 +    Assert.assertEquals(20000, wait);
 +    // no more balancing to do
 +    Assert.assertEquals(0, migrationsOut.size());
 +  }
 +
 +  @Test
 +  public void testBalanceWithTooManyOutstandingMigrations() {
 +    List<TabletMigration> migrationsOut = new ArrayList<>();
 +    init(factory);
 +    // lets say we already have migrations ongoing for the FOO and BAR table extends (should
be 5 of each of them) for a total of 10
-     Set<KeyExtent> migrations = new HashSet<KeyExtent>();
++    Set<KeyExtent> migrations = new HashSet<>();
 +    migrations.addAll(tableExtents.get(FOO.getTableName()));
 +    migrations.addAll(tableExtents.get(BAR.getTableName()));
 +    long wait = this.balance(Collections.unmodifiableSortedMap(createCurrent(15)), migrations,
migrationsOut);
 +    Assert.assertEquals(20000, wait);
 +    // no migrations should have occurred as 10 is the maxOutstandingMigrations
 +    Assert.assertEquals(0, migrationsOut.size());
    }
  
    @Test

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a205559d/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java
index 8009d66,6a43e93..165f926
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@@ -737,19 -718,8 +737,19 @@@ public class Master extends AccumuloSer
    }
  
    static enum TabletGoalState {
 -    HOSTED, UNASSIGNED, DELETED
 +    HOSTED(TUnloadTabletGoal.UNKNOWN), UNASSIGNED(TUnloadTabletGoal.UNASSIGNED), DELETED(TUnloadTabletGoal.DELETED),
SUSPENDED(TUnloadTabletGoal.SUSPENDED);
 +
 +    private final TUnloadTabletGoal unloadGoal;
 +
 +    TabletGoalState(TUnloadTabletGoal unloadGoal) {
 +      this.unloadGoal = unloadGoal;
 +    }
 +
 +    /** The purpose of unloading this tablet. */
 +    public TUnloadTabletGoal howUnload() {
 +      return unloadGoal;
 +    }
-   };
+   }
  
    TabletGoalState getSystemGoalState(TabletLocationState tls) {
      switch (getMasterState()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a205559d/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
index 9dad4e9,0000000..fc1719f
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
@@@ -1,90 -1,0 +1,89 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.test.functional.ConfigurableMacBase;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
 +// ACCUMULO-3692
 +public class BalanceWithOfflineTableIT extends ConfigurableMacBase {
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 30;
 +  }
 +
 +  @Override
 +  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {}
 +
 +  @Test
 +  public void test() throws Exception {
 +    final String tableNames[] = getUniqueNames(2);
 +    final String tableName = tableNames[0];
 +    // create a table with a bunch of splits
 +
 +    final Connector c = getConnector();
 +    log.info("Creating table " + tableName);
 +    c.tableOperations().create(tableName);
- 
 +    final SortedSet<Text> splits = new TreeSet<>();
 +    for (String split : "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(","))
{
 +      splits.add(new Text(split));
 +    }
 +    log.info("Splitting table " + tableName);
 +    c.tableOperations().addSplits(tableName, splits);
 +    log.info("Balancing");
 +    c.instanceOperations().waitForBalance();
 +    log.info("Balanced");
 +
 +    // create a new table which will unbalance the cluster
 +    final String table2 = tableNames[1];
 +    log.info("Creating table " + table2);
 +    c.tableOperations().create(table2);
 +    log.info("Creating splits " + table2);
 +    c.tableOperations().addSplits(table2, splits);
 +
 +    // offline the table, hopefully while there are some migrations going on
 +    log.info("Offlining " + table2);
 +    c.tableOperations().offline(table2, true);
 +    log.info("Offlined " + table2);
 +
 +    log.info("Waiting for balance");
 +
 +    SimpleThreadPool pool = new SimpleThreadPool(1, "waitForBalance");
 +    Future<Boolean> wait = pool.submit(new Callable<Boolean>() {
 +      @Override
 +      public Boolean call() throws Exception {
 +        c.instanceOperations().waitForBalance();
 +        return true;
 +      }
 +    });
 +    wait.get(20, TimeUnit.SECONDS);
 +    log.info("Balance succeeded with an offline table");
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a205559d/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
index 8659922,0000000..9782ff1
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
@@@ -1,216 -1,0 +1,216 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static org.junit.Assert.assertFalse;
 +
 +import java.io.FileInputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.cluster.AccumuloCluster;
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.BatchWriterOpts;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.AdminUtil;
 +import org.apache.accumulo.fate.AdminUtil.FateStatus;
 +import org.apache.accumulo.fate.ZooStore;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl.LogWriter;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
 +import org.apache.accumulo.test.TestIngest;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.zookeeper.KeeperException;
 +import org.junit.Assert;
 +
 +import com.google.common.collect.Iterators;
 +
 +public class FunctionalTestUtils {
 +
 +  public static int countRFiles(Connector c, String tableName) throws Exception {
 +    Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    String tableId = c.tableOperations().tableIdMap().get(tableName);
 +    scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId));
 +    scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
 +
 +    return Iterators.size(scanner.iterator());
 +  }
 +
 +  static void checkRFiles(Connector c, String tableName, int minTablets, int maxTablets,
int minRFiles, int maxRFiles) throws Exception {
 +    Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    String tableId = c.tableOperations().tableIdMap().get(tableName);
 +    scanner.setRange(new Range(new Text(tableId + ";"), true, new Text(tableId + "<"),
true));
 +    scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
 +    MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
 +
 +    HashMap<Text,Integer> tabletFileCounts = new HashMap<>();
 +
 +    for (Entry<Key,Value> entry : scanner) {
 +
 +      Text row = entry.getKey().getRow();
 +
 +      Integer count = tabletFileCounts.get(row);
 +      if (count == null)
 +        count = 0;
 +      if (entry.getKey().getColumnFamily().equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME))
{
 +        count = count + 1;
 +      }
 +
 +      tabletFileCounts.put(row, count);
 +    }
 +
 +    if (tabletFileCounts.size() < minTablets || tabletFileCounts.size() > maxTablets)
{
 +      throw new Exception("Did not find expected number of tablets " + tabletFileCounts.size());
 +    }
 +
 +    Set<Entry<Text,Integer>> es = tabletFileCounts.entrySet();
 +    for (Entry<Text,Integer> entry : es) {
 +      if (entry.getValue() > maxRFiles || entry.getValue() < minRFiles) {
 +        throw new Exception("tablet " + entry.getKey() + " has " + entry.getValue() + "
map files");
 +      }
 +    }
 +  }
 +
 +  static public void bulkImport(Connector c, FileSystem fs, String table, String dir) throws
Exception {
 +    String failDir = dir + "_failures";
 +    Path failPath = new Path(failDir);
 +    fs.delete(failPath, true);
 +    fs.mkdirs(failPath);
 +
 +    // Ensure server can read/modify files
 +    c.tableOperations().importDirectory(table, dir, failDir, false);
 +
 +    if (fs.listStatus(failPath).length > 0) {
 +      throw new Exception("Some files failed to bulk import");
 +    }
 +
 +  }
 +
 +  static public void checkSplits(Connector c, String table, int min, int max) throws Exception
{
 +    Collection<Text> splits = c.tableOperations().listSplits(table);
 +    if (splits.size() < min || splits.size() > max) {
 +      throw new Exception("# of table splits points out of range, #splits=" + splits.size()
+ " table=" + table + " min=" + min + " max=" + max);
 +    }
 +  }
 +
 +  static public void createRFiles(final Connector c, final FileSystem fs, String path, int
rows, int splits, int threads) throws Exception {
 +    fs.delete(new Path(path), true);
 +    ExecutorService threadPool = Executors.newFixedThreadPool(threads);
 +    final AtomicBoolean fail = new AtomicBoolean(false);
 +    for (int i = 0; i < rows; i += rows / splits) {
 +      final TestIngest.Opts opts = new TestIngest.Opts();
 +      opts.outputFile = String.format("%s/mf%s", path, i);
 +      opts.random = 56;
 +      opts.timestamp = 1;
 +      opts.dataSize = 50;
 +      opts.rows = rows / splits;
 +      opts.startRow = i;
 +      opts.cols = 1;
 +      threadPool.execute(new Runnable() {
 +        @Override
 +        public void run() {
 +          try {
 +            TestIngest.ingest(c, fs, opts, new BatchWriterOpts());
 +          } catch (Exception e) {
 +            fail.set(true);
 +          }
 +        }
 +      });
 +    }
 +    threadPool.shutdown();
 +    threadPool.awaitTermination(1, TimeUnit.HOURS);
 +    assertFalse(fail.get());
 +  }
 +
 +  static public String readAll(InputStream is) throws IOException {
 +    byte[] buffer = new byte[4096];
 +    StringBuilder result = new StringBuilder();
 +    while (true) {
 +      int n = is.read(buffer);
 +      if (n <= 0)
 +        break;
 +      result.append(new String(buffer, 0, n));
 +    }
 +    return result.toString();
 +  }
 +
 +  public static String readAll(MiniAccumuloClusterImpl c, Class<?> klass, Process
p) throws Exception {
 +    for (LogWriter writer : c.getLogWriters())
 +      writer.flush();
 +    return readAll(new FileInputStream(c.getConfig().getLogDir() + "/" + klass.getSimpleName()
+ "_" + p.hashCode() + ".out"));
 +  }
 +
 +  static Mutation nm(String row, String cf, String cq, Value value) {
 +    Mutation m = new Mutation(new Text(row));
 +    m.put(new Text(cf), new Text(cq), value);
 +    return m;
 +  }
 +
 +  static Mutation nm(String row, String cf, String cq, String value) {
 +    return nm(row, cf, cq, new Value(value.getBytes()));
 +  }
 +
 +  public static SortedSet<Text> splits(String[] splits) {
 +    SortedSet<Text> result = new TreeSet<>();
 +    for (String split : splits)
 +      result.add(new Text(split));
 +    return result;
 +  }
 +
 +  public static void assertNoDanglingFateLocks(Instance instance, AccumuloCluster cluster)
{
 +    FateStatus fateStatus = getFateStatus(instance, cluster);
 +    Assert.assertEquals("Dangling FATE locks : " + fateStatus.getDanglingHeldLocks(), 0,
fateStatus.getDanglingHeldLocks().size());
 +    Assert.assertEquals("Dangling FATE locks : " + fateStatus.getDanglingWaitingLocks(),
0, fateStatus.getDanglingWaitingLocks().size());
 +  }
 +
 +  private static FateStatus getFateStatus(Instance instance, AccumuloCluster cluster) {
 +    try {
 +      AdminUtil<String> admin = new AdminUtil<>(false);
 +      String secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET);
 +      IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(),
instance.getZooKeepersSessionTimeOut(), secret);
-       ZooStore<String> zs = new ZooStore<String>(ZooUtil.getRoot(instance) +
Constants.ZFATE, zk);
++      ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instance) + Constants.ZFATE,
zk);
 +      FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS,
null, null);
 +      return fateStatus;
 +    } catch (KeeperException | InterruptedException e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +}


Mime
View raw message