nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [1/2] nifi git commit: NIFI-744: Do not allow StandardContentClaim's offset to be updated. Data should be read-only once it has been written
Date Mon, 03 Aug 2015 19:56:46 GMT
Repository: nifi
Updated Branches:
  refs/heads/NIFI-744 53a6e962d -> 05d4b067a


NIFI-744: Do not allow StandardContentClaim's offset to be updated. Data should be read-only
once it has been written


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/84aae10a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/84aae10a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/84aae10a

Branch: refs/heads/NIFI-744
Commit: 84aae10a17148785d0d8a45d2f9cfa8018a04850
Parents: b8cee51
Author: Mark Payne <markap14@hotmail.com>
Authored: Fri Jul 31 13:48:32 2015 -0400
Committer: Mark Payne <markap14@hotmail.com>
Committed: Fri Jul 31 17:42:19 2015 -0400

----------------------------------------------------------------------
 .../repository/ContentRepository.java           | 10 ++-
 .../SynchronizedByteCountingOutputStream.java   | 66 +++++++++++++++++
 .../repository/FileSystemRepository.java        | 23 +++---
 .../repository/StandardProcessSession.java      | 27 ++++---
 .../repository/claim/StandardContentClaim.java  | 18 ++---
 .../repository/TestFileSystemRepository.java    | 76 --------------------
 6 files changed, 105 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/84aae10a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
index da87d75..8d0bdb3 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
@@ -173,9 +173,13 @@ public interface ContentRepository {
      * @param content to import from
      * @param claim the claim to write imported content to
      * @param append if true, the content will be appended to the claim; if
-     * false, the content will replace the contents of the claim
+     *        false, the content will replace the contents of the claim
      * @throws IOException if unable to read content
+     *
+     * @deprecated if needing to append to a content claim, the contents of the claim should
be
+     *             copied to a new claim and then the data to append should be written to
that new claim.
      */
+    @Deprecated
     long importFrom(Path content, ContentClaim claim, boolean append) throws IOException;
 
     /**
@@ -198,7 +202,11 @@ public interface ContentRepository {
      * @param append whether to append or replace
      * @return length of data imported in bytes
      * @throws IOException if failure to read or write stream
+     *
+     * @deprecated if needing to append to a content claim, the contents of the claim should
be
+     * copied to a new claim and then the data to append should be written to that new claim.
      */
+    @Deprecated
     long importFrom(InputStream content, ContentClaim claim, boolean append) throws IOException;
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/84aae10a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/SynchronizedByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/SynchronizedByteCountingOutputStream.java
b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/SynchronizedByteCountingOutputStream.java
new file mode 100644
index 0000000..e617829
--- /dev/null
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/SynchronizedByteCountingOutputStream.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class SynchronizedByteCountingOutputStream extends ByteCountingOutputStream {
+
+    public SynchronizedByteCountingOutputStream(final OutputStream out) {
+        super(out);
+    }
+
+    public SynchronizedByteCountingOutputStream(final OutputStream out, final long byteCount)
{
+        super(out, byteCount);
+    }
+
+    @Override
+    public synchronized void flush() throws IOException {
+        super.flush();
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+        super.close();
+    }
+
+    @Override
+    public synchronized long getBytesWritten() {
+        return super.getBytesWritten();
+    }
+
+    @Override
+    public synchronized OutputStream getWrappedStream() {
+        return super.getWrappedStream();
+    }
+
+    @Override
+    public synchronized void write(final byte[] b) throws IOException {
+        super.write(b);
+    }
+
+    @Override
+    public synchronized void write(final int b) throws IOException {
+        super.write(b);
+    }
+
+    @Override
+    public synchronized void write(final byte[] b, final int off, final int len) throws IOException
{
+        super.write(b, off, len);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/84aae10a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 3a6338c..31835dc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -67,6 +67,7 @@ import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.LongHolder;
 import org.apache.nifi.util.NiFiProperties;
@@ -764,7 +765,7 @@ public class FileSystemRepository implements ContentRepository {
 
         // see javadocs for claim.getLength() as to why we do this.
         if (claim.getLength() < 0) {
-            return Files.size(getPath(claim, true));
+			return Files.size(getPath(claim, true)) - claim.getOffset();
         }
 
         return claim.getLength();
@@ -806,6 +807,9 @@ public class FileSystemRepository implements ContentRepository {
         }
 
         final StandardContentClaim scc = (StandardContentClaim) claim;
+        if (claim.getLength() > 0) {
+            throw new IllegalArgumentException("Cannot write to " + claim + " because it
has already been written to.");
+        }
 
         // we always append because there may be another ContentClaim using the same resource
claim.
         // However, we know that we will never write to the same claim from two different
threads
@@ -816,14 +820,14 @@ public class FileSystemRepository implements ContentRepository {
         final long initialLength;
         if (claimStream == null) {
             final File file = getPath(scc).toFile();
-            claimStream = new ByteCountingOutputStream(new FileOutputStream(file, true),
file.length());
+            // use a synchronized stream because we want to pass this OutputStream out from
one thread to another.
+            claimStream = new SynchronizedByteCountingOutputStream(new FileOutputStream(file,
true), file.length());
             initialLength = 0L;
         } else {
             if (append) {
                 initialLength = Math.max(0, scc.getLength());
             } else {
                 initialLength = 0;
-                scc.setOffset(claimStream.getBytesWritten());
             }
         }
 
@@ -924,22 +928,15 @@ public class FileSystemRepository implements ContentRepository {
                     // the queue because we need to ensure that the latter operation does
not cause problems
                     // with the former.
                     final ClaimLengthPair pair = new ClaimLengthPair(scc.getResourceClaim(),
resourceClaimLength);
-                    final boolean enqueued;
-                    if (writableClaimQueue.contains(pair)) {
-                        // may already exist on the queue, if the content claim is written
to multiple times.
-                        enqueued = true;
-                    } else {
-                        enqueued = writableClaimQueue.offer(pair);
-                    }
+                    final boolean enqueued = writableClaimQueue.offer(pair);
 
                     if (enqueued) {
                         writableClaimStreams.put(scc.getResourceClaim(), bcos);
                         LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams",
this);
                     } else {
-                        writableClaimStreams.remove(scc.getResourceClaim());
                         bcos.close();
 
-                        LOG.debug("Claim length less than max; Closing {}", this);
+                        LOG.debug("Claim length less than max; Closing {} because could not
add back to queue", this);
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("Stack trace: ", new RuntimeException("Stack Trace
for closing " + this));
                         }
@@ -947,7 +944,7 @@ public class FileSystemRepository implements ContentRepository {
                 } else {
                     // we've reached the limit for this claim. Don't add it back to our queue.
                     // Instead, just remove it and move on.
-                    writableClaimStreams.remove(scc.getResourceClaim());
+
                     // ensure that the claim is no longer on the queue
                     writableClaimQueue.remove(new ClaimLengthPair(scc.getResourceClaim(),
resourceClaimLength));
                     bcos.close();

http://git-wip-us.apache.org/repos/asf/nifi/blob/84aae10a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 3e2868e..62a001c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1880,8 +1880,6 @@ public final class StandardProcessSession implements ProcessSession,
ProvenanceE
     public FlowFile write(final FlowFile source, final OutputStreamCallback writer) {
         validateRecordState(source);
         final StandardRepositoryRecord record = records.get(source);
-        long newSize = 0L;
-        final long claimOffset = 0L;
 
         ContentClaim newClaim = null;
         final LongHolder writtenHolder = new LongHolder(0L);
@@ -1898,7 +1896,6 @@ public final class StandardProcessSession implements ProcessSession,
ProvenanceE
             } finally {
                 recursionSet.remove(source);
             }
-            newSize = context.getContentRepository().size(newClaim);
         } catch (final ContentNotFoundException nfe) {
             resetWriteClaims(); // need to reset write claim before we can remove the claim
             destroyContent(newClaim);
@@ -1920,7 +1917,13 @@ public final class StandardProcessSession implements ProcessSession,
ProvenanceE
         }
 
         removeTemporaryClaim(record);
-        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).build();
+        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
+        	.fromFlowFile(record.getCurrent())
+        	.contentClaim(newClaim)
+        	.contentClaimOffset(0)
+        	.size(writtenHolder.getValue())
+        	.build();
+
         record.setWorking(newFile);
         return newFile;
     }
@@ -2068,8 +2071,6 @@ public final class StandardProcessSession implements ProcessSession,
ProvenanceE
         final ContentClaim currClaim = record.getCurrentClaim();
 
         ContentClaim newClaim = null;
-        long newSize = 0L;
-        final long claimOffset = 0L;
         final LongHolder writtenHolder = new LongHolder(0L);
         try {
             newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
@@ -2109,8 +2110,6 @@ public final class StandardProcessSession implements ProcessSession,
ProvenanceE
                     }
                 }
             }
-
-            newSize = context.getContentRepository().size(newClaim);
         } catch (final ContentNotFoundException nfe) {
             destroyContent(newClaim);
             handleContentNotFound(nfe, record);
@@ -2128,7 +2127,13 @@ public final class StandardProcessSession implements ProcessSession,
ProvenanceE
         }
 
         removeTemporaryClaim(record);
-        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).build();
+        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
+        	.fromFlowFile(record.getCurrent())
+        	.contentClaim(newClaim)
+        	.contentClaimOffset(0L)
+        	.size(writtenHolder.getValue())
+        	.build();
+
         record.setWorking(newFile);
         return newFile;
     }
@@ -2157,7 +2162,7 @@ public final class StandardProcessSession implements ProcessSession,
ProvenanceE
         claimOffset = 0L;
         long newSize = 0L;
         try {
-            newSize = context.getContentRepository().importFrom(source, newClaim, false);
+            newSize = context.getContentRepository().importFrom(source, newClaim);
             bytesWritten.increment(newSize);
             bytesRead.increment(newSize);
         } catch (final Throwable t) {
@@ -2191,7 +2196,7 @@ public final class StandardProcessSession implements ProcessSession,
ProvenanceE
                 newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
                 claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim,
destination);
 
-                newSize = context.getContentRepository().importFrom(source, newClaim, false);
+                newSize = context.getContentRepository().importFrom(source, newClaim);
                 bytesWritten.increment(newSize);
             } catch (final IOException e) {
                 throw new FlowFileAccessException("Unable to create ContentClaim due to "
+ e.toString(), e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/84aae10a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
index 753e818..62ff276 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
@@ -28,29 +28,24 @@ package org.apache.nifi.controller.repository.claim;
 public final class StandardContentClaim implements ContentClaim, Comparable<ContentClaim>
{
 
     private final ResourceClaim resourceClaim;
-    private final int hashCode;
-    private volatile long offset;
+    private final long offset;
     private volatile long length;
 
     public StandardContentClaim(final ResourceClaim resourceClaim, final long offset) {
         this.resourceClaim = resourceClaim;
         this.offset = offset;
         this.length = -1L;
-        this.hashCode = calculateHashCode();
     }
 
     public void setLength(final long length) {
         this.length = length;
     }
 
-    public void setOffset(final long offset) {
-        this.offset = offset;
-    }
-
-    private int calculateHashCode() {
+	@Override
+	public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + hashCode;
+		result = prime * result;
         result = prime * result + (int) (length ^ length >>> 32);
         result = prime * result + (int) (offset ^ offset >>> 32);
         result = prime * result + (resourceClaim == null ? 0 : resourceClaim.hashCode());
@@ -58,11 +53,6 @@ public final class StandardContentClaim implements ContentClaim, Comparable<Cont
     }
 
     @Override
-    public int hashCode() {
-        return this.hashCode;
-    }
-
-    @Override
     public boolean equals(final Object obj) {
         if (this == obj) {
             return true;

http://git-wip-us.apache.org/repos/asf/nifi/blob/84aae10a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index 519ba9c..5ffcb3d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -36,10 +36,8 @@ import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Random;
 
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.repository.util.DiskUtils;
 import org.apache.nifi.stream.io.StreamUtils;
@@ -136,54 +134,6 @@ public class TestFileSystemRepository {
         assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim());
     }
 
-    @Test
-    public void testRewriteContentClaim() throws IOException {
-        final ContentClaim claim1 = repository.create(false);
-        assertEquals(1, repository.getClaimantCount(claim1));
-
-        try (final OutputStream out = repository.write(claim1)) {
-            out.write("abc".getBytes());
-        }
-        assertEquals(1, repository.getClaimantCount(claim1));
-
-        try (final OutputStream out = repository.write(claim1)) {
-            out.write("cba".getBytes());
-        }
-        assertEquals(1, repository.getClaimantCount(claim1));
-
-        try (final InputStream in = repository.read(claim1)) {
-            assertEquals('c', in.read());
-            assertEquals('b', in.read());
-            assertEquals('a', in.read());
-        }
-        assertEquals(1, repository.getClaimantCount(claim1));
-
-        assertEquals(3, repository.size(claim1));
-
-        final byte[] oneMB = new byte[1024 * 1024 - 6];
-        new Random().nextBytes(oneMB);
-        try (final OutputStream out = repository.write(claim1)) {
-            out.write(oneMB);
-        }
-        assertEquals(1, repository.getClaimantCount(claim1));
-
-        assertEquals(1024 * 1024 - 6, repository.size(claim1));
-        try (final InputStream in = repository.read(claim1)) {
-            final byte[] buff = new byte[oneMB.length];
-            StreamUtils.fillBuffer(in, buff);
-            assertTrue(Arrays.equals(buff, oneMB));
-        }
-
-        final ResourceClaim resourceClaim = claim1.getResourceClaim();
-        final Path path = rootFile.toPath().resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
-        assertTrue(Files.exists(path));
-        assertEquals(0, repository.decrementClaimantCount(claim1));
-        assertTrue(repository.remove(claim1));
-        assertFalse(Files.exists(path));
-
-        final ContentClaim claim2 = repository.create(false);
-        assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim());
-    }
 
     @Test
     public void testWriteWithNoContent() throws IOException {
@@ -292,32 +242,6 @@ public class TestFileSystemRepository {
         assertTrue(Arrays.equals(expected, baos.toByteArray()));
     }
 
-    @Test
-    public void testImportFromFileWithAppend() throws IOException {
-        final ContentClaim claim = repository.create(false);
-        final File hello = new File("src/test/resources/hello.txt");
-        final File goodbye = new File("src/test/resources/bye.txt");
-
-        repository.importFrom(hello.toPath(), claim, true);
-        assertContentEquals(claim, "Hello, World");
-
-        repository.importFrom(goodbye.toPath(), claim, true);
-        assertContentEquals(claim, "Hello, WorldGood-Bye, World!");
-
-        repository.importFrom(hello.toPath(), claim, true);
-        assertContentEquals(claim, "Hello, WorldGood-Bye, World!Hello, World");
-
-        repository.importFrom(goodbye.toPath(), claim, false);
-        assertContentEquals(claim, "Good-Bye, World!");
-    }
-
-    private void assertContentEquals(final ContentClaim claim, final String expected) throws
IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (final InputStream in = repository.read(claim)) {
-            StreamUtils.copy(in, baos);
-        }
-        assertEquals(expected, new String(baos.toByteArray()));
-    }
 
     @Test
     public void testImportFromStream() throws IOException {


Mime
View raw message