From commits-return-44743-archive-asf-public=cust-asf.ponee.io@nifi.apache.org Tue Jun 8 18:24:35 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id E276A18060E for ; Tue, 8 Jun 2021 20:24:34 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id 242A8401FA for ; Tue, 8 Jun 2021 18:24:34 +0000 (UTC) Received: (qmail 70637 invoked by uid 500); 8 Jun 2021 18:24:33 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 70628 invoked by uid 99); 8 Jun 2021 18:24:33 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Jun 2021 18:24:33 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id BDCA481A86; Tue, 8 Jun 2021 18:24:33 +0000 (UTC) Date: Tue, 08 Jun 2021 18:24:33 +0000 To: "commits@nifi.apache.org" Subject: [nifi] branch main updated: NIFI-8499 - Added encrypted FlowFile repository swap file implementation MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <162317667300.361.12745109423338967680@gitbox.apache.org> From: exceptionfactory@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: nifi X-Git-Refname: refs/heads/main X-Git-Reftype: branch X-Git-Oldrev: 07ff4f2592b26a5b556bb1297c42325233bdb101 X-Git-Newrev: a3c1cd074bafe6ed1975cef2a08793b2a47e4dff X-Git-Rev: a3c1cd074bafe6ed1975cef2a08793b2a47e4dff X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git The following commit(s) were added to refs/heads/main by this push: new a3c1cd0 NIFI-8499 - Added encrypted FlowFile repository swap file implementation a3c1cd0 is described below commit a3c1cd074bafe6ed1975cef2a08793b2a47e4dff Author: Paul Grey AuthorDate: Thu Jun 3 15:37:27 2021 -0400 NIFI-8499 - Added encrypted FlowFile repository swap file implementation This closes #5122 Signed-off-by: David Handermann --- .../src/main/asciidoc/administration-guide.adoc | 6 +- .../controller/EncryptedFileSystemSwapManager.java | 104 +++++++++++++++++++ .../nifi/controller/FileSystemSwapManager.java | 21 ++-- ....nifi.controller.repository.FlowFileSwapManager | 3 +- .../TestEncryptedFileSystemSwapManager.java | 115 +++++++++++++++++++++ 5 files changed, 239 insertions(+), 10 deletions(-) diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index 6b68282..52d8008 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -427,7 +427,7 @@ To enable authentication via SAML the following properties must be configured in |`nifi.security.user.saml.group.attribute.name`| The name of a SAML assertion attribute containing group names the user belongs to. This property is optional, but if populated the groups will be passed along to the authorization process. |`nifi.security.user.saml.metadata.signing.enabled`| Enables signing of the generated service provider metadata. |`nifi.security.user.saml.request.signing.enabled`| Controls the value of `AuthnRequestsSigned` in the generated service provider metadata from `nifi-api/access/saml/metadata`. This indicates that the service provider (i.e. NiFi) should not sign authentication requests sent to the identity provider, but the requests may still need to be signed if the identity provider indicates `WantAuthnRequestSigned=true`. -|`nifi.security.user.saml.want.assertions.signed`| Controls the value of `WantAssertionsSigned` in the generated service provider metadata from `nifi-api/access/saml/metadata`. This indictaes that the identity provider should sign assertions, but some identity providers may provide their own configuration for controlling whether assertions are signed. +|`nifi.security.user.saml.want.assertions.signed`| Controls the value of `WantAssertionsSigned` in the generated service provider metadata from `nifi-api/access/saml/metadata`. This indicates that the identity provider should sign assertions, but some identity providers may provide their own configuration for controlling whether assertions are signed. |`nifi.security.user.saml.signature.algorithm`| The algorithm to use when signing SAML messages. Reference the link:https://git.shibboleth.net/view/?p=java-xmltooling.git;a=blob;f=src/main/java/org/opensaml/xml/signature/SignatureConstants.java[Open SAML Signature Constants] for a list of valid values. If not specified, a default of SHA-256 will be used. |`nifi.security.user.saml.signature.digest.algorithm`| The digest algorithm to use when signing SAML messages. Reference the link:https://git.shibboleth.net/view/?p=java-xmltooling.git;a=blob;f=src/main/java/org/opensaml/xml/signature/SignatureConstants.java[Open SAML Signature Constants] for a list of valid values. If not specified, a default of SHA-256 will be used. |`nifi.security.user.saml.message.logging.enabled`| Enables logging of SAML messages for debugging purposes. @@ -2961,7 +2961,9 @@ available again. These properties govern how that process occurs. |==== |*Property*|*Description* -|`nifi.swap.manager.implementation`|The Swap Manager implementation. The default value is `org.apache.nifi.controller.FileSystemSwapManager` and should not be changed. +|`nifi.swap.manager.implementation`| The Swap Manager implementation. The default value is `org.apache.nifi.controller.FileSystemSwapManager`. +There is an alternate implementation, `EncryptedFileSystemSwapManager`, that encrypts the swap file content on +disk. The encryption key configured for the FlowFile repository is used to perform the encryption, using the AES-GCM algorithm. |`nifi.queue.swap.threshold`|The queue threshold at which NiFi starts to swap FlowFile information to disk. The default value is `20000`. |==== diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EncryptedFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EncryptedFileSystemSwapManager.java new file mode 100644 index 0000000..6e064e6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EncryptedFileSystemSwapManager.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.security.kms.CryptoUtils; +import org.apache.nifi.security.kms.EncryptionException; +import org.apache.nifi.security.kms.KeyProvider; +import org.apache.nifi.security.repository.RepositoryEncryptorUtils; +import org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.crypto.Cipher; +import javax.crypto.CipherInputStream; +import javax.crypto.CipherOutputStream; +import javax.crypto.SecretKey; +import javax.crypto.spec.GCMParameterSpec; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.GeneralSecurityException; +import java.security.SecureRandom; + +/** + *

+ * An implementation of {@link FlowFileSwapManager} that swaps FlowFiles + * to/from local disk. The swap file is encrypted using AES/GCM, using the + * encryption key defined in nifi.properties for the FlowFile repository. + *

+ */ +public class EncryptedFileSystemSwapManager extends FileSystemSwapManager { + + private static final String CIPHER_TRANSFORMATION = "AES/GCM/NoPadding"; + private static final int SIZE_IV_AES_BYTES = 16; + private static final int SIZE_TAG_GCM_BITS = 128; + + private static final Logger logger = LoggerFactory.getLogger(EncryptedFileSystemSwapManager.class); + private static final SecureRandom secureRandom = new SecureRandom(); + + private final SecretKey secretKey; + + public EncryptedFileSystemSwapManager(final NiFiProperties nifiProperties) + throws IOException, EncryptionException, GeneralSecurityException { + super(nifiProperties); + // acquire reference to FlowFileRepository key + final FlowFileRepositoryEncryptionConfiguration configuration = new FlowFileRepositoryEncryptionConfiguration(nifiProperties); + if (!CryptoUtils.isValidRepositoryEncryptionConfiguration(configuration)) { + logger.error("The flowfile repository encryption configuration is not valid (see above). Shutting down..."); + throw new EncryptionException("The flowfile repository encryption configuration is not valid"); + } + final KeyProvider keyProvider = RepositoryEncryptorUtils.validateAndBuildRepositoryKeyProvider(configuration); + this.secretKey = keyProvider.getKey(configuration.getEncryptionKeyId()); + } + + protected InputStream getInputStream(final File file) throws IOException { + final FileInputStream fis = new FileInputStream(file); + try { + final byte[] iv = new byte[SIZE_IV_AES_BYTES]; + final int ivBytesRead = fis.read(iv); + if (ivBytesRead != SIZE_IV_AES_BYTES) { + throw new IOException(String.format( + "problem reading IV [expected=%d, actual=%d]", SIZE_IV_AES_BYTES, ivBytesRead)); + } + final Cipher cipher = Cipher.getInstance(CIPHER_TRANSFORMATION); + cipher.init(Cipher.DECRYPT_MODE, secretKey, new GCMParameterSpec(SIZE_TAG_GCM_BITS, iv)); + return new CipherInputStream(fis, cipher); + } catch (GeneralSecurityException e) { + throw new IOException(String.format("Preparing Cipher Failed for File [%s]", file.getAbsolutePath()), e); + } + } + + protected OutputStream getOutputStream(final File file) throws IOException { + final byte[] iv = new byte[SIZE_IV_AES_BYTES]; + secureRandom.nextBytes(iv); + final FileOutputStream fos = new FileOutputStream(file); + fos.write(iv); + try { + final Cipher cipher = Cipher.getInstance(CIPHER_TRANSFORMATION); + cipher.init(Cipher.ENCRYPT_MODE, secretKey, new GCMParameterSpec(SIZE_TAG_GCM_BITS, iv)); + return new CipherOutputStream(fos, cipher); + } catch (GeneralSecurityException e) { + throw new IOException(String.format("Preparing Cipher Failed for File [%s]", file.getAbsolutePath()), e); + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index 8885877..b01bdc9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -119,6 +119,13 @@ public class FileSystemSwapManager implements FlowFileSwapManager { this.flowFileRepository = initializationContext.getFlowFileRepository(); } + protected InputStream getInputStream(final File file) throws IOException { + return new FileInputStream(file); + } + + protected OutputStream getOutputStream(final File file) throws IOException { + return new FileOutputStream(file); + } @Override public String swapOut(final List toSwap, final FlowFileQueue flowFileQueue, final String partitionName) throws IOException { @@ -135,14 +142,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager { final String swapLocation = swapFile.getAbsolutePath(); final SwapSerializer serializer = new SchemaSwapSerializer(); - try (final FileOutputStream fos = new FileOutputStream(swapTempFile); - final OutputStream out = new BufferedOutputStream(fos)) { + try (final OutputStream os = getOutputStream(swapTempFile); + final OutputStream out = new BufferedOutputStream(os)) { out.write(MAGIC_HEADER); final DataOutputStream dos = new DataOutputStream(out); dos.writeUTF(serializer.getSerializationName()); serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, out); - fos.getFD().sync(); + out.flush(); } catch (final IOException ioe) { // we failed to write out the entire swap file. Delete the temporary file, if we can. swapTempFile.delete(); @@ -188,8 +195,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager { throw new FileNotFoundException("Failed to swap in FlowFiles from external storage location " + swapLocation + " into FlowFile Queue because the file could not be found"); } - try (final InputStream fis = new FileInputStream(swapFile); - final InputStream bis = new BufferedInputStream(fis); + try (final InputStream is = getInputStream(swapFile); + final InputStream bis = new BufferedInputStream(is); final DataInputStream in = new DataInputStream(bis)) { final SwapDeserializer deserializer = createSwapDeserializer(in); @@ -318,7 +325,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } // Read the queue identifier from the swap file to check if the swap file is for this queue - try (final InputStream fis = new FileInputStream(swapFile); + try (final InputStream fis = getInputStream(swapFile); final InputStream bufferedIn = new BufferedInputStream(fis); final DataInputStream in = new DataInputStream(bufferedIn)) { @@ -351,7 +358,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { final File swapFile = new File(swapLocation); // read record from disk via the swap file - try (final InputStream fis = new FileInputStream(swapFile); + try (final InputStream fis = getInputStream(swapFile); final InputStream bufferedIn = new BufferedInputStream(fis); final DataInputStream in = new DataInputStream(bufferedIn)) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager index e5c63ac..b0ba624 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager @@ -12,4 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.controller.FileSystemSwapManager \ No newline at end of file +org.apache.nifi.controller.FileSystemSwapManager +org.apache.nifi.controller.EncryptedFileSystemSwapManager diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestEncryptedFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestEncryptedFileSystemSwapManager.java new file mode 100644 index 0000000..6ef5115 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestEncryptedFileSystemSwapManager.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.repository.StandardFlowFileRecord; +import org.apache.nifi.controller.repository.SwapContents; +import org.apache.nifi.controller.repository.SwapManagerInitializationContext; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.security.kms.EncryptionException; +import org.apache.nifi.security.kms.StaticKeyProvider; +import org.apache.nifi.util.NiFiProperties; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.logging.Logger; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +/** + * Test cases for {@link EncryptedFileSystemSwapManager}. + */ +public class TestEncryptedFileSystemSwapManager { + private static final Logger logger = Logger.getLogger(TestEncryptedFileSystemSwapManager.class.getName()); + + /** + * Test a simple swap to disk / swap from disk operation. Configured to use {@link StaticKeyProvider}. + */ + @Test + public void testSwapOutSwapIn() throws GeneralSecurityException, EncryptionException, IOException { + // use temp folder on filesystem to temporarily hold swap content (clean up after test) + final File folderRepository = Files.createTempDirectory(getClass().getSimpleName()).toFile(); + logger.info(folderRepository.getPath()); + folderRepository.deleteOnExit(); + new File(folderRepository, "swap").deleteOnExit(); + + // configure a nifi properties for encrypted swap file + final Properties properties = new Properties(); + properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS, StaticKeyProvider.class.getName()); + properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID, NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY); + properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY, StringUtils.repeat("00", 32)); + properties.put(NiFiProperties.FLOWFILE_REPOSITORY_DIRECTORY, folderRepository.getPath()); + final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(null, properties); + + // generate some flow file content to swap to disk + final List flowFiles = new ArrayList<>(); + for (int i = 0; (i < 100); ++i) { + flowFiles.add(new StandardFlowFileRecord.Builder().id(i).build()); + } + + // setup for test case + final FlowFileSwapManager swapManager = createSwapManager(nifiProperties); + final String queueIdentifier = UUID.randomUUID().toString(); + final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); + when(flowFileQueue.getIdentifier()).thenReturn(queueIdentifier); + + // swap out to disk; pull content back from disk + final String swapPath = swapManager.swapOut(flowFiles, flowFileQueue, "partition-1"); + final SwapContents swapContents = swapManager.swapIn(swapPath, flowFileQueue); + + // verify recovery of original content + final List flowFilesRecovered = swapContents.getFlowFiles(); + Assert.assertEquals(flowFiles.size(), flowFilesRecovered.size()); + Assert.assertTrue(flowFilesRecovered.containsAll(flowFiles)); + Assert.assertTrue(flowFiles.containsAll(flowFilesRecovered)); + } + + /** + * Borrowed from "nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java". + */ + private FlowFileSwapManager createSwapManager(NiFiProperties nifiProperties) + throws IOException, GeneralSecurityException, EncryptionException { + final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class); + when(flowFileRepo.isValidSwapLocationSuffix(any())).thenReturn(true); + + final FileSystemSwapManager swapManager = new EncryptedFileSystemSwapManager(nifiProperties); + final ResourceClaimManager resourceClaimManager = Mockito.mock(ResourceClaimManager.class); + final SwapManagerInitializationContext context = Mockito.mock(SwapManagerInitializationContext.class); + when(context.getResourceClaimManager()).thenReturn(resourceClaimManager); + when(context.getFlowFileRepository()).thenReturn(flowFileRepo); + when(context.getEventReporter()).thenReturn(EventReporter.NO_OP); + swapManager.initialize(context); + + return swapManager; + } +}