Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E2DF410A64 for ; Tue, 23 Jul 2013 16:54:46 +0000 (UTC) Received: (qmail 53508 invoked by uid 500); 23 Jul 2013 16:54:36 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 53015 invoked by uid 500); 23 Jul 2013 16:54:33 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 52201 invoked by uid 99); 23 Jul 2013 16:54:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Jul 2013 16:54:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id EBE828B217B; Tue, 23 Jul 2013 16:54:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@accumulo.apache.org Date: Tue, 23 Jul 2013 16:55:10 -0000 Message-Id: <24b07e1f3a0d41b88890aa830bbf44c2@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [45/50] git commit: ACCUMULO-1000 fixed a lot of odds and ends ACCUMULO-1000 fixed a lot of odds and ends Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5e908585 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5e908585 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5e908585 Branch: refs/heads/ACCUMULO-1000 Commit: 5e908585aa76b605840c87e2f769e2aff642b3a6 Parents: 7bb5f8f Author: Keith Turner Authored: Mon Jul 22 16:37:03 2013 -0400 Committer: Keith Turner Committed: Mon Jul 22 16:37:03 2013 -0400 ---------------------------------------------------------------------- .../core/client/impl/ConditionalWriterImpl.java | 43 +++-- .../iterators/system/ColumnQualifierFilter.java | 5 +- .../accumulo/server/tabletserver/Tablet.java | 8 +- .../server/tabletserver/TabletServer.java | 173 ++++++++++--------- 4 files changed, 123 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e908585/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java index c87c865..ed20054 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java @@ -65,6 +65,7 @@ import org.apache.accumulo.core.util.BadArgumentException; import org.apache.accumulo.core.util.ByteBufferUtil; import org.apache.accumulo.core.util.LoggingRunnable; import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.trace.instrument.Tracer; import org.apache.accumulo.trace.thrift.TInfo; import org.apache.commons.collections.map.LRUMap; @@ -81,6 +82,10 @@ class ConditionalWriterImpl implements ConditionalWriter { private static final Logger log = Logger.getLogger(ConditionalWriterImpl.class); + private static final int MAX_SLEEP = 5000; + + private static final long SESSION_CACHE_TIME = 60000; + private Authorizations auths; private VisibilityEvaluator ve; @SuppressWarnings("unchecked") @@ -167,7 +172,7 @@ class ConditionalWriterImpl implements ConditionalWriter { void resetDelay() { // TODO eventually timeout a mutation - delay = Math.min(delay * 2, 5000); + delay = Math.min(delay * 2, MAX_SLEEP); resetTime = System.currentTimeMillis(); } } @@ -231,7 +236,7 @@ class ConditionalWriterImpl implements ConditionalWriter { synchronized (serverQueue) { serverQueue.queue.add(mutations); - //never execute more that one task per server + // never execute more than one task per server if(!serverQueue.taskQueued){ threadPool.execute(new LoggingRunnable(log, new SendTask(location))); serverQueue.taskQueued = true; @@ -357,12 +362,13 @@ class ConditionalWriterImpl implements ConditionalWriter { @Override public void run() { - TabletServerMutations mutations = dequeue(location); - if (mutations != null) - sendToServer(location, mutations); - - //TODO if exception is thrown, will not reschedule - reschedule(this); + try { + TabletServerMutations mutations = dequeue(location); + if (mutations != null) + sendToServer(location, mutations); + } finally { + reschedule(this); + } } } @@ -380,6 +386,7 @@ class ConditionalWriterImpl implements ConditionalWriter { private static class SessionID { long sessionID; boolean reserved; + long lastAccessTime; } private HashMap cachedSessionIDs = new HashMap(); @@ -392,8 +399,12 @@ class ConditionalWriterImpl implements ConditionalWriter { if (sid.reserved) throw new IllegalStateException(); - sid.reserved = true; - return sid.sessionID; + if (System.currentTimeMillis() - sid.lastAccessTime > SESSION_CACHE_TIME) { + cachedSessionIDs.remove(location); + } else { + sid.reserved = true; + return sid.sessionID; + } } } @@ -423,6 +434,7 @@ class ConditionalWriterImpl implements ConditionalWriter { if(!sid.reserved) throw new IllegalStateException(); sid.reserved = false; + sid.lastAccessTime = System.currentTimeMillis(); } } @@ -470,9 +482,6 @@ class ConditionalWriterImpl implements ConditionalWriter { } } - - // TODO maybe have thrift call return bad extents - for (KeyExtent ke : extentsToInvalidate) { locator.invalidateCache(ke); } @@ -533,14 +542,14 @@ class ConditionalWriterImpl implements ConditionalWriter { */ private void invalidateSession(long sessionId, String location, TabletServerMutations mutations) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - // TODO could assume tserver will invalidate sessions after a given time period - ArrayList mutList = new ArrayList(); for (List tml : mutations.getMutations().values()) { mutList.addAll(tml); } + long sleepTime = 50; + while (true) { Map> binnedMutations = new HashMap>(); List failures = new ArrayList(); @@ -565,7 +574,9 @@ class ConditionalWriterImpl implements ConditionalWriter { locator.invalidateCache(location); } - //TODO sleep + UtilWaitThread.sleep(sleepTime); + sleepTime = Math.min(2 * sleepTime, MAX_SLEEP); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e908585/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java index 1595f5a..d5ca3b4 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.iterators.system; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.Set; import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; @@ -36,7 +37,7 @@ public class ColumnQualifierFilter extends Filter { public ColumnQualifierFilter() {} - public ColumnQualifierFilter(SortedKeyValueIterator iterator, HashSet columns) { + public ColumnQualifierFilter(SortedKeyValueIterator iterator, Set columns) { setSource(iterator); init(columns); } @@ -63,7 +64,7 @@ public class ColumnQualifierFilter extends Filter { return cfset != null && cfset.contains(key.getColumnFamilyData()); } - public void init(HashSet columns) { + public void init(Set columns) { this.columnFamilies = new HashSet(); this.columnsQualifiers = new HashMap>(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e908585/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java index 1305be6..035d9b0 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java @@ -1651,7 +1651,7 @@ public class Tablet { } } - private Batch nextBatch(SortedKeyValueIterator iter, Range range, int num, HashSet columns) throws IOException { + private Batch nextBatch(SortedKeyValueIterator iter, Range range, int num, Set columns) throws IOException { // log.info("In nextBatch.."); @@ -1739,7 +1739,7 @@ public class Tablet { public long numBytes; } - Scanner createScanner(Range range, int num, HashSet columns, Authorizations authorizations, List ssiList, + Scanner createScanner(Range range, int num, Set columns, Authorizations authorizations, List ssiList, Map> ssio, boolean isolated, AtomicBoolean interruptFlag) { // do a test to see if this range falls within the tablet, if it does not // then clip will throw an exception @@ -1873,14 +1873,14 @@ public class Tablet { // scan options Authorizations authorizations; byte[] defaultLabels; - HashSet columnSet; + Set columnSet; List ssiList; Map> ssio; AtomicBoolean interruptFlag; int num; boolean isolated; - ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, HashSet columnSet, List ssiList, + ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, Set columnSet, List ssiList, Map> ssio, AtomicBoolean interruptFlag, boolean isolated) { this.num = num; this.authorizations = authorizations; http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e908585/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java index 013639e..8f33488 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java @@ -750,6 +750,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu public TCredentials credentials; public Authorizations auths; public String tableId; + public AtomicBoolean interruptFlag; + + @Override + public void cleanup() { + interruptFlag.set(true); + } } private static class UpdateSession extends Session { @@ -901,6 +907,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu WriteTracker writeTracker = new WriteTracker(); + private RowLocks rowLocks = new RowLocks(); + ThriftClientHandler() { super(instance, watcher); log.debug(ThriftClientHandler.class.getName() + " created"); @@ -1730,16 +1738,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu writeTracker.finishWrite(opid); } } - - private RowLocks rowLocks = new RowLocks(); - private void checkConditions(Map> updates, ArrayList results, Authorizations authorizations, - List symbols) { + private void checkConditions(Map> updates, ArrayList results, ConditionalSession cs, + List symbols) throws IOException { Iterator>> iter = updates.entrySet().iterator(); - // TODO use constant - HashSet columns = new HashSet(); - CompressedIterators compressedIters = new CompressedIterators(symbols); while (iter.hasNext()) { @@ -1752,97 +1755,91 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu iter.remove(); } else { List okMutations = new ArrayList(entry.getValue().size()); - - // TODO extract to method - for (ServerConditionalMutation scm : entry.getValue()) { - boolean add = true; - for(TCondition tc : scm.getConditions()){ - - Range range; - if (tc.hasTimestamp) - range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs()); - else - range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv())); - - AtomicBoolean interruptFlag = new AtomicBoolean(); - - IterConfig ic = compressedIters.decompress(tc.iterators); - //TODO use one iterator per tablet, push checks into tablet? - Scanner scanner = tablet.createScanner(range, 1, columns, authorizations, ic.ssiList, ic.ssio, false, interruptFlag); - - try { - ScanBatch batch = scanner.read(); - - Value val = null; - - for (KVEntry entry2 : batch.results) { - val = entry2.getValue(); - break; - } - - if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) { - results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED)); - add = false; - break; - } - - } catch (TabletClosedException e) { - // TODO ignore rest of tablets mutations - results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); - add = false; - break; - } catch (IterationInterruptedException iie) { - // TODO determine why this happened, ignore rest of tablets mutations? - results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); - add = false; - break; - } catch (TooManyFilesException tmfe) { - // TODO handle differently? - results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); - add = false; - break; - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } finally { - scanner.close(); - } - } - - if (add) + for (ServerConditionalMutation scm : entry.getValue()) { + if (checkCondition(results, cs, compressedIters, tablet, scm)) okMutations.add(scm); } - // TODO just rebuild map - entry.getValue().clear(); - entry.getValue().addAll(okMutations); + entry.setValue(okMutations); } } } - private void writeConditionalMutations(Map> updates, ArrayList results, TCredentials credentials) { + boolean checkCondition(ArrayList results, ConditionalSession cs, CompressedIterators compressedIters, + Tablet tablet, ServerConditionalMutation scm) throws IOException { + boolean add = true; + + Set emptyCols = Collections.emptySet(); + + for(TCondition tc : scm.getConditions()){ + + Range range; + if (tc.hasTimestamp) + range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs()); + else + range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv())); + + IterConfig ic = compressedIters.decompress(tc.iterators); + + //TODO use one iterator per tablet, push checks into tablet? + Scanner scanner = tablet.createScanner(range, 1, emptyCols, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag); + + try { + ScanBatch batch = scanner.read(); + + Value val = null; + + for (KVEntry entry2 : batch.results) { + val = entry2.getValue(); + break; + } + + if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) { + results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED)); + add = false; + break; + } + + } catch (TabletClosedException e) { + results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); + add = false; + break; + } catch (IterationInterruptedException iie) { + results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); + add = false; + break; + } catch (TooManyFilesException tmfe) { + results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); + add = false; + break; + } + } + return add; + } + + private void writeConditionalMutations(Map> updates, ArrayList results, ConditionalSession sess) { Set>> es = updates.entrySet(); Map> sendables = new HashMap>(); // TODO stats + boolean sessionCanceled = sess.interruptFlag.get(); + for (Entry> entry : es) { Tablet tablet = onlineTablets.get(entry.getKey()); - if (tablet == null || tablet.isClosed()) { + if (tablet == null || tablet.isClosed() || sessionCanceled) { for (ServerConditionalMutation scm : entry.getValue()) results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); } else { - // TODO write tracker - try { List mutations = (List) (List) entry.getValue(); if (mutations.size() > 0) { - CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations); + CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, sess.credentials), mutations); if (cs == null) { for (ServerConditionalMutation scm : entry.getValue()) @@ -1889,8 +1886,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } - private Map> conditionalUpdate(TCredentials credentials, Authorizations authorizations, - Map> updates, ArrayList results, List symbols) { + private Map> conditionalUpdate(ConditionalSession cs, Map> updates, + ArrayList results, List symbols) throws IOException { // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows. ConditionalMutationSet.sortConditionalMutations(updates); @@ -1902,8 +1899,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu // get as many locks as possible w/o blocking... defer any rows that are locked List locks = rowLocks.acquireRowlocks(updates, deferred); try { - checkConditions(updates, results, authorizations, symbols); - writeConditionalMutations(updates, results, credentials); + checkConditions(updates, results, cs, symbols); + writeConditionalMutations(updates, results, cs); } finally { rowLocks.releaseRowLocks(locks); } @@ -1926,6 +1923,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu cs.auths = new Authorizations(authorizations); cs.credentials = credentials; cs.tableId = tableID; + cs.interruptFlag = new AtomicBoolean(); return sessionManager.createSession(cs, false); } @@ -1934,34 +1932,36 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu public List conditionalUpdate(TInfo tinfo, long sessID, Map> mutations, List symbols) throws NoSuchScanIDException, TException { // TODO sessions, should show up in list scans - // TODO timeout like scans do ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID); if(cs == null) throw new NoSuchScanIDException(); - + Text tid = new Text(cs.tableId); + long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null))); try{ Map> updates = Translator.translate(mutations, Translator.TKET, new Translator.ListTranslator(ServerConditionalMutation.TCMT)); - - Text tid = new Text(cs.tableId); + for(KeyExtent ke : updates.keySet()) if(!ke.getTableId().equals(tid)) throw new IllegalArgumentException("Unexpected table id "+tid+" != "+ke.getTableId()); ArrayList results = new ArrayList(); - Map> deferred = conditionalUpdate(cs.credentials, cs.auths, updates, results, symbols); + Map> deferred = conditionalUpdate(cs, updates, results, symbols); while (deferred.size() > 0) { - deferred = conditionalUpdate(cs.credentials, cs.auths, deferred, results, symbols); + deferred = conditionalUpdate(cs, deferred, results, symbols); } return results; + } catch (IOException ioe) { + throw new TException(ioe); }finally{ + writeTracker.finishWrite(opid); sessionManager.unreserveSession(sessID); } } @@ -1970,7 +1970,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException { //this method should wait for any running conditional update to complete //after this method returns a conditional update should not be able to start - ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID, true); + + ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID); + if (cs != null) + cs.interruptFlag.set(true); + + cs = (ConditionalSession) sessionManager.reserveSession(sessID, true); if(cs != null) sessionManager.removeSession(sessID, true); }