hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1503292 - in /hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
Date Mon, 15 Jul 2013 15:21:19 GMT
Author: edwardyoon
Date: Mon Jul 15 15:21:19 2013
New Revision: 1503292

URL: http://svn.apache.org/r1503292
Log:
HAMA-772: When selected KeyValueTextInputFormat, workers get only one value for key

Added:
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1503292&r1=1503291&r2=1503292&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Jul 15 15:21:19 2013
@@ -6,6 +6,7 @@ Release 0.6.3 (unreleased changes)
 
   BUG FIXES
 
+   HAMA-772: When selected KeyValueTextInputFormat, workers get only one value for key (Ikhtiyor
Ahmedov via edwardyoon)
    HAMA-776: localQueue is set as Send queue in init function of AbstractMessageManager (kennethxian)
    HAMA-769: Intermediate queue's close method is not called, clean work may be omitted (kennethxian)
    HAMA-771: Determining the count of active vertices (edwardyoon)

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1503292&r1=1503291&r2=1503292&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Mon Jul 15 15:21:19
2013
@@ -19,6 +19,8 @@ package org.apache.hama.bsp;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -45,7 +47,7 @@ public class PartitioningRunner extends
   private FileSystem fs = null;
   private Path partitionDir;
   private RecordConverter converter;
-  private Map<Integer, Map<Writable, Writable>> values = new HashMap<Integer,
Map<Writable, Writable>>();
+  private Map<Integer,LinkedList<KeyValuePair<Writable,Writable>>> values
= new HashMap<Integer, LinkedList<KeyValuePair<Writable,Writable>>>();
 
   @Override
   public final void setup(
@@ -101,6 +103,11 @@ public class PartitioningRunner extends
      *         needed.
      */
     public Map<Writable, Writable> newMap();
+    
+    /**
+     * @return a list implementation, so order will not be changed in subclasses
+     */
+    public List<KeyValuePair<Writable, Writable>> newList();
   }
 
   /**
@@ -133,6 +140,11 @@ public class PartitioningRunner extends
     public Map<Writable, Writable> newMap() {
       return new HashMap<Writable, Writable>();
     }
+
+    @Override
+    public List<KeyValuePair<Writable, Writable>> newList() {
+      return new LinkedList<KeyValuePair<Writable,Writable>>();
+    }
   }
 
   @Override
@@ -160,23 +172,23 @@ public class PartitioningRunner extends
 
       int index = converter.getPartitionId(outputPair, partitioner, conf, peer,
           desiredNum);
-
-      Map<Writable, Writable> map = values.get(index);
-      if (map == null) {
-        map = converter.newMap();
-        values.put(index, map);
+      
+      LinkedList<KeyValuePair<Writable, Writable>> list = values.get(index);
+      if (list == null) {
+        list = (LinkedList<KeyValuePair<Writable, Writable>>) converter.newList();
+        values.put(index, list);
       }
-      map.put(pair.getKey(), pair.getValue());
+      list.add(new KeyValuePair<Writable, Writable>(pair.getKey(), pair.getValue()));
     }
 
     // The reason of use of Memory is to reduce file opens
-    for (Map.Entry<Integer, Map<Writable, Writable>> e : values.entrySet()) {
+    for (Map.Entry<Integer, LinkedList<KeyValuePair<Writable, Writable>>>
e : values.entrySet()) {
       Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
           + peer.getPeerIndex());
       SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
           destFile, keyClass, valueClass, CompressionType.NONE);
 
-      for (Map.Entry<Writable, Writable> v : e.getValue().entrySet()) {
+      for (KeyValuePair<Writable, Writable> v : e.getValue()) {
         writer.append(v.getKey(), v.getValue());
       }
       writer.close();

Added: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java?rev=1503292&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java Mon
Jul 15 15:21:19 2013
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestKeyValueTextInputFormat extends TestCase {
+
+  public static class KeyValueHashPartitionedBSP 
+      extends 
+      BSP<Text, Text, NullWritable, NullWritable, MapWritable> {
+    public static final String TEST_INPUT_VALUES = "test.bsp.max.input";
+    public static final String TEST_UNEXPECTED_KEYS = "test.bsp.keys.unexpected";
+    public static final String TEST_MAX_VALUE = "test.bsp.keys.max";
+
+    private int numTasks = 0;
+    private int maxValue = 0;
+    private MapWritable expectedKeys = new MapWritable();
+    //private Set<Text> expectedKeys = new HashSet<Text>();
+
+    @Override
+    public void setup(
+        BSPPeer<Text, Text, NullWritable, NullWritable, MapWritable> peer)
+        throws IOException, SyncException, InterruptedException {
+      Configuration conf = peer.getConfiguration();
+      maxValue = conf.getInt(KeyValueHashPartitionedBSP.TEST_MAX_VALUE, 1000);
+      numTasks = peer.getNumPeers();
+    }
+
+    @Override
+    public void bsp(
+        BSPPeer<Text, Text, NullWritable, NullWritable, MapWritable> peer)
+        throws IOException, SyncException, InterruptedException {
+      Text key = null;
+      Text value = null;
+      MapWritable message = new MapWritable();
+      message.put(new Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS), new BooleanWritable(false));
+      KeyValuePair<Text, Text> tmp = null;
+
+      while ( (tmp = peer.readNext()) != null) {
+        key = tmp.getKey();
+        value = tmp.getValue();
+
+        int expectedPeerId = Math.abs(key.hashCode() % numTasks);
+
+        if (expectedPeerId == peer.getPeerIndex()) {
+          if (expectedKeys.containsKey(key)) {
+            // same key twice, incorrect
+            message.put(new Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS), new BooleanWritable(true));
+            break;
+          } else {
+            expectedKeys.put(new Text(key), new Text(value));
+          }
+        } else {
+          message.put(new Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS), new BooleanWritable(true));
+          break;
+        } //if (expectedPeerId == peer.getPeerIndex())
+      } //while (peer.readNext(key, value) != false)
+      message.put(new Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES), expectedKeys);
+      
+      int master = peer.getNumPeers()/2;
+      String masterName = peer.getPeerName(master);
+      peer.send(masterName, message);
+      peer.sync();
+
+      if(peer.getPeerIndex() == master) {
+        MapWritable msg = null;
+        MapWritable values = null;
+        BooleanWritable blValue = null;
+        HashMap<Integer, Integer> input = new HashMap<Integer, Integer>();
+
+        while ( (msg = peer.getCurrentMessage()) != null ) {
+          blValue = (BooleanWritable) msg.get(new Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS));
+          assertEquals(false, blValue.get());
+          values = (MapWritable) msg.get(new Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES));
+          for (Map.Entry<Writable,Writable> w : values.entrySet()) {
+            input.put( Integer.valueOf( w.getKey().toString() ), Integer.valueOf( w.getValue().toString()
));
+          }
+        }
+        
+        for (int i=0; i<maxValue; i++) {
+          assertEquals(true, input.containsKey(Integer.valueOf(i)));
+          assertEquals(i*i, input.get(Integer.valueOf(i)).intValue());
+        }
+      }
+      peer.sync();
+    }
+  }
+  
+  @Test
+  public void testInput() {
+    
+    Configuration fsConf = new Configuration();
+    String strDataPath = "/tmp/test_keyvalueinputformat";
+    Path dataPath = new Path(strDataPath);
+    Path outPath = new Path("/tmp/test_keyvalueinputformat_out");
+    
+    int maxValue = 1000;
+    
+    try {
+      URI uri = new URI(strDataPath);
+      FileSystem fs = FileSystem.get(uri, fsConf);
+      fs.delete(dataPath, true);
+      FSDataOutputStream fileOut = fs.create(dataPath, true);
+
+      StringBuilder str = new StringBuilder();
+      for (int i = 0; i < maxValue; ++i) {
+        str.append(i);
+        str.append("\t");
+        str.append(i*i);
+        str.append("\n");
+      }
+      fileOut.writeBytes(str.toString());
+      fileOut.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    
+    
+    try {
+      HamaConfiguration conf = new HamaConfiguration();
+      conf.setInt(KeyValueHashPartitionedBSP.TEST_MAX_VALUE, maxValue);
+      BSPJob job = new BSPJob(conf, TestKeyValueTextInputFormat.class);
+      job.setJobName("Test KeyValueTextInputFormat together with HashPartitioner");
+      job.setBspClass(KeyValueHashPartitionedBSP.class);
+      
+      job.setPartitioner(HashPartitioner.class);
+
+      job.setInputPath(dataPath);
+      job.setInputFormat(KeyValueTextInputFormat.class);
+      job.setInputKeyClass(Text.class);
+      job.setInputValueClass(Text.class);
+
+      job.setOutputPath(outPath);
+      job.setOutputFormat(SequenceFileOutputFormat.class);
+      job.setOutputKeyClass(NullWritable.class);
+      job.setOutputValueClass(NullWritable.class);
+
+      BSPJobClient jobClient = new BSPJobClient(conf);
+      ClusterStatus cluster = jobClient.getClusterStatus(true);
+      job.setNumBspTask(cluster.getMaxTasks());
+      
+      assertEquals(true, job.waitForCompletion(true));
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java?rev=1503292&r1=1503291&r2=1503292&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java Mon Jul 15
15:21:19 2013
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.graph;
 
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -103,4 +105,9 @@ public abstract class VertexInputReader<
     return new TreeMap<Writable, Writable>();
   }
 
+  @Override
+  public List<KeyValuePair<Writable, Writable>> newList() {
+    return new LinkedList<KeyValuePair<Writable,Writable>>();
+  }
+
 }



Mime
View raw message