Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CE48A107F4 for ; Thu, 7 Nov 2013 05:24:32 +0000 (UTC) Received: (qmail 29462 invoked by uid 500); 7 Nov 2013 05:24:27 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 29368 invoked by uid 500); 7 Nov 2013 05:24:24 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 29286 invoked by uid 99); 7 Nov 2013 05:24:24 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Nov 2013 05:24:24 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7027149552; Thu, 7 Nov 2013 05:24:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Thu, 07 Nov 2013 05:24:27 -0000 Message-Id: <624e6a2e2ec74f45bbcdee86b2a9aad6@git.apache.org> In-Reply-To: <616f456d7be34cbe8e27debfec6606c7@git.apache.org> References: <616f456d7be34cbe8e27debfec6606c7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/6] git commit: ACCUMULO-1854 Ended up re-implementing some of the old approach to get back to a functional state. ACCUMULO-1854 Ended up re-implementing some of the old approach to get back to a functional state. Couldn't solely use the Configuration for things as getting the same Configuration each time getSplits is called isn't guaranteed. Since getSplits are always called serially by one client, we can use that fact to keep some state and not read the same data many times. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c50a2229 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c50a2229 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c50a2229 Branch: refs/heads/ACCUMULO-1854-multi-aif Commit: c50a22296d80042a86639129a02e2b9468dc3330 Parents: 0f10a6f Author: Josh Elser Authored: Thu Nov 7 00:16:17 2013 -0500 Committer: Josh Elser Committed: Thu Nov 7 00:20:00 2013 -0500 ---------------------------------------------------------------------- .../core/client/mapreduce/InputFormatBase.java | 38 ++++++++++++++--- .../client/mapreduce/SequencedFormatHelper.java | 45 +++++++++++++++++--- .../mapreduce/AccumuloOutputFormatTest.java | 6 +++ .../client/mapreduce/InputFormatBaseTest.java | 1 + 4 files changed, 77 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/c50a2229/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java index 5c87c13..9ce98ba 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java @@ -42,8 +42,8 @@ import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.Set; import java.util.StringTokenizer; - -import javax.servlet.jsp.jstl.core.Config; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; @@ -83,7 +83,6 @@ import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.commons.codec.binary.Base64; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -151,15 +150,22 @@ public abstract class InputFormatBase extends InputFormat { private static final String ITERATORS_DELIM = ","; private static final String SEQ_DELIM = "."; - private static final String READ_OFFLINE = PREFIX + ".read.offline"; + + private static final AtomicBoolean DEFAULT_SEQUENCE_READ = new AtomicBoolean(false); + private static final AtomicInteger SEQUENCES_READ = new AtomicInteger(0); protected static String merge(String name, Integer sequence) { return name + SEQ_DELIM + sequence; } + protected static void resetInternals() { + DEFAULT_SEQUENCE_READ.set(false); + SEQUENCES_READ.set(0); + } + /** * Get a unique identifier for these configurations * @@ -169,8 +175,25 @@ public abstract class InputFormatBase extends InputFormat { return SequencedFormatHelper.nextSequence(conf, PREFIX); } - protected static int nextSequenceToProcess(Configuration conf) { - return SequencedFormatHelper.nextSequenceToProcess(conf, PREFIX); + protected static synchronized int nextSequenceToProcess(Configuration conf) { + boolean isDefaultSequenceUsed = SequencedFormatHelper.isDefaultSequenceUsed(conf, PREFIX); + + if (isDefaultSequenceUsed && !DEFAULT_SEQUENCE_READ.get()) { + DEFAULT_SEQUENCE_READ.set(true); + return 0; + } + + Integer[] configuredSequences = SequencedFormatHelper.configuredSequences(conf, PREFIX); + + int sequenceOffset = SEQUENCES_READ.getAndAdd(1); + + if (0 == configuredSequences.length && !isDefaultSequenceUsed) { + throw new NoSuchElementException(); + } else if (sequenceOffset >= configuredSequences.length) { + return -1; + } + + return configuredSequences[sequenceOffset]; } protected static void setDefaultSequenceUsed(Configuration conf) { @@ -1791,6 +1814,9 @@ public abstract class InputFormatBase extends InputFormat { */ @Override public List getSplits(JobContext job) throws IOException { + // Disclaimer: the only reason this works as it does is because getSplits is + // called serially by the JobClient before the job starts (one node, one thread). + // If it was called by multiple nodes, this approach would fail miserably. final Configuration conf = job.getConfiguration(); final int sequence = nextSequenceToProcess(conf); http://git-wip-us.apache.org/repos/asf/accumulo/blob/c50a2229/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java index ff18754..ab6dd3a 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java @@ -16,11 +16,11 @@ public class SequencedFormatHelper { private static final String COMMA = ","; private static final String TRUE = "true"; - protected static final int DEFAULT_SEQUENCE = 0; + public static final int DEFAULT_SEQUENCE = 0; - private static final String DEFAULT_SEQ_USED = ".defaultSequenceUsed"; - private static final String CONFIGURED_SEQUENCES = ".configuredSeqs"; - private static final String PROCESSED_SEQUENCES = ".processedSeqs"; + public static final String DEFAULT_SEQ_USED = ".defaultSequenceUsed"; + public static final String CONFIGURED_SEQUENCES = ".configuredSeqs"; + public static final String PROCESSED_SEQUENCES = ".processedSeqs"; /** * Get a unique identifier for these configurations @@ -44,15 +44,46 @@ public class SequencedFormatHelper { return newValue; } } + + /** + * Returns all configured sequences but not the default sequence + * @param conf + * @param prefix + * @return + */ + public static Integer[] configuredSequences(Configuration conf, String prefix) { + ArgumentChecker.notNull(conf, prefix); + + final String configuredSequences = prefix + CONFIGURED_SEQUENCES; + String[] values = conf.getStrings(configuredSequences); + if (null == values) { + return new Integer[0]; + } + + Integer[] intValues = new Integer[values.length]; + for (int i = 0; i < values.length; i++) { + intValues[i] = Integer.parseInt(values[i]); + } + + return intValues; + } + protected static boolean isDefaultSequenceUsed(Configuration conf, String prefix) { + ArgumentChecker.notNull(conf, prefix); + + final String defaultSequenceUsedKey = prefix + DEFAULT_SEQ_USED; + + return conf.getBoolean(defaultSequenceUsedKey, false); + } + protected static void setDefaultSequenceUsed(Configuration conf, String prefix) { ArgumentChecker.notNull(conf, prefix); - final String configuredSequences = prefix + DEFAULT_SEQ_USED; + final String defaultSequenceUsedKey = prefix + DEFAULT_SEQ_USED; - String value = conf.get(configuredSequences); + String value = conf.get(defaultSequenceUsedKey); if (null == value || !TRUE.equals(value)) { - conf.setBoolean(configuredSequences, true); + conf.setBoolean(defaultSequenceUsedKey, true); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c50a2229/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java index 5599cae..c4c2e76 100644 --- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java +++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java @@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.junit.Before; import org.junit.Test; /** @@ -77,6 +78,11 @@ public class AccumuloOutputFormatTest { } } + @Before + public void clearInputFormatState() { + InputFormatBase.resetInternals(); + } + @Test public void testMR() throws Exception { MockInstance mockInstance = new MockInstance("testmrinstance"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/c50a2229/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java ---------------------------------------------------------------------- diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java index 9d167a9..f52c7a1 100644 --- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java +++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java @@ -14,6 +14,7 @@ public class InputFormatBaseTest { @Before public void setup() { + InputFormatBase.resetInternals(); conf = new Configuration(); }