Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3A334D96A for ; Mon, 22 Oct 2012 20:44:39 +0000 (UTC) Received: (qmail 78268 invoked by uid 500); 22 Oct 2012 20:44:38 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 78199 invoked by uid 500); 22 Oct 2012 20:44:38 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 78153 invoked by uid 99); 22 Oct 2012 20:44:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Oct 2012 20:44:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Oct 2012 20:44:35 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 1034A2388BA6; Mon, 22 Oct 2012 20:43:35 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1401071 [3/7] - in /hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/... Date: Mon, 22 Oct 2012 20:43:30 -0000 To: mapreduce-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121022204335.1034A2388BA6@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java?rev=1401071&r1=1401070&r2=1401071&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java Mon Oct 22 20:43:16 2012 @@ -1,120 +1,120 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapreduce; - -import java.io.IOException; - -import junit.framework.TestCase; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.LocalJobRunner; -import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; -import org.junit.Test; - -public class TestClientProtocolProviderImpls extends TestCase { - - @Test - public void testClusterWithLocalClientProvider() throws Exception { - - Configuration conf = new Configuration(); - - try { - conf.set(MRConfig.FRAMEWORK_NAME, "incorrect"); - new Cluster(conf); - fail("Cluster should not be initialized with incorrect framework name"); - } catch (IOException e) { - - } - - try { - conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); - conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0"); - - new Cluster(conf); - fail("Cluster with Local Framework name should use local JT address"); - } catch (IOException e) { - - } - - try { - conf.set(JTConfig.JT_IPC_ADDRESS, "local"); - Cluster cluster = new Cluster(conf); - assertTrue(cluster.getClient() instanceof LocalJobRunner); - cluster.close(); - } catch (IOException e) { - - } - } - - @Test - public void testClusterWithJTClientProvider() throws Exception { - - Configuration conf = new Configuration(); - try { - conf.set(MRConfig.FRAMEWORK_NAME, "incorrect"); - new Cluster(conf); - fail("Cluster should not be initialized with incorrect framework name"); - - } catch (IOException e) { - - } - - try { - conf.set(MRConfig.FRAMEWORK_NAME, "classic"); - conf.set(JTConfig.JT_IPC_ADDRESS, "local"); - new Cluster(conf); - fail("Cluster with classic Framework name shouldnot use local JT address"); - - } catch (IOException e) { - - } - - try { - conf = new Configuration(); - conf.set(MRConfig.FRAMEWORK_NAME, "classic"); - conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0"); - Cluster cluster = new Cluster(conf); - cluster.close(); - } catch (IOException e) { - - } - } - - @Test - public void testClusterException() { - - Configuration conf = new Configuration(); - conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME); - conf.set(JTConfig.JT_IPC_ADDRESS, "local"); - - // initializing a cluster with this conf should throw an error. - // However the exception thrown should not be specific to either - // the job tracker client provider or the local provider - boolean errorThrown = false; - try { - Cluster cluster = new Cluster(conf); - cluster.close(); - fail("Not expected - cluster init should have failed"); - } catch (IOException e) { - errorThrown = true; - assert(e.getMessage().contains("Cannot initialize Cluster. Please check")); - } - assert(errorThrown); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.LocalJobRunner; +import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; +import org.junit.Test; + +public class TestClientProtocolProviderImpls extends TestCase { + + @Test + public void testClusterWithLocalClientProvider() throws Exception { + + Configuration conf = new Configuration(); + + try { + conf.set(MRConfig.FRAMEWORK_NAME, "incorrect"); + new Cluster(conf); + fail("Cluster should not be initialized with incorrect framework name"); + } catch (IOException e) { + + } + + try { + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); + conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0"); + + new Cluster(conf); + fail("Cluster with Local Framework name should use local JT address"); + } catch (IOException e) { + + } + + try { + conf.set(JTConfig.JT_IPC_ADDRESS, "local"); + Cluster cluster = new Cluster(conf); + assertTrue(cluster.getClient() instanceof LocalJobRunner); + cluster.close(); + } catch (IOException e) { + + } + } + + @Test + public void testClusterWithJTClientProvider() throws Exception { + + Configuration conf = new Configuration(); + try { + conf.set(MRConfig.FRAMEWORK_NAME, "incorrect"); + new Cluster(conf); + fail("Cluster should not be initialized with incorrect framework name"); + + } catch (IOException e) { + + } + + try { + conf.set(MRConfig.FRAMEWORK_NAME, "classic"); + conf.set(JTConfig.JT_IPC_ADDRESS, "local"); + new Cluster(conf); + fail("Cluster with classic Framework name shouldnot use local JT address"); + + } catch (IOException e) { + + } + + try { + conf = new Configuration(); + conf.set(MRConfig.FRAMEWORK_NAME, "classic"); + conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0"); + Cluster cluster = new Cluster(conf); + cluster.close(); + } catch (IOException e) { + + } + } + + @Test + public void testClusterException() { + + Configuration conf = new Configuration(); + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME); + conf.set(JTConfig.JT_IPC_ADDRESS, "local"); + + // initializing a cluster with this conf should throw an error. + // However the exception thrown should not be specific to either + // the job tracker client provider or the local provider + boolean errorThrown = false; + try { + Cluster cluster = new Cluster(conf); + cluster.close(); + fail("Not expected - cluster init should have failed"); + } catch (IOException e) { + errorThrown = true; + assert(e.getMessage().contains("Cannot initialize Cluster. Please check")); + } + assert(errorThrown); + } +} Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java?rev=1401071&r1=1401070&r2=1401071&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java Mon Oct 22 20:43:16 2012 @@ -1,129 +1,129 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapreduce; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import junit.framework.TestCase; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.LocalJobRunner; -import org.apache.hadoop.mapred.ResourceMgrDelegate; -import org.apache.hadoop.mapred.YARNRunner; -import org.apache.hadoop.mapreduce.protocol.ClientProtocol; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; -import org.apache.hadoop.yarn.api.records.DelegationToken; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.junit.Test; - -public class TestYarnClientProtocolProvider extends TestCase { - - private static final RecordFactory recordFactory = RecordFactoryProvider. - getRecordFactory(null); - - @Test - public void testClusterWithYarnClientProtocolProvider() throws Exception { - - Configuration conf = new Configuration(false); - Cluster cluster = null; - - try { - cluster = new Cluster(conf); - } catch (Exception e) { - throw new Exception( - "Failed to initialize a local runner w/o a cluster framework key", e); - } - - try { - assertTrue("client is not a LocalJobRunner", - cluster.getClient() instanceof LocalJobRunner); - } finally { - if (cluster != null) { - cluster.close(); - } - } - - try { - conf = new Configuration(); - conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); - cluster = new Cluster(conf); - ClientProtocol client = cluster.getClient(); - assertTrue("client is a YARNRunner", client instanceof YARNRunner); - } catch (IOException e) { - - } finally { - if (cluster != null) { - cluster.close(); - } - } - } - - - @Test - public void testClusterGetDelegationToken() throws Exception { - - Configuration conf = new Configuration(false); - Cluster cluster = null; - try { - conf = new Configuration(); - conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); - cluster = new Cluster(conf); - YARNRunner yrunner = (YARNRunner) cluster.getClient(); - GetDelegationTokenResponse getDTResponse = - recordFactory.newRecordInstance(GetDelegationTokenResponse.class); - DelegationToken rmDTToken = recordFactory.newRecordInstance( - DelegationToken.class); - rmDTToken.setIdentifier(ByteBuffer.wrap(new byte[2])); - rmDTToken.setKind("Testclusterkind"); - rmDTToken.setPassword(ByteBuffer.wrap("testcluster".getBytes())); - rmDTToken.setService("0.0.0.0:8032"); - getDTResponse.setRMDelegationToken(rmDTToken); - final ClientRMProtocol cRMProtocol = mock(ClientRMProtocol.class); - when(cRMProtocol.getDelegationToken(any( - GetDelegationTokenRequest.class))).thenReturn(getDTResponse); - ResourceMgrDelegate rmgrDelegate = new ResourceMgrDelegate( - new YarnConfiguration(conf)) { - @Override - public synchronized void start() { - this.rmClient = cRMProtocol; - } - }; - yrunner.setResourceMgrDelegate(rmgrDelegate); - Token t = cluster.getDelegationToken(new Text(" ")); - assertTrue("Token kind is instead " + t.getKind().toString(), - "Testclusterkind".equals(t.getKind().toString())); - } finally { - if (cluster != null) { - cluster.close(); - } - } - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.LocalJobRunner; +import org.apache.hadoop.mapred.ResourceMgrDelegate; +import org.apache.hadoop.mapred.YARNRunner; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.records.DelegationToken; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.junit.Test; + +public class TestYarnClientProtocolProvider extends TestCase { + + private static final RecordFactory recordFactory = RecordFactoryProvider. + getRecordFactory(null); + + @Test + public void testClusterWithYarnClientProtocolProvider() throws Exception { + + Configuration conf = new Configuration(false); + Cluster cluster = null; + + try { + cluster = new Cluster(conf); + } catch (Exception e) { + throw new Exception( + "Failed to initialize a local runner w/o a cluster framework key", e); + } + + try { + assertTrue("client is not a LocalJobRunner", + cluster.getClient() instanceof LocalJobRunner); + } finally { + if (cluster != null) { + cluster.close(); + } + } + + try { + conf = new Configuration(); + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); + cluster = new Cluster(conf); + ClientProtocol client = cluster.getClient(); + assertTrue("client is a YARNRunner", client instanceof YARNRunner); + } catch (IOException e) { + + } finally { + if (cluster != null) { + cluster.close(); + } + } + } + + + @Test + public void testClusterGetDelegationToken() throws Exception { + + Configuration conf = new Configuration(false); + Cluster cluster = null; + try { + conf = new Configuration(); + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); + cluster = new Cluster(conf); + YARNRunner yrunner = (YARNRunner) cluster.getClient(); + GetDelegationTokenResponse getDTResponse = + recordFactory.newRecordInstance(GetDelegationTokenResponse.class); + DelegationToken rmDTToken = recordFactory.newRecordInstance( + DelegationToken.class); + rmDTToken.setIdentifier(ByteBuffer.wrap(new byte[2])); + rmDTToken.setKind("Testclusterkind"); + rmDTToken.setPassword(ByteBuffer.wrap("testcluster".getBytes())); + rmDTToken.setService("0.0.0.0:8032"); + getDTResponse.setRMDelegationToken(rmDTToken); + final ClientRMProtocol cRMProtocol = mock(ClientRMProtocol.class); + when(cRMProtocol.getDelegationToken(any( + GetDelegationTokenRequest.class))).thenReturn(getDTResponse); + ResourceMgrDelegate rmgrDelegate = new ResourceMgrDelegate( + new YarnConfiguration(conf)) { + @Override + public synchronized void start() { + this.rmClient = cRMProtocol; + } + }; + yrunner.setResourceMgrDelegate(rmgrDelegate); + Token t = cluster.getDelegationToken(new Text(" ")); + assertTrue("Token kind is instead " + t.getKind().toString(), + "Testclusterkind".equals(t.getKind().toString())); + } finally { + if (cluster != null) { + cluster.close(); + } + } + } + +} Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1401071&r1=1401070&r2=1401071&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Mon Oct 22 20:43:16 2012 @@ -317,7 +317,7 @@ public class TestCombineFileInputFormat for (InputSplit split : splits) { System.out.println("File split(Test0): " + split); } - assertEquals(splits.size(), 1); + assertEquals(1, splits.size()); CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0); assertEquals(2, fileSplit.getNumPaths()); assertEquals(1, fileSplit.getLocations().length); @@ -347,24 +347,24 @@ public class TestCombineFileInputFormat for (InputSplit split : splits) { System.out.println("File split(Test1): " + split); } - assertEquals(splits.size(), 2); + assertEquals(2, splits.size()); fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(fileSplit.getNumPaths(), 2); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getPath(0).getName(), file2.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getPath(1).getName(), file2.getName()); - assertEquals(fileSplit.getOffset(1), BLOCKSIZE); - assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(fileSplit.getNumPaths(), 1); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getPath(0).getName(), file1.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file2.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 // create another file on 3 datanodes and 3 racks. dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null); @@ -378,37 +378,37 @@ public class TestCombineFileInputFormat for (InputSplit split : splits) { System.out.println("File split(Test2): " + split); } - assertEquals(splits.size(), 3); + assertEquals(3, splits.size()); fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(fileSplit.getNumPaths(), 3); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getPath(0).getName(), file3.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getPath(1).getName(), file3.getName()); - assertEquals(fileSplit.getOffset(1), BLOCKSIZE); - assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getPath(2).getName(), file3.getName()); - assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE); - assertEquals(fileSplit.getLength(2), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3 - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(fileSplit.getNumPaths(), 2); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getPath(0).getName(), file2.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getPath(1).getName(), file2.getName()); - assertEquals(fileSplit.getOffset(1), BLOCKSIZE); - assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(fileSplit.getNumPaths(), 1); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getPath(0).getName(), file1.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 + assertEquals(3, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file3.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file3.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(file3.getName(), fileSplit.getPath(2).getName()); + assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2)); + assertEquals(BLOCKSIZE, fileSplit.getLength(2)); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3 + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file2.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 // create file4 on all three racks Path file4 = new Path(dir4 + "/file4"); @@ -420,37 +420,37 @@ public class TestCombineFileInputFormat for (InputSplit split : splits) { System.out.println("File split(Test3): " + split); } - assertEquals(splits.size(), 3); + assertEquals(3, splits.size()); fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(fileSplit.getNumPaths(), 6); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getPath(0).getName(), file3.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getPath(1).getName(), file3.getName()); - assertEquals(fileSplit.getOffset(1), BLOCKSIZE); - assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getPath(2).getName(), file3.getName()); - assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE); - assertEquals(fileSplit.getLength(2), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3 - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(fileSplit.getNumPaths(), 2); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getPath(0).getName(), file2.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getPath(1).getName(), file2.getName()); - assertEquals(fileSplit.getOffset(1), BLOCKSIZE); - assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(fileSplit.getNumPaths(), 1); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getPath(0).getName(), file1.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 + assertEquals(6, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file3.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file3.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(file3.getName(), fileSplit.getPath(2).getName()); + assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2)); + assertEquals(BLOCKSIZE, fileSplit.getLength(2)); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3 + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file2.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 // maximum split size is 2 blocks inFormat = new DummyInputFormat(); @@ -462,35 +462,35 @@ public class TestCombineFileInputFormat for (InputSplit split : splits) { System.out.println("File split(Test4): " + split); } - assertEquals(splits.size(), 5); + assertEquals(5, splits.size()); fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(fileSplit.getNumPaths(), 2); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getPath(0).getName(), file3.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getPath(1).getName(), file3.getName()); - assertEquals(fileSplit.getOffset(1), BLOCKSIZE); - assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "host3.rack3.com"); - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(fileSplit.getPath(0).getName(), file3.getName()); - assertEquals(fileSplit.getOffset(0), 2 * BLOCKSIZE); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getPath(1).getName(), file4.getName()); - assertEquals(fileSplit.getOffset(1), 0); - assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "host3.rack3.com"); - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(fileSplit.getNumPaths(), 2); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getPath(0).getName(), file4.getName()); - assertEquals(fileSplit.getOffset(0), BLOCKSIZE); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getPath(1).getName(), file4.getName()); - assertEquals(fileSplit.getOffset(1), 2 * BLOCKSIZE); - assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "host3.rack3.com"); + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file3.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file3.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(file3.getName(), fileSplit.getPath(0).getName()); + assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file4.getName(), fileSplit.getPath(1).getName()); + assertEquals(0, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file4.getName(), fileSplit.getPath(0).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file4.getName(), fileSplit.getPath(1).getName()); + assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); // maximum split size is 3 blocks inFormat = new DummyInputFormat(); @@ -502,48 +502,48 @@ public class TestCombineFileInputFormat for (InputSplit split : splits) { System.out.println("File split(Test5): " + split); } - assertEquals(splits.size(), 4); + assertEquals(4, splits.size()); fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(fileSplit.getNumPaths(), 3); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getPath(0).getName(), file3.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getPath(1).getName(), file3.getName()); - assertEquals(fileSplit.getOffset(1), BLOCKSIZE); - assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getPath(2).getName(), file3.getName()); - assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE); - assertEquals(fileSplit.getLength(2), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "host3.rack3.com"); - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(fileSplit.getPath(0).getName(), file4.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getPath(1).getName(), file4.getName()); - assertEquals(fileSplit.getOffset(1), BLOCKSIZE); - assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getPath(2).getName(), file4.getName()); - assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE); - assertEquals(fileSplit.getLength(2), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "host3.rack3.com"); - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(fileSplit.getNumPaths(), 2); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getPath(0).getName(), file2.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getPath(1).getName(), file2.getName()); - assertEquals(fileSplit.getOffset(1), BLOCKSIZE); - assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "host2.rack2.com"); + assertEquals(3, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file3.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file3.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(file3.getName(), fileSplit.getPath(2).getName()); + assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2)); + assertEquals(BLOCKSIZE, fileSplit.getLength(2)); + assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(file4.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file4.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(file4.getName(), fileSplit.getPath(2).getName()); + assertEquals( 2 * BLOCKSIZE, fileSplit.getOffset(2)); + assertEquals(BLOCKSIZE, fileSplit.getLength(2)); + assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file2.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals("host2.rack2.com", fileSplit.getLocations()[0]); fileSplit = (CombineFileSplit) splits.get(3); - assertEquals(fileSplit.getNumPaths(), 1); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getPath(0).getName(), file1.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "host1.rack1.com"); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals("host1.rack1.com", fileSplit.getLocations()[0]); // maximum split size is 4 blocks inFormat = new DummyInputFormat(); @@ -553,42 +553,42 @@ public class TestCombineFileInputFormat for (InputSplit split : splits) { System.out.println("File split(Test6): " + split); } - assertEquals(splits.size(), 3); + assertEquals(3, splits.size()); fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(fileSplit.getNumPaths(), 4); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getPath(0).getName(), file3.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getPath(1).getName(), file3.getName()); - assertEquals(fileSplit.getOffset(1), BLOCKSIZE); - assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getPath(2).getName(), file3.getName()); - assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE); - assertEquals(fileSplit.getLength(2), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "host3.rack3.com"); - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(fileSplit.getNumPaths(), 4); - assertEquals(fileSplit.getPath(0).getName(), file2.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getPath(1).getName(), file2.getName()); - assertEquals(fileSplit.getOffset(1), BLOCKSIZE); - assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getPath(2).getName(), file4.getName()); - assertEquals(fileSplit.getOffset(2), BLOCKSIZE); - assertEquals(fileSplit.getLength(2), BLOCKSIZE); - assertEquals(fileSplit.getPath(3).getName(), file4.getName()); - assertEquals(fileSplit.getOffset(3), 2 * BLOCKSIZE); - assertEquals(fileSplit.getLength(3), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "host2.rack2.com"); - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(fileSplit.getNumPaths(), 1); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getPath(0).getName(), file1.getName()); - assertEquals(fileSplit.getOffset(0), 0); - assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 + assertEquals(4, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file3.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file3.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(file3.getName(), fileSplit.getPath(2).getName()); + assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2)); + assertEquals(BLOCKSIZE, fileSplit.getLength(2)); + assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(4, fileSplit.getNumPaths()); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file2.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(file4.getName(), fileSplit.getPath(2).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(2)); + assertEquals(BLOCKSIZE, fileSplit.getLength(2)); + assertEquals(file4.getName(), fileSplit.getPath(3).getName()); + assertEquals( 2 * BLOCKSIZE, fileSplit.getOffset(3)); + assertEquals(BLOCKSIZE, fileSplit.getLength(3)); + assertEquals("host2.rack2.com", fileSplit.getLocations()[0]); + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 // maximum split size is 7 blocks and min is 3 blocks inFormat = new DummyInputFormat(); @@ -601,15 +601,15 @@ public class TestCombineFileInputFormat for (InputSplit split : splits) { System.out.println("File split(Test7): " + split); } - assertEquals(splits.size(), 2); + assertEquals(2, splits.size()); fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(fileSplit.getNumPaths(), 6); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getLocations()[0], "host3.rack3.com"); - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(fileSplit.getNumPaths(), 3); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getLocations()[0], "host1.rack1.com"); + assertEquals(6, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(3, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals("host1.rack1.com", fileSplit.getLocations()[0]); // Rack 1 has file1, file2 and file3 and file4 // Rack 2 has file2 and file3 and file4 @@ -624,19 +624,19 @@ public class TestCombineFileInputFormat for (InputSplit split : splits) { System.out.println("File split(Test1): " + split); } - assertEquals(splits.size(), 3); + assertEquals(3, splits.size()); fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(fileSplit.getNumPaths(), 2); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(fileSplit.getNumPaths(), 1); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(fileSplit.getNumPaths(), 6); - assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3 + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(6, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3 // measure performance when there are multiple pools and // many files in each pool. @@ -669,7 +669,7 @@ public class TestCombineFileInputFormat for (InputSplit split : splits) { System.out.println("File split(Test8): " + split); } - assertEquals(6, splits.size()); + assertEquals(splits.size(), 6); } finally { if (dfs != null) { @@ -750,7 +750,7 @@ public class TestCombineFileInputFormat for (InputSplit split : splits) { System.out.println("File split(Test0): " + split); } - assertEquals(splits.size(), 1); + assertEquals(1, splits.size()); CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0); assertEquals(2, fileSplit.getNumPaths()); assertEquals(1, fileSplit.getLocations().length); @@ -1135,7 +1135,7 @@ public class TestCombineFileInputFormat Job job = Job.getInstance(conf); FileInputFormat.setInputPaths(job, "test"); List splits = inFormat.getSplits(job); - assertEquals(splits.size(), 1); + assertEquals(1, splits.size()); CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0); assertEquals(1, fileSplit.getNumPaths()); assertEquals(file.getName(), fileSplit.getPath(0).getName()); Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java?rev=1401071&r1=1401070&r2=1401071&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java Mon Oct 22 20:43:16 2012 @@ -1,196 +1,196 @@ -package org.apache.hadoop.examples; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.StringTokenizer; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -public class WordMean extends Configured implements Tool { - - private double mean = 0; - - private final static Text COUNT = new Text("count"); - private final static Text LENGTH = new Text("length"); - private final static LongWritable ONE = new LongWritable(1); - - /** - * Maps words from line of text into 2 key-value pairs; one key-value pair for - * counting the word, another for counting its length. - */ - public static class WordMeanMapper extends - Mapper { - - private LongWritable wordLen = new LongWritable(); - - /** - * Emits 2 key-value pairs for counting the word and its length. Outputs are - * (Text, LongWritable). - * - * @param value - * This will be a line of text coming in from our input file. - */ - public void map(Object key, Text value, Context context) - throws IOException, InterruptedException { - StringTokenizer itr = new StringTokenizer(value.toString()); - while (itr.hasMoreTokens()) { - String string = itr.nextToken(); - this.wordLen.set(string.length()); - context.write(LENGTH, this.wordLen); - context.write(COUNT, ONE); - } - } - } - - /** - * Performs integer summation of all the values for each key. - */ - public static class WordMeanReducer extends - Reducer { - - private LongWritable sum = new LongWritable(); - - /** - * Sums all the individual values within the iterator and writes them to the - * same key. - * - * @param key - * This will be one of 2 constants: LENGTH_STR or COUNT_STR. - * @param values - * This will be an iterator of all the values associated with that - * key. - */ - public void reduce(Text key, Iterable values, Context context) - throws IOException, InterruptedException { - - int theSum = 0; - for (LongWritable val : values) { - theSum += val.get(); - } - sum.set(theSum); - context.write(key, sum); - } - } - - /** - * Reads the output file and parses the summation of lengths, and the word - * count, to perform a quick calculation of the mean. - * - * @param path - * The path to find the output file in. Set in main to the output - * directory. - * @throws IOException - * If it cannot access the output directory, we throw an exception. - */ - private double readAndCalcMean(Path path, Configuration conf) - throws IOException { - FileSystem fs = FileSystem.get(conf); - Path file = new Path(path, "part-r-00000"); - - if (!fs.exists(file)) - throw new IOException("Output not found!"); - - BufferedReader br = null; - - // average = total sum / number of elements; - try { - br = new BufferedReader(new InputStreamReader(fs.open(file))); - - long count = 0; - long length = 0; - - String line; - while ((line = br.readLine()) != null) { - StringTokenizer st = new StringTokenizer(line); - - // grab type - String type = st.nextToken(); - - // differentiate - if (type.equals(COUNT.toString())) { - String countLit = st.nextToken(); - count = Long.parseLong(countLit); - } else if (type.equals(LENGTH.toString())) { - String lengthLit = st.nextToken(); - length = Long.parseLong(lengthLit); - } - } - - double theMean = (((double) length) / ((double) count)); - System.out.println("The mean is: " + theMean); - return theMean; - } finally { - br.close(); - } - } - - public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new WordMean(), args); - } - - @Override - public int run(String[] args) throws Exception { - if (args.length != 2) { - System.err.println("Usage: wordmean "); - return 0; - } - - Configuration conf = getConf(); - - @SuppressWarnings("deprecation") - Job job = new Job(conf, "word mean"); - job.setJarByClass(WordMean.class); - job.setMapperClass(WordMeanMapper.class); - job.setCombinerClass(WordMeanReducer.class); - job.setReducerClass(WordMeanReducer.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - FileInputFormat.addInputPath(job, new Path(args[0])); - Path outputpath = new Path(args[1]); - FileOutputFormat.setOutputPath(job, outputpath); - boolean result = job.waitForCompletion(true); - mean = readAndCalcMean(outputpath, conf); - - return (result ? 0 : 1); - } - - /** - * Only valuable after run() called. - * - * @return Returns the mean value. - */ - public double getMean() { - return mean; - } +package org.apache.hadoop.examples; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class WordMean extends Configured implements Tool { + + private double mean = 0; + + private final static Text COUNT = new Text("count"); + private final static Text LENGTH = new Text("length"); + private final static LongWritable ONE = new LongWritable(1); + + /** + * Maps words from line of text into 2 key-value pairs; one key-value pair for + * counting the word, another for counting its length. + */ + public static class WordMeanMapper extends + Mapper { + + private LongWritable wordLen = new LongWritable(); + + /** + * Emits 2 key-value pairs for counting the word and its length. Outputs are + * (Text, LongWritable). + * + * @param value + * This will be a line of text coming in from our input file. + */ + public void map(Object key, Text value, Context context) + throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + String string = itr.nextToken(); + this.wordLen.set(string.length()); + context.write(LENGTH, this.wordLen); + context.write(COUNT, ONE); + } + } + } + + /** + * Performs integer summation of all the values for each key. + */ + public static class WordMeanReducer extends + Reducer { + + private LongWritable sum = new LongWritable(); + + /** + * Sums all the individual values within the iterator and writes them to the + * same key. + * + * @param key + * This will be one of 2 constants: LENGTH_STR or COUNT_STR. + * @param values + * This will be an iterator of all the values associated with that + * key. + */ + public void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + + int theSum = 0; + for (LongWritable val : values) { + theSum += val.get(); + } + sum.set(theSum); + context.write(key, sum); + } + } + + /** + * Reads the output file and parses the summation of lengths, and the word + * count, to perform a quick calculation of the mean. + * + * @param path + * The path to find the output file in. Set in main to the output + * directory. + * @throws IOException + * If it cannot access the output directory, we throw an exception. + */ + private double readAndCalcMean(Path path, Configuration conf) + throws IOException { + FileSystem fs = FileSystem.get(conf); + Path file = new Path(path, "part-r-00000"); + + if (!fs.exists(file)) + throw new IOException("Output not found!"); + + BufferedReader br = null; + + // average = total sum / number of elements; + try { + br = new BufferedReader(new InputStreamReader(fs.open(file))); + + long count = 0; + long length = 0; + + String line; + while ((line = br.readLine()) != null) { + StringTokenizer st = new StringTokenizer(line); + + // grab type + String type = st.nextToken(); + + // differentiate + if (type.equals(COUNT.toString())) { + String countLit = st.nextToken(); + count = Long.parseLong(countLit); + } else if (type.equals(LENGTH.toString())) { + String lengthLit = st.nextToken(); + length = Long.parseLong(lengthLit); + } + } + + double theMean = (((double) length) / ((double) count)); + System.out.println("The mean is: " + theMean); + return theMean; + } finally { + br.close(); + } + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new WordMean(), args); + } + + @Override + public int run(String[] args) throws Exception { + if (args.length != 2) { + System.err.println("Usage: wordmean "); + return 0; + } + + Configuration conf = getConf(); + + @SuppressWarnings("deprecation") + Job job = new Job(conf, "word mean"); + job.setJarByClass(WordMean.class); + job.setMapperClass(WordMeanMapper.class); + job.setCombinerClass(WordMeanReducer.class); + job.setReducerClass(WordMeanReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + FileInputFormat.addInputPath(job, new Path(args[0])); + Path outputpath = new Path(args[1]); + FileOutputFormat.setOutputPath(job, outputpath); + boolean result = job.waitForCompletion(true); + mean = readAndCalcMean(outputpath, conf); + + return (result ? 0 : 1); + } + + /** + * Only valuable after run() called. + * + * @return Returns the mean value. + */ + public double getMean() { + return mean; + } } \ No newline at end of file Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java?rev=1401071&r1=1401070&r2=1401071&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java Mon Oct 22 20:43:16 2012 @@ -1,208 +1,208 @@ -package org.apache.hadoop.examples; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.StringTokenizer; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.TaskCounter; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -public class WordMedian extends Configured implements Tool { - - private double median = 0; - private final static IntWritable ONE = new IntWritable(1); - - /** - * Maps words from line of text into a key-value pair; the length of the word - * as the key, and 1 as the value. - */ - public static class WordMedianMapper extends - Mapper { - - private IntWritable length = new IntWritable(); - - /** - * Emits a key-value pair for counting the word. Outputs are (IntWritable, - * IntWritable). - * - * @param value - * This will be a line of text coming in from our input file. - */ - public void map(Object key, Text value, Context context) - throws IOException, InterruptedException { - StringTokenizer itr = new StringTokenizer(value.toString()); - while (itr.hasMoreTokens()) { - String string = itr.nextToken(); - length.set(string.length()); - context.write(length, ONE); - } - } - } - - /** - * Performs integer summation of all the values for each key. - */ - public static class WordMedianReducer extends - Reducer { - - private IntWritable val = new IntWritable(); - - /** - * Sums all the individual values within the iterator and writes them to the - * same key. - * - * @param key - * This will be a length of a word that was read. - * @param values - * This will be an iterator of all the values associated with that - * key. - */ - public void reduce(IntWritable key, Iterable values, - Context context) throws IOException, InterruptedException { - - int sum = 0; - for (IntWritable value : values) { - sum += value.get(); - } - val.set(sum); - context.write(key, val); - } - } - - /** - * This is a standard program to read and find a median value based on a file - * of word counts such as: 1 456, 2 132, 3 56... Where the first values are - * the word lengths and the following values are the number of times that - * words of that length appear. - * - * @param path - * The path to read the HDFS file from (part-r-00000...00001...etc). - * @param medianIndex1 - * The first length value to look for. - * @param medianIndex2 - * The second length value to look for (will be the same as the first - * if there are an even number of words total). - * @throws IOException - * If file cannot be found, we throw an exception. - * */ - private double readAndFindMedian(String path, int medianIndex1, - int medianIndex2, Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(conf); - Path file = new Path(path, "part-r-00000"); - - if (!fs.exists(file)) - throw new IOException("Output not found!"); - - BufferedReader br = null; - - try { - br = new BufferedReader(new InputStreamReader(fs.open(file))); - int num = 0; - - String line; - while ((line = br.readLine()) != null) { - StringTokenizer st = new StringTokenizer(line); - - // grab length - String currLen = st.nextToken(); - - // grab count - String lengthFreq = st.nextToken(); - - int prevNum = num; - num += Integer.parseInt(lengthFreq); - - if (medianIndex2 >= prevNum && medianIndex1 <= num) { - System.out.println("The median is: " + currLen); - br.close(); - return Double.parseDouble(currLen); - } else if (medianIndex2 >= prevNum && medianIndex1 < num) { - String nextCurrLen = st.nextToken(); - double theMedian = (Integer.parseInt(currLen) + Integer - .parseInt(nextCurrLen)) / 2.0; - System.out.println("The median is: " + theMedian); - br.close(); - return theMedian; - } - } - } finally { - br.close(); - } - // error, no median found - return -1; - } - - public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new WordMedian(), args); - } - - @Override - public int run(String[] args) throws Exception { - if (args.length != 2) { - System.err.println("Usage: wordmedian "); - return 0; - } - - setConf(new Configuration()); - Configuration conf = getConf(); - - @SuppressWarnings("deprecation") - Job job = new Job(conf, "word median"); - job.setJarByClass(WordMedian.class); - job.setMapperClass(WordMedianMapper.class); - job.setCombinerClass(WordMedianReducer.class); - job.setReducerClass(WordMedianReducer.class); - job.setOutputKeyClass(IntWritable.class); - job.setOutputValueClass(IntWritable.class); - FileInputFormat.addInputPath(job, new Path(args[0])); - FileOutputFormat.setOutputPath(job, new Path(args[1])); - boolean result = job.waitForCompletion(true); - - // Wait for JOB 1 -- get middle value to check for Median - - long totalWords = job.getCounters() - .getGroup(TaskCounter.class.getCanonicalName()) - .findCounter("MAP_OUTPUT_RECORDS", "Map output records").getValue(); - int medianIndex1 = (int) Math.ceil((totalWords / 2.0)); - int medianIndex2 = (int) Math.floor((totalWords / 2.0)); - - median = readAndFindMedian(args[1], medianIndex1, medianIndex2, conf); - - return (result ? 0 : 1); - } - - public double getMedian() { - return median; - } -} +package org.apache.hadoop.examples; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class WordMedian extends Configured implements Tool { + + private double median = 0; + private final static IntWritable ONE = new IntWritable(1); + + /** + * Maps words from line of text into a key-value pair; the length of the word + * as the key, and 1 as the value. + */ + public static class WordMedianMapper extends + Mapper { + + private IntWritable length = new IntWritable(); + + /** + * Emits a key-value pair for counting the word. Outputs are (IntWritable, + * IntWritable). + * + * @param value + * This will be a line of text coming in from our input file. + */ + public void map(Object key, Text value, Context context) + throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + String string = itr.nextToken(); + length.set(string.length()); + context.write(length, ONE); + } + } + } + + /** + * Performs integer summation of all the values for each key. + */ + public static class WordMedianReducer extends + Reducer { + + private IntWritable val = new IntWritable(); + + /** + * Sums all the individual values within the iterator and writes them to the + * same key. + * + * @param key + * This will be a length of a word that was read. + * @param values + * This will be an iterator of all the values associated with that + * key. + */ + public void reduce(IntWritable key, Iterable values, + Context context) throws IOException, InterruptedException { + + int sum = 0; + for (IntWritable value : values) { + sum += value.get(); + } + val.set(sum); + context.write(key, val); + } + } + + /** + * This is a standard program to read and find a median value based on a file + * of word counts such as: 1 456, 2 132, 3 56... Where the first values are + * the word lengths and the following values are the number of times that + * words of that length appear. + * + * @param path + * The path to read the HDFS file from (part-r-00000...00001...etc). + * @param medianIndex1 + * The first length value to look for. + * @param medianIndex2 + * The second length value to look for (will be the same as the first + * if there are an even number of words total). + * @throws IOException + * If file cannot be found, we throw an exception. + * */ + private double readAndFindMedian(String path, int medianIndex1, + int medianIndex2, Configuration conf) throws IOException { + FileSystem fs = FileSystem.get(conf); + Path file = new Path(path, "part-r-00000"); + + if (!fs.exists(file)) + throw new IOException("Output not found!"); + + BufferedReader br = null; + + try { + br = new BufferedReader(new InputStreamReader(fs.open(file))); + int num = 0; + + String line; + while ((line = br.readLine()) != null) { + StringTokenizer st = new StringTokenizer(line); + + // grab length + String currLen = st.nextToken(); + + // grab count + String lengthFreq = st.nextToken(); + + int prevNum = num; + num += Integer.parseInt(lengthFreq); + + if (medianIndex2 >= prevNum && medianIndex1 <= num) { + System.out.println("The median is: " + currLen); + br.close(); + return Double.parseDouble(currLen); + } else if (medianIndex2 >= prevNum && medianIndex1 < num) { + String nextCurrLen = st.nextToken(); + double theMedian = (Integer.parseInt(currLen) + Integer + .parseInt(nextCurrLen)) / 2.0; + System.out.println("The median is: " + theMedian); + br.close(); + return theMedian; + } + } + } finally { + br.close(); + } + // error, no median found + return -1; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new WordMedian(), args); + } + + @Override + public int run(String[] args) throws Exception { + if (args.length != 2) { + System.err.println("Usage: wordmedian "); + return 0; + } + + setConf(new Configuration()); + Configuration conf = getConf(); + + @SuppressWarnings("deprecation") + Job job = new Job(conf, "word median"); + job.setJarByClass(WordMedian.class); + job.setMapperClass(WordMedianMapper.class); + job.setCombinerClass(WordMedianReducer.class); + job.setReducerClass(WordMedianReducer.class); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(IntWritable.class); + FileInputFormat.addInputPath(job, new Path(args[0])); + FileOutputFormat.setOutputPath(job, new Path(args[1])); + boolean result = job.waitForCompletion(true); + + // Wait for JOB 1 -- get middle value to check for Median + + long totalWords = job.getCounters() + .getGroup(TaskCounter.class.getCanonicalName()) + .findCounter("MAP_OUTPUT_RECORDS", "Map output records").getValue(); + int medianIndex1 = (int) Math.ceil((totalWords / 2.0)); + int medianIndex2 = (int) Math.floor((totalWords / 2.0)); + + median = readAndFindMedian(args[1], medianIndex1, medianIndex2, conf); + + return (result ? 0 : 1); + } + + public double getMedian() { + return median; + } +}