incubator-chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r941034 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/ src/java/org/apache/hadoop/chukwa/util/ src/test/org/apache/hadoop/chukwa/util/
Date Tue, 04 May 2010 20:47:38 GMT
Author: asrabkin
Date: Tue May  4 20:47:38 2010
New Revision: 941034

URL: http://svn.apache.org/viewvc?rev=941034&view=rev
Log:
CHUKWA-4. Collectors don't finish writing .dones on close. Contributed by Ahmed Fathalla.

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/CopySequenceFile.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCopySequenceFile.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=941034&r1=941033&r2=941034&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Tue May  4 20:47:38 2010
@@ -22,6 +22,8 @@ Trunk (unreleased changes)
 
   BUG FIXES
 
+    CHUKWA-4. Collectors don't finish writing .dones on close. (Ahmed Fathalla via asrabkin)
+
     CHUKWA-485. Fix TSProcessor handling of some dates. (Bill Graham via asrabkin)
 
     CHUKWA-478. Fixed TestSocketTee intermittent failure.  (Chris Douglas via Eric Yang)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java?rev=941034&r1=941033&r2=941034&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java
Tue May  4 20:47:38 2010
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
 import java.net.URI;
 import java.util.concurrent.BlockingQueue;
 
+import org.apache.hadoop.chukwa.util.CopySequenceFile;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -138,20 +139,57 @@ public class LocalToRemoteHdfsMover exte
       String fileName = null;
       for (FileStatus file: files) {
         fileName = file.getPath().getName();
-        if (fileName.endsWith(".done")) {
-          moveFile(localOutputDir + fileName);
-        } else if (fileName.endsWith(".chukwa")) {
-          long lastPeriod = System.currentTimeMillis() - rotateInterval - (2*60*1000);
-          if (file.getModificationTime() < lastPeriod) {
-            log.info("Moving .chukwa file over, " + localOutputDir + fileName);
-            moveFile(localOutputDir + fileName);
-          }
+       
+        if (fileName.endsWith(".recover")) {
+          //.recover files indicate a previously failed copying attempt of .chukwa files
+        	
+          Path recoverPath= new Path(localOutputDir+fileName);
+          localFs.delete(recoverPath, false);
+          log.info("Deleted .recover file, " + localOutputDir + fileName);
+        } else if (fileName.endsWith(".recoverDone")) {
+            //.recoverDone files are valid sink files that have not been renamed to .done
+        	// First, check if there are still any .chukwa files with the same name
+         	
+            String chukwaFileName= fileName.replace(".recoverDone", ".chukwa");
+        	Boolean fileNotFound=true;
+        	int i=0;
+        	while (i<files.length && fileNotFound) {
+        	   String currentFileName = files[i].getPath().getName();
+        	   
+        	   if (currentFileName.equals(chukwaFileName)){
+        	      //Remove the .chukwa file found as it has already been recovered
+        	      
+        	     fileNotFound = false;
+        	     Path chukwaFilePath = new Path(localOutputDir+chukwaFileName);
+        	     localFs.delete(chukwaFilePath,false);	
+        	     log.info(".recoverDone file exists, deleted duplicate .chukwa file, "
+        	    		 + localOutputDir + fileName);
+        	   }
+        	   i++;
+        	}
+        	 //Finally, rename .recoverDone file to .done
+        	 
+        	String doneFileName= fileName.replace(".recoverDone", ".done");
+        	Path donePath= new Path(localOutputDir+doneFileName);
+        	Path recoverDonePath= new Path(localOutputDir+fileName);
+        	localFs.rename(recoverDonePath, donePath);
+        	log.info("Renamed .recoverDone file to .done, "+ localOutputDir + fileName);
+         }  else if (fileName.endsWith(".done")) {
+              moveFile(localOutputDir + fileName);
+            }  else if (fileName.endsWith(".chukwa")) {
+                 long lastPeriod = System.currentTimeMillis() - rotateInterval - (2*60*1000);
+                 if (file.getModificationTime() < lastPeriod) { 
+        	       //. chukwa file has not modified for some time, may indicate collector had
previously crashed
+         
+                   log.info("Copying .chukwa file to valid sink file before moving, " + localOutputDir
+ fileName);
+                   CopySequenceFile.createValidSequenceFile(conf,localOutputDir,fileName,localFs);
+                  }
+               } 
         }
+    } catch (Exception e) {
+        log.warn("Cannot copy to the remote HDFS",e);
+        throw e;
       }
-    }catch (Exception e) {
-      log.warn("Cannot copy to the remote HDFS",e);
-      throw e;
-    }
   }
   
   @Override

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/CopySequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/CopySequenceFile.java?rev=941034&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/CopySequenceFile.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/CopySequenceFile.java Tue May
 4 20:47:38 2010
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.chukwa.util;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.log4j.Logger;
+
+/**
+ * This class is used by LocalToRemoteHDFSMover to
+ * convert .chukwa files to .done before moving them. 
+ * By creating a new sequence file and copying all valid chunks to it,
+ * it makes sure that no corrupt sequence files get into HDFS.
+ */
+public class CopySequenceFile {
+  static Logger log = Logger.getLogger(CopySequenceFile.class);
+  private static SequenceFile.Writer seqFileWriter = null;
+  private static SequenceFile.Reader seqFileReader = null; 
+  private static FSDataOutputStream newOutputStr = null;
+  
+  public static void createValidSequenceFile(Configuration conf,
+		                                     String originalFileDir, 
+		                                     String originalFileName,
+		                                     FileSystem localFs) {
+    try {
+      if (!originalFileDir.endsWith("/")) {
+    	      originalFileDir += "/";
+      }	
+	  String originalCompleteDir= originalFileDir + originalFileName;
+	  Path originalPath= new Path(originalCompleteDir);
+	  int extensionIndex= originalFileName.indexOf(".chukwa",0);
+      
+	  String recoverFileName=originalFileName.substring(0, extensionIndex)+".recover";
+	  String recoverDir= originalFileDir + recoverFileName;
+	  Path recoverPath= new Path(recoverDir);
+	  String recoverDoneFileName=originalFileName.substring(0, extensionIndex)+".recoverDone";
+ 	  String recoverDoneDir= originalFileDir + recoverDoneFileName;
+ 	  Path recoverDonePath= new Path(recoverDoneDir);
+	  String doneFileName=originalFileName.substring(0, extensionIndex)+".done";
+	  String doneDir= originalFileDir + doneFileName;
+	  Path donePath= new Path(doneDir);
+	  
+	  ChukwaArchiveKey key = new ChukwaArchiveKey();
+      ChunkImpl evt = ChunkImpl.getBlankChunk();
+
+	  newOutputStr = localFs.create(recoverPath);
+      seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
+                                                ChukwaArchiveKey.class, ChunkImpl.class,
+                                                SequenceFile.CompressionType.NONE, null);
+      seqFileReader = new SequenceFile.Reader(localFs, originalPath, conf);
+        
+      System.out.println("key class name is " + seqFileReader.getKeyClassName());
+      System.out.println("value class name is " + seqFileReader.getValueClassName());
+      try { 
+        while (seqFileReader.next(key, evt)) {
+          seqFileWriter.append(key, evt);
+        }
+       } catch (ChecksumException e) { //The exception occurs when we read a bad chunk while
copying
+           log.info("Encountered Bad Chunk while copying .chukwa file, continuing",e);	 
+       }
+       seqFileReader.close();
+	   seqFileWriter.close();
+	   newOutputStr.close();
+       try {
+	     localFs.rename(recoverPath, recoverDonePath); //Rename the destination file from .recover
to .recoverDone 
+   	     localFs.delete(originalPath,false); //Delete Original .chukwa file
+	     localFs.rename(recoverDonePath, donePath); //rename .recoverDone to .done
+       } catch (Exception e) {
+           log.warn("Error occured while renaming .recoverDone to .recover or deleting .chukwa",e);
	 
+    	   e.printStackTrace();
+       }
+
+	} catch(Exception e) {
+	    log.warn("Error during .chukwa file recovery",e);	 
+	    e.printStackTrace();
+	}	
+  }
+}

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCopySequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCopySequenceFile.java?rev=941034&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCopySequenceFile.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCopySequenceFile.java Tue
May  4 20:47:38 2010
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.util;
+
+import junit.framework.TestCase;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.io.IOException;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.util.CopySequenceFile;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+
+public class TestCopySequenceFile extends TestCase {
+  File doneFile = null;
+  File tempDir = null;
+  String tempFileName = null;
+	
+  public void testCopySequenceFile() throws IOException {
+    //Create a .chukwa sequence file 
+	  
+    tempDir = new File(System.getProperty("test.build.data", "/tmp"));
+    File tempFile = File.createTempFile("testcopy", ".chukwa", tempDir); 
+    tempFile.deleteOnExit(); // Will delete this file if test fails and file is not renamed
to .done
+    tempFileName=tempFile.getName();
+    Configuration conf = new Configuration();
+    Path path = new Path(tempFile.getAbsolutePath());
+    List<ChunkImpl> chunks = new ArrayList<ChunkImpl>();
+    byte[] dat = "test".getBytes();
+    
+    ChunkImpl c = new ChunkImpl("Data", "aname", dat.length, dat, null);
+    chunks.add(c);
+    
+    dat = "ing".getBytes();
+    c = new ChunkImpl("Data", "aname", dat.length+4, dat, null);
+    chunks.add(c);
+    
+    //Utilize the writeSeqFile method to create a valid .chukwa sequence file
+    
+    writeSeqFile(conf, FileSystem.getLocal(conf), path, chunks);
+    
+    //Call CopySequenceFile to convert .chukwa to .done
+    
+    CopySequenceFile.createValidSequenceFile(conf, tempDir.getAbsolutePath(), tempFile.getName(),
FileSystem.getLocal(conf));
+	
+    //Assert that the chukwa file has been deleted
+    
+	assertFalse("File " + tempFile.getAbsolutePath() + " has not been deleted", tempFile.exists())
; 
+	
+	String doneFilePath= tempDir.getAbsolutePath()+"/"+tempFileName.replace(".chukwa", ".done");
+	doneFile= new File(doneFilePath);
+	
+	//Assert that the done file has been created
+	
+    assertTrue("File " + doneFilePath + " has not been created", doneFile.exists()); 
+		
+  }
+  public static void writeSeqFile(Configuration conf, FileSystem fileSys, Path dest,
+	                              List<ChunkImpl> chunks) throws IOException {
+    FSDataOutputStream out = fileSys.create(dest);
+
+	Calendar calendar = Calendar.getInstance();
+	SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, out,
+	                                    ChukwaArchiveKey.class, ChunkImpl.class,
+	                                    SequenceFile.CompressionType.NONE, null);
+	    
+	for (ChunkImpl chunk: chunks) {
+	  ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+	      
+      calendar.set(Calendar.SECOND, 0);
+      calendar.set(Calendar.MILLISECOND, 0);
+      archiveKey.setTimePartition(calendar.getTimeInMillis());
+      
+      archiveKey.setDataType(chunk.getDataType());
+      archiveKey.setStreamName(chunk.getStreamName());
+      archiveKey.setSeqId(chunk.getSeqID());
+      seqFileWriter.append(archiveKey, chunk);
+	}
+    seqFileWriter.close();
+    out.close();
+  }
+  protected void tearDown() {
+	if (doneFile != null && doneFile.exists()){
+		  doneFile.delete();
+		} else { //Cleanup any files that may have been created during a failed copy attempt 
+			File recoverFile = new File(tempDir.getAbsolutePath()+"/"+tempFileName.replace(".chukwa",
".recover"));
+			if (recoverFile.exists()){
+			  recoverFile.delete();
+			} else {
+			    File recoverDoneFile = new File(tempDir.getAbsolutePath()+"/"+tempFileName.replace(".chukwa",
".recoverDone"));
+			    if (recoverDoneFile.exists()){
+			      recoverDoneFile.delete();
+			    }
+			  }
+		 }
+  }
+}	



Mime
View raw message