giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject svn commit: r1370438 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/ src/main/java/org/apache/giraph/utils/ src/test/java/org/apache/giraph/comm/
Date Tue, 07 Aug 2012 19:12:08 GMT
Author: aching
Date: Tue Aug  7 19:12:07 2012
New Revision: 1370438

URL: http://svn.apache.org/viewvc?rev=1370438&view=rev
Log:
GIRAPH-288: Bandwidth tracking - subset of GIRAPH-262. (aching)

Added:
    giraph/trunk/src/main/java/org/apache/giraph/comm/ByteCounter.java
    giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug  7 19:12:07 2012
@@ -2,6 +2,8 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-288: Bandwidth tracking - subset of GIRAPH-262. (aching)
+
   GIRAPH-289: Add thread and channel pooling to NettyClient and
   NettyServer. (ekoontz via aching)
 

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/ByteCounter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ByteCounter.java?rev=1370438&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ByteCounter.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ByteCounter.java Tue Aug  7 19:12:07
2012
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.text.DecimalFormat;
+import java.util.concurrent.atomic.AtomicLong;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * Keep track of the bytes sent/received and provide some metrics when
+ * desired as part of the Netty Channel stack.
+ */
+public class ByteCounter extends SimpleChannelHandler {
+  /** Megabyte in bytes */
+  public static final double MEGABYTE = 1024f * 1024f;
+  /** Helper to format the doubles */
+  private static final DecimalFormat DOUBLE_FORMAT =
+      new DecimalFormat("#######.####");
+  /** All bytes ever sent */
+  private final AtomicLong bytesSent = new AtomicLong();
+  /** Total sent requests */
+  private final AtomicLong sentRequests = new AtomicLong();
+  /** All bytes ever received */
+  private final AtomicLong bytesReceived = new AtomicLong();
+  /** Total received requests */
+  private final AtomicLong receivedRequests = new AtomicLong();
+  /** Start time (for bandwidth calculation) */
+  private final AtomicLong startMsecs =
+      new AtomicLong(System.currentTimeMillis());
+
+  @Override
+  public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
+    throws Exception {
+    if (e instanceof MessageEvent &&
+        ((MessageEvent) e).getMessage() instanceof ChannelBuffer) {
+      ChannelBuffer b = (ChannelBuffer) ((MessageEvent) e).getMessage();
+      bytesReceived.addAndGet(b.readableBytes());
+      receivedRequests.incrementAndGet();
+    }
+
+    super.handleUpstream(ctx, e);
+  }
+
+  @Override
+  public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
+    throws Exception {
+    if (e instanceof MessageEvent &&
+        ((MessageEvent) e).getMessage() instanceof ChannelBuffer) {
+      ChannelBuffer b = (ChannelBuffer) ((MessageEvent) e).getMessage();
+      bytesSent.addAndGet(b.readableBytes());
+      sentRequests.incrementAndGet();
+    }
+
+    super.handleDownstream(ctx, e);
+  }
+
+  /**
+   * Reset all the bytes kept track of.
+   */
+  private void resetBytes() {
+    bytesSent.set(0);
+    sentRequests.set(0);
+    bytesReceived.set(0);
+    receivedRequests.set(0);
+  }
+
+  /**
+   * Reset the start msecs.
+   */
+  private void resetStartMsecs() {
+    startMsecs.set(System.currentTimeMillis());
+  }
+
+  /**
+   * Reset everything this object keeps track of
+   */
+  public void resetAll() {
+    resetBytes();
+    resetStartMsecs();
+  }
+
+  public long getBytesSent() {
+    return bytesSent.get();
+  }
+
+  public long getBytesReceived() {
+    return bytesReceived.get();
+  }
+
+  /**
+   * @return Mbytes sent / sec in the current interval
+   */
+  public double getMbytesPerSecSent() {
+    return bytesSent.get() * 1000f /
+        (1 + System.currentTimeMillis() - startMsecs.get()) / MEGABYTE;
+  }
+
+  /**
+   * @return Mbytes received / sec in the current interval
+   */
+  public double getMbytesPerSecReceived() {
+    return bytesReceived.get() * 1000f /
+        (1 + System.currentTimeMillis() - startMsecs.get()) / MEGABYTE;
+  }
+
+  /**
+   * @return A string containing all the metrics
+   */
+  public String getMetrics() {
+    double mBytesSent = bytesSent.get() / MEGABYTE;
+    double mBytesReceived = bytesReceived.get() / MEGABYTE;
+    long curSentRequests = sentRequests.get();
+    long curReceivedRequests = receivedRequests.get();
+    double mBytesSentPerReq =
+        (curSentRequests == 0) ? 0 : mBytesSent / curSentRequests;
+    double mBytesReceivedPerReq =
+        (curReceivedRequests == 0) ? 0 : mBytesReceived / curReceivedRequests;
+    return "MBytes/sec sent = " +
+        DOUBLE_FORMAT.format(getMbytesPerSecSent()) +
+        ", MBytes/sec received = " +
+        DOUBLE_FORMAT.format(getMbytesPerSecReceived()) +
+        ", MBytesSent = " + DOUBLE_FORMAT.format(mBytesSent) +
+        ", MBytesReceived = " + DOUBLE_FORMAT.format(mBytesReceived) +
+        ", ave sent request MBytes = " +
+        DOUBLE_FORMAT.format(mBytesSentPerReq) +
+        ", ave received request MBytes = " +
+        DOUBLE_FORMAT.format(mBytesReceivedPerReq) +
+        ", secs waited = " +
+        ((System.currentTimeMillis() - startMsecs.get()) / 1000f);
+  }
+
+  /**
+   * Get the metrics if a given window of time has passed.  Return null
+   * otherwise.  If the window is met, reset the metrics.
+   *
+   * @param minMsecsWindow Msecs of the minimum window
+   * @return Metrics or else null if the window wasn't met
+   */
+  public String getMetricsWindow(int minMsecsWindow) {
+    if (System.currentTimeMillis() - startMsecs.get() > minMsecsWindow) {
+      String metrics = getMetrics();
+      resetAll();
+      return metrics;
+    }
+
+    return null;
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java Tue Aug  7 19:12:07
2012
@@ -73,6 +73,8 @@ public class NettyClient<I extends Writa
       Maps.newHashMap();
   /** Number of channels per server */
   private final int channelsPerServer;
+  /** Byte counter for this client */
+  private final ByteCounter byteCounter = new ByteCounter();
   /** Send buffer size */
   private final int sendBufferSize;
   /** Receive buffer size */
@@ -107,6 +109,7 @@ public class NettyClient<I extends Writa
       @Override
       public ChannelPipeline getPipeline() throws Exception {
         return Channels.pipeline(
+            byteCounter,
             new RequestEncoder(),
             new ResponseClientHandler(waitingRequestCount));
       }
@@ -209,6 +212,9 @@ public class NettyClient<I extends Writa
    */
   public void sendWritableRequest(InetSocketAddress remoteServer,
                                   WritableRequest<I, V, E, M> request) {
+    if (waitingRequestCount.get() == 0) {
+      byteCounter.resetAll();
+    }
     waitingRequestCount.incrementAndGet();
     Channel channel = addressChannelMap.get(remoteServer).nextChannel();
     if (channel == null) {
@@ -226,6 +232,11 @@ public class NettyClient<I extends Writa
   public void waitAllRequests() {
     synchronized (waitingRequestCount) {
       while (waitingRequestCount.get() != 0) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("waitAllRequests: Waiting interval of " +
+              WAITING_REQUEST_MSECS + " msecs and still waiting on " +
+              waitingRequestCount + " requests, " + byteCounter.getMetrics());
+        }
         try {
           waitingRequestCount.wait(WAITING_REQUEST_MSECS);
         } catch (InterruptedException e) {
@@ -235,6 +246,10 @@ public class NettyClient<I extends Writa
         context.progress();
       }
     }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("waitAllRequests: Finished all requests. " +
+          byteCounter.getMetrics());
+    }
   }
 
   /**

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java Tue Aug  7 19:12:07
2012
@@ -82,6 +82,8 @@ public class NettyServer<I extends Writa
   private final ServerData<I, V, E, M> serverData;
   /** Server bootstrap */
   private ServerBootstrap bootstrap;
+  /** Byte counter for this client */
+  private final ByteCounter byteCounter = new ByteCounter();
   /** Send buffer size */
   private final int sendBufferSize;
   /** Receive buffer size */
@@ -142,8 +144,9 @@ public class NettyServer<I extends Writa
       @Override
       public ChannelPipeline getPipeline() throws Exception {
         return Channels.pipeline(
+            byteCounter,
             new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
-            new RequestDecoder<I, V, E, M>(conf, requestRegistry),
+            new RequestDecoder<I, V, E, M>(conf, requestRegistry, byteCounter),
             new RequestServerHandler<I, V, E, M>(serverData));
       }
     });

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java Tue Aug  7 19:12:07
2012
@@ -48,16 +48,21 @@ public class RequestDecoder<I extends Wr
   private final Configuration conf;
   /** Registry of requests */
   private final RequestRegistry requestRegistry;
+  /** Byte counter to output */
+  private final ByteCounter byteCounter;
 
   /**
    * Constructor.
    *
    * @param conf Configuration
    * @param requestRegistry Request registry
+   * @param byteCounter Keeps track of the decoded bytes
    */
-  public RequestDecoder(Configuration conf, RequestRegistry requestRegistry) {
+  public RequestDecoder(Configuration conf, RequestRegistry requestRegistry,
+                        ByteCounter byteCounter) {
     this.conf = conf;
     this.requestRegistry = requestRegistry;
+    this.byteCounter = byteCounter;
   }
 
   @Override
@@ -67,6 +72,14 @@ public class RequestDecoder<I extends Wr
       throw new IllegalStateException("decode: Got illegal message " + msg);
     }
 
+    // Output metrics every 1/2 minute
+    String metrics = byteCounter.getMetricsWindow(30000);
+    if (metrics != null) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("decode: Server window metrics " + metrics);
+      }
+    }
+
     ChannelBuffer buffer = (ChannelBuffer) msg;
     ChannelBufferInputStream inputStream = new ChannelBufferInputStream(buffer);
     int enumValue = inputStream.readByte();

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java Tue Aug 
7 19:12:07 2012
@@ -25,6 +25,7 @@ import org.apache.log4j.Logger;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBufferInputStream;
 import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 
@@ -93,4 +94,11 @@ public class ResponseClientHandler exten
       }
     }
   }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+    throw new IllegalStateException("exceptionCaught: Channel failed with " +
+        "remote address " + ctx.getChannel().getRemoteAddress() + " with " +
+        "cause " + e.getCause());
+  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java Tue Aug  7 19:12:07
2012
@@ -110,9 +110,11 @@ public class SendVertexRequest<I extends
     Collection<Vertex<I, V, E, M>> vertexMap =
         partitionVertexMap.get(partitionId);
     if (vertexMap == null) {
-      vertexMap = partitionVertexMap.putIfAbsent(partitionId, vertices);
+      final Collection<Vertex<I, V, E, M>> tmpVertices  =
+          Lists.newArrayListWithCapacity(vertices.size());
+      vertexMap = partitionVertexMap.putIfAbsent(partitionId, tmpVertices);
       if (vertexMap == null) {
-        return;
+        vertexMap = tmpVertices;
       }
     }
     synchronized (vertexMap) {

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Aug  7 19:12:07
2012
@@ -452,7 +452,7 @@ public class BspServiceWorker<I extends 
           vertexReader.getCurrentVertex();
       if (readerVertex.getId() == null) {
         throw new IllegalArgumentException(
-            "loadVertices: Vertex reader returned a vertex " +
+            "readVerticesFromInputSplit: Vertex reader returned a vertex " +
                 "without an id!  - " + readerVertex);
       }
       if (readerVertex.getValue() == null) {

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Tue Aug  7 19:12:07
2012
@@ -26,6 +26,7 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.graph.partition.PartitionStats;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.TimedLogger;
 import org.apache.giraph.zk.ZooKeeperManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
@@ -589,6 +590,8 @@ public class GraphMapper<I extends Writa
       }
 
       partitionStatsList.clear();
+      TimedLogger partitionLogger = new TimedLogger(15000, LOG);
+      int completedPartitions = 0;
       for (Partition<I, V, E, M> partition :
         serviceWorker.getPartitionMap().values()) {
         PartitionStats partitionStats =
@@ -633,6 +636,10 @@ public class GraphMapper<I extends Writa
         }
 
         partitionStatsList.add(partitionStats);
+        ++completedPartitions;
+        partitionLogger.info("map: Completed " + completedPartitions + " of " +
+            serviceWorker.getPartitionMap().size() + " partitions " +
+            MemoryUtils.getRuntimeMemoryStats());
       }
     } while (!serviceWorker.finishSuperstep(partitionStatsList));
     if (LOG.isInfoEnabled()) {

Added: giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java?rev=1370438&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java Tue Aug  7 19:12:07
2012
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Print log messages only if the time is met.  Thread-safe.
+ */
+public class TimedLogger {
+  /** Last time printed */
+  private volatile long lastPrint = System.currentTimeMillis();
+  /** Minimum interval of time to wait before printing */
+  private final int msecs;
+  /** Logger */
+  private final Logger log;
+
+  /**
+   * Constructor of the timed logger
+   *
+   * @param msecs Msecs to wait before printing again
+   * @param log Logger to print to
+   */
+  public TimedLogger(int msecs, Logger log) {
+    this.msecs = msecs;
+    this.log = log;
+  }
+
+  /**
+   * Print to the info log level if the minimum waiting time was reached.
+   *
+   * @param msg Message to print
+   */
+  public void info(String msg) {
+    if (System.currentTimeMillis() > lastPrint + msecs) {
+      log.info(msg);
+      lastPrint = System.currentTimeMillis();
+    }
+  }
+}

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Tue Aug  7 19:12:07
2012
@@ -82,6 +82,7 @@ public class ConnectionTest {
     @SuppressWarnings("rawtypes")
     Context context = mock(Context.class);
     when(context.getConfiguration()).thenReturn(conf);
+
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
         new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
             (SimpleMessageStore.newFactory(



Mime
View raw message