apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pra...@apache.org
Subject [1/2] incubator-apex-malhar git commit: MLHR-1825 #comment handling case when no open part is updated but stream is not created
Date Fri, 04 Sep 2015 05:00:58 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 cdc8b28af -> 30e4b1ff6


MLHR-1825 #comment handling case when no open part is updated but stream is not created


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c6da4a05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c6da4a05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c6da4a05

Branch: refs/heads/devel-3
Commit: c6da4a05e3df215a39eb06752e31760bb206528b
Parents: 91321ce
Author: Chandni Singh <csingh@apache.org>
Authored: Tue Sep 1 15:57:10 2015 -0700
Committer: Chandni Singh <csingh@apache.org>
Committed: Wed Sep 2 13:48:12 2015 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   | 36 +++++++++++++-------
 1 file changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c6da4a05/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 3f45afb..a589751 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -484,34 +484,40 @@ public abstract class AbstractFileOutputOperator<INPUT> extends
BaseOperator imp
         //of this operator.
         for(String seenFileName: endOffsets.keySet()) {
           try {
-            Integer part = openPart.get(seenFileName).getValue() + 1;
+            Integer fileOpenPart = this.openPart.get(seenFileName).getValue();
+            int nextPart = fileOpenPart + 1;
             String seenPartFileName;
             while (true) {
-              seenPartFileName = getPartFileName(seenFileName, part);
-              Path activePath;
+              seenPartFileName = getPartFileName(seenFileName, nextPart);
+              Path activePath = null;
               if (alwaysWriteToTmp) {
                 String tmpFileName = fileNameToTmpName.get(seenPartFileName);
-                activePath = new Path(filePath + Path.SEPARATOR + tmpFileName);
+                if (tmpFileName != null) {
+                  activePath = new Path(filePath + Path.SEPARATOR + tmpFileName);
+                }
               } else {
                 activePath = new Path(filePath + Path.SEPARATOR + seenPartFileName);
               }
-              if (!fs.exists(activePath)) {
+              if (activePath == null || !fs.exists(activePath)) {
                 break;
               }
 
               fs.delete(activePath, true);
-              part = part + 1;
+              nextPart++;
             }
 
-            seenPartFileName = getPartFileName(seenFileName, openPart.get(seenFileName).intValue());
-            Path activeFilePath;
+            seenPartFileName = getPartFileName(seenFileName, fileOpenPart);
+            Path activePath = null;
             if (alwaysWriteToTmp) {
-              activeFilePath = new Path(filePath + Path.SEPARATOR + fileNameToTmpName.get(seenPartFileName));
+              String tmpFileName = fileNameToTmpName.get(seenPartFileName);
+              if (tmpFileName != null) {
+                activePath = new Path(filePath + Path.SEPARATOR + fileNameToTmpName.get(seenPartFileName));
+              }
             } else {
-              activeFilePath = new Path(filePath + Path.SEPARATOR + seenPartFileName);
+              activePath = new Path(filePath + Path.SEPARATOR + seenPartFileName);
             }
 
-            if (fs.getFileStatus(activeFilePath).getLen() > maxLength) {
+            if (activePath != null && fs.getFileStatus(activePath).getLen() >
maxLength) {
               //Handle the case when restoring to a checkpoint where the current rolling
file
               //already has a length greater than max length.
               LOG.debug("rotating file at setup.");
@@ -1143,8 +1149,12 @@ public abstract class AbstractFileOutputOperator<INPUT> extends
BaseOperator imp
     for (FileStatus status : statuses) {
       String statusName = status.getPath().getName();
       if (statusName.endsWith(TMP_EXTENSION) && statusName.startsWith(fileName))
{
-        LOG.debug("deleting vagrant file {}", statusName);
-        fs.delete(status.getPath(), true);
+        //a tmp file has tmp extension always preceded by timestamp
+        String actualFileName = statusName.substring(0, statusName.lastIndexOf('.', statusName.lastIndexOf('.')
- 1));
+        if (fileName.equals(actualFileName)) {
+          LOG.debug("deleting vagrant file {}", statusName);
+          fs.delete(status.getPath(), true);
+        }
       }
     }
   }


Mime
View raw message