Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5552210C04 for ; Sun, 24 Nov 2013 05:53:38 +0000 (UTC) Received: (qmail 13723 invoked by uid 500); 24 Nov 2013 05:53:37 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 13638 invoked by uid 500); 24 Nov 2013 05:53:32 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 13624 invoked by uid 99); 24 Nov 2013 05:53:30 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 24 Nov 2013 05:53:30 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 376099041B2; Sun, 24 Nov 2013 05:53:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Sun, 24 Nov 2013 05:53:30 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: ACCUMULO-1783 Update code to work with upstream 1.5 changes. Updated Branches: refs/heads/ACCUMULO-1783-1.5 [created] cb720e850 ACCUMULO-1783 Update code to work with upstream 1.5 changes. Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/0ce6fb34 Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/0ce6fb34 Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/0ce6fb34 Branch: refs/heads/ACCUMULO-1783-1.5 Commit: 0ce6fb34a852050643aae5ad5feb5c21c4b6b94e Parents: 170229a Author: Josh Elser Authored: Fri Nov 22 18:00:15 2013 -0500 Committer: Josh Elser Committed: Fri Nov 22 18:00:15 2013 -0500 ---------------------------------------------------------------------- pom.xml | 11 ++- .../accumulo/pig/AbstractAccumuloStorage.java | 79 ++++++++++++-------- .../apache/accumulo/pig/AccumuloStorage.java | 7 +- .../accumulo/pig/AccumuloWholeRowStorage.java | 6 +- .../pig/AbstractAccumuloStorageTest.java | 48 ++++++++---- 5 files changed, 98 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/0ce6fb34/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0c82c39..587bf1e 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ 4.0.0 org.apache.accumulo accumulo-pig - 1.4.4-SNAPSHOT + 1.5.1-SNAPSHOT @@ -65,7 +65,7 @@ org.apache.accumulo accumulo-core - 1.4.5-SNAPSHOT + 1.5.1-SNAPSHOT joda-time @@ -78,6 +78,11 @@ 15.0 + log4j + log4j + 1.2.16 + + org.apache.zookeeper zookeeper 3.4.5 @@ -86,7 +91,7 @@ org.apache.accumulo accumulo-minicluster - 1.4.5-SNAPSHOT + 1.5.1-SNAPSHOT test http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/0ce6fb34/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java index a829d4a..890abf3 100644 --- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java +++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java @@ -26,10 +26,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import java.util.Properties; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; @@ -77,7 +81,6 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF private static final String COLON = ":", COMMA = ","; private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName(); - private Configuration conf; private RecordReader reader; private RecordWriter writer; @@ -104,6 +107,10 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF public AbstractAccumuloStorage() {} + protected Map getInputFormatEntries(Configuration conf) { + return getEntries(conf, INPUT_PREFIX); + } + @Override public Tuple getNext() throws IOException { try { @@ -202,10 +209,6 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF return writer; } - protected Map getInputFormatEntries(Configuration conf) { - return getEntries(conf, INPUT_PREFIX); - } - protected Map getEntries(Configuration conf, String prefix) { Map entries = new HashMap(); @@ -221,44 +224,50 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF @Override public void setLocation(String location, Job job) throws IOException { - conf = job.getConfiguration(); setLocationFromUri(location); - Map entries = getInputFormatEntries(conf); - + Map entries = getInputFormatEntries(job.getConfiguration()); for (String key : entries.keySet()) { - conf.unset(key); + job.getConfiguration().unset(key); } - AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations); - AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers); + try { + AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password)); + } catch (AccumuloSecurityException e) { + throw new IOException(e); + } + + AccumuloInputFormat.setInputTableName(job, table); + AccumuloInputFormat.setScanAuthorizations(job, authorizations); + AccumuloInputFormat.setZooKeeperInstance(job, inst, zookeepers); + if (columnFamilyColumnQualifierPairs.size() > 0) { LOG.info("columns: " + columnFamilyColumnQualifierPairs); - AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs); + AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs); } Collection ranges = Collections.singleton(new Range(start, end)); LOG.info("Scanning Accumulo for " + ranges + " for table " + table); - AccumuloInputFormat.setRanges(conf, ranges); + AccumuloInputFormat.setRanges(job, ranges); - configureInputFormat(conf); + configureInputFormat(job); } /** - * Method to allow specific implementations to add more elements to the Configuration for reading data from Accumulo. + * Method to allow specific implementations to add more elements to the Job for reading data from Accumulo. * - * @param conf + * @param job */ - protected void configureInputFormat(Configuration conf) {} + protected void configureInputFormat(Job job) {} /** - * Method to allow specific implementations to add more elements to the Configuration for writing data to Accumulo. + * Method to allow specific implementations to add more elements to the Job for writing data to Accumulo. * - * @param conf + * @param job */ - protected void configureOutputFormat(Configuration conf) {} + protected void configureOutputFormat(Job job) {} @Override public String relativeToAbsolutePath(String location, Path curDir) throws IOException { @@ -288,20 +297,30 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF } public void setStoreLocation(String location, Job job) throws IOException { - conf = job.getConfiguration(); setLocationFromUri(location); - - // TODO If Pig ever uses a MultipleOutputs-esque construct, this approach will fall apart - if (conf.get(AccumuloOutputFormat.class.getSimpleName() + ".configured") == null) { - AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table); - AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers); - AccumuloOutputFormat.setMaxLatency(conf, maxLatency); - AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize); - AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads); + + // If Pig ever uses an approach like they handle inputs (load), this will fall apart. + // Currently, it appears that multiple stores will get new m/r jobs + if (job.getConfiguration().get(AccumuloOutputFormat.class.getSimpleName() + ".configured") == null) { + try { + AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password)); + } catch (AccumuloSecurityException e) { + throw new IOException(e); + } + + // AccumuloOutputFormat.setCreateTables(job, true); + // AccumuloOutputFormat.setDefaultTableName(job, table); + AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers); + + BatchWriterConfig bwConfig = new BatchWriterConfig(); + bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS); + bwConfig.setMaxMemory(maxMutationBufferSize); + bwConfig.setMaxWriteThreads(maxWriteThreads); + AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig); LOG.info("Writing data to " + table); - configureOutputFormat(conf); + configureOutputFormat(job); } } http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/0ce6fb34/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java index 8e9cfef..742480c 100644 --- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java +++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java @@ -18,15 +18,14 @@ import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; import org.apache.log4j.Logger; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.builtin.Utf8StorageConverter; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; -import org.apache.pig.data.InternalMap; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; @@ -142,8 +141,8 @@ public class AccumuloStorage extends AbstractAccumuloStorage { } @Override - protected void configureInputFormat(Configuration conf) { - AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class)); + protected void configureInputFormat(Job job) { + AccumuloInputFormat.addIterator(job, new IteratorSetting(50, WholeRowIterator.class)); } @Override http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/0ce6fb34/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java index a6db638..784904f 100644 --- a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java +++ b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java @@ -33,8 +33,8 @@ import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DefaultDataBag; @@ -84,8 +84,8 @@ public class AccumuloWholeRowStorage extends AbstractAccumuloStorage { } @Override - protected void configureInputFormat(Configuration conf) { - AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class)); + protected void configureInputFormat(Job job) { + AccumuloInputFormat.addIterator(job, new IteratorSetting(50, WholeRowIterator.class)); } @Override http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/0ce6fb34/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java index 1b5b81a..21d4fc7 100644 --- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java +++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java @@ -20,9 +20,13 @@ import java.io.IOException; import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; @@ -33,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.pig.data.Tuple; +import org.junit.Assert; import org.junit.Test; public class AbstractAccumuloStorageTest { @@ -42,12 +47,19 @@ public class AbstractAccumuloStorageTest { Collection ranges = new LinkedList(); ranges.add(new Range(start, end)); - Job expected = new Job(); - Configuration expectedConf = expected.getConfiguration(); - AccumuloInputFormat.setInputInfo(expectedConf, user, password.getBytes(), table, authorizations); - AccumuloInputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers); - AccumuloInputFormat.fetchColumns(expectedConf, columnFamilyColumnQualifierPairs); - AccumuloInputFormat.setRanges(expectedConf, ranges); + Job expected = new Job(new Configuration()); + + try { + AccumuloInputFormat.setConnectorInfo(expected, user, new PasswordToken(password)); + } catch (AccumuloSecurityException e) { + Assert.fail(e.getMessage()); + } + AccumuloInputFormat.setInputTableName(expected, table); + AccumuloInputFormat.setScanAuthorizations(expected, authorizations); + AccumuloInputFormat.setZooKeeperInstance(expected, inst, zookeepers); + AccumuloInputFormat.fetchColumns(expected, columnFamilyColumnQualifierPairs); + AccumuloInputFormat.setRanges(expected, ranges); + return expected; } @@ -72,13 +84,23 @@ public class AbstractAccumuloStorageTest { public Job getExpectedStoreJob(String inst, String zookeepers, String user, String password, String table, long maxWriteBufferSize, int writeThreads, int maxWriteLatencyMS) throws IOException { - Job expected = new Job(); - Configuration expectedConf = expected.getConfiguration(); - AccumuloOutputFormat.setOutputInfo(expectedConf, user, password.getBytes(), true, table); - AccumuloOutputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers); - AccumuloOutputFormat.setMaxLatency(expectedConf, maxWriteLatencyMS); - AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, maxWriteBufferSize); - AccumuloOutputFormat.setMaxWriteThreads(expectedConf, writeThreads); + + Job expected = new Job(new Configuration()); + + try { + AccumuloOutputFormat.setConnectorInfo(expected, user, new PasswordToken(password)); + } catch (AccumuloSecurityException e) { + Assert.fail(e.getMessage()); + } + + AccumuloOutputFormat.setZooKeeperInstance(expected, inst, zookeepers); + + BatchWriterConfig bwConfig = new BatchWriterConfig(); + bwConfig.setMaxLatency(maxWriteLatencyMS, TimeUnit.MILLISECONDS); + bwConfig.setMaxMemory(maxWriteBufferSize); + bwConfig.setMaxWriteThreads(writeThreads); + + AccumuloOutputFormat.setBatchWriterOptions(expected, bwConfig); return expected; }