Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5F682176DF for ; Fri, 9 Jan 2015 02:44:10 +0000 (UTC) Received: (qmail 3622 invoked by uid 500); 9 Jan 2015 02:44:11 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 3524 invoked by uid 500); 9 Jan 2015 02:44:11 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 767 invoked by uid 99); 9 Jan 2015 02:44:07 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Jan 2015 02:44:07 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DEDE390DE23; Fri, 9 Jan 2015 02:44:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Fri, 09 Jan 2015 02:44:50 -0000 Message-Id: <03d279381c3a48f1ab47b5795c9c21ef@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [46/66] [abbrv] accumulo git commit: ACCUMULO-3451 Format master branch (1.7.0-SNAPSHOT) http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java index 8440c1c..f0d9e0c 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java @@ -91,9 +91,9 @@ import org.slf4j.LoggerFactory; import com.google.common.net.HostAndPort; class ConditionalWriterImpl implements ConditionalWriter { - + private static ThreadPoolExecutor cleanupThreadPool = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue()); - + static { cleanupThreadPool.allowCoreThreadTimeOut(true); } @@ -101,7 +101,7 @@ class ConditionalWriterImpl implements ConditionalWriter { private static final Logger log = LoggerFactory.getLogger(ConditionalWriterImpl.class); private static final int MAX_SLEEP = 30000; - + private Authorizations auths; private VisibilityEvaluator ve; @SuppressWarnings("unchecked") @@ -111,44 +111,44 @@ class ConditionalWriterImpl implements ConditionalWriter { private String tableId; private long timeout; private final Durability durability; - + private static class ServerQueue { BlockingQueue> queue = new LinkedBlockingQueue>(); boolean taskQueued = false; } - + private Map serverQueues; private DelayQueue failedMutations = new DelayQueue(); private ScheduledThreadPoolExecutor threadPool; - + private class RQIterator implements Iterator { - + private BlockingQueue rq; private int count; - + public RQIterator(BlockingQueue resultQueue, int count) { this.rq = resultQueue; this.count = count; } - + @Override public boolean hasNext() { return count > 0; } - + @Override public Result next() { if (count <= 0) throw new NoSuchElementException(); - + try { Result result = rq.poll(1, TimeUnit.SECONDS); while (result == null) { - + if (threadPool.isShutdown()) { throw new NoSuchElementException("ConditionalWriter closed"); } - + result = rq.poll(1, TimeUnit.SECONDS); } count--; @@ -157,32 +157,32 @@ class ConditionalWriterImpl implements ConditionalWriter { throw new RuntimeException(e); } } - + @Override public void remove() { throw new UnsupportedOperationException(); } - + } - + private static class QCMutation extends ConditionalMutation implements Delayed { private BlockingQueue resultQueue; private long resetTime; private long delay = 50; private long entryTime; - + QCMutation(ConditionalMutation cm, BlockingQueue resultQueue, long entryTime) { super(cm); this.resultQueue = resultQueue; this.entryTime = entryTime; } - + @Override public int compareTo(Delayed o) { QCMutation oqcm = (QCMutation) o; return Long.valueOf(resetTime).compareTo(Long.valueOf(oqcm.resetTime)); } - + @Override public boolean equals(Object o) { if (o instanceof QCMutation) @@ -194,45 +194,45 @@ class ConditionalWriterImpl implements ConditionalWriter { public long getDelay(TimeUnit unit) { return unit.convert(delay - (System.currentTimeMillis() - resetTime), TimeUnit.MILLISECONDS); } - + void resetDelay() { delay = Math.min(delay * 2, MAX_SLEEP); resetTime = System.currentTimeMillis(); } - + void queueResult(Result result) { resultQueue.add(result); } } - + private ServerQueue getServerQueue(String location) { ServerQueue serverQueue; synchronized (serverQueues) { serverQueue = serverQueues.get(location); if (serverQueue == null) { - + serverQueue = new ServerQueue(); serverQueues.put(location, serverQueue); } } return serverQueue; } - + private class CleanupTask implements Runnable { private List sessions; - + CleanupTask(List activeSessions) { this.sessions = activeSessions; } - + @Override public void run() { TabletClientService.Iface client = null; - + for (SessionID sid : sessions) { if (!sid.isActive()) continue; - + TInfo tinfo = Tracer.traceInfo(); try { client = getClient(sid.location); @@ -240,7 +240,7 @@ class ConditionalWriterImpl implements ConditionalWriter { } catch (Exception e) {} finally { ThriftUtil.returnClient((TServiceClient) client); } - + } } } @@ -248,11 +248,11 @@ class ConditionalWriterImpl implements ConditionalWriter { private void queueRetry(List mutations, HostAndPort server) { if (timeout < Long.MAX_VALUE) { - + long time = System.currentTimeMillis(); - + ArrayList mutations2 = new ArrayList(mutations.size()); - + for (QCMutation qcm : mutations) { qcm.resetDelay(); if (time + qcm.getDelay(TimeUnit.MILLISECONDS) > qcm.entryTime + timeout) { @@ -267,52 +267,52 @@ class ConditionalWriterImpl implements ConditionalWriter { mutations2.add(qcm); } } - + if (mutations2.size() > 0) failedMutations.addAll(mutations2); - + } else { for (QCMutation qcm : mutations) qcm.resetDelay(); failedMutations.addAll(mutations); } } - + private void queue(List mutations) { List failures = new ArrayList(); Map> binnedMutations = new HashMap>(); - + try { locator.binMutations(context, mutations, binnedMutations, failures); - + if (failures.size() == mutations.size()) if (!Tables.exists(context.getInstance(), tableId)) throw new TableDeletedException(tableId); else if (Tables.getTableState(context.getInstance(), tableId) == TableState.OFFLINE) throw new TableOfflineException(context.getInstance(), tableId); - + } catch (Exception e) { for (QCMutation qcm : mutations) qcm.queueResult(new Result(e, qcm, null)); - + // do not want to queue anything that was put in before binMutations() failed failures.clear(); binnedMutations.clear(); } - + if (failures.size() > 0) queueRetry(failures, null); - + for (Entry> entry : binnedMutations.entrySet()) { queue(entry.getKey(), entry.getValue()); } - + } - + private void queue(String location, TabletServerMutations mutations) { - + ServerQueue serverQueue = getServerQueue(location); - + synchronized (serverQueue) { serverQueue.queue.add(mutations); // never execute more than one task per server @@ -321,9 +321,9 @@ class ConditionalWriterImpl implements ConditionalWriter { serverQueue.taskQueued = true; } } - + } - + private void reschedule(SendTask task) { ServerQueue serverQueue = getServerQueue(task.location); // just finished processing work for this server, could reschedule if it has more work or immediately process the work @@ -331,31 +331,31 @@ class ConditionalWriterImpl implements ConditionalWriter { // more data that need to be processed... also it will give the current server time to build // up more data... the thinking is that rescheduling instead or processing immediately will result // in bigger batches and less RPC overhead - + synchronized (serverQueue) { if (serverQueue.queue.size() > 0) threadPool.execute(new LoggingRunnable(log, Trace.wrap(task))); else serverQueue.taskQueued = false; } - + } - + private TabletServerMutations dequeue(String location) { BlockingQueue> queue = getServerQueue(location).queue; - + ArrayList> mutations = new ArrayList>(); queue.drainTo(mutations); - + if (mutations.size() == 0) return null; - + if (mutations.size() == 1) { return mutations.get(0); } else { // merge multiple request to a single tablet server TabletServerMutations tsm = mutations.get(0); - + for (int i = 1; i < mutations.size(); i++) { for (Entry> entry : mutations.get(i).getMutations().entrySet()) { List list = tsm.getMutations().get(entry.getKey()); @@ -363,15 +363,15 @@ class ConditionalWriterImpl implements ConditionalWriter { list = new ArrayList(); tsm.getMutations().put(entry.getKey(), list); } - + list.addAll(entry.getValue()); } } - + return tsm; } } - + ConditionalWriterImpl(ClientContext context, String tableId, ConditionalWriterConfig config) { this.context = context; this.auths = config.getAuthorizations(); @@ -382,9 +382,9 @@ class ConditionalWriterImpl implements ConditionalWriter { this.tableId = tableId; this.timeout = config.getTimeout(TimeUnit.MILLISECONDS); this.durability = config.getDurability(); - + Runnable failureHandler = new Runnable() { - + @Override public void run() { List mutations = new ArrayList(); @@ -393,27 +393,27 @@ class ConditionalWriterImpl implements ConditionalWriter { queue(mutations); } }; - + failureHandler = new LoggingRunnable(log, failureHandler); - + threadPool.scheduleAtFixedRate(failureHandler, 250, 250, TimeUnit.MILLISECONDS); } - + @Override public Iterator write(Iterator mutations) { - + BlockingQueue resultQueue = new LinkedBlockingQueue(); - + List mutationList = new ArrayList(); - + int count = 0; - + long entryTime = System.currentTimeMillis(); - + mloop: while (mutations.hasNext()) { ConditionalMutation mut = mutations.next(); count++; - + if (mut.getConditions().size() == 0) throw new IllegalArgumentException("ConditionalMutation had no conditions " + new String(mut.getRow())); @@ -423,26 +423,26 @@ class ConditionalWriterImpl implements ConditionalWriter { continue mloop; } } - + // copy the mutations so that even if caller changes it, it will not matter mutationList.add(new QCMutation(mut, resultQueue, entryTime)); } - + queue(mutationList); - + return new RQIterator(resultQueue, count); - + } - + private class SendTask implements Runnable { - + String location; - + public SendTask(String location) { this.location = location; - + } - + @Override public void run() { try { @@ -454,18 +454,18 @@ class ConditionalWriterImpl implements ConditionalWriter { } } } - + private static class CMK { - + QCMutation cm; KeyExtent ke; - + public CMK(KeyExtent ke, QCMutation cm) { this.ke = ke; this.cm = cm; } } - + private static class SessionID { HostAndPort location; String lockId; @@ -473,7 +473,7 @@ class ConditionalWriterImpl implements ConditionalWriter { boolean reserved; long lastAccessTime; long ttl; - + boolean isActive() { return System.currentTimeMillis() - lastAccessTime < ttl * .95; } @@ -488,7 +488,7 @@ class ConditionalWriterImpl implements ConditionalWriter { if (sid != null) { if (sid.reserved) throw new IllegalStateException(); - + if (!sid.isActive()) { cachedSessionIDs.remove(location); } else { @@ -497,10 +497,10 @@ class ConditionalWriterImpl implements ConditionalWriter { } } } - + TConditionalSession tcs = client.startConditionalUpdate(tinfo, context.rpcCreds(), ByteBufferUtil.toByteBuffers(auths.getAuthorizations()), tableId, DurabilityImpl.toThrift(durability)); - + synchronized (cachedSessionIDs) { SessionID sid = new SessionID(); sid.reserved = true; @@ -510,17 +510,17 @@ class ConditionalWriterImpl implements ConditionalWriter { sid.location = location; if (cachedSessionIDs.put(location, sid) != null) throw new IllegalStateException(); - + return sid; } - + } private void invalidateSessionID(HostAndPort location) { synchronized (cachedSessionIDs) { cachedSessionIDs.remove(location); } - + } private void unreserveSessionID(HostAndPort location) { @@ -534,7 +534,7 @@ class ConditionalWriterImpl implements ConditionalWriter { } } } - + List getActiveSessions() { ArrayList activeSessions = new ArrayList(); for (SessionID sid : cachedSessionIDs.values()) @@ -554,23 +554,23 @@ class ConditionalWriterImpl implements ConditionalWriter { private void sendToServer(HostAndPort location, TabletServerMutations mutations) { TabletClientService.Iface client = null; - + TInfo tinfo = Tracer.traceInfo(); - + Map cmidToCm = new HashMap(); MutableLong cmid = new MutableLong(0); - + SessionID sessionId = null; - - try { + + try { Map> tmutations = new HashMap>(); - + CompressedIterators compressedIters = new CompressedIterators(); convertMutations(mutations, cmidToCm, cmid, tmutations, compressedIters); - - //getClient() call must come after converMutations in case it throws a TException + + // getClient() call must come after converMutations in case it throws a TException client = getClient(location); - + List tresults = null; while (tresults == null) { try { @@ -581,11 +581,11 @@ class ConditionalWriterImpl implements ConditionalWriter { invalidateSessionID(location); } } - + HashSet extentsToInvalidate = new HashSet(); - + ArrayList ignored = new ArrayList(); - + for (TCMResult tcmResult : tresults) { if (tcmResult.status == TCMStatus.IGNORED) { CMK cmk = cmidToCm.get(tcmResult.cmid); @@ -596,13 +596,13 @@ class ConditionalWriterImpl implements ConditionalWriter { qcm.queueResult(new Result(fromThrift(tcmResult.status), qcm, location.toString())); } } - + for (KeyExtent ke : extentsToInvalidate) { locator.invalidateCache(ke); } - + queueRetry(ignored, location); - + } catch (ThriftSecurityException tse) { AccumuloSecurityException ase = new AccumuloSecurityException(context.getCredentials().getPrincipal(), tse.getCode(), Tables.getPrintableTableInfoFromId( context.getInstance(), tableId), tse); @@ -618,7 +618,7 @@ class ConditionalWriterImpl implements ConditionalWriter { } catch (Exception e) { queueException(location, cmidToCm, e); } finally { - if(sessionId != null) + if (sessionId != null) unreserveSessionID(location); ThriftUtil.returnClient((TServiceClient) client); } @@ -649,22 +649,22 @@ class ConditionalWriterImpl implements ConditionalWriter { } } } - + /* * The purpose of this code is to ensure that a conditional mutation will not execute when its status is unknown. This allows a user to read the row when the * status is unknown and not have to worry about the tserver applying the mutation after the scan. - * + * * If a conditional mutation is taking a long time to process, then this method will wait for it to finish... unless this exceeds timeout. */ private void invalidateSession(SessionID sessionId, HostAndPort location) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { long sleepTime = 50; - + long startTime = System.currentTimeMillis(); - + Instance instance = context.getInstance(); LockID lid = new LockID(ZooUtil.getRoot(instance) + Constants.ZTSERVERS, sessionId.lockId); - + ZooCacheFactory zcf = new ZooCacheFactory(); while (true) { if (!ZooLock.isLockHeld(zcf.getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()), lid)) { @@ -673,33 +673,33 @@ class ConditionalWriterImpl implements ConditionalWriter { locator.invalidateCache(context.getInstance(), location.toString()); return; } - + try { // if the mutation is currently processing, this method will block until its done or times out invalidateSession(sessionId.sessionID, location); - + return; } catch (TApplicationException tae) { throw new AccumuloServerException(location.toString(), tae); } catch (TException e) { locator.invalidateCache(context.getInstance(), location.toString()); } - + if ((System.currentTimeMillis() - startTime) + sleepTime > timeout) throw new TimedOutException(Collections.singleton(location.toString())); UtilWaitThread.sleep(sleepTime); sleepTime = Math.min(2 * sleepTime, MAX_SLEEP); - + } - + } private void invalidateSession(long sessionId, HostAndPort location) throws TException { TabletClientService.Iface client = null; - + TInfo tinfo = Tracer.traceInfo(); - + try { client = getClient(location); client.invalidateConditionalUpdate(tinfo, sessionId); @@ -707,7 +707,7 @@ class ConditionalWriterImpl implements ConditionalWriter { ThriftUtil.returnClient((TServiceClient) client); } } - + private Status fromThrift(TCMStatus status) { switch (status) { case ACCEPTED: @@ -720,63 +720,63 @@ class ConditionalWriterImpl implements ConditionalWriter { throw new IllegalArgumentException(status.toString()); } } - + private void convertMutations(TabletServerMutations mutations, Map cmidToCm, MutableLong cmid, Map> tmutations, CompressedIterators compressedIters) { - + for (Entry> entry : mutations.getMutations().entrySet()) { TKeyExtent tke = entry.getKey().toThrift(); ArrayList tcondMutaions = new ArrayList(); - + List condMutations = entry.getValue(); - + for (QCMutation cm : condMutations) { TMutation tm = cm.toThrift(); - + List conditions = convertConditions(cm, compressedIters); - + cmidToCm.put(cmid.longValue(), new CMK(entry.getKey(), cm)); TConditionalMutation tcm = new TConditionalMutation(conditions, tm, cmid.longValue()); cmid.increment(); tcondMutaions.add(tcm); } - + tmutations.put(tke, tcondMutaions); } } - + private List convertConditions(ConditionalMutation cm, CompressedIterators compressedIters) { List conditions = new ArrayList(cm.getConditions().size()); - + for (Condition cond : cm.getConditions()) { long ts = 0; boolean hasTs = false; - + if (cond.getTimestamp() != null) { ts = cond.getTimestamp(); hasTs = true; } - + ByteBuffer iters = compressedIters.compress(cond.getIterators()); - + TCondition tc = new TCondition(ByteBufferUtil.toByteBuffers(cond.getFamily()), ByteBufferUtil.toByteBuffers(cond.getQualifier()), ByteBufferUtil.toByteBuffers(cond.getVisibility()), ts, hasTs, ByteBufferUtil.toByteBuffers(cond.getValue()), iters); - + conditions.add(tc); } - + return conditions; } - + private boolean isVisible(ByteSequence cv) { Text testVis = new Text(cv.toArray()); if (testVis.getLength() == 0) return true; - + Boolean b = cache.get(testVis); if (b != null) return b; - + try { Boolean bb = ve.evaluate(new ColumnVisibility(testVis)); cache.put(new Text(testVis), bb); @@ -787,16 +787,16 @@ class ConditionalWriterImpl implements ConditionalWriter { return false; } } - + @Override public Result write(ConditionalMutation mutation) { return write(Collections.singleton(mutation).iterator()).next(); } - + @Override public void close() { threadPool.shutdownNow(); cleanupThreadPool.execute(new CleanupTask(getActiveSessions())); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/DurabilityImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/DurabilityImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/DurabilityImpl.java index b2a0a98..bc82629 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/DurabilityImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/DurabilityImpl.java @@ -64,5 +64,5 @@ public class DurabilityImpl { } return durability; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java index b95d8b2..7b3ee12 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java @@ -219,6 +219,6 @@ public class InstanceOperationsImpl implements InstanceOperations { // should never happen throw new RuntimeException("Unexpected exception thrown", ex); } - + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/IsolationException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/IsolationException.java b/core/src/main/java/org/apache/accumulo/core/client/impl/IsolationException.java index f09e1cb..c544ca3 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/IsolationException.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/IsolationException.java @@ -17,7 +17,7 @@ package org.apache.accumulo.core.client.impl; public class IsolationException extends RuntimeException { - + private static final long serialVersionUID = 1L; - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java index a9ad8a1..fcbf9f9 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java @@ -86,8 +86,8 @@ public class MasterClient { } } - public static T execute(ClientContext context, ClientExecReturn exec) throws AccumuloException, - AccumuloSecurityException, TableNotFoundException { + public static T execute(ClientContext context, ClientExecReturn exec) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { MasterClientService.Client client = null; while (true) { try { @@ -118,8 +118,8 @@ public class MasterClient { } } - public static void executeGeneric(ClientContext context, ClientExec exec) throws AccumuloException, - AccumuloSecurityException, TableNotFoundException { + public static void executeGeneric(ClientContext context, ClientExec exec) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { MasterClientService.Client client = null; while (true) { try { @@ -151,13 +151,13 @@ public class MasterClient { } } - public static void executeTable(ClientContext context, ClientExec exec) throws AccumuloException, - AccumuloSecurityException, TableNotFoundException { + public static void executeTable(ClientContext context, ClientExec exec) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { executeGeneric(context, exec); } - public static void executeNamespace(ClientContext context, ClientExec exec) throws AccumuloException, - AccumuloSecurityException, NamespaceNotFoundException { + public static void executeNamespace(ClientContext context, ClientExec exec) throws AccumuloException, AccumuloSecurityException, + NamespaceNotFoundException { try { executeGeneric(context, exec); } catch (TableNotFoundException e) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java index ace8701..82aa714 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java @@ -155,7 +155,7 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { /** * Returns the table ID for the given table name. - * + * * @param tableName * The name of the table which to find the ID for * @return The table ID, or null if the table name doesn't exist @@ -203,7 +203,7 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { // cacheResetCount could change after this point in time, but I think thats ok because just want to ensure this methods sees changes // made before it was called. - + long internalResetCount = cacheLastState.get(); if (cacheResetCount > internalResetCount) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java index 2552682..d679ffa 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java @@ -17,6 +17,7 @@ package org.apache.accumulo.core.client.impl; import static com.google.common.base.Preconditions.checkArgument; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -71,35 +72,35 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; class OfflineIterator implements Iterator> { - + static class OfflineIteratorEnvironment implements IteratorEnvironment { @Override public SortedKeyValueIterator reserveMapFileReader(String mapFileName) throws IOException { throw new NotImplementedException(); } - + @Override public AccumuloConfiguration getConfig() { return AccumuloConfiguration.getDefaultConfiguration(); } - + @Override public IteratorScope getIteratorScope() { return IteratorScope.scan; } - + @Override public boolean isFullMajorCompaction() { return false; } - + private ArrayList> topLevelIterators = new ArrayList>(); - + @Override public void registerSideChannel(SortedKeyValueIterator iter) { topLevelIterators.add(iter); } - + SortedKeyValueIterator getTopLevelIterator(SortedKeyValueIterator iter) { if (topLevelIterators.isEmpty()) return iter; @@ -108,7 +109,7 @@ class OfflineIterator implements Iterator> { return new MultiIterator(allIters, false); } } - + private SortedKeyValueIterator iter; private Range range; private KeyExtent currentExtent; @@ -124,83 +125,83 @@ class OfflineIterator implements Iterator> { this.options = new ScannerOptions(options); this.instance = instance; this.range = range; - + if (this.options.fetchedColumns.size() > 0) { this.range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last()); } - + this.tableId = table.toString(); this.authorizations = authorizations; this.readers = new ArrayList>(); - + try { conn = instance.getConnector(credentials.getPrincipal(), credentials.getToken()); config = new ConfigurationCopy(conn.instanceOperations().getSiteConfiguration()); nextTablet(); - + while (iter != null && !iter.hasTop()) nextTablet(); - + } catch (Exception e) { throw new RuntimeException(e); } } - + @Override public boolean hasNext() { return iter != null && iter.hasTop(); } - + @Override public Entry next() { try { byte[] v = iter.getTopValue().get(); // copy just like tablet server does, do this before calling next KeyValue ret = new KeyValue(new Key(iter.getTopKey()), Arrays.copyOf(v, v.length)); - + iter.next(); - + while (iter != null && !iter.hasTop()) nextTablet(); - + return ret; } catch (Exception e) { throw new RuntimeException(e); } } - + private void nextTablet() throws TableNotFoundException, AccumuloException, IOException { - + Range nextRange = null; - + if (currentExtent == null) { Text startRow; - + if (range.getStartKey() != null) startRow = range.getStartKey().getRow(); else startRow = new Text(); - + nextRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false); } else { - + if (currentExtent.getEndRow() == null) { iter = null; return; } - + if (range.afterEndKey(new Key(currentExtent.getEndRow()).followingKey(PartialKey.ROW))) { iter = null; return; } - + nextRange = new Range(currentExtent.getMetadataEntry(), false, null, false); } - + List relFiles = new ArrayList(); - + Pair eloc = getTabletFiles(nextRange, relFiles); - + while (eloc.getSecond() != null) { if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { Tables.clearCache(instance); @@ -208,18 +209,18 @@ class OfflineIterator implements Iterator> { throw new AccumuloException("Table is online " + tableId + " cannot scan tablet in offline mode " + eloc.getFirst()); } } - + UtilWaitThread.sleep(250); - + eloc = getTabletFiles(nextRange, relFiles); } - + KeyExtent extent = eloc.getFirst(); - + if (!extent.getTableId().toString().equals(tableId)) { throw new AccumuloException(" did not find tablets for table " + tableId + " " + extent); } - + if (currentExtent != null && !extent.isPreviousExtent(currentExtent)) throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent); @@ -241,108 +242,108 @@ class OfflineIterator implements Iterator> { } } } - + iter = createIterator(extent, absFiles); iter.seek(range, LocalityGroupUtil.families(options.fetchedColumns), options.fetchedColumns.size() == 0 ? false : true); currentExtent = extent; - + } - + private Pair getTabletFiles(Range nextRange, List relFiles) throws TableNotFoundException { Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); scanner.setBatchSize(100); scanner.setRange(nextRange); - + RowIterator rowIter = new RowIterator(scanner); Iterator> row = rowIter.next(); - + KeyExtent extent = null; String location = null; - + while (row.hasNext()) { Entry entry = row.next(); Key key = entry.getKey(); - + if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { relFiles.add(key.getColumnQualifier().toString()); } - + if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME) || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) { location = entry.getValue().toString(); } - + if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { extent = new KeyExtent(key.getRow(), entry.getValue()); } - + } return new Pair(extent, location); } - + private SortedKeyValueIterator createIterator(KeyExtent extent, List absFiles) throws TableNotFoundException, AccumuloException, IOException { - + // TODO share code w/ tablet - ACCUMULO-1303 AccumuloConfiguration acuTableConf = AccumuloConfiguration.getTableConfiguration(conn, tableId); - + Configuration conf = CachedConfiguration.getInstance(); for (SortedKeyValueIterator reader : readers) { ((FileSKVIterator) reader).close(); } - + readers.clear(); - + // TODO need to close files - ACCUMULO-1303 for (String file : absFiles) { FileSystem fs = VolumeConfiguration.getVolume(file, conf, config).getFileSystem(); FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, acuTableConf, null, null); readers.add(reader); } - + MultiIterator multiIter = new MultiIterator(readers, extent); - + OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment(); - + DeletingIterator delIter = new DeletingIterator(multiIter, false); - + ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter); - + ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, new HashSet(options.fetchedColumns)); - + byte[] defaultSecurityLabel; - + ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY)); defaultSecurityLabel = cv.getExpression(); - + VisibilityFilter visFilter = new VisibilityFilter(colFilter, authorizations, defaultSecurityLabel); - + return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.serverSideIteratorList, options.serverSideIteratorOptions, iterEnv, false)); } - + @Override public void remove() { throw new UnsupportedOperationException(); } - + } /** - * + * */ public class OfflineScanner extends ScannerOptions implements Scanner { - + private int batchSize; private int timeOut; private Range range; - + private Instance instance; private Credentials credentials; private Authorizations authorizations; private Text tableId; - + public OfflineScanner(Instance instance, Credentials credentials, String tableId, Authorizations authorizations) { checkArgument(instance != null, "instance is null"); checkArgument(credentials != null, "credentials is null"); @@ -352,55 +353,55 @@ public class OfflineScanner extends ScannerOptions implements Scanner { this.credentials = credentials; this.tableId = new Text(tableId); this.range = new Range((Key) null, (Key) null); - + this.authorizations = authorizations; - + this.batchSize = Constants.SCAN_BATCH_SIZE; this.timeOut = Integer.MAX_VALUE; } - + @Deprecated @Override public void setTimeOut(int timeOut) { this.timeOut = timeOut; } - + @Deprecated @Override public int getTimeOut() { return timeOut; } - + @Override public void setRange(Range range) { this.range = range; } - + @Override public Range getRange() { return range; } - + @Override public void setBatchSize(int size) { this.batchSize = size; } - + @Override public int getBatchSize() { return batchSize; } - + @Override public void enableIsolation() { - + } - + @Override public void disableIsolation() { - + } - + @Override public Iterator> iterator() { return new OfflineIterator(this, instance, credentials, authorizations, tableId, range); @@ -415,5 +416,5 @@ public class OfflineScanner extends ScannerOptions implements Scanner { public void setReadaheadThreshold(long batches) { throw new UnsupportedOperationException(); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java index 95b71ee..a449389 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java @@ -115,7 +115,7 @@ public class ReplicationClient { /** * Attempt a single time to create a ReplicationServicer client to the given host - * + * * @param context * The client session for the peer replicant * @param server @@ -152,8 +152,8 @@ public class ReplicationClient { } } - public static T executeCoordinatorWithReturn(ClientContext context, ClientExecReturn exec) - throws AccumuloException, AccumuloSecurityException { + public static T executeCoordinatorWithReturn(ClientContext context, ClientExecReturn exec) throws AccumuloException, + AccumuloSecurityException { ReplicationCoordinator.Client client = null; for (int i = 0; i < 10; i++) { try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java index 2a18765..49d5c9e 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java @@ -41,10 +41,10 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; public class RootTabletLocator extends TabletLocator { - + private final TabletServerLockChecker lockChecker; private final ZooCacheFactory zcf; - + RootTabletLocator(TabletServerLockChecker lockChecker) { this(lockChecker, new ZooCacheFactory()); } @@ -53,7 +53,7 @@ public class RootTabletLocator extends TabletLocator { this.lockChecker = lockChecker; this.zcf = zcf; } - + @Override public void binMutations(ClientContext context, List mutations, Map> binnedMutations, List failures) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { @@ -68,11 +68,11 @@ public class RootTabletLocator extends TabletLocator { failures.addAll(mutations); } } - + @Override public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - + TabletLocation rootTabletLocation = getRootTabletLocation(context); if (rootTabletLocation != null) { for (Range range : ranges) { @@ -82,38 +82,38 @@ public class RootTabletLocator extends TabletLocator { } return ranges; } - + @Override public void invalidateCache(KeyExtent failedExtent) {} - + @Override public void invalidateCache(Collection keySet) {} - + @Override public void invalidateCache(Instance instance, String server) { ZooCache zooCache = zcf.getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); String root = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; zooCache.clear(root + "/" + server); } - + @Override public void invalidateCache() {} - + protected TabletLocation getRootTabletLocation(ClientContext context) { Instance instance = context.getInstance(); String zRootLocPath = ZooUtil.getRoot(instance) + RootTable.ZROOT_TABLET_LOCATION; ZooCache zooCache = zcf.getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); - + OpTimer opTimer = new OpTimer(Logger.getLogger(this.getClass()), Level.TRACE).start("Looking up root tablet location in zookeeper."); byte[] loc = zooCache.get(zRootLocPath); opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%"); - + if (loc == null) { return null; } - + String[] tokens = new String(loc).split("\\|"); - + if (lockChecker.isLockHeld(tokens[0], tokens[1])) return new TabletLocation(RootTable.EXTENT, tokens[0], tokens[1]); else @@ -129,8 +129,8 @@ public class RootTabletLocator extends TabletLocator { UtilWaitThread.sleep(500); location = getRootTabletLocation(context); } - + return location; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java index 666a8af..3f73f04 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java @@ -32,29 +32,29 @@ import org.apache.hadoop.io.Text; /** * provides scanner functionality - * + * * "Clients can iterate over multiple column families, and there are several mechanisms for limiting the rows, columns, and timestamps traversed by a scan. For * example, we could restrict [a] scan ... to only produce anchors whose columns match [a] regular expression ..., or to only produce anchors whose timestamps * fall within ten days of the current time." - * + * */ public class ScannerImpl extends ScannerOptions implements Scanner { - + // keep a list of columns over which to scan // keep track of the last thing read // hopefully, we can track all the state in the scanner on the client // and just query for the next highest row from the tablet server - + private final ClientContext context; private Authorizations authorizations; private Text table; - + private int size; - + private Range range; private boolean isolated = false; private long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD; - + public ScannerImpl(ClientContext context, String table, Authorizations authorizations) { checkArgument(context != null, "context is null"); checkArgument(table != null, "table is null"); @@ -63,21 +63,21 @@ public class ScannerImpl extends ScannerOptions implements Scanner { this.table = new Text(table); this.range = new Range((Key) null, (Key) null); this.authorizations = authorizations; - + this.size = Constants.SCAN_BATCH_SIZE; } - + @Override public synchronized void setRange(Range range) { checkArgument(range != null, "range is null"); this.range = range; } - + @Override public synchronized Range getRange() { return range; } - + @Override public synchronized void setBatchSize(int size) { if (size > 0) @@ -85,12 +85,12 @@ public class ScannerImpl extends ScannerOptions implements Scanner { else throw new IllegalArgumentException("size must be greater than zero"); } - + @Override public synchronized int getBatchSize() { return size; } - + /** * Returns an iterator over an accumulo table. This iterator uses the options that are currently set on the scanner for its lifetime. So setting options on a * Scanner object will have no effect on existing iterators. @@ -99,17 +99,17 @@ public class ScannerImpl extends ScannerOptions implements Scanner { public synchronized Iterator> iterator() { return new ScannerIterator(context, table, authorizations, range, size, getTimeOut(), this, isolated, readaheadThreshold); } - + @Override public synchronized void enableIsolation() { this.isolated = true; } - + @Override public synchronized void disableIsolation() { this.isolated = false; } - + @Deprecated @Override public void setTimeOut(int timeOut) { @@ -118,7 +118,7 @@ public class ScannerImpl extends ScannerOptions implements Scanner { else setTimeout(timeOut, TimeUnit.SECONDS); } - + @Deprecated @Override public int getTimeOut() { @@ -127,16 +127,16 @@ public class ScannerImpl extends ScannerOptions implements Scanner { return Integer.MAX_VALUE; return (int) timeout; } - + @Override public synchronized void setReadaheadThreshold(long batches) { if (0 > batches) { throw new IllegalArgumentException("Number of batches before read-ahead must be non-negative"); } - + readaheadThreshold = batches; } - + @Override public synchronized long getReadaheadThreshold() { return readaheadThreshold; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java index 1e0ac99..276e1d6 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java @@ -44,49 +44,49 @@ import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; public class ScannerIterator implements Iterator> { - + private static final Logger log = Logger.getLogger(ScannerIterator.class); - + // scanner options private Text tableId; private int timeOut; - + // scanner state private Iterator iter; private ScanState scanState; - + private ScannerOptions options; - + private ArrayBlockingQueue synchQ; - + private boolean finished = false; - + private boolean readaheadInProgress = false; private long batchCount = 0; private long readaheadThreshold; - + private static final List EMPTY_LIST = Collections.emptyList(); - + private static ThreadPoolExecutor readaheadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3l, TimeUnit.SECONDS, new SynchronousQueue(), new NamingThreadFactory("Accumulo scanner read ahead thread")); - + private class Reader implements Runnable { - + @Override public void run() { - + try { while (true) { List currentBatch = ThriftScanner.scan(scanState.context, scanState, timeOut); - + if (currentBatch == null) { synchQ.add(EMPTY_LIST); return; } - + if (currentBatch.size() == 0) continue; - + synchQ.add(currentBatch); return; } @@ -116,63 +116,62 @@ public class ScannerIterator implements Iterator> { synchQ.add(e); } } - + } - - ScannerIterator(ClientContext context, Text table, Authorizations authorizations, Range range, int size, int timeOut, ScannerOptions options, - boolean isolated) { + + ScannerIterator(ClientContext context, Text table, Authorizations authorizations, Range range, int size, int timeOut, ScannerOptions options, boolean isolated) { this(context, table, authorizations, range, size, timeOut, options, isolated, Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD); } - + ScannerIterator(ClientContext context, Text table, Authorizations authorizations, Range range, int size, int timeOut, ScannerOptions options, boolean isolated, long readaheadThreshold) { this.tableId = new Text(table); this.timeOut = timeOut; this.readaheadThreshold = readaheadThreshold; - + this.options = new ScannerOptions(options); - + synchQ = new ArrayBlockingQueue(1); - + if (this.options.fetchedColumns.size() > 0) { range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last()); } - + scanState = new ScanState(context, tableId, authorizations, new Range(range), options.fetchedColumns, size, options.serverSideIteratorList, options.serverSideIteratorOptions, isolated, readaheadThreshold); - + // If we want to start readahead immediately, don't wait for hasNext to be called if (0l == readaheadThreshold) { initiateReadAhead(); } iter = null; } - + private void initiateReadAhead() { readaheadInProgress = true; readaheadPool.execute(new Reader()); } - + @Override @SuppressWarnings("unchecked") public boolean hasNext() { if (finished) return false; - + if (iter != null && iter.hasNext()) { return true; } - + // this is done in order to find see if there is another batch to get - + try { if (!readaheadInProgress) { // no read ahead run, fetch the next batch right now new Reader().run(); } - + Object obj = synchQ.take(); - + if (obj instanceof Exception) { finished = true; if (obj instanceof RuntimeException) @@ -180,9 +179,9 @@ public class ScannerIterator implements Iterator> { else throw new RuntimeException((Exception) obj); } - + List currentBatch = (List) obj; - + if (currentBatch.size() == 0) { currentBatch = null; finished = true; @@ -190,31 +189,31 @@ public class ScannerIterator implements Iterator> { } iter = currentBatch.iterator(); batchCount++; - + if (batchCount > readaheadThreshold) { // start a thread to read the next batch initiateReadAhead(); } - + } catch (InterruptedException e1) { throw new RuntimeException(e1); } - + return true; } - + @Override public Entry next() { if (hasNext()) return iter.next(); throw new NoSuchElementException(); } - + // just here to satisfy the interface // could make this actually delete things from the database @Override public void remove() { throw new UnsupportedOperationException("remove is not supported in Scanner"); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java index 9726266..d6c50c7 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java @@ -17,6 +17,7 @@ package org.apache.accumulo.core.client.impl; import static com.google.common.base.Preconditions.checkArgument; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -39,70 +40,70 @@ import org.apache.accumulo.core.util.TextUtil; import org.apache.hadoop.io.Text; public class ScannerOptions implements ScannerBase { - + protected List serverSideIteratorList = Collections.emptyList(); protected Map> serverSideIteratorOptions = Collections.emptyMap(); - + protected SortedSet fetchedColumns = new TreeSet(); - + protected long timeOut = Long.MAX_VALUE; - + private String regexIterName = null; - + protected ScannerOptions() {} - + public ScannerOptions(ScannerOptions so) { setOptions(this, so); } - + /** * Adds server-side scan iterators. - * + * */ @Override public synchronized void addScanIterator(IteratorSetting si) { checkArgument(si != null, "si is null"); if (serverSideIteratorList.size() == 0) serverSideIteratorList = new ArrayList(); - + for (IterInfo ii : serverSideIteratorList) { if (ii.iterName.equals(si.getName())) throw new IllegalArgumentException("Iterator name is already in use " + si.getName()); if (ii.getPriority() == si.getPriority()) throw new IllegalArgumentException("Iterator priority is already in use " + si.getPriority()); } - + serverSideIteratorList.add(new IterInfo(si.getPriority(), si.getIteratorClass(), si.getName())); - + if (serverSideIteratorOptions.size() == 0) serverSideIteratorOptions = new HashMap>(); - + Map opts = serverSideIteratorOptions.get(si.getName()); - + if (opts == null) { opts = new HashMap(); serverSideIteratorOptions.put(si.getName(), opts); } opts.putAll(si.getOptions()); } - + @Override public synchronized void removeScanIterator(String iteratorName) { checkArgument(iteratorName != null, "iteratorName is null"); // if no iterators are set, we don't have it, so it is already removed if (serverSideIteratorList.size() == 0) return; - + for (IterInfo ii : serverSideIteratorList) { if (ii.iterName.equals(iteratorName)) { serverSideIteratorList.remove(ii); break; } } - + serverSideIteratorOptions.remove(iteratorName); } - + /** * Override any existing options on the given named iterator */ @@ -113,29 +114,29 @@ public class ScannerOptions implements ScannerBase { checkArgument(value != null, "value is null"); if (serverSideIteratorOptions.size() == 0) serverSideIteratorOptions = new HashMap>(); - + Map opts = serverSideIteratorOptions.get(iteratorName); - + if (opts == null) { opts = new HashMap(); serverSideIteratorOptions.put(iteratorName, opts); } opts.put(key, value); } - + /** * Limit a scan to the specified column family. This can limit which locality groups are read on the server side. - * + * * To fetch multiple column families call this function multiple times. */ - + @Override public synchronized void fetchColumnFamily(Text col) { checkArgument(col != null, "col is null"); Column c = new Column(TextUtil.getBytes(col), null, null); fetchedColumns.add(c); } - + @Override public synchronized void fetchColumn(Text colFam, Text colQual) { checkArgument(colFam != null, "colFam is null"); @@ -143,21 +144,21 @@ public class ScannerOptions implements ScannerBase { Column c = new Column(TextUtil.getBytes(colFam), TextUtil.getBytes(colQual), null); fetchedColumns.add(c); } - + public synchronized void fetchColumn(Column column) { checkArgument(column != null, "column is null"); fetchedColumns.add(column); } - + @Override public synchronized void clearColumns() { fetchedColumns.clear(); } - + public synchronized SortedSet getFetchedColumns() { return fetchedColumns; } - + /** * Clears scan iterators prior to returning a scanner to the pool. */ @@ -167,14 +168,14 @@ public class ScannerOptions implements ScannerBase { serverSideIteratorOptions = Collections.emptyMap(); regexIterName = null; } - + protected static void setOptions(ScannerOptions dst, ScannerOptions src) { synchronized (dst) { synchronized (src) { dst.regexIterName = src.regexIterName; dst.fetchedColumns = new TreeSet(src.fetchedColumns); dst.serverSideIteratorList = new ArrayList(src.serverSideIteratorList); - + dst.serverSideIteratorOptions = new HashMap>(); Set>> es = src.serverSideIteratorOptions.entrySet(); for (Entry> entry : es) @@ -182,29 +183,29 @@ public class ScannerOptions implements ScannerBase { } } } - + @Override public Iterator> iterator() { throw new UnsupportedOperationException(); } - + @Override public void setTimeout(long timeout, TimeUnit timeUnit) { if (timeOut < 0) { throw new IllegalArgumentException("TimeOut must be positive : " + timeOut); } - + if (timeout == 0) this.timeOut = Long.MAX_VALUE; else this.timeOut = timeUnit.toMillis(timeout); } - + @Override public long getTimeout(TimeUnit timeunit) { return timeunit.convert(timeOut, TimeUnit.MILLISECONDS); } - + @Override public void close() { // Nothing needs to be closed http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java index 84124ca..bbf2d3f 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java @@ -42,7 +42,7 @@ import org.apache.thrift.transport.TTransportException; public class ServerClient { private static final Logger log = Logger.getLogger(ServerClient.class); - + public static T execute(ClientContext context, ClientExecReturn exec) throws AccumuloException, AccumuloSecurityException { try { return executeRaw(context, exec); @@ -54,7 +54,7 @@ public class ServerClient { throw new AccumuloException(e); } } - + public static void execute(ClientContext context, ClientExec exec) throws AccumuloException, AccumuloSecurityException { try { executeRaw(context, exec); @@ -66,7 +66,7 @@ public class ServerClient { throw new AccumuloException(e); } } - + public static T executeRaw(ClientContext context, ClientExecReturn exec) throws Exception { while (true) { ClientService.Client client = null; @@ -85,7 +85,7 @@ public class ServerClient { } } } - + public static void executeRaw(ClientContext context, ClientExec exec) throws Exception { while (true) { ClientService.Client client = null; @@ -105,23 +105,23 @@ public class ServerClient { } } } - + static volatile boolean warnedAboutTServersBeingDown = false; public static Pair getConnection(ClientContext context) throws TTransportException { return getConnection(context, true); } - + public static Pair getConnection(ClientContext context, boolean preferCachedConnections) throws TTransportException { return getConnection(context, preferCachedConnections, context.getClientTimeoutInMillis()); } - + public static Pair getConnection(ClientContext context, boolean preferCachedConnections, long rpcTimeout) throws TTransportException { checkArgument(context != null, "context is null"); // create list of servers ArrayList servers = new ArrayList(); - + // add tservers Instance instance = context.getInstance(); ZooCache zc = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); @@ -134,7 +134,7 @@ public class ServerClient { servers.add(new ThriftTransportKey(new ServerServices(strData).getAddress(Service.TSERV_CLIENT), rpcTimeout, context)); } } - + boolean opened = false; try { Pair pair = ThriftTransportPool.getInstance().getAnyTransport(servers, preferCachedConnections); @@ -155,7 +155,7 @@ public class ServerClient { } } } - + public static void close(ClientService.Client client) { if (client != null && client.getInputProtocol() != null && client.getInputProtocol().getTransport() != null) { ThriftTransportPool.getInstance().returnTransport(client.getInputProtocol().getTransport()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsHelper.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsHelper.java index cea934d..4068dee 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsHelper.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsHelper.java @@ -17,6 +17,7 @@ package org.apache.accumulo.core.client.impl; import static com.google.common.base.Preconditions.checkArgument; + import java.util.EnumSet; import java.util.HashMap; import java.util.Map; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java index dca40da..e32a9e1 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java @@ -220,15 +220,14 @@ public class TableOperationsImpl extends TableOperationsHelper { * @param ntc * specifies the new table's configuration. It determines whether the versioning iterator is enabled or disabled, logical or real-time based time * recording for entries in the table - * + * */ @Override public void create(String tableName, NewTableConfiguration ntc) throws AccumuloException, AccumuloSecurityException, TableExistsException { checkArgument(tableName != null, "tableName is null"); checkArgument(ntc != null, "ntc is null"); - List args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)), - ByteBuffer.wrap(ntc.getTimeType().name().getBytes(UTF_8))); + List args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)), ByteBuffer.wrap(ntc.getTimeType().name().getBytes(UTF_8))); Map opts = ntc.getProperties(); @@ -816,9 +815,9 @@ public class TableOperationsImpl extends TableOperationsHelper { if (config.getFlush()) _flush(tableId, start, end, true); - List args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes(UTF_8)), start == null ? EMPTY : TextUtil.getByteBuffer(start), - end == null ? EMPTY : TextUtil.getByteBuffer(end), ByteBuffer.wrap(IteratorUtil.encodeIteratorSettings(config.getIterators())), - ByteBuffer.wrap(CompactionStrategyConfigUtil.encode(config.getCompactionStrategy()))); + List args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes(UTF_8)), start == null ? EMPTY : TextUtil.getByteBuffer(start), end == null ? EMPTY + : TextUtil.getByteBuffer(end), ByteBuffer.wrap(IteratorUtil.encodeIteratorSettings(config.getIterators())), ByteBuffer + .wrap(CompactionStrategyConfigUtil.encode(config.getCompactionStrategy()))); Map opts = new HashMap(); try { @@ -1474,11 +1473,11 @@ public class TableOperationsImpl extends TableOperationsHelper { throw new AccumuloSecurityException(e.getUser(), e.getCode()); } catch (TTransportException e) { // some sort of communication error occurred, retry - if (pair == null) { + if (pair == null) { log.debug("Disk usage request failed. Pair is null. Retrying request...", e); - } else { + } else { log.debug("Disk usage request failed " + pair.getFirst() + ", retrying ... ", e); - } + } UtilWaitThread.sleep(100); } catch (TException e) { // may be a TApplicationException which indicates error on the server side http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java index c2b8001..20b1639 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java @@ -43,7 +43,7 @@ public class Tables { private static final Logger log = Logger.getLogger(Tables.class); public static final String VALID_NAME_REGEX = "^(\\w+\\.)?(\\w+)$"; - + private static final SecurityPermission TABLES_PERMISSION = new SecurityPermission("tablesPermission"); private static final AtomicLong cacheResetCount = new AtomicLong(0); @@ -222,7 +222,7 @@ public class Tables { /** * Returns the namespace id for a given table ID. - * + * * @param instance * The Accumulo Instance * @param tableId http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java index 3cb0fde..782a599 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java @@ -39,66 +39,66 @@ import org.apache.accumulo.core.metadata.RootTable; import org.apache.hadoop.io.Text; public abstract class TabletLocator { - + public abstract TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException; - + public abstract void binMutations(ClientContext context, List mutations, Map> binnedMutations, List failures) throws AccumuloException, AccumuloSecurityException, TableNotFoundException; - + public abstract List binRanges(ClientContext context, List ranges, Map>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException; - + public abstract void invalidateCache(KeyExtent failedExtent); - + public abstract void invalidateCache(Collection keySet); - + /** * Invalidate entire cache */ public abstract void invalidateCache(); - + /** * Invalidate all metadata entries that point to server */ public abstract void invalidateCache(Instance instance, String server); - + private static class LocatorKey { String instanceId; Text tableName; - + LocatorKey(String instanceId, Text table) { this.instanceId = instanceId; this.tableName = table; } - + @Override public int hashCode() { return instanceId.hashCode() + tableName.hashCode(); } - + @Override public boolean equals(Object o) { if (o instanceof LocatorKey) return equals((LocatorKey) o); return false; } - + public boolean equals(LocatorKey lk) { return instanceId.equals(lk.instanceId) && tableName.equals(lk.tableName); } - + } - + private static HashMap locators = new HashMap(); - + public static synchronized TabletLocator getLocator(ClientContext context, Text tableId) { Instance instance = context.getInstance(); LocatorKey key = new LocatorKey(instance.getInstanceID(), tableId); TabletLocator tl = locators.get(key); if (tl == null) { MetadataLocationObtainer mlo = new MetadataLocationObtainer(); - + if (tableId.toString().equals(RootTable.ID)) { tl = new RootTabletLocator(new ZookeeperLockChecker(instance)); } else if (tableId.toString().equals(MetadataTable.ID)) { @@ -108,32 +108,32 @@ public abstract class TabletLocator { } locators.put(key, tl); } - + return tl; } - + public static class TabletLocations { - + private final List locations; private final List locationless; - + public TabletLocations(List locations, List locationless) { this.locations = locations; this.locationless = locationless; } - + public List getLocations() { return locations; } - + public List getLocationless() { return locationless; } } - + public static class TabletLocation implements Comparable { private static final WeakHashMap> tabletLocs = new WeakHashMap>(); - + private static String dedupeLocation(String tabletLoc) { synchronized (tabletLocs) { WeakReference lref = tabletLocs.get(tabletLoc); @@ -143,17 +143,17 @@ public abstract class TabletLocator { return loc; } } - + tabletLoc = new String(tabletLoc); tabletLocs.put(tabletLoc, new WeakReference(tabletLoc)); return tabletLoc; } } - + public final KeyExtent tablet_extent; public final String tablet_location; public final String tablet_session; - + public TabletLocation(KeyExtent tablet_extent, String tablet_location, String session) { checkArgument(tablet_extent != null, "tablet_extent is null"); checkArgument(tablet_location != null, "tablet_location is null"); @@ -162,7 +162,7 @@ public abstract class TabletLocator { this.tablet_location = dedupeLocation(tablet_location); this.tablet_session = dedupeLocation(session); } - + @Override public boolean equals(Object o) { if (o instanceof TabletLocation) { @@ -171,17 +171,17 @@ public abstract class TabletLocator { } return false; } - + @Override public int hashCode() { throw new UnsupportedOperationException("hashcode is not implemented for class " + this.getClass().toString()); } - + @Override public String toString() { return "(" + tablet_extent + "," + tablet_location + "," + tablet_session + ")"; } - + @Override public int compareTo(TabletLocation o) { int result = tablet_extent.compareTo(o.tablet_extent); @@ -193,7 +193,7 @@ public abstract class TabletLocator { return result; } } - + public static class TabletServerMutations { private Map> mutations; private String tserverSession; @@ -209,14 +209,14 @@ public abstract class TabletLocator { mutList = new ArrayList(); mutations.put(ke, mutList); } - + mutList.add(m); } - + public Map> getMutations() { return mutations; } - + final String getSession() { return tserverSession; }