hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1511158 - in /hama/trunk: ./ examples/src/main/java/org/apache/hama/examples/
Date Wed, 07 Aug 2013 01:19:04 GMT
Author: edwardyoon
Date: Wed Aug  7 01:19:04 2013
New Revision: 1511158

URL: http://svn.apache.org/r1511158
Log:
HAMA-594: Semi-Clustering Algorithm Implementation

Added:
    hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1511158&r1=1511157&r2=1511158&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Aug  7 01:19:04 2013
@@ -4,6 +4,8 @@ Release 0.6.3 (unreleased changes)
 
   NEW FEATURES
 
+   HAMA-594: Semi-Clustering Algorithm Implementation (Renil Jeseph via edwardyoon)
+
   BUG FIXES
   
    HAMA-782: The arguments of DoubleVector.slice(int, int) method will mislead the user.
(Yexi Jiang)

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1511158&r1=1511157&r2=1511158&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java Wed Aug
 7 01:19:04 2013
@@ -36,6 +36,7 @@ public class ExampleDriver {
       pgd.addClass("pagerank", PageRank.class, "PageRank");
       pgd.addClass("inlnkcount", InlinkCount.class, "InlinkCount");
       pgd.addClass("bipartite", BipartiteMatching.class, "Bipartite Matching");
+      pgd.addClass("semi", SemiClusterJobDriver.class, "Semi Clustering");
       pgd.addClass("kmeans", Kmeans.class, "K-Means Clustering");
       pgd.addClass("gd", GradientDescentExample.class, "Gradient Descent");
 

Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java?rev=1511158&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java (added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java Wed
Aug  7 01:19:04 2013
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.examples;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * The SemiClusterDetails class is used to store a Semi-Cluster ID and its
+ * score.This class implements Comparable interface which compares the score of
+ * the objects.
+ * 
+ */
+
+public class SemiClusterDetails implements
+    WritableComparable<SemiClusterDetails> {
+
+  private String semiClusterId;
+  private double semiClusterScore;
+
+  public SemiClusterDetails() {
+    this.semiClusterId = "";
+    this.semiClusterScore = 1.0;
+  }
+
+  public SemiClusterDetails(String semiClusterId, double semiClusterScore) {
+    this.semiClusterId = semiClusterId;
+    this.semiClusterScore = semiClusterScore;
+  }
+
+  public String getSemiClusterId() {
+    return semiClusterId;
+  }
+
+  public void setSemiClusterId(String semiClusterId) {
+    this.semiClusterId = semiClusterId;
+  }
+
+  public double getSemiClusterScore() {
+    return semiClusterScore;
+  }
+
+  public void setSemiClusterScore(double semiClusterScore) {
+    this.semiClusterScore = semiClusterScore;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result
+        + ((semiClusterId == null) ? 0 : semiClusterId.hashCode());
+    long temp;
+    temp = Double.doubleToLongBits(semiClusterScore);
+    result = prime * result + (int) (temp ^ (temp >>> 32));
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    SemiClusterDetails other = (SemiClusterDetails) obj;
+    if (semiClusterId == null) {
+      if (other.semiClusterId != null)
+        return false;
+    } else if (!semiClusterId.equals(other.semiClusterId))
+      return false;
+    if (Double.doubleToLongBits(semiClusterScore) != Double
+        .doubleToLongBits(other.semiClusterScore))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return semiClusterId;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    String semiClusterId = in.readUTF();
+    setSemiClusterId(semiClusterId);
+    double score = in.readDouble();
+    setSemiClusterScore(score);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(semiClusterId);
+    out.writeDouble(semiClusterScore);
+  }
+
+  @Override
+  public int compareTo(SemiClusterDetails sc) {
+    return (this.getSemiClusterId().compareTo(sc.getSemiClusterId()));
+  }
+}

Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java?rev=1511158&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java (added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java Wed
Aug  7 01:19:04 2013
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.examples;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.graph.GraphJob;
+
+public class SemiClusterJobDriver {
+
+  protected static final Log LOG = LogFactory
+      .getLog(SemiClusterJobDriver.class);
+  private static final String outputPathString = "semicluster.outputpath";
+  private static final String inputPathString = "semicluster.inputmatrixpath";
+  private static final String requestedGraphJobMaxIterationString = "hama.graph.max.iteration";
+  private static final String semiClusterMaximumVertexCount = "semicluster.max.vertex.count";
+  private static final String graphJobMessageSentCount = "semicluster.max.message.sent.count";
+  private static final String graphJobVertexMaxClusterCount = "vertex.max.cluster.count";
+
+  public static void startTask(HamaConfiguration conf) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    GraphJob semiClusterJob = new GraphJob(conf, SemiClusterJobDriver.class);
+    semiClusterJob
+        .setVertexOutputWriterClass(SemiClusterVertexOutputWriter.class);
+    semiClusterJob.setJobName("SemiClusterJob");
+    semiClusterJob.setVertexClass(SemiClusteringVertex.class);
+    semiClusterJob.setInputPath(new Path(conf.get(inputPathString)));
+    semiClusterJob.setOutputPath(new Path(conf.get(outputPathString)));
+
+    semiClusterJob.set("hama.graph.self.ref", "true");
+    semiClusterJob.set("hama.graph.repair", "true");
+
+    semiClusterJob.setVertexIDClass(Text.class);
+    semiClusterJob.setVertexValueClass(SemiClusterMessage.class);
+    semiClusterJob.setEdgeValueClass(DoubleWritable.class);
+
+    semiClusterJob.setInputKeyClass(LongWritable.class);
+    semiClusterJob.setInputValueClass(Text.class);
+    semiClusterJob.setInputFormat(TextInputFormat.class);
+    semiClusterJob.setVertexInputReaderClass(SemiClusterTextReader.class);
+
+    semiClusterJob.setPartitioner(HashPartitioner.class);
+
+    semiClusterJob.setOutputFormat(TextOutputFormat.class);
+    semiClusterJob.setOutputKeyClass(Text.class);
+    semiClusterJob.setOutputValueClass(Text.class);
+
+    long startTime = System.currentTimeMillis();
+    if (semiClusterJob.waitForCompletion(true)) {
+      System.out.println("Job Finished in "
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+    }
+  }
+
+  private static void printUsage() {
+    LOG.info("Usage: SemiClusterO <input path>  <output path> [number of tasks
(default max)] [Maximum number of vertices in a Semi Cluster (default 10)] [Number of messages
sent from a Vertex(default 10)][Maximum number of clusters in which a vertex can be containted(default
10)]");
+  }
+
+  /**
+   * Function parses command line in standart form.
+   */
+  private static void parseArgs(HamaConfiguration conf, String[] args) {
+    if (args.length < 2) {
+      printUsage();
+      System.exit(-1);
+    }
+
+    conf.set(inputPathString, args[0]);
+
+    Path path = new Path(args[1]);
+    conf.set(outputPathString, path.toString());
+
+    if (args.length >= 3) {
+      try {
+        int taskCount = Integer.parseInt(args[2]);
+        if (taskCount < 0) {
+          printUsage();
+          throw new IllegalArgumentException(
+              "The number of requested job maximum iteration count can't be negative. Actual
value: "
+                  + String.valueOf(taskCount));
+        }
+        conf.setInt(requestedGraphJobMaxIterationString, taskCount);
+        if (args.length >= 4) {
+          int maximumVertexCount = Integer.parseInt(args[3]);
+          if (maximumVertexCount < 0) {
+            printUsage();
+            throw new IllegalArgumentException(
+                "The number of  maximum vertex  count can't be negative. Actual value: "
+                    + String.valueOf(maximumVertexCount));
+          }
+          conf.setInt(semiClusterMaximumVertexCount, maximumVertexCount);
+          if (args.length >= 5) {
+            int messageSentCount = Integer.parseInt(args[4]);
+            if (messageSentCount < 0) {
+              printUsage();
+              throw new IllegalArgumentException(
+                  "The number of  maximum message sent count can't be negative. Actual value:
"
+                      + String.valueOf(messageSentCount));
+            }
+            conf.setInt(graphJobMessageSentCount, messageSentCount);
+            if (args.length == 6) {
+              int vertexClusterCount = Integer.parseInt(args[5]);
+              if (vertexClusterCount < 0) {
+                printUsage();
+                throw new IllegalArgumentException(
+                    "The maximum number of clusters in which a vertex can be containted can't
be negative. Actual value: "
+                        + String.valueOf(vertexClusterCount));
+              }
+              conf.setInt(graphJobVertexMaxClusterCount, vertexClusterCount);
+
+            }
+          }
+        }
+      } catch (NumberFormatException e) {
+        printUsage();
+        throw new IllegalArgumentException(
+            "The format of job maximum iteration count is int. Can not parse value: "
+                + args[2]);
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    HamaConfiguration conf = new HamaConfiguration();
+    parseArgs(conf, args);
+    startTask(conf);
+  }
+}

Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java?rev=1511158&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java (added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java Wed
Aug  7 01:19:04 2013
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.examples;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.graph.Vertex;
+
+/**
+ * The SemiClusterMessage class defines the structure of the value stored by
+ * each vertex in the graph Job which is same as the Message sent my each
+ * vertex.
+ * 
+ */
+public class SemiClusterMessage implements
+    WritableComparable<SemiClusterMessage> {
+
+  private String semiClusterId;
+  private double semiClusterScore;
+  private List<Vertex<Text, DoubleWritable, SemiClusterMessage>> semiClusterVertexList
= new ArrayList<Vertex<Text, DoubleWritable, SemiClusterMessage>>();
+  private Set<SemiClusterDetails> semiClusterContainThis = new TreeSet<SemiClusterDetails>();
+
+  public SemiClusterMessage(String scId,
+      List<Vertex<Text, DoubleWritable, SemiClusterMessage>> verticesEdges,
+      double score) {
+    this.semiClusterId = scId;
+    this.semiClusterVertexList = verticesEdges;
+    this.semiClusterScore = score;
+  }
+
+  public SemiClusterMessage(SemiClusterMessage msg) {
+    this.semiClusterId = msg.getScId();
+    for (Vertex<Text, DoubleWritable, SemiClusterMessage> v : msg
+        .getVertexList())
+      this.semiClusterVertexList.add(v);
+    this.semiClusterScore = msg.getScore();
+  }
+
+  public SemiClusterMessage(Set<SemiClusterDetails> semiClusterContainThis) {
+    this.semiClusterId = "";
+    this.semiClusterScore = 0.0;
+    this.semiClusterVertexList = null;
+    this.semiClusterContainThis = semiClusterContainThis;
+  }
+
+  public SemiClusterMessage() {
+  }
+
+  public double getScore() {
+    return semiClusterScore;
+  }
+
+  public void setScore(double score) {
+    this.semiClusterScore = score;
+  }
+
+  public List<Vertex<Text, DoubleWritable, SemiClusterMessage>> getVertexList()
{
+    return semiClusterVertexList;
+  }
+
+  public void addVertex(Vertex<Text, DoubleWritable, SemiClusterMessage> v) {
+    this.semiClusterVertexList.add(v);
+  }
+
+  public String getScId() {
+    return semiClusterId;
+  }
+
+  public void setScId(String scId) {
+    this.semiClusterId = scId;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    clear();
+    String semiClusterId = in.readUTF();
+    setScId(semiClusterId);
+    double score = in.readDouble();
+    setScore(score);
+    if (in.readBoolean()) {
+      int len = in.readInt();
+      if (len > 0) {
+        for (int i = 0; i < len; i++) {
+          SemiClusteringVertex v = new SemiClusteringVertex();
+          v.readFields(in);
+          semiClusterVertexList.add(v);
+        }
+      }
+    }
+    int len = in.readInt();
+    if (len > 0) {
+      for (int i = 0; i < len; i++) {
+        SemiClusterDetails sd = new SemiClusterDetails();
+        sd.readFields(in);
+        semiClusterContainThis.add(sd);
+      }
+    }
+
+  }
+
+  private void clear() {
+    semiClusterVertexList = new ArrayList<Vertex<Text, DoubleWritable, SemiClusterMessage>>();
+    semiClusterContainThis = new TreeSet<SemiClusterDetails>();
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(semiClusterId);
+    out.writeDouble(semiClusterScore);
+
+    if (this.semiClusterVertexList == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      out.writeInt(semiClusterVertexList.size());
+      for (Vertex<Text, DoubleWritable, SemiClusterMessage> v : semiClusterVertexList)
{
+        v.write(out);
+      }
+    }
+    out.writeInt(semiClusterContainThis.size());
+    Iterator<SemiClusterDetails> itr = semiClusterContainThis.iterator();
+    while (itr.hasNext())
+      itr.next().write(out);
+  }
+
+  public Set<SemiClusterDetails> getSemiClusterContainThis() {
+    return semiClusterContainThis;
+  }
+
+  public void setSemiClusterContainThis(
+      List<SemiClusterDetails> semiClusterContainThis,
+      int graphJobVertexMaxClusterCount) {
+    int clusterCountToBeRemoved = 0;
+    NavigableSet<SemiClusterDetails> setSort = new TreeSet<SemiClusterDetails>(
+        new Comparator<SemiClusterDetails>() {
+
+          @Override
+          public int compare(SemiClusterDetails o1, SemiClusterDetails o2) {
+            return (o1.getSemiClusterScore() == o2.getSemiClusterScore() ? 0
+                : o1.getSemiClusterScore() < o2.getSemiClusterScore() ? -1 : 1);
+          }
+        });
+    setSort.addAll(this.semiClusterContainThis);
+    setSort.addAll(semiClusterContainThis);
+    clusterCountToBeRemoved = setSort.size() - graphJobVertexMaxClusterCount;
+    Iterator<SemiClusterDetails> itr = setSort.descendingIterator();
+    while (clusterCountToBeRemoved > 0) {
+      itr.next();
+      itr.remove();
+      clusterCountToBeRemoved--;
+    }
+    this.semiClusterContainThis = setSort;
+
+  }
+
+  public int compareTo(SemiClusterMessage m) {
+    return (this.getScId().compareTo(m.getScId()));
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result
+        + ((semiClusterId == null) ? 0 : semiClusterId.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    SemiClusterMessage other = (SemiClusterMessage) obj;
+    if (semiClusterId == null) {
+      if (other.semiClusterId != null)
+        return false;
+    } else if (!semiClusterId.equals(other.semiClusterId))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "SCMessage [semiClusterId=" + semiClusterId + ", semiClusterScore="
+        + semiClusterScore + ", semiClusterVertexList=" + semiClusterVertexList
+        + ", semiClusterContainThis=" + semiClusterContainThis + "]";
+  }
+}

Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java?rev=1511158&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java
(added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java
Wed Aug  7 01:19:04 2013
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.examples;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
+
+/**
+ * SemiClusterTextReader defines the way in which data is to be read from the
+ * input file and store as a vertex with VertexId and Edges
+ * 
+ */
+public class SemiClusterTextReader extends
+    VertexInputReader<LongWritable, Text, Text, DoubleWritable, Text> {
+
+  String lastVertexId = null;
+  List<String> adjacents = new ArrayList<String>();
+
+  @Override
+  public boolean parseVertex(LongWritable key, Text value,
+      Vertex<Text, DoubleWritable, Text> vertex) {
+
+    String line = value.toString();
+    String[] lineSplit = line.split("\t");
+    if (!line.startsWith("#")) {
+      if (lastVertexId == null) {
+        lastVertexId = lineSplit[0];
+      }
+      if (lastVertexId.equals(lineSplit[0])) {
+        adjacents.add(lineSplit[1]);
+      } else {
+        vertex.setVertexID(new Text(lastVertexId));
+        for (String adjacent : adjacents) {
+          String[] ValueSplit = adjacent.split("-");
+          vertex.addEdge(new Edge<Text, DoubleWritable>(
+              new Text(ValueSplit[0]), new DoubleWritable(Double
+                  .parseDouble(ValueSplit[1]))));
+        }
+        adjacents.clear();
+        lastVertexId = lineSplit[0];
+        adjacents.add(lineSplit[1]);
+        return true;
+      }
+    }
+    return false;
+  }
+
+}

Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java?rev=1511158&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java
(added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java
Wed Aug  7 01:19:04 2013
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.examples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.graph.GraphJobMessage;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexOutputWriter;
+
+/**
+ * The VertexOutputWriter defines what parts of the vertex shall be written to
+ * the output format.
+ * 
+ * @param <V> the vertexID type.
+ * @param <E> the edge value type.
+ * @param <M> the vertex value type.
+ */
+@SuppressWarnings("rawtypes")
+public class SemiClusterVertexOutputWriter<KEYOUT extends Writable, VALUEOUT extends Writable,
V extends WritableComparable, E extends Writable, M extends Writable>
+    implements VertexOutputWriter<KEYOUT, VALUEOUT, V, E, M> {
+
+  @Override
+  public void setup(Configuration conf) {
+    // do nothing
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void write(Vertex<V, E, M> vertex,
+      BSPPeer<Writable, Writable, KEYOUT, VALUEOUT, GraphJobMessage> peer)
+      throws IOException {
+    SemiClusterMessage vertexValue = (SemiClusterMessage) vertex.getValue();
+    peer.write((KEYOUT) vertex.getVertexID(), (VALUEOUT) new Text(vertexValue
+        .getSemiClusterContainThis().toString()));
+  }
+
+}

Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java?rev=1511158&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java (added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java Wed
Aug  7 01:19:04 2013
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.Vertex;
+
+/**
+ * SemiClusteringVertex Class defines each vertex in a Graph job and the
+ * compute() method is the function which is applied on each Vertex in the graph
+ * on each Super step of the job execution.
+ * 
+ */
+public class SemiClusteringVertex extends
+    Vertex<Text, DoubleWritable, SemiClusterMessage> {
+  private int semiClusterMaximumVertexCount;
+  private int graphJobMessageSentCount;
+  private int graphJobVertexMaxClusterCount;
+
+  @Override
+  public void setup(Configuration conf) {
+    semiClusterMaximumVertexCount = conf.getInt("semicluster.max.vertex.count",
+        10);
+    graphJobMessageSentCount = conf.getInt(
+        "semicluster.max.message.sent.count", 10);
+    graphJobVertexMaxClusterCount = conf.getInt("vertex.max.cluster.count", 10);
+  }
+
+  /**
+   * The user overrides the Compute() method, which will be executed at each
+   * active vertex in every superstep
+   */
+  @Override
+  public void compute(Iterable<SemiClusterMessage> messages) throws IOException {
+    if (this.getSuperstepCount() == 0) {
+      firstSuperStep();
+    }
+    if (this.getSuperstepCount() >= 1) {
+      Set<SemiClusterMessage> scListContainThis = new TreeSet<SemiClusterMessage>();
+      Set<SemiClusterMessage> scListNotContainThis = new TreeSet<SemiClusterMessage>();
+      List<SemiClusterMessage> scList = new ArrayList<SemiClusterMessage>();
+      for (SemiClusterMessage msg : messages) {
+        if (!isVertexInSc(msg)) {
+          scListNotContainThis.add(msg);
+          SemiClusterMessage msgNew = new SemiClusterMessage(msg);
+          msgNew.addVertex(this);
+          msgNew
+              .setScId("C" + createNewSemiClusterName(msgNew.getVertexList()));
+          msgNew.setScore(semiClusterScoreCalcuation(msgNew));
+          scListContainThis.add(msgNew);
+        } else {
+          scListContainThis.add(msg);
+        }
+      }
+      scList.addAll(scListContainThis);
+      scList.addAll(scListNotContainThis);
+      sendBestSCMsg(scList);
+      updatesVertexSemiClustersList(scListContainThis);
+    }
+  }
+
+  public List<SemiClusterMessage> addSCList(List<SemiClusterMessage> scList,
+      SemiClusterMessage msg) {
+    return scList;
+  }
+
+  /**
+   * This function create a new Semi-cluster ID for a semi-cluster from the list
+   * of vertices in the cluster.It first take all the vertexIds as a list sort
+   * the list and then find the HashCode of the Sorted List.
+   */
+  public int createNewSemiClusterName(
+      List<Vertex<Text, DoubleWritable, SemiClusterMessage>> semiClusterVertexList)
{
+    List<String> vertexIDList = getSemiClusterVerticesIdList(semiClusterVertexList);
+    Collections.sort(vertexIDList);
+    return (vertexIDList.hashCode());
+  }
+
+  /**
+   * Function which is executed in the first SuperStep
+   * 
+   * @throws IOException
+   */
+  public void firstSuperStep() throws IOException {
+    Vertex<Text, DoubleWritable, SemiClusterMessage> v = this;
+    List<Vertex<Text, DoubleWritable, SemiClusterMessage>> lV = new ArrayList<Vertex<Text,
DoubleWritable, SemiClusterMessage>>();
+    lV.add(v);
+    String newClusterName = "C" + createNewSemiClusterName(lV);
+    SemiClusterMessage initialClusters = new SemiClusterMessage(newClusterName,
+        lV, 1);
+    sendMessageToNeighbors(initialClusters);
+    Set<SemiClusterDetails> scList = new TreeSet<SemiClusterDetails>();
+    scList.add(new SemiClusterDetails(newClusterName, 1.0));
+    SemiClusterMessage vertexValue = new SemiClusterMessage(scList);
+    this.setValue(vertexValue);
+  }
+
+  /**
+   * Vertex V updates its list of semi-clusters with the semi- clusters from c1
+   * , ..., ck , c'1 , ..., c'k that contain V
+   */
+  public void updatesVertexSemiClustersList(
+      Set<SemiClusterMessage> scListContainThis) throws IOException {
+    List<SemiClusterDetails> scList = new ArrayList<SemiClusterDetails>();
+    Set<SemiClusterMessage> sortedSet = new TreeSet<SemiClusterMessage>(
+        new Comparator<SemiClusterMessage>() {
+
+          @Override
+          public int compare(SemiClusterMessage o1, SemiClusterMessage o2) {
+            return (o1.getScore() == o2.getScore() ? 0
+                : o1.getScore() < o2.getScore() ? -1 : 1);
+          }
+        });
+    sortedSet.addAll(scListContainThis);
+    int count = 0;
+    for (SemiClusterMessage msg : sortedSet) {
+      scList.add(new SemiClusterDetails(msg.getScId(), msg.getScore()));
+      if (count > graphJobMessageSentCount)
+        break;
+    }
+
+    SemiClusterMessage vertexValue = this.getValue();
+    vertexValue
+        .setSemiClusterContainThis(scList, graphJobVertexMaxClusterCount);
+    this.setValue(vertexValue);
+  }
+
+  /**
+   * Function to calcualte the Score of a semi-cluster
+   * 
+   * @param message
+   * @return
+   */
+  public double semiClusterScoreCalcuation(SemiClusterMessage message) {
+    double iC = 0.0, bC = 0.0, fB = 0.0, sC = 0.0;
+    int vC = 0, eC = 0;
+    List<String> vertexId = getSemiClusterVerticesIdList(message
+        .getVertexList());
+    vC = vertexId.size();
+    for (Vertex<Text, DoubleWritable, SemiClusterMessage> v : message
+        .getVertexList()) {
+      List<Edge<Text, DoubleWritable>> eL = v.getEdges();
+      for (Edge<Text, DoubleWritable> e : eL) {
+        eC++;
+        if (vertexId.contains(e.getDestinationVertexID().toString())
+            && e.getCost() != null) {
+          iC = iC + e.getCost().get();
+        } else if (e.getCost() != null) {
+          bC = bC + e.getCost().get();
+        }
+      }
+    }
+    if (vC > 1)
+      sC = ((iC - fB * bC) / ((vC * (vC - 1)) / 2)) / eC;
+    return sC;
+  }
+
+  /**
+   * Returns a Array List of vertexIds from a List of Vertex<Text,
+   * DoubleWritable, SCMessage> Objects
+   * 
+   * @param lV
+   * @return
+   */
+  public List<String> getSemiClusterVerticesIdList(
+      List<Vertex<Text, DoubleWritable, SemiClusterMessage>> lV) {
+    Iterator<Vertex<Text, DoubleWritable, SemiClusterMessage>> vertexItrator
= lV
+        .iterator();
+    List<String> vertexId = new ArrayList<String>();
+    while (vertexItrator.hasNext()) {
+      vertexId.add(vertexItrator.next().getVertexID().toString());
+    }
+    return vertexId;
+  }
+
+  /**
+   * If a semi-cluster c does not already contain V , and Vc < Mmax , then V is
+   * added to c to form c' .
+   */
+  public boolean isVertexInSc(SemiClusterMessage msg) {
+    List<String> vertexId = getSemiClusterVerticesIdList(msg.getVertexList());
+    if (vertexId.contains(this.getVertexID().toString())
+        && vertexId.size() < semiClusterMaximumVertexCount)
+      return true;
+    else
+      return false;
+  }
+
+  /**
+   * The semi-clusters c1 , ..., ck , c'1 , ..., c'k are sorted by their scores,
+   * and the best ones are sent to V ?? neighbors.
+   */
+  public void sendBestSCMsg(List<SemiClusterMessage> scList) throws IOException {
+    Collections.sort(scList, new Comparator<SemiClusterMessage>() {
+
+      @Override
+      public int compare(SemiClusterMessage o1, SemiClusterMessage o2) {
+        return (o1.getScore() == o2.getScore() ? 0 : o1.getScore() < o2
+            .getScore() ? -1 : 1);
+      }
+    });
+    Iterator<SemiClusterMessage> scItr = scList.iterator();
+    int count = 0;
+    while (scItr.hasNext()) {
+      sendMessageToNeighbors(scItr.next());
+      count++;
+      if (count > graphJobMessageSentCount)
+        break;
+    }
+  }
+}



Mime
View raw message