chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r786353 - in /hadoop/chukwa: branches/chukwa-0.2/ branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/writer/ trunk/ trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/
Date Fri, 19 Jun 2009 01:56:54 GMT
Author: eyang
Date: Fri Jun 19 01:56:53 2009
New Revision: 786353

URL: http://svn.apache.org/viewvc?rev=786353&view=rev
Log:
CHUKWA-313. Removed the 10 second ack from SeqFileWriter. (Eric Yang)

Modified:
    hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java

Modified: hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt?rev=786353&r1=786352&r2=786353&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt (original)
+++ hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt Fri Jun 19 01:56:53 2009
@@ -34,6 +34,8 @@
 
   IMPROVEMENTS
 
+    CHUKWA-313. Removed the 10 second ack from SeqFileWriter. (Eric Yang)
+
     CHUKWA-5. Adaptors have durable names. (asrabkin)
 
     CHUKWA-278. Improve post process manager and metric data loader to support data loading
from pig aggregation. (Eric Yang)

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=786353&r1=786352&r2=786353&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
(original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
Fri Jun 19 01:56:53 2009
@@ -19,15 +19,14 @@
 package org.apache.hadoop.chukwa.datacollection.writer;
 
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.util.Calendar;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.ChunkImpl;
@@ -45,15 +44,15 @@
  * 
  */
 public class SeqFileWriter implements ChukwaWriter {
+  static Logger log = Logger.getLogger(SeqFileWriter.class);
   public static final boolean ENABLE_ROTATION = true;
 
   static final int STAT_INTERVAL_SECONDS = 30;
   static String localHostAddr = null;
   
-  static final Object lock = new Object();
-
-  static Logger log = Logger.getLogger(SeqFileWriter.class);
+  final Object lock = new Object();
 
+  
   private FileSystem fs = null;
   private Configuration conf = null;
 
@@ -63,23 +62,19 @@
   private Path currentPath = null;
   private String currentFileName = null;
   private FSDataOutputStream currentOutputStr = null;
-  private static SequenceFile.Writer seqFileWriter = null;
-
-  private static ClientAck clientAck = new ClientAck();
-  private static long nextRotate = 0;
-  private static int rotateInterval = 1000 * 60;
-
-  private static Timer clientAckTimer = null;
-
-  private Timer timer = null;
+  private SequenceFile.Writer seqFileWriter = null;
 
+  private int rotateInterval = 1000 * 60;
+  private long timePeriod = -1;
+  private long nextTimePeriodComputation = -1;
+  
+  private Timer rotateTimer = null;  
   private Timer statTimer = null;
+  
   private volatile long dataSize = 0;
-
-  private int initWriteChunkRetries = 10;
-  private int writeChunkRetries = initWriteChunkRetries;
-  private boolean chunksWrittenThisRotate = false;
-
+  private volatile boolean chunksWrittenThisRotate = false;
+  private volatile boolean isRunning = false;
+  
   static {
     try {
       localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
@@ -98,11 +93,7 @@
 
     rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
         1000 * 60 * 5);// defaults to 5 minutes
-    nextRotate = System.currentTimeMillis() + rotateInterval;
-
-    initWriteChunkRetries = conf
-        .getInt("chukwaCollector.writeChunkRetries", 10);
-    writeChunkRetries = initWriteChunkRetries;
+    
 
     // check if they've told us the file system to use
     String fsname = conf.get("writer.hdfs.filesystem");
@@ -137,43 +128,11 @@
     // Setup everything by rotating
     rotate();
 
-    clientAckTimer = new Timer();
-    clientAckTimer.schedule(new TimerTask() {
-      public void run() {
-        synchronized (lock) {
-          ClientAck previous = clientAck;
-          SeqFileWriter.clientAck = new ClientAck();
-
-          try {
-            // SeqFile is uncompressed for now
-            // So we can flush every xx secs
-            // But if we're using block Compression
-            // this is not true anymore
-            // because this will trigger
-            // the compression
-            if (currentOutputStr != null) {
-              currentOutputStr.flush();
-            }
-            previous.releaseLock(ClientAck.OK, null);
-            long now = System.currentTimeMillis();
-            if (now >= nextRotate) {
-              nextRotate = System.currentTimeMillis() + rotateInterval;
-              rotate();
-            }
-          } catch (Throwable e) {
-            previous.releaseLock(ClientAck.KO, e);
-            log.warn("Exception when flushing ", e);
-            e.printStackTrace();
-          }
-        }
-      }
-
-    }, (5 * 1000), (5 * 1000));
-
     statTimer = new Timer();
     statTimer.schedule(new StatReportingTask(), 1000,
         STAT_INTERVAL_SECONDS * 1000);
 
+    isRunning = true;
   }
 
   private class StatReportingTask extends TimerTask {
@@ -195,6 +154,10 @@
   };
 
   void rotate() {
+     if (rotateTimer != null) {
+      rotateTimer.cancel();
+    }
+    
     calendar.setTimeInMillis(System.currentTimeMillis());
 
     log.info("start Date [" + calendar.getTime() + "]");
@@ -208,8 +171,11 @@
     newName = newName.replace(".", "");
     newName = outputDir + "/" + newName.trim();
 
+    boolean bailOut = false;
+
     synchronized (lock) {
       try {
+
         FSDataOutputStream previousOutputStr = currentOutputStr;
         Path previousPath = currentPath;
         String previousFileName = currentFileName;
@@ -217,6 +183,7 @@
         if (previousOutputStr != null) {
           previousOutputStr.close();
           if (chunksWrittenThisRotate) {
+            log.info("rotate file on HDFS");
             fs.rename(previousPath, new Path(previousFileName + ".done"));
           } else {
             log.info("no chunks written to " + previousPath + ", deleting");
@@ -233,45 +200,64 @@
         seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
             ChukwaArchiveKey.class, ChunkImpl.class,
             SequenceFile.CompressionType.NONE, null);
-
-      } catch (IOException e) {
-        log.fatal("IO Exception in rotate. Exiting!");
-        e.printStackTrace();
-        // TODO
-        // As discussed for now:
-        // Everytime this happen in the past it was because HDFS was down,
-        // so there's nothing we can do
-        // Shutting down the collector for now
-        // Watchdog will re-start it automatically
-        DaemonWatcher.bailout(-1);
+      } catch (Throwable e) {
+        log.warn("Got an exception in rotate",e);
+        bailOut = true;
+        isRunning = false;
       }
     }
+    
+    if (bailOut) {
+      log.fatal("IO Exception in rotate. Exiting!");
+      // As discussed for now:
+      // Everytime this happen in the past it was because HDFS was down,
+      // so there's nothing we can do
+      // Shutting down the collector for now
+      // Watchdog will re-start it automatically
+      DaemonWatcher.bailout(-1);
+    }   
 
-    log.debug("finished rotate()");
+    // Schedule the next timer
+    rotateTimer = new Timer();
+    rotateTimer.schedule(new TimerTask() {
+      public void run() {
+        rotate();
+      }
+    }, rotateInterval);
+    
   }
 
+  
+  protected void computeTimePeriod() {
+    synchronized (calendar) {
+      calendar.setTimeInMillis(System.currentTimeMillis());
+      calendar.set(Calendar.MINUTE, 0);
+      calendar.set(Calendar.SECOND, 0);
+      calendar.set(Calendar.MILLISECOND, 0);
+      timePeriod = calendar.getTimeInMillis();
+      calendar.add(Calendar.HOUR, 1);
+      nextTimePeriodComputation = calendar.getTimeInMillis();
+    }
+  }
+  
   @Override
   public void add(List<Chunk> chunks) throws WriterException {
+    if (!isRunning) {
+      log.info("Collector not ready");
+      throw new WriterException("Collector not ready");
+    }
+
     if (chunks != null) {
       try {
         chunksWrittenThisRotate = true;
         ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
-
-        // FIXME compute this once an hour
-        // 
-        synchronized (calendar) {
-          calendar.setTimeInMillis(System.currentTimeMillis());
-          calendar.set(Calendar.MINUTE, 0);
-          calendar.set(Calendar.SECOND, 0);
-          calendar.set(Calendar.MILLISECOND, 0);
-
-          archiveKey.setTimePartition(calendar.getTimeInMillis());
+        
+        if (System.currentTimeMillis() >= nextTimePeriodComputation) {
+          computeTimePeriod();
         }
-
-        ClientAck localClientAck = null;
         synchronized (lock) {
-          localClientAck = SeqFileWriter.clientAck;
           for (Chunk chunk : chunks) {
+            archiveKey.setTimePartition(timePeriod);
             archiveKey.setDataType(chunk.getDataType());
             archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
                 + "/" + chunk.getStreamName());
@@ -285,54 +271,43 @@
 
           }
         }// End synchro
-
-        localClientAck.wait4Ack();
-        if (localClientAck.getStatus() != ClientAck.OK) {
-          log
-              .warn("Exception after notyfyAll on the lock - Thread:"
-                  + Thread.currentThread().getName(), localClientAck
-                  .getException());
-          throw new WriterException(localClientAck.getException());
-        } else {
-          // sucess
-          writeChunkRetries = initWriteChunkRetries;
-        }
-
-      } catch (IOException e) {
-        writeChunkRetries--;
-        log.error("Could not save the chunk. ", e);
-
-        if (writeChunkRetries < 0) {
-          log
-              .fatal("Too many IOException when trying to write a chunk, Collector is going
to exit!");
-          DaemonWatcher.bailout(-1);
-        }
-        throw new WriterException(e);
+      } catch (Throwable e) {
+        // We don't want to loose anything
+        log.fatal("IOException when trying to write a chunk, Collector is going to exit!");
+        DaemonWatcher.bailout(-1);
+        isRunning = false;
       }
     }
-
   }
 
   public void close() {
-    synchronized (lock) {
-      if (timer != null)
-        timer.cancel();
-      if (statTimer != null)
-        statTimer.cancel();
-      if (clientAckTimer != null)
-        clientAckTimer.cancel();
-      try {
+    
+    isRunning = false;
 
+    if (statTimer != null) {
+      statTimer.cancel();
+    }
+
+    if (rotateTimer != null) {
+      rotateTimer.cancel();
+    }
+
+    // If we are here it's either because of an HDFS exception
+    // or Agent has received a kill -TERM
+    // In both cases, we will not be able to execute any
+    // HDFS command so There's no point in trying to 
+    // close or rename the file
+    // see HDFS shutdownHook Jira
+    // but just in case someone fixes this issue
+    try {
+      synchronized(lock) {
         if (this.currentOutputStr != null) {
           this.currentOutputStr.close();
         }
-
-        clientAck.releaseLock(ClientAck.OK, null);
         fs.rename(currentPath, new Path(currentFileName + ".done"));
-      } catch (IOException e) {
-        clientAck.releaseLock(ClientAck.OK, e);
-        log.error("failed to close and rename stream", e);
       }
+    } catch (Throwable e) {
+     log.warn("cannot rename dataSink file:" + currentPath,e);
     }
   }
 

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=786353&r1=786352&r2=786353&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Fri Jun 19 01:56:53 2009
@@ -32,6 +32,8 @@
 
   IMPROVEMENTS
 
+    CHUKWA-313. Removed the 10 second ack from SeqFileWriter. (Eric Yang)
+
     CHUKWA-5. Adaptors have durable names. (asrabkin)
 
     CHUKWA-280. Added end to end test to detect iostat overflow. (Eric Yang)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=786353&r1=786352&r2=786353&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
Fri Jun 19 01:56:53 2009
@@ -19,15 +19,14 @@
 package org.apache.hadoop.chukwa.datacollection.writer;
 
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.util.Calendar;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.ChunkImpl;
@@ -45,15 +44,15 @@
  * 
  */
 public class SeqFileWriter implements ChukwaWriter {
+  static Logger log = Logger.getLogger(SeqFileWriter.class);
   public static final boolean ENABLE_ROTATION = true;
 
   static final int STAT_INTERVAL_SECONDS = 30;
   static String localHostAddr = null;
   
-  static final Object lock = new Object();
-
-  static Logger log = Logger.getLogger(SeqFileWriter.class);
+  final Object lock = new Object();
 
+  
   private FileSystem fs = null;
   private Configuration conf = null;
 
@@ -63,23 +62,19 @@
   private Path currentPath = null;
   private String currentFileName = null;
   private FSDataOutputStream currentOutputStr = null;
-  private static SequenceFile.Writer seqFileWriter = null;
-
-  private static ClientAck clientAck = new ClientAck();
-  private static long nextRotate = 0;
-  private static int rotateInterval = 1000 * 60;
-
-  private static Timer clientAckTimer = null;
-
-  private Timer timer = null;
+  private SequenceFile.Writer seqFileWriter = null;
 
+  private int rotateInterval = 1000 * 60;
+  private long timePeriod = -1;
+  private long nextTimePeriodComputation = -1;
+  
+  private Timer rotateTimer = null;  
   private Timer statTimer = null;
+  
   private volatile long dataSize = 0;
-
-  private int initWriteChunkRetries = 10;
-  private int writeChunkRetries = initWriteChunkRetries;
-  private boolean chunksWrittenThisRotate = false;
-
+  private volatile boolean chunksWrittenThisRotate = false;
+  private volatile boolean isRunning = false;
+  
   static {
     try {
       localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
@@ -98,11 +93,7 @@
 
     rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
         1000 * 60 * 5);// defaults to 5 minutes
-    nextRotate = System.currentTimeMillis() + rotateInterval;
-
-    initWriteChunkRetries = conf
-        .getInt("chukwaCollector.writeChunkRetries", 10);
-    writeChunkRetries = initWriteChunkRetries;
+    
 
     // check if they've told us the file system to use
     String fsname = conf.get("writer.hdfs.filesystem");
@@ -137,43 +128,11 @@
     // Setup everything by rotating
     rotate();
 
-    clientAckTimer = new Timer();
-    clientAckTimer.schedule(new TimerTask() {
-      public void run() {
-        synchronized (lock) {
-          ClientAck previous = clientAck;
-          SeqFileWriter.clientAck = new ClientAck();
-
-          try {
-            // SeqFile is uncompressed for now
-            // So we can flush every xx secs
-            // But if we're using block Compression
-            // this is not true anymore
-            // because this will trigger
-            // the compression
-            if (currentOutputStr != null) {
-              currentOutputStr.flush();
-            }
-            previous.releaseLock(ClientAck.OK, null);
-            long now = System.currentTimeMillis();
-            if (now >= nextRotate) {
-              nextRotate = System.currentTimeMillis() + rotateInterval;
-              rotate();
-            }
-          } catch (Throwable e) {
-            previous.releaseLock(ClientAck.KO, e);
-            log.warn("Exception when flushing ", e);
-            e.printStackTrace();
-          }
-        }
-      }
-
-    }, (5 * 1000), (5 * 1000));
-
     statTimer = new Timer();
     statTimer.schedule(new StatReportingTask(), 1000,
         STAT_INTERVAL_SECONDS * 1000);
 
+    isRunning = true;
   }
 
   private class StatReportingTask extends TimerTask {
@@ -195,6 +154,10 @@
   };
 
   void rotate() {
+     if (rotateTimer != null) {
+      rotateTimer.cancel();
+    }
+    
     calendar.setTimeInMillis(System.currentTimeMillis());
 
     log.info("start Date [" + calendar.getTime() + "]");
@@ -208,8 +171,11 @@
     newName = newName.replace(".", "");
     newName = outputDir + "/" + newName.trim();
 
+    boolean bailOut = false;
+
     synchronized (lock) {
       try {
+
         FSDataOutputStream previousOutputStr = currentOutputStr;
         Path previousPath = currentPath;
         String previousFileName = currentFileName;
@@ -217,6 +183,7 @@
         if (previousOutputStr != null) {
           previousOutputStr.close();
           if (chunksWrittenThisRotate) {
+            log.info("rotate file on HDFS");
             fs.rename(previousPath, new Path(previousFileName + ".done"));
           } else {
             log.info("no chunks written to " + previousPath + ", deleting");
@@ -233,45 +200,64 @@
         seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
             ChukwaArchiveKey.class, ChunkImpl.class,
             SequenceFile.CompressionType.NONE, null);
-
-      } catch (IOException e) {
-        log.fatal("IO Exception in rotate. Exiting!");
-        e.printStackTrace();
-        // TODO
-        // As discussed for now:
-        // Everytime this happen in the past it was because HDFS was down,
-        // so there's nothing we can do
-        // Shutting down the collector for now
-        // Watchdog will re-start it automatically
-        DaemonWatcher.bailout(-1);
+      } catch (Throwable e) {
+        log.warn("Got an exception in rotate",e);
+        bailOut = true;
+        isRunning = false;
       }
     }
+    
+    if (bailOut) {
+      log.fatal("IO Exception in rotate. Exiting!");
+      // As discussed for now:
+      // Everytime this happen in the past it was because HDFS was down,
+      // so there's nothing we can do
+      // Shutting down the collector for now
+      // Watchdog will re-start it automatically
+      DaemonWatcher.bailout(-1);
+    }   
 
-    log.debug("finished rotate()");
+    // Schedule the next timer
+    rotateTimer = new Timer();
+    rotateTimer.schedule(new TimerTask() {
+      public void run() {
+        rotate();
+      }
+    }, rotateInterval);
+    
   }
 
+  
+  protected void computeTimePeriod() {
+    synchronized (calendar) {
+      calendar.setTimeInMillis(System.currentTimeMillis());
+      calendar.set(Calendar.MINUTE, 0);
+      calendar.set(Calendar.SECOND, 0);
+      calendar.set(Calendar.MILLISECOND, 0);
+      timePeriod = calendar.getTimeInMillis();
+      calendar.add(Calendar.HOUR, 1);
+      nextTimePeriodComputation = calendar.getTimeInMillis();
+    }
+  }
+  
   @Override
   public void add(List<Chunk> chunks) throws WriterException {
+    if (!isRunning) {
+      log.info("Collector not ready");
+      throw new WriterException("Collector not ready");
+    }
+
     if (chunks != null) {
       try {
         chunksWrittenThisRotate = true;
         ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
-
-        // FIXME compute this once an hour
-        // 
-        synchronized (calendar) {
-          calendar.setTimeInMillis(System.currentTimeMillis());
-          calendar.set(Calendar.MINUTE, 0);
-          calendar.set(Calendar.SECOND, 0);
-          calendar.set(Calendar.MILLISECOND, 0);
-
-          archiveKey.setTimePartition(calendar.getTimeInMillis());
+        
+        if (System.currentTimeMillis() >= nextTimePeriodComputation) {
+          computeTimePeriod();
         }
-
-        ClientAck localClientAck = null;
         synchronized (lock) {
-          localClientAck = SeqFileWriter.clientAck;
           for (Chunk chunk : chunks) {
+            archiveKey.setTimePartition(timePeriod);
             archiveKey.setDataType(chunk.getDataType());
             archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
                 + "/" + chunk.getStreamName());
@@ -285,54 +271,43 @@
 
           }
         }// End synchro
-
-        localClientAck.wait4Ack();
-        if (localClientAck.getStatus() != ClientAck.OK) {
-          log
-              .warn("Exception after notyfyAll on the lock - Thread:"
-                  + Thread.currentThread().getName(), localClientAck
-                  .getException());
-          throw new WriterException(localClientAck.getException());
-        } else {
-          // sucess
-          writeChunkRetries = initWriteChunkRetries;
-        }
-
-      } catch (IOException e) {
-        writeChunkRetries--;
-        log.error("Could not save the chunk. ", e);
-
-        if (writeChunkRetries < 0) {
-          log
-              .fatal("Too many IOException when trying to write a chunk, Collector is going
to exit!");
-          DaemonWatcher.bailout(-1);
-        }
-        throw new WriterException(e);
+      } catch (Throwable e) {
+        // We don't want to loose anything
+        log.fatal("IOException when trying to write a chunk, Collector is going to exit!");
+        DaemonWatcher.bailout(-1);
+        isRunning = false;
       }
     }
-
   }
 
   public void close() {
-    synchronized (lock) {
-      if (timer != null)
-        timer.cancel();
-      if (statTimer != null)
-        statTimer.cancel();
-      if (clientAckTimer != null)
-        clientAckTimer.cancel();
-      try {
+    
+    isRunning = false;
 
+    if (statTimer != null) {
+      statTimer.cancel();
+    }
+
+    if (rotateTimer != null) {
+      rotateTimer.cancel();
+    }
+
+    // If we are here it's either because of an HDFS exception
+    // or Agent has received a kill -TERM
+    // In both cases, we will not be able to execute any
+    // HDFS command so There's no point in trying to 
+    // close or rename the file
+    // see HDFS shutdownHook Jira
+    // but just in case someone fixes this issue
+    try {
+      synchronized(lock) {
         if (this.currentOutputStr != null) {
           this.currentOutputStr.close();
         }
-
-        clientAck.releaseLock(ClientAck.OK, null);
         fs.rename(currentPath, new Path(currentFileName + ".done"));
-      } catch (IOException e) {
-        clientAck.releaseLock(ClientAck.OK, e);
-        log.error("failed to close and rename stream", e);
       }
+    } catch (Throwable e) {
+     log.warn("cannot rename dataSink file:" + currentPath,e);
     }
   }
 



Mime
View raw message