Return-Path: Delivered-To: apmail-incubator-chukwa-commits-archive@www.apache.org Received: (qmail 24160 invoked from network); 24 Nov 2010 17:48:53 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 24 Nov 2010 17:48:53 -0000 Received: (qmail 94637 invoked by uid 500); 24 Nov 2010 17:49:24 -0000 Delivered-To: apmail-incubator-chukwa-commits-archive@incubator.apache.org Received: (qmail 94483 invoked by uid 500); 24 Nov 2010 17:49:24 -0000 Mailing-List: contact chukwa-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: chukwa-dev@incubator.apache.org Delivered-To: mailing list chukwa-commits@incubator.apache.org Received: (qmail 94277 invoked by uid 99); 24 Nov 2010 17:49:24 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Nov 2010 17:49:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Nov 2010 17:49:20 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E7D6723888E8; Wed, 24 Nov 2010 17:47:47 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1038719 - in /incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection: collector/servlet/ServletCollector.java writer/SeqFileWriter.java Date: Wed, 24 Nov 2010 17:47:47 -0000 To: chukwa-commits@incubator.apache.org From: billgraham@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101124174747.E7D6723888E8@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: billgraham Date: Wed Nov 24 17:47:47 2010 New Revision: 1038719 URL: http://svn.apache.org/viewvc?rev=1038719&view=rev Log: CHUKWA-533. Improve fault-tolerance of collector Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=1038719&r1=1038718&r2=1038719&view=diff ============================================================================== --- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java (original) +++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java Wed Nov 24 17:47:47 2010 @@ -147,14 +147,19 @@ public class ServletCollector extends Ht } } + int responseStatus = HttpServletResponse.SC_OK; + // write new data to data sync file if (writer != null) { ChukwaWriter.CommitStatus result = writer.add(events); - numberchunks += events.size(); - lifetimechunks += events.size(); + // this is where we ACK this connection if(result == ChukwaWriter.COMMIT_OK) { + // only count the chunks if result is commit or commit pending + numberchunks += events.size(); + lifetimechunks += events.size(); + for(Chunk receivedChunk: events) { sb.append(ACK_PREFIX); sb.append(receivedChunk.getData().length); @@ -162,10 +167,18 @@ public class ServletCollector extends Ht sb.append(receivedChunk.getSeqID() - 1).append("\n"); } } else if(result instanceof ChukwaWriter.COMMIT_PENDING) { + + // only count the chunks if result is commit or commit pending + numberchunks += events.size(); + lifetimechunks += events.size(); + for(String s: ((ChukwaWriter.COMMIT_PENDING) result).pendingEntries) sb.append(s); + } else if(result == ChukwaWriter.COMMIT_FAIL) { + sb.append("Commit failed"); + responseStatus = HttpServletResponse.SC_SERVICE_UNAVAILABLE; } - + l_out.print(sb.toString()); } else { l_out.println("can't write: no writer"); @@ -175,7 +188,7 @@ public class ServletCollector extends Ht diagnosticPage.doneWithPost(); } - resp.setStatus(200); + resp.setStatus(responseStatus); } catch (Throwable e) { log.warn("Exception talking to " + req.getRemoteHost() + " at t=" Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=1038719&r1=1038718&r2=1038719&view=diff ============================================================================== --- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java (original) +++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Wed Nov 24 17:47:47 2010 @@ -28,6 +28,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.io.IOException; import org.apache.hadoop.chukwa.ChukwaArchiveKey; import org.apache.hadoop.chukwa.Chunk; @@ -80,7 +81,7 @@ public class SeqFileWriter extends Pipel protected volatile long dataSize = 0; protected volatile long bytesThisRotate = 0; protected volatile boolean isRunning = false; - + static { try { localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_"; @@ -126,7 +127,7 @@ public class SeqFileWriter extends Pipel if (fs == null) { log.error("can't connect to HDFS at " + fs.getUri() + " bail out!"); DaemonWatcher.bailout(-1); - } + } } catch (Throwable e) { log.error( "can't connect to HDFS, trying default file system instead (likely to be local)", @@ -183,9 +184,7 @@ public class SeqFileWriter extends Pipel newName = newName.replace(".", ""); newName = outputDir + "/" + newName.trim(); - boolean bailOut = false; - - try { + try { lock.acquire(); FSDataOutputStream previousOutputStr = currentOutputStr; @@ -193,43 +192,52 @@ public class SeqFileWriter extends Pipel String previousFileName = currentFileName; if (previousOutputStr != null) { - previousOutputStr.close(); + boolean closed = false; + try { + log.info("closing sink file" + previousFileName); + previousOutputStr.close(); + closed = true; + }catch (Throwable e) { + log.error("couldn't close file" + previousFileName, e); + //we probably have an orphaned 0 byte file at this point due to an + //intermitant HDFS outage. Once HDFS comes up again we'll be able to + //close it, although it will be empty. + } + if (bytesThisRotate > 0) { - log.info("rotating sink file " + previousPath); - fs.rename(previousPath, new Path(previousFileName + ".done")); + if (closed) { + log.info("rotating sink file " + previousPath); + fs.rename(previousPath, new Path(previousFileName + ".done")); + } + else { + log.warn(bytesThisRotate + " bytes potentially lost, since " + + previousPath + " could not be closed."); + } } else { log.info("no chunks written to " + previousPath + ", deleting"); fs.delete(previousPath, false); } } + Path newOutputPath = new Path(newName + ".chukwa"); FSDataOutputStream newOutputStr = fs.create(newOutputPath); - currentOutputStr = newOutputStr; - currentPath = newOutputPath; - currentFileName = newName; - bytesThisRotate = 0; // Uncompressed for now seqFileWriter = SequenceFile.createWriter(conf, newOutputStr, ChukwaArchiveKey.class, ChunkImpl.class, SequenceFile.CompressionType.NONE, null); + + // reset these once we know that seqFileWriter was created + currentOutputStr = newOutputStr; + currentPath = newOutputPath; + currentFileName = newName; + bytesThisRotate = 0; } catch (Throwable e) { - log.warn("Got an exception in rotate",e); - bailOut = true; - isRunning = false; + log.warn("Got an exception trying to rotate. Will try again in " + + rotateInterval/1000 + " seconds." ,e); } finally { lock.release(); } - if (bailOut) { - log.fatal("IO Exception in rotate. Exiting!"); - // As discussed for now: - // Everytime this happen in the past it was because HDFS was down, - // so there's nothing we can do - // Shutting down the collector for now - // Watchdog will re-start it automatically - DaemonWatcher.bailout(-1); - } - // Schedule the next timer rotateTimer = new Timer(); rotateTimer.schedule(new TimerTask() { @@ -237,7 +245,7 @@ public class SeqFileWriter extends Pipel rotate(); } }, rotateInterval); - + } @@ -277,17 +285,26 @@ public class SeqFileWriter extends Pipel archiveKey.setSeqId(chunk.getSeqID()); if (chunk != null) { - // compute size for stats + seqFileWriter.append(archiveKey, chunk); + + // compute size for stats only if append succeeded. Note though that + // seqFileWriter.append can continue taking data for quite some time + // after HDFS goes down while the client is trying to reconnect. Hence + // these stats might not reflect reality during an HDFS outage. dataSize += chunk.getData().length; bytesThisRotate += chunk.getData().length; - seqFileWriter.append(archiveKey, chunk); String futureName = currentPath.getName().replace(".chukwa", ".done"); result.addPend(futureName, currentOutputStr.getPos()); } } - } catch (Throwable e) { + } + catch (IOException e) { + log.error("IOException when trying to write a chunk, Collector will return error and keep running.", e); + return COMMIT_FAIL; + } + catch (Throwable e) { // We don't want to loose anything log.fatal("IOException when trying to write a chunk, Collector is going to exit!", e); DaemonWatcher.bailout(-1);