nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From exceptionfact...@apache.org
Subject [nifi] branch main updated: NIFI-8499 - Added encrypted FlowFile repository swap file implementation
Date Tue, 08 Jun 2021 18:24:33 GMT
This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new a3c1cd0  NIFI-8499 - Added encrypted FlowFile repository swap file implementation
a3c1cd0 is described below

commit a3c1cd074bafe6ed1975cef2a08793b2a47e4dff
Author: Paul Grey <greyp9@yahoo.com>
AuthorDate: Thu Jun 3 15:37:27 2021 -0400

    NIFI-8499 - Added encrypted FlowFile repository swap file implementation
    
    This closes #5122
    
    Signed-off-by: David Handermann <exceptionfactory@apache.org>
---
 .../src/main/asciidoc/administration-guide.adoc    |   6 +-
 .../controller/EncryptedFileSystemSwapManager.java | 104 +++++++++++++++++++
 .../nifi/controller/FileSystemSwapManager.java     |  21 ++--
 ....nifi.controller.repository.FlowFileSwapManager |   3 +-
 .../TestEncryptedFileSystemSwapManager.java        | 115 +++++++++++++++++++++
 5 files changed, 239 insertions(+), 10 deletions(-)

diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 6b68282..52d8008 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -427,7 +427,7 @@ To enable authentication via SAML the following properties must be configured
in
 |`nifi.security.user.saml.group.attribute.name`| The name of a SAML assertion attribute containing
group names the user belongs to. This property is optional, but if populated the groups will
be passed along to the authorization process.
 |`nifi.security.user.saml.metadata.signing.enabled`| Enables signing of the generated service
provider metadata.
 |`nifi.security.user.saml.request.signing.enabled`| Controls the value of `AuthnRequestsSigned`
in the generated service provider metadata from `nifi-api/access/saml/metadata`. This indicates
that the service provider (i.e. NiFi) should not sign authentication requests sent to the
identity provider, but the requests may still need to be signed if the identity provider indicates
`WantAuthnRequestSigned=true`.
-|`nifi.security.user.saml.want.assertions.signed`| Controls the value of `WantAssertionsSigned`
in the generated service provider metadata from `nifi-api/access/saml/metadata`. This indictaes
that the identity provider should sign assertions, but some identity providers may provide
their own configuration for controlling whether assertions are signed.
+|`nifi.security.user.saml.want.assertions.signed`| Controls the value of `WantAssertionsSigned`
in the generated service provider metadata from `nifi-api/access/saml/metadata`. This indicates
that the identity provider should sign assertions, but some identity providers may provide
their own configuration for controlling whether assertions are signed.
 |`nifi.security.user.saml.signature.algorithm`| The algorithm to use when signing SAML messages.
Reference the link:https://git.shibboleth.net/view/?p=java-xmltooling.git;a=blob;f=src/main/java/org/opensaml/xml/signature/SignatureConstants.java[Open
SAML Signature Constants] for a list of valid values. If not specified, a default of SHA-256
will be used.
 |`nifi.security.user.saml.signature.digest.algorithm`| The digest algorithm to use when signing
SAML messages. Reference the link:https://git.shibboleth.net/view/?p=java-xmltooling.git;a=blob;f=src/main/java/org/opensaml/xml/signature/SignatureConstants.java[Open
SAML Signature Constants] for a list of valid values. If not specified, a default of SHA-256
will be used.
 |`nifi.security.user.saml.message.logging.enabled`| Enables logging of SAML messages for
debugging purposes.
@@ -2961,7 +2961,9 @@ available again. These properties govern how that process occurs.
 
 |====
 |*Property*|*Description*
-|`nifi.swap.manager.implementation`|The Swap Manager implementation. The default value is
`org.apache.nifi.controller.FileSystemSwapManager` and should not be changed.
+|`nifi.swap.manager.implementation`| The Swap Manager implementation. The default value is
`org.apache.nifi.controller.FileSystemSwapManager`.
+There is an alternate implementation, `EncryptedFileSystemSwapManager`, that encrypts the
swap file content on
+disk.  The encryption key configured for the FlowFile repository is used to perform the encryption,
using the AES-GCM algorithm.
 |`nifi.queue.swap.threshold`|The queue threshold at which NiFi starts to swap FlowFile information
to disk. The default value is `20000`.
 |====
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EncryptedFileSystemSwapManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EncryptedFileSystemSwapManager.java
new file mode 100644
index 0000000..6e064e6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EncryptedFileSystemSwapManager.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.security.kms.CryptoUtils;
+import org.apache.nifi.security.kms.EncryptionException;
+import org.apache.nifi.security.kms.KeyProvider;
+import org.apache.nifi.security.repository.RepositoryEncryptorUtils;
+import org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.GCMParameterSpec;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.GeneralSecurityException;
+import java.security.SecureRandom;
+
+/**
+ * <p>
+ * An implementation of {@link FlowFileSwapManager} that swaps FlowFiles
+ * to/from local disk.  The swap file is encrypted using AES/GCM, using the
+ * encryption key defined in nifi.properties for the FlowFile repository.
+ * </p>
+ */
+public class EncryptedFileSystemSwapManager extends FileSystemSwapManager {
+
+    private static final String CIPHER_TRANSFORMATION = "AES/GCM/NoPadding";
+    private static final int SIZE_IV_AES_BYTES = 16;
+    private static final int SIZE_TAG_GCM_BITS = 128;
+
+    private static final Logger logger = LoggerFactory.getLogger(EncryptedFileSystemSwapManager.class);
+    private static final SecureRandom secureRandom = new SecureRandom();
+
+    private final SecretKey secretKey;
+
+    public EncryptedFileSystemSwapManager(final NiFiProperties nifiProperties)
+            throws IOException, EncryptionException, GeneralSecurityException {
+        super(nifiProperties);
+        // acquire reference to FlowFileRepository key
+        final FlowFileRepositoryEncryptionConfiguration configuration = new FlowFileRepositoryEncryptionConfiguration(nifiProperties);
+        if (!CryptoUtils.isValidRepositoryEncryptionConfiguration(configuration)) {
+            logger.error("The flowfile repository encryption configuration is not valid (see
above). Shutting down...");
+            throw new EncryptionException("The flowfile repository encryption configuration
is not valid");
+        }
+        final KeyProvider keyProvider = RepositoryEncryptorUtils.validateAndBuildRepositoryKeyProvider(configuration);
+        this.secretKey = keyProvider.getKey(configuration.getEncryptionKeyId());
+    }
+
+    protected InputStream getInputStream(final File file) throws IOException {
+        final FileInputStream fis = new FileInputStream(file);
+        try {
+            final byte[] iv = new byte[SIZE_IV_AES_BYTES];
+            final int ivBytesRead = fis.read(iv);
+            if (ivBytesRead != SIZE_IV_AES_BYTES) {
+                throw new IOException(String.format(
+                        "problem reading IV [expected=%d, actual=%d]", SIZE_IV_AES_BYTES,
ivBytesRead));
+            }
+            final Cipher cipher = Cipher.getInstance(CIPHER_TRANSFORMATION);
+            cipher.init(Cipher.DECRYPT_MODE, secretKey, new GCMParameterSpec(SIZE_TAG_GCM_BITS,
iv));
+            return new CipherInputStream(fis, cipher);
+        } catch (GeneralSecurityException e) {
+            throw new IOException(String.format("Preparing Cipher Failed for File [%s]",
file.getAbsolutePath()), e);
+        }
+    }
+
+    protected OutputStream getOutputStream(final File file) throws IOException {
+        final byte[] iv = new byte[SIZE_IV_AES_BYTES];
+        secureRandom.nextBytes(iv);
+        final FileOutputStream fos = new FileOutputStream(file);
+        fos.write(iv);
+        try {
+            final Cipher cipher = Cipher.getInstance(CIPHER_TRANSFORMATION);
+            cipher.init(Cipher.ENCRYPT_MODE, secretKey, new GCMParameterSpec(SIZE_TAG_GCM_BITS,
iv));
+            return new CipherOutputStream(fos, cipher);
+        } catch (GeneralSecurityException e) {
+            throw new IOException(String.format("Preparing Cipher Failed for File [%s]",
file.getAbsolutePath()), e);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 8885877..b01bdc9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -119,6 +119,13 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         this.flowFileRepository = initializationContext.getFlowFileRepository();
     }
 
+    protected InputStream getInputStream(final File file) throws IOException {
+        return new FileInputStream(file);
+    }
+
+    protected OutputStream getOutputStream(final File file) throws IOException {
+        return new FileOutputStream(file);
+    }
 
     @Override
     public String swapOut(final List<FlowFileRecord> toSwap, final FlowFileQueue flowFileQueue,
final String partitionName) throws IOException {
@@ -135,14 +142,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         final String swapLocation = swapFile.getAbsolutePath();
 
         final SwapSerializer serializer = new SchemaSwapSerializer();
-        try (final FileOutputStream fos = new FileOutputStream(swapTempFile);
-            final OutputStream out = new BufferedOutputStream(fos)) {
+        try (final OutputStream os = getOutputStream(swapTempFile);
+            final OutputStream out = new BufferedOutputStream(os)) {
             out.write(MAGIC_HEADER);
             final DataOutputStream dos = new DataOutputStream(out);
             dos.writeUTF(serializer.getSerializationName());
 
             serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, out);
-            fos.getFD().sync();
+            out.flush();
         } catch (final IOException ioe) {
             // we failed to write out the entire swap file. Delete the temporary file, if
we can.
             swapTempFile.delete();
@@ -188,8 +195,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
             throw new FileNotFoundException("Failed to swap in FlowFiles from external storage
location " + swapLocation + " into FlowFile Queue because the file could not be found");
         }
 
-        try (final InputStream fis = new FileInputStream(swapFile);
-                final InputStream bis = new BufferedInputStream(fis);
+        try (final InputStream is = getInputStream(swapFile);
+                final InputStream bis = new BufferedInputStream(is);
                 final DataInputStream in = new DataInputStream(bis)) {
 
             final SwapDeserializer deserializer = createSwapDeserializer(in);
@@ -318,7 +325,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
             }
 
             // Read the queue identifier from the swap file to check if the swap file is
for this queue
-            try (final InputStream fis = new FileInputStream(swapFile);
+            try (final InputStream fis = getInputStream(swapFile);
                     final InputStream bufferedIn = new BufferedInputStream(fis);
                     final DataInputStream in = new DataInputStream(bufferedIn)) {
 
@@ -351,7 +358,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         final File swapFile = new File(swapLocation);
 
         // read record from disk via the swap file
-        try (final InputStream fis = new FileInputStream(swapFile);
+        try (final InputStream fis = getInputStream(swapFile);
                 final InputStream bufferedIn = new BufferedInputStream(fis);
                 final DataInputStream in = new DataInputStream(bufferedIn)) {
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager
index e5c63ac..b0ba624 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager
@@ -12,4 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.controller.FileSystemSwapManager
\ No newline at end of file
+org.apache.nifi.controller.FileSystemSwapManager
+org.apache.nifi.controller.EncryptedFileSystemSwapManager
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestEncryptedFileSystemSwapManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestEncryptedFileSystemSwapManager.java
new file mode 100644
index 0000000..6ef5115
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestEncryptedFileSystemSwapManager.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.security.kms.EncryptionException;
+import org.apache.nifi.security.kms.StaticKeyProvider;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link EncryptedFileSystemSwapManager}.
+ */
+public class TestEncryptedFileSystemSwapManager {
+    private static final Logger logger = Logger.getLogger(TestEncryptedFileSystemSwapManager.class.getName());
+
+    /**
+     * Test a simple swap to disk / swap from disk operation.  Configured to use {@link StaticKeyProvider}.
+     */
+    @Test
+    public void testSwapOutSwapIn() throws GeneralSecurityException, EncryptionException,
IOException {
+        // use temp folder on filesystem to temporarily hold swap content (clean up after
test)
+        final File folderRepository = Files.createTempDirectory(getClass().getSimpleName()).toFile();
+        logger.info(folderRepository.getPath());
+        folderRepository.deleteOnExit();
+        new File(folderRepository, "swap").deleteOnExit();
+
+        // configure a nifi properties for encrypted swap file
+        final Properties properties = new Properties();
+        properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS,
StaticKeyProvider.class.getName());
+        properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID, NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY);
+        properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY, StringUtils.repeat("00",
32));
+        properties.put(NiFiProperties.FLOWFILE_REPOSITORY_DIRECTORY, folderRepository.getPath());
+        final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(null,
properties);
+
+        // generate some flow file content to swap to disk
+        final List<FlowFileRecord> flowFiles = new ArrayList<>();
+        for (int i = 0; (i < 100); ++i) {
+            flowFiles.add(new StandardFlowFileRecord.Builder().id(i).build());
+        }
+
+        // setup for test case
+        final FlowFileSwapManager swapManager = createSwapManager(nifiProperties);
+        final String queueIdentifier = UUID.randomUUID().toString();
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        when(flowFileQueue.getIdentifier()).thenReturn(queueIdentifier);
+
+        // swap out to disk; pull content back from disk
+        final String swapPath = swapManager.swapOut(flowFiles, flowFileQueue, "partition-1");
+        final SwapContents swapContents = swapManager.swapIn(swapPath, flowFileQueue);
+
+        // verify recovery of original content
+        final List<FlowFileRecord> flowFilesRecovered = swapContents.getFlowFiles();
+        Assert.assertEquals(flowFiles.size(), flowFilesRecovered.size());
+        Assert.assertTrue(flowFilesRecovered.containsAll(flowFiles));
+        Assert.assertTrue(flowFiles.containsAll(flowFilesRecovered));
+    }
+
+    /**
+     * Borrowed from "nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java".
+     */
+    private FlowFileSwapManager createSwapManager(NiFiProperties nifiProperties)
+            throws IOException, GeneralSecurityException, EncryptionException {
+        final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
+        when(flowFileRepo.isValidSwapLocationSuffix(any())).thenReturn(true);
+
+        final FileSystemSwapManager swapManager = new EncryptedFileSystemSwapManager(nifiProperties);
+        final ResourceClaimManager resourceClaimManager = Mockito.mock(ResourceClaimManager.class);
+        final SwapManagerInitializationContext context = Mockito.mock(SwapManagerInitializationContext.class);
+        when(context.getResourceClaimManager()).thenReturn(resourceClaimManager);
+        when(context.getFlowFileRepository()).thenReturn(flowFileRepo);
+        when(context.getEventReporter()).thenReturn(EventReporter.NO_OP);
+        swapManager.initialize(context);
+
+        return swapManager;
+    }
+}

Mime
View raw message