incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1231088 - in /incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp: BSPPeerImpl.java BSPTask.java GroomServer.java message/HadoopMessageManagerImpl.java
Date Fri, 13 Jan 2012 14:24:06 GMT
Author: tjungblut
Date: Fri Jan 13 14:24:05 2012
New Revision: 1231088

URL: http://svn.apache.org/viewvc?rev=1231088&view=rev
Log:
Finalizing classes again, hanging problem is now fixed.

Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1231088&r1=1231087&r2=1231088&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Fri Jan 13
14:24:05 2012
@@ -45,7 +45,7 @@ import org.apache.hama.util.KeyValuePair
 /**
  * This class represents a BSP peer.
  */
-public class BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> implements
+public final class BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> implements
     BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
 
   private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
@@ -146,7 +146,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
   }
 
   @SuppressWarnings("unchecked")
-  public void initialize() throws Exception {
+  public final void initialize() throws Exception {
     syncClient = SyncServiceFactory.getSyncClient(conf);
     syncClient.init(conf, taskId.getJobID(), taskId);
 
@@ -154,8 +154,8 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
 
     // just output something when the user configured it
     if (conf.get("bsp.output.dir") != null) {
-      Path outdir = new Path(conf.get("bsp.output.dir"), Task
-          .getOutputName(partition));
+      Path outdir = new Path(conf.get("bsp.output.dir"),
+          Task.getOutputName(partition));
       outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob,
           outdir.makeQualified(fs).toString());
       final RecordWriter<KEYOUT, VALUEOUT> finalOut = outWriter;
@@ -170,7 +170,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
   }
 
   @SuppressWarnings("unchecked")
-  public void initInput() throws IOException {
+  public final void initInput() throws IOException {
     // just read input if the user defined one
     if (conf.get("bsp.input.dir") != null) {
       InputSplit inputSplit = null;
@@ -196,16 +196,16 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
   }
 
   @Override
-  public BSPMessage getCurrentMessage() throws IOException {
+  public final BSPMessage getCurrentMessage() throws IOException {
     return messenger.getCurrentMessage();
   }
 
   @Override
-  public void send(String peerName, BSPMessage msg) throws IOException {
+  public final void send(String peerName, BSPMessage msg) throws IOException {
     messenger.send(peerName, msg);
   }
 
-  private String checkpointedPath() {
+  private final String checkpointedPath() {
     String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
     String ckptPath = backup + bspJob.getJobID().toString() + "/"
         + getSuperstepCount() + "/" + this.taskId.toString();
@@ -214,7 +214,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
     return ckptPath;
   }
 
-  void checkpoint(String checkpointedPath, BSPMessageBundle bundle) {
+  final void checkpoint(String checkpointedPath, BSPMessageBundle bundle) {
     FSDataOutputStream out = null;
     try {
       out = this.fs.create(new Path(checkpointedPath));
@@ -236,7 +236,8 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
    * @see org.apache.hama.bsp.BSPPeerInterface#sync()
    */
   @Override
-  public void sync() throws IOException, SyncException, InterruptedException {
+  public final void sync() throws IOException, SyncException,
+      InterruptedException {
     enterBarrier();
     Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> it = messenger
         .getMessageIterator();
@@ -259,7 +260,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
     }
 
     leaveBarrier();
-    
+
     incrCounter(PeerCounter.SUPERSTEPS, 1);
     currentTaskStatus.setCounters(counters);
 
@@ -268,11 +269,11 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
     messenger.clearOutgoingQueues();
   }
 
-  private BSPMessageBundle combineMessages(Iterable<BSPMessage> messages) {
+  private final BSPMessageBundle combineMessages(Iterable<BSPMessage> messages) {
     if (!conf.getClass("bsp.combiner.class", Combiner.class).equals(
         Combiner.class)) {
-      Combiner combiner = (Combiner) ReflectionUtils.newInstance(conf.getClass(
-          "bsp.combiner.class", Combiner.class), conf);
+      Combiner combiner = (Combiner) ReflectionUtils.newInstance(
+          conf.getClass("bsp.combiner.class", Combiner.class), conf);
 
       return combiner.combine(messages);
     } else {
@@ -284,17 +285,18 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
     }
   }
 
-  protected void enterBarrier() throws SyncException {
-    syncClient.enterBarrier(taskId.getJobID(), taskId, currentTaskStatus
-        .getSuperstepCount());
+  protected final void enterBarrier() throws SyncException {
+    syncClient.enterBarrier(taskId.getJobID(), taskId,
+        currentTaskStatus.getSuperstepCount());
   }
 
-  protected void leaveBarrier() throws SyncException {
-    syncClient.leaveBarrier(taskId.getJobID(), taskId, currentTaskStatus
-        .getSuperstepCount());
+  protected final void leaveBarrier() throws SyncException {
+    syncClient.leaveBarrier(taskId.getJobID(), taskId,
+        currentTaskStatus.getSuperstepCount());
   }
 
-  public void close() throws SyncException, IOException, InterruptedException {
+  public final void close() throws SyncException, IOException,
+      InterruptedException {
     if (in != null) {
       in.close();
     }
@@ -308,36 +310,36 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
   }
 
   @Override
-  public void clear() {
+  public final void clear() {
     messenger.clearOutgoingQueues();
   }
 
   /**
    * @return the string as host:port of this Peer
    */
-  public String getPeerName() {
+  public final String getPeerName() {
     return peerAddress.getHostName() + ":" + peerAddress.getPort();
   }
 
   @Override
-  public String[] getAllPeerNames() {
+  public final String[] getAllPeerNames() {
     initPeerNames();
     return allPeers;
   }
 
   @Override
-  public String getPeerName(int index) {
+  public final String getPeerName(int index) {
     initPeerNames();
     return allPeers[index];
   }
 
   @Override
-  public int getNumPeers() {
+  public final int getNumPeers() {
     initPeerNames();
     return allPeers.length;
   }
 
-  private void initPeerNames() {
+  private final void initPeerNames() {
     if (allPeers == null) {
       allPeers = syncClient.getAllPeerNames(taskId);
     }
@@ -347,7 +349,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
    * @return the number of messages
    */
   @Override
-  public int getNumCurrentMessages() {
+  public final int getNumCurrentMessages() {
     return messenger.getNumCurrentMessages();
   }
 
@@ -356,14 +358,14 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
    * 
    * @param currentTaskStatus
    */
-  public void setCurrentTaskStatus(TaskStatus currentTaskStatus) {
+  public final void setCurrentTaskStatus(TaskStatus currentTaskStatus) {
     this.currentTaskStatus = currentTaskStatus;
   }
 
   /**
    * @return the count of current super-step
    */
-  public long getSuperstepCount() {
+  public final long getSuperstepCount() {
     return currentTaskStatus.getSuperstepCount();
   }
 
@@ -372,7 +374,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
    * 
    * @return the conf
    */
-  public Configuration getConfiguration() {
+  public final Configuration getConfiguration() {
     return conf;
   }
 
@@ -381,17 +383,17 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
    */
 
   @Override
-  public void write(KEYOUT key, VALUEOUT value) throws IOException {
+  public final void write(KEYOUT key, VALUEOUT value) throws IOException {
     collector.collect(key, value);
   }
 
   @Override
-  public boolean readNext(KEYIN key, VALUEIN value) throws IOException {
+  public final boolean readNext(KEYIN key, VALUEIN value) throws IOException {
     return in.next(key, value);
   }
 
   @Override
-  public KeyValuePair<KEYIN, VALUEIN> readNext() throws IOException {
+  public final KeyValuePair<KEYIN, VALUEIN> readNext() throws IOException {
     KEYIN k = in.createKey();
     VALUEIN v = in.createValue();
     if (in.next(k, v)) {
@@ -402,17 +404,17 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
   }
 
   @Override
-  public void reopenInput() throws IOException {
+  public final void reopenInput() throws IOException {
     initInput();
   }
 
   @Override
-  public Counter getCounter(Enum<?> name) {
+  public final Counter getCounter(Enum<?> name) {
     return counters == null ? null : counters.findCounter(name);
   }
 
   @Override
-  public Counter getCounter(String group, String name) {
+  public final Counter getCounter(String group, String name) {
     Counters.Counter counter = null;
     if (counters != null) {
       counter = counters.findCounter(group, name);
@@ -421,14 +423,14 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
   }
 
   @Override
-  public void incrCounter(Enum<?> key, long amount) {
+  public final void incrCounter(Enum<?> key, long amount) {
     if (counters != null) {
       counters.incrCounter(key, amount);
     }
   }
 
   @Override
-  public void incrCounter(String group, String counter, long amount) {
+  public final void incrCounter(String group, String counter, long amount) {
     if (counters != null) {
       counters.incrCounter(group, counter, amount);
     }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1231088&r1=1231087&r2=1231088&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Fri Jan 13 14:24:05
2012
@@ -32,7 +32,7 @@ import org.apache.hama.ipc.BSPPeerProtoc
 /**
  * Base class for tasks.
  */
-public class BSPTask extends Task {
+public final class BSPTask extends Task {
 
   public static final Log LOG = LogFactory.getLog(BSPTask.class);
 
@@ -55,12 +55,12 @@ public class BSPTask extends Task {
   }
 
   @Override
-  public BSPTaskRunner createRunner(GroomServer groom) {
+  public final BSPTaskRunner createRunner(GroomServer groom) {
     return new BSPTaskRunner(this, groom, this.conf);
   }
 
   @Override
-  public void run(BSPJob job, BSPPeerImpl<?, ?, ?, ?> bspPeer,
+  public final void run(BSPJob job, BSPPeerImpl<?, ?, ?, ?> bspPeer,
       BSPPeerProtocol umbilical) throws IOException, SyncException,
       ClassNotFoundException, InterruptedException {
     runBSP(job, bspPeer, split, umbilical);
@@ -69,15 +69,15 @@ public class BSPTask extends Task {
   }
 
   @SuppressWarnings("unchecked")
-  private <KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runBSP(final BSPJob job,
-      BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> bspPeer,
+  private final <KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runBSP(
+      final BSPJob job, BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> bspPeer,
       final BytesWritable rawSplit, final BSPPeerProtocol umbilical)
       throws IOException, SyncException, ClassNotFoundException,
       InterruptedException {
 
     BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT> bsp = (BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT>)
ReflectionUtils
-        .newInstance(job.getConf().getClass("bsp.work.class", BSP.class), job
-            .getConf());
+        .newInstance(job.getConf().getClass("bsp.work.class", BSP.class),
+            job.getConf());
 
     bsp.setup(bspPeer);
     bsp.bsp(bspPeer);
@@ -86,16 +86,16 @@ public class BSPTask extends Task {
     bspPeer.close();
   }
 
-  public BSPJob getConf() {
+  public final BSPJob getConf() {
     return conf;
   }
 
-  public void setConf(BSPJob conf) {
+  public final void setConf(BSPJob conf) {
     this.conf = conf;
   }
 
   @Override
-  public void write(DataOutput out) throws IOException {
+  public final void write(DataOutput out) throws IOException {
     super.write(out);
     if (split != null) {
       out.writeBoolean(true);
@@ -108,7 +108,7 @@ public class BSPTask extends Task {
   }
 
   @Override
-  public void readFields(DataInput in) throws IOException {
+  public final void readFields(DataInput in) throws IOException {
     super.readFields(in);
     if (in.readBoolean()) {
       splitClass = Text.readString(in);

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1231088&r1=1231087&r2=1231088&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Fri Jan 13
14:24:05 2012
@@ -227,8 +227,8 @@ public class GroomServer implements Runn
     // this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
 
     try {
-      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf), conf
-          .getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
+          conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
     } catch (IOException e) {
       LOG.error("Exception during reinitialization!", e);
     }
@@ -240,8 +240,9 @@ public class GroomServer implements Runn
     }
 
     if (localHostname == null) {
-      this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
-          "default"), conf.get("bsp.dns.nameserver", "default"));
+      this.localHostname = DNS.getDefaultHost(
+          conf.get("bsp.dns.interface", "default"),
+          conf.get("bsp.dns.nameserver", "default"));
     }
     // check local disk
     checkLocalDirs(getLocalDirs());
@@ -517,17 +518,17 @@ public class GroomServer implements Runn
 
     synchronized (rjob) {
       if (!rjob.localized) {
-        
+
         FileSystem localFs = FileSystem.getLocal(conf);
         Path jobDir = localJobFile.getParent();
-        if (localFs.exists(jobDir)){
+        if (localFs.exists(jobDir)) {
           localFs.delete(jobDir, true);
           boolean b = localFs.mkdirs(jobDir);
           if (!b)
             throw new IOException("Not able to create job directory "
-                                  + jobDir.toString());
+                + jobDir.toString());
         }
-        
+
         Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
             + task.getTaskID() + "/" + "job.jar");
         systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
@@ -930,13 +931,13 @@ public class GroomServer implements Runn
   /**
    * The main() for BSPPeer child processes.
    */
-  public static class BSPPeerChild {
+  public static final class BSPPeerChild {
 
     public static void main(String[] args) throws Throwable {
       if (LOG.isDebugEnabled())
         LOG.debug("BSPPeerChild starting");
 
-      HamaConfiguration defaultConf = new HamaConfiguration();
+      final HamaConfiguration defaultConf = new HamaConfiguration();
       // report address
       String host = args[0];
       int port = Integer.parseInt(args[1]);
@@ -948,7 +949,7 @@ public class GroomServer implements Runn
           BSPPeerProtocol.class, BSPPeerProtocol.versionID, address,
           defaultConf);
 
-      BSPTask task = (BSPTask) umbilical.getTask(taskid);
+      final BSPTask task = (BSPTask) umbilical.getTask(taskid);
       int peerPort = umbilical.getAssignedPortNum(taskid);
 
       defaultConf.addResource(new Path(task.getJobFile()));
@@ -967,9 +968,9 @@ public class GroomServer implements Runn
 
         // instantiate and init our peer
         @SuppressWarnings("rawtypes")
-        BSPPeerImpl<?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, defaultConf,
-            taskid, umbilical, task.partition, task.splitClass, task.split,
-            task.getCounters());
+        final BSPPeerImpl<?, ?, ?, ?> bspPeer = new BSPPeerImpl(job,
+            defaultConf, taskid, umbilical, task.partition, task.splitClass,
+            task.split, task.getCounters());
 
         task.run(job, bspPeer, umbilical); // run the task
 
@@ -979,7 +980,7 @@ public class GroomServer implements Runn
       } catch (SyncException e) {
         LOG.fatal("SyncError from child", e);
         umbilical.fatalError(taskid, e.toString());
-        
+
         // Report back any failures, for diagnostic purposes
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         e.printStackTrace(new PrintStream(baos));

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1231088&r1=1231087&r2=1231088&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
Fri Jan 13 14:24:05 2012
@@ -39,7 +39,7 @@ import org.apache.hama.util.BSPNetUtils;
  * Implementation of the {@link HadoopMessageManager}.
  * 
  */
-public class HadoopMessageManagerImpl implements MessageManager,
+public final class HadoopMessageManagerImpl implements MessageManager,
     HadoopMessageManager {
 
   private static final Log LOG = LogFactory
@@ -57,12 +57,13 @@ public class HadoopMessageManagerImpl im
   private final ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new
ConcurrentLinkedQueue<BSPMessage>();
 
   @Override
-  public void init(Configuration conf, InetSocketAddress peerAddress) {
+  public final void init(Configuration conf, InetSocketAddress peerAddress) {
     this.conf = conf;
     startRPCServer(conf, peerAddress);
   }
 
-  private void startRPCServer(Configuration conf, InetSocketAddress peerAddress) {
+  private final void startRPCServer(Configuration conf,
+      InetSocketAddress peerAddress) {
     try {
       this.server = RPC.getServer(this, peerAddress.getHostName(),
           peerAddress.getPort(), conf);
@@ -76,19 +77,19 @@ public class HadoopMessageManagerImpl im
   }
 
   @Override
-  public void close() {
+  public final void close() {
     if (server != null) {
       server.stop();
     }
   }
 
   @Override
-  public BSPMessage getCurrentMessage() throws IOException {
+  public final BSPMessage getCurrentMessage() throws IOException {
     return localQueue.poll();
   }
 
   @Override
-  public void send(String peerName, BSPMessage msg) throws IOException {
+  public final void send(String peerName, BSPMessage msg) throws IOException {
     LOG.debug("Send message (" + msg.toString() + ") to " + peerName);
     InetSocketAddress targetPeerAddress = null;
     // Get socket for target peer.
@@ -107,12 +108,12 @@ public class HadoopMessageManagerImpl im
   }
 
   @Override
-  public Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> getMessageIterator()
{
+  public final Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>>
getMessageIterator() {
     return this.outgoingQueues.entrySet().iterator();
   }
 
-  protected HadoopMessageManager getBSPPeerConnection(InetSocketAddress addr)
-      throws IOException {
+  protected final HadoopMessageManager getBSPPeerConnection(
+      InetSocketAddress addr) throws IOException {
     HadoopMessageManager peer = peers.get(addr);
     if (peer == null) {
       peer = (HadoopMessageManager) RPC.getProxy(HadoopMessageManager.class,
@@ -123,7 +124,7 @@ public class HadoopMessageManagerImpl im
   }
 
   @Override
-  public void transfer(InetSocketAddress addr, BSPMessageBundle bundle)
+  public final void transfer(InetSocketAddress addr, BSPMessageBundle bundle)
       throws IOException {
 
     HadoopMessageManager bspPeerConnection = this.getBSPPeerConnection(addr);
@@ -137,31 +138,32 @@ public class HadoopMessageManagerImpl im
   }
 
   @Override
-  public void clearOutgoingQueues() {
+  public final void clearOutgoingQueues() {
     this.outgoingQueues.clear();
     localQueue.addAll(localQueueForNextIteration);
     localQueueForNextIteration.clear();
   }
 
   @Override
-  public void put(BSPMessage msg) {
+  public final void put(BSPMessage msg) {
     this.localQueueForNextIteration.add(msg);
   }
 
   @Override
-  public void put(BSPMessageBundle messages) {
+  public final void put(BSPMessageBundle messages) {
     for (BSPMessage message : messages.getMessages()) {
       this.localQueueForNextIteration.add(message);
     }
   }
 
   @Override
-  public int getNumCurrentMessages() {
+  public final int getNumCurrentMessages() {
     return localQueue.size();
   }
 
   @Override
-  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+  public final long getProtocolVersion(String arg0, long arg1)
+      throws IOException {
     return versionID;
   }
 



Mime
View raw message