accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cjno...@apache.org
Subject [4/4] accumulo git commit: Merge branch '1.6'
Date Thu, 08 Jan 2015 04:40:46 GMT
Merge branch '1.6'

Conflicts:
	server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
	server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
	trace/src/main/java/org/apache/accumulo/trace/thrift/TInfo.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/44e2b2cd
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/44e2b2cd
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/44e2b2cd

Branch: refs/heads/master
Commit: 44e2b2cdf89b26a94540e653386bbe9ab269296f
Parents: 44ad36c 3bf4666
Author: Corey J. Nolet <cjnolet@gmail.com>
Authored: Wed Jan 7 23:40:33 2015 -0500
Committer: Corey J. Nolet <cjnolet@gmail.com>
Committed: Wed Jan 7 23:40:33 2015 -0500

----------------------------------------------------------------------
 .../core/client/impl/ActiveScanImpl.java        |   5 +-
 .../core/tabletserver/thrift/ActiveScan.java    | 102 +++++-
 core/src/main/thrift/tabletserver.thrift        |   1 +
 .../tserver/session/SessionManager.java         |   8 +-
 .../accumulo/test/functional/ScanIdIT.java      | 360 +++++++++++++++++++
 5 files changed, 469 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/44e2b2cd/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/44e2b2cd/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index c9445c6,0000000..13049e2
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@@ -1,313 -1,0 +1,317 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.session;
 +
 +import java.security.SecureRandom;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.TimerTask;
 +
 +import org.apache.accumulo.core.client.impl.Translator;
 +import org.apache.accumulo.core.client.impl.Translators;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.thrift.MultiScanResult;
 +import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 +import org.apache.accumulo.core.tabletserver.thrift.ScanState;
 +import org.apache.accumulo.core.tabletserver.thrift.ScanType;
 +import org.apache.accumulo.core.util.MapCounter;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.tserver.scan.ScanRunState;
 +import org.apache.accumulo.tserver.scan.ScanTask;
 +import org.apache.accumulo.tserver.tablet.ScanBatch;
 +import org.apache.log4j.Logger;
 +
 +public class SessionManager {
 +  private static final Logger log = Logger.getLogger(SessionManager.class);
 +
 +  private final SecureRandom random = new SecureRandom();
 +  private final Map<Long,Session> sessions = new HashMap<Long,Session>();
 +  private final long maxIdle;
 +  private final AccumuloConfiguration aconf;
 +
 +  public SessionManager(AccumuloConfiguration conf) {
 +    aconf = conf;
 +    maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
 +
 +    Runnable r = new Runnable() {
 +      @Override
 +      public void run() {
 +        sweep(maxIdle);
 +      }
 +    };
 +
 +    SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000));
 +  }
 +
 +  public synchronized long createSession(Session session, boolean reserve) {
 +    long sid = random.nextLong();
 +
 +    while (sessions.containsKey(sid)) {
 +      sid = random.nextLong();
 +    }
 +
 +    sessions.put(sid, session);
 +
 +    session.reserved = reserve;
 +
 +    session.startTime = session.lastAccessTime = System.currentTimeMillis();
 +
 +    return sid;
 +  }
 +
 +  public long getMaxIdleTime() {
 +    return maxIdle;
 +  }
 +
 +  /**
 +   * while a session is reserved, it cannot be canceled or removed
 +   */
 +
 +  public synchronized Session reserveSession(long sessionId) {
 +    Session session = sessions.get(sessionId);
 +    if (session != null) {
 +      if (session.reserved)
 +        throw new IllegalStateException();
 +      session.reserved = true;
 +    }
 +
 +    return session;
 +
 +  }
 +
 +  public synchronized Session reserveSession(long sessionId, boolean wait) {
 +    Session session = sessions.get(sessionId);
 +    if (session != null) {
 +      while (wait && session.reserved) {
 +        try {
 +          wait(1000);
 +        } catch (InterruptedException e) {
 +          throw new RuntimeException();
 +        }
 +      }
 +
 +      if (session.reserved)
 +        throw new IllegalStateException();
 +      session.reserved = true;
 +    }
 +
 +    return session;
 +
 +  }
 +
 +  public synchronized void unreserveSession(Session session) {
 +    if (!session.reserved)
 +      throw new IllegalStateException();
 +    notifyAll();
 +    session.reserved = false;
 +    session.lastAccessTime = System.currentTimeMillis();
 +  }
 +
 +  public synchronized void unreserveSession(long sessionId) {
 +    Session session = getSession(sessionId);
 +    if (session != null)
 +      unreserveSession(session);
 +  }
 +
 +  public synchronized Session getSession(long sessionId) {
 +    Session session = sessions.get(sessionId);
 +    if (session != null)
 +      session.lastAccessTime = System.currentTimeMillis();
 +    return session;
 +  }
 +
 +  public Session removeSession(long sessionId) {
 +    return removeSession(sessionId, false);
 +  }
 +
 +  public Session removeSession(long sessionId, boolean unreserve) {
 +    Session session = null;
 +    synchronized (this) {
 +      session = sessions.remove(sessionId);
 +      if (unreserve && session != null)
 +        unreserveSession(session);
 +    }
 +
 +    // do clean up out side of lock..
 +    if (session != null)
 +      session.cleanup();
 +
 +    return session;
 +  }
 +
 +  private void sweep(long maxIdle) {
 +    List<Session> sessionsToCleanup = new ArrayList<Session>();
 +    synchronized (this) {
 +      Iterator<Session> iter = sessions.values().iterator();
 +      while (iter.hasNext()) {
 +        Session session = iter.next();
 +        long idleTime = System.currentTimeMillis() - session.lastAccessTime;
 +        if (idleTime > maxIdle && !session.reserved) {
 +          log.info("Closing idle session from user=" + session.getUser() + ", client=" +
session.client + ", idle=" + idleTime + "ms");
 +          iter.remove();
 +          sessionsToCleanup.add(session);
 +        }
 +      }
 +    }
 +
 +    // do clean up outside of lock
 +    for (Session session : sessionsToCleanup) {
 +      session.cleanup();
 +    }
 +  }
 +
 +  public synchronized void removeIfNotAccessed(final long sessionId, final long delay) {
 +    Session session = sessions.get(sessionId);
 +    if (session != null) {
 +      final long removeTime = session.lastAccessTime;
 +      TimerTask r = new TimerTask() {
 +        @Override
 +        public void run() {
 +          Session sessionToCleanup = null;
 +          synchronized (SessionManager.this) {
 +            Session session2 = sessions.get(sessionId);
 +            if (session2 != null && session2.lastAccessTime == removeTime &&
!session2.reserved) {
 +              log.info("Closing not accessed session from user=" + session2.getUser() +
", client=" + session2.client + ", duration=" + delay + "ms");
 +              sessions.remove(sessionId);
 +              sessionToCleanup = session2;
 +            }
 +          }
 +
 +          // call clean up outside of lock
 +          if (sessionToCleanup != null)
 +            sessionToCleanup.cleanup();
 +        }
 +      };
 +
 +      SimpleTimer.getInstance(aconf).schedule(r, delay);
 +    }
 +  }
 +
 +  public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable()
{
 +    Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
 +    for (Entry<Long,Session> entry : sessions.entrySet()) {
 +
 +      Session session = entry.getValue();
 +      @SuppressWarnings("rawtypes")
 +      ScanTask nbt = null;
 +      String tableID = null;
 +
 +      if (session instanceof ScanSession) {
 +        ScanSession ss = (ScanSession) session;
 +        nbt = ss.nextBatchTask;
 +        tableID = ss.extent.getTableId().toString();
 +      } else if (session instanceof MultiScanSession) {
 +        MultiScanSession mss = (MultiScanSession) session;
 +        nbt = mss.lookupTask;
 +        tableID = mss.threadPoolExtent.getTableId().toString();
 +      }
 +
 +      if (nbt == null)
 +        continue;
 +
 +      ScanRunState srs = nbt.getScanRunState();
 +
 +      if (srs == ScanRunState.FINISHED)
 +        continue;
 +
 +      MapCounter<ScanRunState> stateCounts = counts.get(tableID);
 +      if (stateCounts == null) {
 +        stateCounts = new MapCounter<ScanRunState>();
 +        counts.put(tableID, stateCounts);
 +      }
 +
 +      stateCounts.increment(srs, 1);
 +    }
 +
 +    return counts;
 +  }
 +
 +  public synchronized List<ActiveScan> getActiveScans() {
 +
 +    List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
 +
 +    long ct = System.currentTimeMillis();
 +
 +    for (Entry<Long,Session> entry : sessions.entrySet()) {
 +      Session session = entry.getValue();
 +      if (session instanceof ScanSession) {
 +        ScanSession ss = (ScanSession) session;
 +
 +        ScanState state = ScanState.RUNNING;
 +
 +        ScanTask<ScanBatch> nbt = ss.nextBatchTask;
 +        if (nbt == null) {
 +          state = ScanState.IDLE;
 +        } else {
 +          switch (nbt.getScanRunState()) {
 +            case QUEUED:
 +              state = ScanState.QUEUED;
 +              break;
 +            case FINISHED:
 +              state = ScanState.IDLE;
 +              break;
 +            case RUNNING:
 +            default:
 +              /* do nothing */
 +              break;
 +          }
 +        }
 +
-         activeScans.add(new ActiveScan(ss.client, ss.getUser(), ss.extent.getTableId().toString(),
ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
-             state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT),
ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
++        ActiveScan activeScan = new ActiveScan(ss.client, ss.getUser(), ss.extent.getTableId().toString(),
ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
++            state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT),
ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB());
++
++        // scanId added by ACCUMULO-2641 is an optional thrift argument and not available
in ActiveScan constructor
++        activeScan.setScanId(entry.getKey());
++        activeScans.add(activeScan);
 +
 +      } else if (session instanceof MultiScanSession) {
 +        MultiScanSession mss = (MultiScanSession) session;
 +
 +        ScanState state = ScanState.RUNNING;
 +
 +        ScanTask<MultiScanResult> nbt = mss.lookupTask;
 +        if (nbt == null) {
 +          state = ScanState.IDLE;
 +        } else {
 +          switch (nbt.getScanRunState()) {
 +            case QUEUED:
 +              state = ScanState.QUEUED;
 +              break;
 +            case FINISHED:
 +              state = ScanState.IDLE;
 +              break;
 +            case RUNNING:
 +            default:
 +              /* do nothing */
 +              break;
 +          }
 +        }
 +
 +        activeScans.add(new ActiveScan(mss.client, mss.getUser(), mss.threadPoolExtent.getTableId().toString(),
ct - mss.startTime, ct - mss.lastAccessTime,
 +            ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet,
Translators.CT), mss.ssiList, mss.ssio, mss.auths
 +                .getAuthorizationsBB()));
 +      }
 +    }
 +
 +    return activeScans;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/44e2b2cd/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java
index 0000000,178cb30..fe2e8cb
mode 000000,100644..100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java
@@@ -1,0 -1,360 +1,360 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.test.functional;
+ 
++import java.util.EnumSet;
++import java.util.HashSet;
++import java.util.List;
++import java.util.Map;
++import java.util.Random;
++import java.util.Set;
++import java.util.SortedSet;
++import java.util.TreeSet;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++
+ import org.apache.accumulo.core.client.AccumuloException;
+ import org.apache.accumulo.core.client.AccumuloSecurityException;
+ import org.apache.accumulo.core.client.BatchWriter;
+ import org.apache.accumulo.core.client.BatchWriterConfig;
+ import org.apache.accumulo.core.client.Connector;
+ import org.apache.accumulo.core.client.IteratorSetting;
+ import org.apache.accumulo.core.client.MutationsRejectedException;
+ import org.apache.accumulo.core.client.Scanner;
+ import org.apache.accumulo.core.client.TableNotFoundException;
+ import org.apache.accumulo.core.client.admin.ActiveScan;
+ import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.Mutation;
+ import org.apache.accumulo.core.data.Range;
+ import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.iterators.IteratorUtil;
+ import org.apache.accumulo.core.security.Authorizations;
+ import org.apache.accumulo.core.security.ColumnVisibility;
+ import org.apache.accumulo.core.util.UtilWaitThread;
+ import org.apache.accumulo.harness.AccumuloClusterIT;
+ import org.apache.hadoop.io.Text;
+ import org.junit.Test;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
 -import java.util.EnumSet;
 -import java.util.HashSet;
 -import java.util.List;
 -import java.util.Map;
 -import java.util.Random;
 -import java.util.Set;
 -import java.util.SortedSet;
 -import java.util.TreeSet;
 -import java.util.concurrent.ConcurrentHashMap;
 -import java.util.concurrent.ExecutorService;
 -import java.util.concurrent.Executors;
 -
+ import static com.google.common.base.Charsets.UTF_8;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+ 
+ /**
+  * ACCUMULO-2641 Integration test. ACCUMULO-2641 Adds scan id to thrift protocol so that
{@code org.apache.accumulo.core.client.admin.ActiveScan.getScanid()}
+  * returns a unique scan id.<p>
+  * <p/>
+  * The test uses the Minicluster and the {@code org.apache.accumulo.test.functional.SlowIterator}
to create multiple scan sessions.
+  * The test exercises multiple tablet servers with splits and multiple ranges to force the
scans to occur across multiple tablet servers
+  * for completeness.
+  * <p/>
+  * This patch modified thrift, the TraceRepoDeserializationTest test seems to fail unless
the following be added:
+  * <p/>
+  * private static final long serialVersionUID = -4659975753252858243l;
+  * <p/>
+  * back into org.apache.accumulo.trace.thrift.TInfo until that test signature is regenerated.
+  */
+ public class ScanIdIT extends AccumuloClusterIT {
+ 
+   private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class);
+ 
+   private static final int NUM_SCANNERS = 8;
+ 
+   private static final int NUM_DATA_ROWS = 100;
+ 
+   private static final Random random = new Random();
+ 
+   private static final ExecutorService pool = Executors.newFixedThreadPool(NUM_SCANNERS);
+ 
+   private static volatile boolean testInProgress = true;
+ 
+   private static final Map<Integer,Value> resultsByWorker = new ConcurrentHashMap<Integer,Value>();
+ 
+   @Override
+   protected int defaultTimeoutSeconds() {
+     return 60;
+   }
+ 
+   /**
+    * @throws Exception any exception is a test failure.
+    */
+   @Test
+   public void testScanId() throws Exception {
+ 
+     final String tableName = getUniqueNames(1)[0];
+     Connector conn = getConnector();
+     conn.tableOperations().create(tableName);
+ 
+     addSplits(conn, tableName);
+ 
+     generateSampleData(conn, tableName);
+ 
+     attachSlowIterator(conn, tableName);
+ 
+     for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) {
+       ScannerThread st = new ScannerThread(conn, scannerIndex, tableName);
+       pool.submit(st);
+     }
+ 
+     // wait for scanners to report a result.
+     while (testInProgress) {
+ 
+       if (resultsByWorker.size() < NUM_SCANNERS) {
+         log.trace("Results reported {}", resultsByWorker.size());
+         UtilWaitThread.sleep(750);
+       } else {
+         // each worker has reported at least one result.
+         testInProgress = false;
+ 
+         log.debug("Final result count {}", resultsByWorker.size());
+ 
+         // delay to allow scanners to react to end of test and cleanly close.
+         UtilWaitThread.sleep(1000);
+       }
+ 
+     }
+ 
+     // all scanner have reported at least 1 result, so check for unique scan ids.
+     Set<Long> scanIds = new HashSet<Long>();
+ 
+     List<String> tservers = conn.instanceOperations().getTabletServers();
+ 
+     log.debug("tablet servers {}", tservers.toString());
+ 
+     for (String tserver : tservers) {
+ 
+       List<ActiveScan> activeScans = conn.instanceOperations().getActiveScans(tserver);
+ 
+       log.debug("TServer {} has {} active scans", tserver, activeScans.size());
+ 
+       for (ActiveScan scan : activeScans) {
+         log.debug("Tserver {} scan id {}", tserver, scan.getScanid());
+         scanIds.add(scan.getScanid());
+       }
+     }
+ 
+     assertTrue(NUM_SCANNERS <= scanIds.size());
+ 
+   }
+ 
+   /**
+    * Runs scanner in separate thread to allow multiple scanners to execute in parallel.
+    * <p/>
+    * The thread run method is terminated when the testInProgress flag is set to false.
+    */
+   private static class ScannerThread implements Runnable {
+ 
+     private final Connector connector;
+     private Scanner scanner = null;
+     private final int workerIndex;
+     private final String tablename;
+ 
+     public ScannerThread(final Connector connector, final int workerIndex, final String
tablename) {
+ 
+       this.connector = connector;
+       this.workerIndex = workerIndex;
+       this.tablename = tablename;
+ 
+     }
+ 
+     /**
+      * execute the scan across the sample data and put scan result into result map until
+      * testInProgress flag is set to false.
+      */
+     @Override public void run() {
+ 
+       /*
+       * set random initial delay of up to to
+       * allow scanners to proceed to different points.
+       */
+ 
+       long delay = random.nextInt(5000);
+ 
+       log.trace("Start delay for worker thread {} is {}", workerIndex, delay);
+ 
+       UtilWaitThread.sleep(delay);
+ 
+       try {
+ 
+         scanner = connector.createScanner(tablename, new Authorizations());
+ 
+         // Never start readahead
+         scanner.setReadaheadThreshold(Long.MAX_VALUE);
+         scanner.setBatchSize(1);
+ 
+         // create different ranges to try to hit more than one tablet.
+         scanner.setRange(new Range(new Text(Integer.toString(workerIndex)), new Text("9")));
+ 
+       } catch (TableNotFoundException e) {
+         throw new IllegalStateException("Initialization failure. Could not create scanner",
e);
+       }
+ 
+       scanner.fetchColumnFamily(new Text("fam1"));
+ 
+       for (Map.Entry<Key,Value> entry : scanner) {
+ 
+         // exit when success condition is met.
+         if (!testInProgress) {
+           scanner.clearScanIterators();
+           scanner.close();
+ 
+           return;
+         }
+ 
+         Text row = entry.getKey().getRow();
+ 
+         log.trace("worker {}, row {}", workerIndex, row.toString());
+ 
+         if (entry.getValue() != null) {
+ 
+           Value prevValue = resultsByWorker.put(workerIndex, entry.getValue());
+ 
+           // value should always being increasing
+           if (prevValue != null) {
+ 
+             log.trace("worker {} values {}", workerIndex, String.format("%1$s < %2$s",
prevValue, entry.getValue()));
+ 
+             assertTrue(prevValue.compareTo(entry.getValue()) > 0);
+           }
+         } else {
+           log.info("Scanner returned null");
+           fail("Scanner returned unexpected null value");
+         }
+ 
+       }
+ 
+       log.debug("Scanner ran out of data. (info only, not an error) ");
+ 
+     }
+   }
+ 
+   /**
+    * Create splits on table and force migration by taking table offline and then bring back
+    * online for test.
+    *
+    * @param conn Accumulo connector Accumulo connector to test cluster or MAC instance.
+    */
+   private void addSplits(final Connector conn, final String tableName) {
+ 
+     SortedSet<Text> splits = createSplits();
+ 
+     try {
+ 
+       conn.tableOperations().addSplits(tableName, splits);
+ 
+       conn.tableOperations().offline(tableName, true);
+ 
+       UtilWaitThread.sleep(2000);
+       conn.tableOperations().online(tableName, true);
+ 
+       for (Text split : conn.tableOperations().listSplits(tableName)) {
+         log.trace("Split {}", split);
+       }
+ 
+     } catch (AccumuloSecurityException e) {
+       throw new IllegalStateException("Initialization failed. Could not add splits to "
+ tableName, e);
+     } catch (TableNotFoundException e) {
+       throw new IllegalStateException("Initialization failed. Could not add splits to "
+ tableName, e);
+     } catch (AccumuloException e) {
+       throw new IllegalStateException("Initialization failed. Could not add splits to "
+ tableName, e);
+     }
+ 
+   }
+ 
+   /**
+    * Create splits to distribute data across multiple tservers.
+    *
+    * @return splits in sorted set for addSplits.
+    */
+   private SortedSet<Text> createSplits() {
+ 
+     SortedSet<Text> splits = new TreeSet<Text>();
+ 
+     for (int split = 0; split < 10; split++) {
+       splits.add(new Text(Integer.toString(split)));
+     }
+ 
+     return splits;
+   }
+ 
+   /**
+    * Generate some sample data using random row id to distribute across splits.
+    * <p/>
+    * The primary goal is to determine that each scanner is assigned a unique scan id.
+    * This test does check that the count value  for fam1 increases if a scanner reads multiple
value, but this is
+    * secondary consideration for this test, that is included for completeness.
+    *
+    * @param connector Accumulo connector Accumulo connector to test cluster or MAC instance.
+    */
+   private void generateSampleData(Connector connector, final String tablename) {
+ 
+     try {
+ 
+       BatchWriter bw = connector.createBatchWriter(tablename, new BatchWriterConfig());
+ 
+       ColumnVisibility vis = new ColumnVisibility("public");
+ 
+       for (int i = 0; i < NUM_DATA_ROWS; i++) {
+ 
+         Text rowId = new Text(String.format("%d", ((random.nextInt(10) * 100) + i)));
+ 
+         Mutation m = new Mutation(rowId);
+         m.put(new Text("fam1"), new Text("count"), new Value(Integer.toString(i).getBytes(UTF_8)));
+         m.put(new Text("fam1"), new Text("positive"), vis, new Value(Integer.toString(NUM_DATA_ROWS
- i).getBytes(UTF_8)));
+         m.put(new Text("fam1"), new Text("negative"), vis, new Value(Integer.toString(i
- NUM_DATA_ROWS).getBytes(UTF_8)));
+ 
+         log.trace("Added row {}", rowId);
+ 
+         bw.addMutation(m);
+       }
+ 
+       bw.close();
+     } catch (TableNotFoundException ex) {
+       throw new IllegalStateException("Initialization failed. Could not create test data",
ex);
+     } catch (MutationsRejectedException ex) {
+       throw new IllegalStateException("Initialization failed. Could not create test data",
ex);
+     }
+   }
+ 
+   /**
+    * Attach the test slow iterator so that we have time to read the scan id without creating
a large dataset. Uses a
+    * fairly large sleep and delay times because we are not concerned with how much data
is read and we do not read
+    * all of the data - the test stops once each scanner reports a scan id.
+    *
+    * @param connector Accumulo connector Accumulo connector to test cluster or MAC instance.
+    */
+   private void attachSlowIterator(Connector connector, final String tablename) {
+     try {
+ 
+       IteratorSetting slowIter = new IteratorSetting(50, "slowIter", "org.apache.accumulo.test.functional.SlowIterator");
+       slowIter.addOption("sleepTime", "200");
+       slowIter.addOption("seekSleepTime", "200");
+ 
+       connector.tableOperations().attachIterator(tablename, slowIter, EnumSet.of(IteratorUtil.IteratorScope.scan));
+ 
+     } catch (AccumuloException ex) {
+       throw new IllegalStateException("Initialization failed. Could not attach slow iterator",
ex);
+     } catch (TableNotFoundException ex) {
+       throw new IllegalStateException("Initialization failed. Could not attach slow iterator",
ex);
+     } catch (AccumuloSecurityException ex) {
+       throw new IllegalStateException("Initialization failed. Could not attach slow iterator",
ex);
+     }
+   }
+ 
+ }


Mime
View raw message