Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A1C36107EA for ; Thu, 7 Nov 2013 05:24:06 +0000 (UTC) Received: (qmail 28978 invoked by uid 500); 7 Nov 2013 05:24:06 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 28866 invoked by uid 500); 7 Nov 2013 05:24:05 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 28758 invoked by uid 99); 7 Nov 2013 05:24:03 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Nov 2013 05:24:03 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 755E14953D; Thu, 7 Nov 2013 05:24:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Thu, 07 Nov 2013 05:24:05 -0000 Message-Id: <0c8674cabf344f56a2ea083e13b386b5@git.apache.org> In-Reply-To: <105f52588d72461ca92f5d9433f8bdc2@git.apache.org> References: <105f52588d72461ca92f5d9433f8bdc2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/5] git commit: ACCUMULO-1783 Lift some pig test classes to write a better "functional" test that ensures that joins actually work. ACCUMULO-1783 Lift some pig test classes to write a better "functional" test that ensures that joins actually work. Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/9b398d4a Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/9b398d4a Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/9b398d4a Branch: refs/heads/ACCUMULO-1783 Commit: 9b398d4a32e50d3503b4ecb2f86a306e9db0221b Parents: d72e1cb Author: Josh Elser Authored: Tue Nov 5 17:14:33 2013 -0500 Committer: Josh Elser Committed: Tue Nov 5 17:14:33 2013 -0500 ---------------------------------------------------------------------- .../accumulo/pig/AccumuloPigClusterTest.java | 165 +++++++++++++++++++ .../java/org/apache/pig/test/MiniCluster.java | 86 ++++++++++ .../org/apache/pig/test/MiniGenericCluster.java | 123 ++++++++++++++ 3 files changed, 374 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9b398d4a/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java new file mode 100644 index 0000000..0e2abb5 --- /dev/null +++ b/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java @@ -0,0 +1,165 @@ +package org.apache.accumulo.pig; + +import java.io.File; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.accumulo.minicluster.MiniAccumuloConfig; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; +import org.apache.pig.test.MiniCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.io.Files; + +public class AccumuloPigClusterTest { + + private static final File tmpdir = Files.createTempDir(); + private static MiniAccumuloCluster accumuloCluster; + private static MiniCluster cluster; + private static Configuration conf; + private PigServer pig; + + @BeforeClass + public static void setupClusters() throws Exception { + MiniAccumuloConfig macConf = new MiniAccumuloConfig(tmpdir, "password"); + macConf.setNumTservers(1); + + accumuloCluster = new MiniAccumuloCluster(macConf); + accumuloCluster.start(); + + // This is needed by Pig + cluster = MiniCluster.buildCluster(); + conf = cluster.getConfiguration(); + } + + @Before + public void beforeTest() throws Exception { + AccumuloInputFormat.resetCounters(); + AccumuloOutputFormat.resetCounters(); + pig = new PigServer(ExecType.LOCAL, conf); + } + + @AfterClass + public static void stopClusters() throws Exception { + accumuloCluster.stop(); + FileUtils.deleteDirectory(tmpdir); + } + + private void loadTestData() throws Exception { + ZooKeeperInstance inst = new ZooKeeperInstance(accumuloCluster.getInstanceName(), accumuloCluster.getZooKeepers()); + Connector c = inst.getConnector("root", "password"); + + TableOperations tops = c.tableOperations(); + if (!tops.exists("airports")) { + tops.create("airports"); + } + + if (!tops.exists("flights")) { + tops.create("flights"); + } + + @SuppressWarnings("unchecked") + final List> airportData = Lists.newArrayList(ImmutableMap.of("code", "SJC", "name", "San Jose"), + ImmutableMap.of("code", "SFO", "name", "San Francisco"), ImmutableMap.of("code", "MDO", "name", "Orlando"), + ImmutableMap.of("code", "MDW", "name", "Chicago-Midway"), ImmutableMap.of("code", "JFK", "name", "JFK International"), + ImmutableMap.of("code", "BWI", "name", "Baltimore-Washington")); + + BatchWriter bw = c.createBatchWriter("airports", 100000l, 1000l, 1); + try { + int i = 1; + for (Map record : airportData) { + Mutation m = new Mutation(Integer.toString(i)); + + for (Entry entry : record.entrySet()) { + m.put(entry.getKey(), "", entry.getValue()); + } + + bw.addMutation(m); + i++; + } + } finally { + if (null != bw) { + bw.close(); + } + } + + @SuppressWarnings("unchecked") + final List> flightData = Lists.newArrayList(ImmutableMap.of("origin", "BWI", "destination", "SFO"), + ImmutableMap.of("origin", "BWI", "destination", "SJC"), ImmutableMap.of("origin", "MDW", "destination", "MDO"), + ImmutableMap.of("origin", "MDO", "destination", "SJC"), ImmutableMap.of("origin", "SJC", "destination", "JFK"), + ImmutableMap.of("origin", "JFK", "destination", "MDW")); + + bw = c.createBatchWriter("flights", 100000l, 1000l, 1); + try { + int i = 1; + for (Map record : flightData) { + Mutation m = new Mutation(Integer.toString(i)); + + for (Entry entry : record.entrySet()) { + m.put(entry.getKey(), "", entry.getValue()); + } + + bw.addMutation(m); + i++; + } + } finally { + if (null != bw) { + bw.close(); + } + } + } + + @Test + public void test() throws Exception { + loadTestData(); + + final String loadFlights = "flights = LOAD 'accumulo://flights?instance=" + accumuloCluster.getInstanceName() + + "&user=root&password=password&zookeepers=" + accumuloCluster.getZooKeepers() + "' using org.apache.accumulo.pig.AccumuloStorage()" + + " as (rowKey:chararray, column_map:map[]);"; + + final String loadAirports = "airports = LOAD 'accumulo://airports?instance=" + accumuloCluster.getInstanceName() + + "&user=root&password=password&zookeepers=" + accumuloCluster.getZooKeepers() + "' using org.apache.accumulo.pig.AccumuloStorage()" + + " as (rowKey:chararray, column_map:map[]);"; + + final String joinQuery = "joined = JOIN flights BY column_map#'origin', airports BY column_map#'code';"; + + // System.out.println(query); + + pig.registerQuery(loadFlights); + pig.registerQuery(loadAirports); + pig.registerQuery(joinQuery); + + Iterator it = pig.openIterator("joined"); + + int i = 0; + while (it.hasNext()) { + Tuple t = it.next(); + System.out.println(t); + i++; + } + + // TODO actually verify something here + Assert.assertTrue("Should have found records but found none", i > 0); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9b398d4a/src/test/java/org/apache/pig/test/MiniCluster.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pig/test/MiniCluster.java b/src/test/java/org/apache/pig/test/MiniCluster.java new file mode 100644 index 0000000..64467ae --- /dev/null +++ b/src/test/java/org/apache/pig/test/MiniCluster.java @@ -0,0 +1,86 @@ +package org.apache.pig.test; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.MiniMRCluster; + +public class MiniCluster extends MiniGenericCluster { + private static final File CONF_DIR = new File("build/classes"); + private static final File CONF_FILE = new File(CONF_DIR, "hadoop-site.xml"); + + private MiniMRCluster m_mr = null; + public MiniCluster() { + super(); + } + + @Override + protected void setupMiniDfsAndMrClusters() { + try { + System.setProperty("hadoop.log.dir", "build/test/logs"); + final int dataNodes = 4; // There will be 4 data nodes + final int taskTrackers = 4; // There will be 4 task tracker nodes + + // Create the dir that holds hadoop-site.xml file + // Delete if hadoop-site.xml exists already + CONF_DIR.mkdirs(); + if(CONF_FILE.exists()) { + CONF_FILE.delete(); + } + + // Builds and starts the mini dfs and mapreduce clusters + Configuration config = new Configuration(); + m_dfs = new MiniDFSCluster(config, dataNodes, true, null); + m_fileSys = m_dfs.getFileSystem(); + m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getUri().toString(), 1); + + // Write the necessary config info to hadoop-site.xml + m_conf = m_mr.createJobConf(); + m_conf.setInt("mapred.submit.replication", 2); + m_conf.set("dfs.datanode.address", "0.0.0.0:0"); + m_conf.set("dfs.datanode.http.address", "0.0.0.0:0"); + m_conf.set("mapred.map.max.attempts", "2"); + m_conf.set("mapred.reduce.max.attempts", "2"); + m_conf.set("pig.jobcontrol.sleep", "100"); + m_conf.writeXml(new FileOutputStream(CONF_FILE)); + + // Set the system properties needed by Pig + System.setProperty("cluster", m_conf.get("mapred.job.tracker")); + System.setProperty("namenode", m_conf.get("fs.default.name")); + System.setProperty("junit.hadoop.conf", CONF_DIR.getPath()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void shutdownMiniMrClusters() { + // Delete hadoop-site.xml on shutDown + if(CONF_FILE.exists()) { + CONF_FILE.delete(); + } + if (m_mr != null) { m_mr.shutdown(); } + m_mr = null; + } +} http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/9b398d4a/src/test/java/org/apache/pig/test/MiniGenericCluster.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pig/test/MiniGenericCluster.java b/src/test/java/org/apache/pig/test/MiniGenericCluster.java new file mode 100644 index 0000000..584631a --- /dev/null +++ b/src/test/java/org/apache/pig/test/MiniGenericCluster.java @@ -0,0 +1,123 @@ +package org.apache.pig.test; +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import java.io.*; +import java.util.Properties; + +import org.apache.hadoop.hdfs.MiniDFSCluster; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; + +/** + * This class builds a single instance of itself with the Singleton + * design pattern. While building the single instance, it sets up a + * mini cluster that actually consists of a mini DFS cluster and a + * mini MapReduce cluster on the local machine and also sets up the + * environment for Pig to run on top of the mini cluster. + * + * This class is the base class for MiniCluster, which has slightly + * difference among different versions of hadoop. MiniCluster implementation + * is located in $PIG_HOME/shims. + */ +abstract public class MiniGenericCluster { + protected MiniDFSCluster m_dfs = null; + protected FileSystem m_fileSys = null; + protected Configuration m_conf = null; + + protected final static MiniCluster INSTANCE = new MiniCluster(); + protected static boolean isSetup = true; + + protected MiniGenericCluster() { + setupMiniDfsAndMrClusters(); + } + + abstract protected void setupMiniDfsAndMrClusters(); + + /** + * Returns the single instance of class MiniClusterBuilder that + * represents the resouces for a mini dfs cluster and a mini + * mapreduce cluster. + */ + public static MiniCluster buildCluster() { + if(! isSetup){ + INSTANCE.setupMiniDfsAndMrClusters(); + isSetup = true; + } + return INSTANCE; + } + + public void shutDown(){ + INSTANCE.shutdownMiniDfsAndMrClusters(); + } + + protected void finalize() { + shutdownMiniDfsAndMrClusters(); + } + + protected void shutdownMiniDfsAndMrClusters() { + isSetup = false; + shutdownMiniDfsClusters(); + shutdownMiniMrClusters(); + } + + protected void shutdownMiniDfsClusters() { + try { + if (m_fileSys != null) { m_fileSys.close(); } + } catch (IOException e) { + e.printStackTrace(); + } + if (m_dfs != null) { m_dfs.shutdown(); } + m_fileSys = null; + m_dfs = null; + } + + abstract protected void shutdownMiniMrClusters(); + + public Properties getProperties() { + errorIfNotSetup(); + return ConfigurationUtil.toProperties(m_conf); + } + + public Configuration getConfiguration() { + return new Configuration(m_conf); + } + + public void setProperty(String name, String value) { + errorIfNotSetup(); + m_conf.set(name, value); + } + + public FileSystem getFileSystem() { + errorIfNotSetup(); + return m_fileSys; + } + + /** + * Throw RunTimeException if isSetup is false + */ + private void errorIfNotSetup(){ + if(isSetup) + return; + String msg = "function called on MiniCluster that has been shutdown"; + throw new RuntimeException(msg); + } +}