hama-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tommaso Teofili <tommaso.teof...@gmail.com>
Subject Re: svn commit: r1679360 - in /hama/trunk/core/src: main/java/org/apache/hama/bsp/BSPJobClient.java main/java/org/apache/hama/bsp/PartitioningRunner.java test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
Date Thu, 14 May 2015 13:14:15 GMT
Hi Edward,

I see your latest commits miss the commit message, it'd be good if you
could always add that as that helps others understand such commits without
having to look too deeply into the code and to be able to browse SVN
history more easily.

Thanks,
Tommaso

2015-05-14 15:01 GMT+02:00 <edwardyoon@apache.org>:

> Author: edwardyoon
> Date: Thu May 14 13:01:18 2015
> New Revision: 1679360
>
> URL: http://svn.apache.org/r1679360
> Log: (empty)
>
> Modified:
>     hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
>
> hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
>
> hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
>
> Modified:
> hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
> URL:
> http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1679360&r1=1679359&r2=1679360&view=diff
>
> ==============================================================================
> --- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
> (original)
> +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
> Thu May 14 13:01:18 2015
> @@ -471,7 +471,7 @@ public class BSPJobClient extends Config
>          partitioningJob.getConfiguration().setClass(
>              MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
>              MessageQueue.class);
> -
> +
>          partitioningJob.setBoolean(Constants.FORCE_SET_BSP_TASKS, true);
>          partitioningJob.setInputFormat(job.getInputFormat().getClass());
>          partitioningJob.setInputKeyClass(job.getInputKeyClass());
> @@ -578,6 +578,15 @@ public class BSPJobClient extends Config
>        DataOutputBuffer buffer = new DataOutputBuffer();
>        RawSplit rawSplit = new RawSplit();
>        for (InputSplit split : splits) {
> +
> +        // set partitionID to rawSplit
> +        if (split.getClass().getName().equals(FileSplit.class.getName())
> +            &&
> job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null
> +            && job.get("bsp.partitioning.runner.job") == null) {
> +          String[] extractPartitionID = ((FileSplit)
> split).getPath().getName().split("[-]");
> +
> rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
> +        }
> +
>          rawSplit.setClassName(split.getClass().getName());
>          buffer.reset();
>          split.write(buffer);
> @@ -629,7 +638,10 @@ public class BSPJobClient extends Config
>      for (int i = 0; i < len; ++i) {
>        RawSplit split = new RawSplit();
>        split.readFields(in);
> -      result[i] = split;
> +      if (split.getPartitionID() != Integer.MIN_VALUE)
> +        result[split.getPartitionID()] = split;
> +      else
> +        result[i] = split;
>      }
>      return result;
>    }
> @@ -1075,12 +1087,21 @@ public class BSPJobClient extends Config
>      private String splitClass;
>      private BytesWritable bytes = new BytesWritable();
>      private String[] locations;
> +    private int partitionID = Integer.MIN_VALUE;
>      long dataLength;
>
>      public void setBytes(byte[] data, int offset, int length) {
>        bytes.set(data, offset, length);
>      }
>
> +    public void setPartitionID(int id) {
> +      this.partitionID = id;
> +    }
> +
> +    public int getPartitionID() {
> +      return partitionID;
> +    }
> +
>      public void setClassName(String className) {
>        splitClass = className;
>      }
>
> Modified:
> hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
> URL:
> http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1679360&r1=1679359&r2=1679360&view=diff
>
> ==============================================================================
> ---
> hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
> (original)
> +++
> hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
> Thu May 14 13:01:18 2015
> @@ -144,7 +144,6 @@ public class PartitioningRunner extends
>        raw = new MapWritable();
>        raw.put(rawRecord.getKey(), rawRecord.getValue());
>
> -      System.out.println(peer.getPeerName(index) + ", " +
> rawRecord.getKey() + ", " + rawRecord.getValue());
>        peer.send(peer.getPeerName(index), raw);
>      }
>
>
> Modified:
> hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
> URL:
> http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java?rev=1679360&r1=1679359&r2=1679360&view=diff
>
> ==============================================================================
> ---
> hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
> (original)
> +++
> hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
> Thu May 14 13:01:18 2015
> @@ -78,8 +78,6 @@ public class TestKeyValueTextInputFormat
>
>          int expectedPeerId = Math.abs(key.hashCode() % numTasks);
>
> -        System.out.println(peer.getPeerName() + ": " + key + ", " + value
> + ", " + expectedPeerId);
> -        /*
>          if (expectedPeerId == peer.getPeerIndex()) {
>            expectedKeys.put(new Text(key), new Text(value));
>          } else {
> @@ -88,7 +86,6 @@ public class TestKeyValueTextInputFormat
>                new BooleanWritable(true));
>            break;
>          }
> -        */
>        }
>        message.put(new Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES),
>            expectedKeys);
> @@ -106,7 +103,6 @@ public class TestKeyValueTextInputFormat
>          while ((msg = peer.getCurrentMessage()) != null) {
>            blValue = (BooleanWritable) msg.get(new Text(
>                KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS));
> -          System.out.println(">>>>> " + peer.getPeerName() + ", " +
> blValue.get());
>            assertEquals(false, blValue.get());
>            values = (MapWritable) msg.get(new Text(
>                KeyValueHashPartitionedBSP.TEST_INPUT_VALUES));
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message