hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r674589 - in /hadoop/core/trunk: CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Date Mon, 07 Jul 2008 18:50:11 GMT
Author: shv
Date: Mon Jul  7 11:50:10 2008
New Revision: 674589

URL: http://svn.apache.org/viewvc?rev=674589&view=rev
Log:
HADOOP-3633. Correct exception handling in DataXceiveServer, and throttle the number of xceiver
threads in a data-node. Contributed by Konstantin Shvachko.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=674589&r1=674588&r2=674589&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jul  7 11:50:10 2008
@@ -817,6 +817,9 @@
     HADOOP-3645. MetricsTimeVaryingRate returns wrong value for
     metric_avg_time. (Lohit Vijayarenu via hairong)
 
+    HADOOP-3633. Correct exception handling in DataXceiveServer, and throttle
+    the number of xceiver threads in a data-node. (shv)
+
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=674589&r1=674588&r2=674589&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Jul
 7 11:50:10 2008
@@ -187,10 +187,6 @@
     return System.currentTimeMillis();
   }
 
-
-
-
-    
   /**
    * Create the DataNode given a configuration and an array of dataDirs.
    * 'dataDirs' is where the blocks are stored.
@@ -560,8 +556,7 @@
           this.threadGroup.interrupt();
           LOG.info("Waiting for threadgroup to exit, active threads is " +
                    this.threadGroup.activeCount());
-          if (this.threadGroup.isDestroyed() ||
-              this.threadGroup.activeCount() == 0) {
+          if (this.threadGroup.activeCount() == 0) {
             break;
           }
           try {
@@ -631,18 +626,18 @@
     shutdown();
   }
     
-  private static class Count {
-    int value = 0;
-    Count(int init) { value = init; }
-    synchronized void incr() { value++; }
-    synchronized void decr() { value--; }
-    @Override
-    public String toString() { return Integer.toString(value); }
-    public int getValue() { return value; }
+  /**
+   * Maximal number of concurrent xceivers per node.
+   * Enforcing the limit is required in order to avoid data-node
+   * running out of memory.
+   */
+  private final static int MAX_XCEIVER_COUNT = 256;
+
+  /** Number of concurrent xceivers per node. */
+  int getXceiverCount() {
+    return threadGroup == null ? 0 : threadGroup.activeCount();
   }
     
-  Count xceiverCount = new Count(0);
-    
   /**
    * Main loop for the DataNode.  Runs until shutdown,
    * forever calling remote NameNode functions.
@@ -677,7 +672,7 @@
                                                        data.getDfsUsed(),
                                                        data.getRemaining(),
                                                        xmitsInProgress,
-                                                       xceiverCount.getValue());
+                                                       getXceiverCount());
           myMetrics.heartbeats.inc(now() - startTime);
           //LOG.info("Just sent heartbeat, with name " + localName);
           lastHeartbeat = startTime;
@@ -973,15 +968,25 @@
     /**
      */
     public void run() {
-      try {
-        while (shouldRun) {
+      while (shouldRun) {
+        try {
           Socket s = ss.accept();
           s.setTcpNoDelay(true);
           new Daemon(threadGroup, new DataXceiver(s)).start();
+        } catch (IOException ie) {
+          LOG.warn(dnRegistration + ":DataXceiveServer: " 
+                                  + StringUtils.stringifyException(ie));
+        } catch (Throwable te) {
+          LOG.error(dnRegistration + ":DataXceiveServer: Exiting due to:" 
+                                   + StringUtils.stringifyException(te));
+          shouldRun = false;
         }
+      }
+      try {
         ss.close();
       } catch (IOException ie) {
-        LOG.info(dnRegistration + ":Exiting DataXceiveServer due to " + ie.toString());
+        LOG.warn(dnRegistration + ":DataXceiveServer: " 
+                                + StringUtils.stringifyException(ie));
       }
     }
     public void kill() {
@@ -989,14 +994,16 @@
         "shoudRun should be set to false before killing";
       try {
         this.ss.close();
-      } catch (IOException iex) {
+      } catch (IOException ie) {
+        LOG.warn(dnRegistration + ":DataXceiveServer.kill(): " 
+                                + StringUtils.stringifyException(ie));
       }
 
       // close all the sockets that were accepted earlier
       synchronized (childSockets) {
-        for (Iterator it = childSockets.values().iterator();
+        for (Iterator<Socket> it = childSockets.values().iterator();
              it.hasNext();) {
-          Socket thissock = (Socket) it.next();
+          Socket thissock = it.next();
           try {
             thissock.close();
           } catch (IOException e) {
@@ -1019,7 +1026,7 @@
       InetSocketAddress isock = (InetSocketAddress)s.getRemoteSocketAddress();
       remoteAddress = isock.toString();
       localAddress = s.getInetAddress() + ":" + s.getLocalPort();
-      LOG.debug("Number of active connections is: "+xceiverCount);
+      LOG.debug("Number of active connections is: " + getXceiverCount());
     }
 
     /**
@@ -1037,6 +1044,13 @@
         }
         boolean local = s.getInetAddress().equals(s.getLocalAddress());
         byte op = in.readByte();
+        // Make sure the xciver count is not exceeded
+        int curXceiverCount = getXceiverCount();
+        if(curXceiverCount > MAX_XCEIVER_COUNT) {
+          throw new IOException("xceiverCount " + curXceiverCount
+                                + " exceeds the limit of concurrent xcievers "
+                                + MAX_XCEIVER_COUNT);
+        }
         long startTime = now();
         switch ( op ) {
         case OP_READ_BLOCK:
@@ -1073,7 +1087,8 @@
       } catch (Throwable t) {
         LOG.error(dnRegistration + ":DataXceiver: " + StringUtils.stringifyException(t));
       } finally {
-        LOG.debug(dnRegistration + ":Number of active connections is: "+xceiverCount);
+        LOG.debug(dnRegistration + ":Number of active connections is: "
+                                 + getXceiverCount());
         IOUtils.closeStream(in);
         IOUtils.closeSocket(s);
         childSockets.remove(s);
@@ -1086,7 +1101,6 @@
      * @throws IOException
      */
     private void readBlock(DataInputStream in) throws IOException {
-      xceiverCount.incr();
       //
       // Read in the header
       //
@@ -1140,7 +1154,6 @@
                   StringUtils.stringifyException(ioe) );
         throw ioe;
       } finally {
-        xceiverCount.decr();
         IOUtils.closeStream(out);
         IOUtils.closeStream(blockSender);
       }
@@ -1153,7 +1166,6 @@
      * @throws IOException
      */
     private void writeBlock(DataInputStream in) throws IOException {
-      xceiverCount.incr();
       DatanodeInfo srcDataNode = null;
       LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
                 " tcp no delay " + s.getTcpNoDelay());
@@ -1316,8 +1328,6 @@
         IOUtils.closeStream(replyOut);
         IOUtils.closeSocket(mirrorSock);
         IOUtils.closeStream(blockReceiver);
-        // decrement counter
-        xceiverCount.decr();
       }
     }
 
@@ -1326,8 +1336,6 @@
      * @param in
      */
     void readMetadata(DataInputStream in) throws IOException {
-      xceiverCount.incr();
-
       Block block = new Block( in.readLong(), 0 , in.readLong());
       MetaDataInputStream checksumIn = null;
       DataOutputStream out = null;
@@ -1356,7 +1364,6 @@
         //last DATA_CHUNK
         out.writeInt(0);
       } finally {
-        xceiverCount.decr();
         IOUtils.closeStream(out);
         IOUtils.closeStream(checksumIn);
       }
@@ -2917,6 +2924,7 @@
     }
         
     LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
+    shutdown();
   }
     
   /** Start a single datanode daemon and wait for it to finish.



Mime
View raw message