hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r393865 - in /lucene/hadoop/trunk: conf/hadoop-default.xml src/java/org/apache/hadoop/dfs/DFSClient.java src/java/org/apache/hadoop/dfs/DataNode.java
Date Thu, 13 Apr 2006 17:44:24 GMT
Author: cutting
Date: Thu Apr 13 10:44:22 2006
New Revision: 393865

URL: http://svn.apache.org/viewcvs?rev=393865&view=rev
Log:
Fix HADOOP-128.  Improved DFS error handling.  Contributed by Owen O'Malley.

Modified:
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=393865&r1=393864&r2=393865&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Thu Apr 13 10:44:22 2006
@@ -118,6 +118,14 @@
   <description>Disk usage statistics refresh interval in msec.</description>
 </property>
 
+<property>
+  <name>dfs.client.block.write.retries</name>
+  <value>3</value>
+  <description>The number of retries for writing blocks to the data nodes, 
+  before we signal failure to the application.
+  </description>
+</property>
+
 <!-- map/reduce properties -->
 
 <property>

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=393865&r1=393864&r2=393865&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Apr 13 10:44:22
2006
@@ -564,6 +564,7 @@
         private Block block;
         private long filePos = 0;
         private int bytesWrittenToBlock = 0;
+        private String datanodeName;
 
         /**
          * Create a new output stream to the given DataNode.
@@ -628,7 +629,8 @@
                 try {
                     s = new Socket();
                     s.connect(target, READ_TIMEOUT);
-                    s.setSoTimeout(READ_TIMEOUT);
+                    s.setSoTimeout(replication * READ_TIMEOUT);
+                    datanodeName = nodes[0].getName().toString();
                 } catch (IOException ie) {
                     // Connection failed.  Let's wait a little bit and retry
                     try {
@@ -764,8 +766,10 @@
             //
             // Send it to datanode
             //
-            boolean mustRecover = true;
-            while (mustRecover) {
+            boolean sentOk = false;
+            int remainingAttempts = 
+               conf.getInt("dfs.client.block.write.retries", 3);
+            while (!sentOk) {
                 nextBlockOutputStream();
                 InputStream in = new FileInputStream(backupFile);
                 try {
@@ -777,9 +781,13 @@
                         bytesRead = in.read(buf);
                     }
                     internalClose();
-                    mustRecover = false;
+                    sentOk = true;
                 } catch (IOException ie) {
                     handleSocketException(ie);
+                    remainingAttempts -= 1;
+                    if (remainingAttempts == 0) {
+                      throw ie;
+                    }
                 } finally {
                   in.close();
                 }
@@ -798,6 +806,7 @@
          * Close down stream to remote datanode.
          */
         private synchronized void internalClose() throws IOException {
+          try {
             blockStream.writeLong(0);
             blockStream.flush();
 
@@ -806,6 +815,13 @@
                 LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);
                 throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);
             }
+          } catch (IOException ie) {
+            throw (IOException)
+                  new IOException("failure closing block of file " +
+                                  src.toString() + " to node " +
+                                  (datanodeName == null ? "?" : datanodeName)
+                                 ).initCause(ie);
+          }
                     
             LocatedBlock lb = new LocatedBlock();
             lb.readFields(blockReplyStream);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=393865&r1=393864&r2=393865&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu Apr 13 10:44:22 2006
@@ -80,6 +80,13 @@
         return new InetSocketAddress(host, port);
     }
 
+    private static String stringifyException(Exception e) {
+      StringWriter stm = new StringWriter();
+      PrintWriter wrt = new PrintWriter(stm);
+      e.printStackTrace(wrt);
+      wrt.close();
+      return stm.toString();
+    }
 
     private static Vector subThreadList = null;
     DatanodeProtocol namenode;
@@ -91,7 +98,6 @@
     Daemon dataXceiveServer = null;
     long blockReportInterval;
     private long datanodeStartupPeriod;
-    private Configuration fConf;
 
     /**
      * Create the DataNode given a configuration and a dataDir.
@@ -160,10 +166,8 @@
      * forever calling remote NameNode functions.
      */
     public void offerService() throws Exception {
-        long wakeups = 0;
         long lastHeartbeat = 0, lastBlockReport = 0;
         long sendStart = System.currentTimeMillis();
-        int heartbeatsSent = 0;
         LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
 
         //
@@ -321,275 +325,9 @@
                 try {
                     byte op = (byte) in.read();
                     if (op == OP_WRITE_BLOCK) {
-                        //
-                        // Read in the header
-                        //
-                        DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
-                        try {
-                            boolean shouldReportBlock = in.readBoolean();
-                            Block b = new Block();
-                            b.readFields(in);
-                            int numTargets = in.readInt();
-                            if (numTargets <= 0) {
-                                throw new IOException("Mislabelled incoming datastream.");
-                            }
-                            DatanodeInfo targets[] = new DatanodeInfo[numTargets];
-                            for (int i = 0; i < targets.length; i++) {
-                                DatanodeInfo tmp = new DatanodeInfo();
-                                tmp.readFields(in);
-                                targets[i] = tmp;
-                            }
-                            byte encodingType = (byte) in.read();
-                            long len = in.readLong();
-
-                            //
-                            // Make sure curTarget is equal to this machine
-                            //
-                            DatanodeInfo curTarget = targets[0];
-
-                            //
-                            // Track all the places we've successfully written the block
-                            //
-                            Vector mirrors = new Vector();
-
-                            //
-                            // Open local disk out
-                            //
-                            DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b)));
-                            InetSocketAddress mirrorTarget = null;
-                            try {
-                                //
-                                // Open network conn to backup machine, if 
-                                // appropriate
-                                //
-                                DataInputStream in2 = null;
-                                DataOutputStream out2 = null;
-                                if (targets.length > 1) {
-                                    // Connect to backup machine
-                                    mirrorTarget = createSocketAddr(targets[1].getName().toString());
-                                    try {
-                                        Socket s2 = new Socket();
-                                        s2.connect(mirrorTarget, READ_TIMEOUT);
-                                        s2.setSoTimeout(READ_TIMEOUT);
-                                        out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));
-                                        in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));
-
-                                        // Write connection header
-                                        out2.write(OP_WRITE_BLOCK);
-                                        out2.writeBoolean(shouldReportBlock);
-                                        b.write(out2);
-                                        out2.writeInt(targets.length - 1);
-                                        for (int i = 1; i < targets.length; i++) {
-                                            targets[i].write(out2);
-                                        }
-                                        out2.write(encodingType);
-                                        out2.writeLong(len);
-                                    } catch (IOException ie) {
-                                        if (out2 != null) {
-                                            try {
-                                                out2.close();
-                                                in2.close();
-                                            } catch (IOException out2close) {
-                                            } finally {
-                                                out2 = null;
-                                                in2 = null;
-                                            }
-                                        }
-                                    }
-                                }
-
-                                //
-                                // Process incoming data, copy to disk and
-                                // maybe to network.
-                                //
-                                try {
-                                    boolean anotherChunk = len != 0;
-                                    byte buf[] = new byte[BUFFER_SIZE];
-
-                                    while (anotherChunk) {
-                                        while (len > 0) {
-                                            int bytesRead = in.read(buf, 0, (int)Math.min(buf.length,
len));
-                                            if (bytesRead < 0) {
-                                              throw new EOFException("EOF reading from "+s.toString());
-                                            }
-                                            if (bytesRead > 0) {
-                                                try {
-                                                    out.write(buf, 0, bytesRead);
-                                                } catch (IOException iex) {
-                                                    shutdown();
-                                                    throw iex;
-                                                }
-                                                if (out2 != null) {
-                                                    try {
-                                                        out2.write(buf, 0, bytesRead);
-                                                    } catch (IOException out2e) {
-                                                        //
-                                                        // If stream-copy fails, continue

-                                                        // writing to disk.  We shouldn't

-                                                        // interrupt client write.
-                                                        //
-                                                        try {
-                                                            out2.close();
-                                                            in2.close();
-                                                        } catch (IOException out2close) {
-                                                        } finally {
-                                                            out2 = null;
-                                                            in2 = null;
-                                                        }
-                                                    }
-                                                }
-                                                len -= bytesRead;
-                                            }
-                                        }
-
-                                        if (encodingType == RUNLENGTH_ENCODING) {
-                                            anotherChunk = false;
-                                        } else if (encodingType == CHUNKED_ENCODING) {
-                                            len = in.readLong();
-                                            if (out2 != null) {
-                                                out2.writeLong(len);
-                                            }
-                                            if (len == 0) {
-                                                anotherChunk = false;
-                                            }
-                                        }
-                                    }
-
-                                    if (out2 == null) {
-                                        LOG.info("Received block " + b + " from " + s.getInetAddress());
-                                    } else {
-                                        out2.flush();
-                                        long complete = in2.readLong();
-                                        if (complete != WRITE_COMPLETE) {
-                                            LOG.info("Conflicting value for WRITE_COMPLETE:
" + complete);
-                                        }
-                                        LocatedBlock newLB = new LocatedBlock();
-                                        newLB.readFields(in2);
-                                        DatanodeInfo mirrorsSoFar[] = newLB.getLocations();
-                                        for (int k = 0; k < mirrorsSoFar.length; k++)
{
-                                            mirrors.add(mirrorsSoFar[k]);
-                                        }
-                                        LOG.info("Received block " + b + " from " + s.getInetAddress()
+ " and mirrored to " + mirrorTarget);
-                                    }
-                                } finally {
-                                    if (out2 != null) {
-                                        out2.close();
-                                        in2.close();
-                                    }
-                                }
-                            } finally {
-                                try {
-                                    out.close();
-                                } catch (IOException iex) {
-                                    shutdown();
-                                    throw iex;
-                                }
-                            }
-                            data.finalizeBlock(b);
-
-                            // 
-                            // Tell the namenode that we've received this block 
-                            // in full, if we've been asked to.  This is done
-                            // during NameNode-directed block transfers, but not
-                            // client writes.
-                            //
-                            if (shouldReportBlock) {
-                                synchronized (receivedBlockList) {
-                                    receivedBlockList.add(b);
-                                    receivedBlockList.notifyAll();
-                                }
-                            }
-
-                            //
-                            // Tell client job is done, and reply with
-                            // the new LocatedBlock.
-                            //
-                            reply.writeLong(WRITE_COMPLETE);
-                            mirrors.add(curTarget);
-                            LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new
DatanodeInfo[mirrors.size()]));
-                            newLB.write(reply);
-                        } finally {
-                            reply.close();
-                        }
+                        writeBlock(in);
                     } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK) {
-                        //
-                        // Read in the header
-                        //
-                        Block b = new Block();
-                        b.readFields(in);
-
-                        long toSkip = 0;
-                        if (op == OP_READSKIP_BLOCK) {
-                            toSkip = in.readLong();
-                        }
-
-                        //
-                        // Open reply stream
-                        //
-                        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
-                        try {
-                            //
-                            // Write filelen of -1 if error
-                            //
-                            if (! data.isValidBlock(b)) {
-                                out.writeLong(-1);
-                            } else {
-                                //
-                                // Get blockdata from disk
-                                //
-                                long len = data.getLength(b);
-                                DataInputStream in2 = new DataInputStream(data.getBlockData(b));
-                                out.writeLong(len);
-
-                                if (op == OP_READSKIP_BLOCK) {
-                                    if (toSkip > len) {
-                                        toSkip = len;
-                                    }
-                                    long amtSkipped = 0;
-                                    try {
-                                        amtSkipped = in2.skip(toSkip);
-                                    } catch (IOException iex) {
-                                        shutdown();
-                                        throw iex;
-                                    }
-                                    out.writeLong(amtSkipped);
-                                }
-
-                                byte buf[] = new byte[BUFFER_SIZE];
-                                try {
-                                    int bytesRead = 0;
-                                    try {
-                                        bytesRead = in2.read(buf);
-                                    } catch (IOException iex) {
-                                        shutdown();
-                                        throw iex;
-                                    }
-                                    while (bytesRead >= 0) {
-                                        out.write(buf, 0, bytesRead);
-                                        len -= bytesRead;
-                                        try {
-                                            bytesRead = in2.read(buf);
-                                        } catch (IOException iex) {
-                                            shutdown();
-                                            throw iex;
-                                        }
-                                    }
-                                } catch (SocketException se) {
-                                    // This might be because the reader
-                                    // closed the stream early
-                                } finally {
-                                    try {
-                                        in2.close();
-                                    } catch (IOException iex) {
-                                        shutdown();
-                                        throw iex;
-                                    }
-                                }
-                            }
-                            LOG.info("Served block " + b + " to " + s.getInetAddress());
-                        } finally {
-                            out.close();
-                        }
+                        readBlock(in, op);
                     } else {
                         while (op >= 0) {
                             System.out.println("Faulty op: " + op);
@@ -608,6 +346,326 @@
                 } catch (IOException ie2) {
                 }
             }
+        }
+
+        /**
+         * Read a block from the disk
+         * @param in The stream to read from
+         * @param op OP_READ_BLOCK or OP_READ_SKIPBLOCK
+         * @throws IOException
+         */
+        private void readBlock(DataInputStream in, byte op) throws IOException {
+          //
+          // Read in the header
+          //
+          Block b = new Block();
+          b.readFields(in);
+
+          long toSkip = 0;
+          if (op == OP_READSKIP_BLOCK) {
+              toSkip = in.readLong();
+          }
+
+          //
+          // Open reply stream
+          //
+          DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
+          try {
+              //
+              // Write filelen of -1 if error
+              //
+              if (! data.isValidBlock(b)) {
+                  out.writeLong(-1);
+              } else {
+                  //
+                  // Get blockdata from disk
+                  //
+                  long len = data.getLength(b);
+                  DataInputStream in2 = new DataInputStream(data.getBlockData(b));
+                  out.writeLong(len);
+
+                  if (op == OP_READSKIP_BLOCK) {
+                      if (toSkip > len) {
+                          toSkip = len;
+                      }
+                      long amtSkipped = 0;
+                      try {
+                          amtSkipped = in2.skip(toSkip);
+                      } catch (IOException iex) {
+                          shutdown();
+                          throw iex;
+                      }
+                      out.writeLong(amtSkipped);
+                  }
+
+                  byte buf[] = new byte[BUFFER_SIZE];
+                  try {
+                      int bytesRead = 0;
+                      try {
+                          bytesRead = in2.read(buf);
+                      } catch (IOException iex) {
+                          shutdown();
+                          throw iex;
+                      }
+                      while (bytesRead >= 0) {
+                          out.write(buf, 0, bytesRead);
+                          len -= bytesRead;
+                          try {
+                              bytesRead = in2.read(buf);
+                          } catch (IOException iex) {
+                              shutdown();
+                              throw iex;
+                          }
+                      }
+                  } catch (SocketException se) {
+                      // This might be because the reader
+                      // closed the stream early
+                  } finally {
+                      try {
+                          in2.close();
+                      } catch (IOException iex) {
+                          shutdown();
+                          throw iex;
+                      }
+                  }
+              }
+              LOG.info("Served block " + b + " to " + s.getInetAddress());
+          } finally {
+              out.close();
+          }
+        }
+
+        /**
+         * Write a block to disk.
+         * @param in The stream to read from
+         * @throws IOException
+         */
+        private void writeBlock(DataInputStream in) throws IOException {
+          //
+          // Read in the header
+          //
+          DataOutputStream reply = 
+            new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
+          try {
+            boolean shouldReportBlock = in.readBoolean();
+            Block b = new Block();
+            b.readFields(in);
+            int numTargets = in.readInt();
+            if (numTargets <= 0) {
+              throw new IOException("Mislabelled incoming datastream.");
+            }
+            DatanodeInfo targets[] = new DatanodeInfo[numTargets];
+            for (int i = 0; i < targets.length; i++) {
+              DatanodeInfo tmp = new DatanodeInfo();
+              tmp.readFields(in);
+              targets[i] = tmp;
+            }
+            byte encodingType = (byte) in.read();
+            long len = in.readLong();
+            
+            //
+            // Make sure curTarget is equal to this machine
+            //
+            DatanodeInfo curTarget = targets[0];
+            
+            //
+            // Track all the places we've successfully written the block
+            //
+            Vector mirrors = new Vector();
+            
+            //
+            // Open local disk out
+            //
+            DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b)));
+            InetSocketAddress mirrorTarget = null;
+            String mirrorNode = null;
+            try {
+              //
+              // Open network conn to backup machine, if 
+              // appropriate
+              //
+              DataInputStream in2 = null;
+              DataOutputStream out2 = null;
+              if (targets.length > 1) {
+                // Connect to backup machine
+                mirrorNode = targets[1].getName().toString();
+                mirrorTarget = createSocketAddr(mirrorNode);
+                try {
+                  Socket s2 = new Socket();
+                  s2.connect(mirrorTarget, READ_TIMEOUT);
+                  s2.setSoTimeout(READ_TIMEOUT);
+                  out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));
+                  in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));
+                  
+                  // Write connection header
+                  out2.write(OP_WRITE_BLOCK);
+                  out2.writeBoolean(shouldReportBlock);
+                  b.write(out2);
+                  out2.writeInt(targets.length - 1);
+                  for (int i = 1; i < targets.length; i++) {
+                    targets[i].write(out2);
+                  }
+                  out2.write(encodingType);
+                  out2.writeLong(len);
+                } catch (IOException ie) {
+                  if (out2 != null) {
+                    LOG.info("Exception connecting to mirror " + mirrorNode 
+                             + "\n" + stringifyException(ie));
+                    try {
+                      out2.close();
+                      in2.close();
+                    } catch (IOException out2close) {
+                    } finally {
+                      out2 = null;
+                      in2 = null;
+                    }
+                  }
+                }
+              }
+              
+              //
+              // Process incoming data, copy to disk and
+              // maybe to network.
+              //
+              boolean anotherChunk = len != 0;
+              byte buf[] = new byte[BUFFER_SIZE];
+              
+              while (anotherChunk) {
+                while (len > 0) {
+                  int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len));
+                  if (bytesRead < 0) {
+                    throw new EOFException("EOF reading from "+s.toString());
+                  }
+                  if (bytesRead > 0) {
+                    try {
+                      out.write(buf, 0, bytesRead);
+                    } catch (IOException iex) {
+                      shutdown();
+                      throw iex;
+                    }
+                    if (out2 != null) {
+                      try {
+                        out2.write(buf, 0, bytesRead);
+                      } catch (IOException out2e) {
+                        LOG.info("Exception writing to mirror " + mirrorNode 
+                            + "\n" + stringifyException(out2e));
+                        //
+                        // If stream-copy fails, continue 
+                        // writing to disk.  We shouldn't 
+                        // interrupt client write.
+                        //
+                        try {
+                          out2.close();
+                          in2.close();
+                        } catch (IOException out2close) {
+                        } finally {
+                          out2 = null;
+                          in2 = null;
+                        }
+                      }
+                    }
+                    len -= bytesRead;
+                  }
+                }
+                
+                if (encodingType == RUNLENGTH_ENCODING) {
+                  anotherChunk = false;
+                } else if (encodingType == CHUNKED_ENCODING) {
+                  len = in.readLong();
+                  if (out2 != null) {
+                    try {
+                      out2.writeLong(len);
+                    } catch (IOException ie) {
+                      LOG.info("Exception writing to mirror " + mirrorNode 
+                          + "\n" + stringifyException(ie));
+                      try {
+                        out2.close();
+                        in2.close();
+                      } catch (IOException ie2) {
+                        // NOTHING
+                      } finally {
+                        out2 = null;
+                        in2 = null;
+                      }
+                    }
+                  }
+                  if (len == 0) {
+                    anotherChunk = false;
+                  }
+                }
+              }
+              
+              if (out2 != null) {
+                try {
+                  out2.flush();
+                  long complete = in2.readLong();
+                  if (complete != WRITE_COMPLETE) {
+                    LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);
+                  }
+                  LocatedBlock newLB = new LocatedBlock();
+                  newLB.readFields(in2);
+                  in2.close();
+                  out2.close();
+                  DatanodeInfo mirrorsSoFar[] = newLB.getLocations();
+                  for (int k = 0; k < mirrorsSoFar.length; k++) {
+                    mirrors.add(mirrorsSoFar[k]);
+                  }
+                } catch (IOException ie) {
+                  LOG.info("Exception writing to mirror " + mirrorNode 
+                      + "\n" + stringifyException(ie));
+                  try {
+                    out2.close();
+                    in2.close();
+                  } catch (IOException ie2) {
+                    // NOTHING
+                  } finally {
+                    out2 = null;
+                    in2 = null;
+                  }
+                }
+              }
+              if (out2 == null) {
+                LOG.info("Received block " + b + " from " + 
+                    s.getInetAddress());
+              } else {
+                LOG.info("Received block " + b + " from " + 
+                    s.getInetAddress() + 
+                    " and mirrored to " + mirrorTarget);
+              }
+            } finally {
+              try {
+                out.close();
+              } catch (IOException iex) {
+                shutdown();
+                throw iex;
+              }
+            }
+            data.finalizeBlock(b);
+            
+            // 
+            // Tell the namenode that we've received this block 
+            // in full, if we've been asked to.  This is done
+            // during NameNode-directed block transfers, but not
+            // client writes.
+            //
+            if (shouldReportBlock) {
+              synchronized (receivedBlockList) {
+                receivedBlockList.add(b);
+                receivedBlockList.notifyAll();
+              }
+            }
+            
+            //
+            // Tell client job is done, and reply with
+            // the new LocatedBlock.
+            //
+            reply.writeLong(WRITE_COMPLETE);
+            mirrors.add(curTarget);
+            LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new
DatanodeInfo[mirrors.size()]));
+            newLB.write(reply);
+          } finally {
+            reply.close();
+          }
         }
     }
 



Mime
View raw message