nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From turcsa...@apache.org
Subject [nifi] branch main updated: NIFI-7549 Adding Hazelcast based DistributedMapCacheClient support
Date Thu, 22 Oct 2020 17:33:02 GMT
This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b980a8e  NIFI-7549 Adding Hazelcast based DistributedMapCacheClient support
b980a8e is described below

commit b980a8ea8caf077be8046de3f7b5c1bd337864d8
Author: Bence Simon <simonbence.dev@gmail.com>
AuthorDate: Wed Sep 2 16:47:34 2020 +0200

    NIFI-7549 Adding Hazelcast based DistributedMapCacheClient support
    
    NIFI-7549 Refining documentation; Changing explicit HA mode; Smaller review comments
    NIFI-7549 Code review responses about license, documentation and dependencies
    NIFI-7549 Fixing issue when explicit HA; Some further review based adjustments
    NIFI-7549 Response to code review comments
    NIFI-7549 Adding extra serialization test
    NIFI-7549 Minor changes based on review comments
    NIFI-7549 Adding hook point to the shutdown
    
    This closes #4510.
    
    Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
---
 .../apache/nifi/controller/NodeTypeProvider.java   |  28 ++
 nifi-assembly/NOTICE                               |   5 +
 nifi-assembly/pom.xml                              |  12 +
 ...MockControllerServiceInitializationContext.java |   3 +-
 .../org/apache/nifi/controller/FlowController.java |  19 ++
 .../org/apache/nifi/mock/MockNodeTypeProvider.java |   1 -
 .../nifi-hazelcast-services-api-nar/pom.xml        |  42 +++
 .../src/main/resources/META-INF/LICENSE            | 202 +++++++++++
 .../src/main/resources/META-INF/NOTICE             |   9 +
 .../nifi-hazelcast-services-api/pom.xml            |  35 ++
 .../hazelcast/services/cache/HazelcastCache.java   | 120 +++++++
 .../cachemanager/HazelcastCacheManager.java        |  36 ++
 .../nifi-hazelcast-services-nar/pom.xml            |  42 +++
 .../src/main/resources/META-INF/LICENSE            | 202 +++++++++++
 .../src/main/resources/META-INF/NOTICE             |  21 ++
 .../nifi-hazelcast-services/pom.xml                |  73 ++++
 .../services/cache/IMapBasedHazelcastCache.java    | 128 +++++++
 .../cacheclient/HazelcastMapCacheClient.java       | 256 ++++++++++++++
 .../EmbeddedHazelcastCacheManager.java             | 243 +++++++++++++
 .../ExternalHazelcastCacheManager.java             | 138 ++++++++
 .../IMapBasedHazelcastCacheManager.java            | 112 ++++++
 .../org.apache.nifi.controller.ControllerService   |  17 +
 .../additionalDetails.html                         |  45 +++
 .../additionalDetails.html                         |  89 +++++
 .../additionalDetails.html                         |  48 +++
 .../hazelcast/services/DummyStringSerializer.java  |  41 +++
 .../services/cache/HashMapHazelcastCache.java      |  91 +++++
 .../cache/IMapBasedHazelcastCacheTest.java         | 138 ++++++++
 .../cacheclient/HazelcastMapCacheClientTest.java   | 380 +++++++++++++++++++++
 .../AbstractHazelcastCacheManagerTest.java         |  73 ++++
 .../EmbeddedHazelcastCacheManagerTest.java}        |  34 +-
 .../ExternalHazelcastCacheManagerTest.java         |  81 +++++
 .../cachemanager/TestHazelcastProcessor.java       | 104 ++++++
 nifi-nar-bundles/nifi-hazelcast-bundle/pom.xml     |  36 ++
 nifi-nar-bundles/pom.xml                           |   1 +
 35 files changed, 2885 insertions(+), 20 deletions(-)

diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/NodeTypeProvider.java b/nifi-api/src/main/java/org/apache/nifi/controller/NodeTypeProvider.java
index 2e201ef..ebabc68 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/NodeTypeProvider.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/NodeTypeProvider.java
@@ -17,6 +17,10 @@
 
 package org.apache.nifi.controller;
 
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+
 /**
  * <p>
  * This interface provides a set of methods for checking NiFi node type.
@@ -34,4 +38,28 @@ public interface NodeTypeProvider {
      * @return true if this instance is the primary node in the cluster; false otherwise
      */
     boolean isPrimary();
+
+    /**
+     * @return Returns with the hostname of the current node, if clustered. For For a standalone
+     * NiFi this returns an empty instead.
+     */
+    default Optional<String> getCurrentNode() {
+        if (isClustered()) {
+            throw new IllegalStateException("Clustered environment is not handled!");
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * @return Names/IP addresses of all expected hosts in the cluster (including the current one). For a standalone
+     * NiFi this returns an empty set instead.
+     */
+    default Set<String> getClusterMembers() {
+        if (isClustered()) {
+            throw new IllegalStateException("Clustered environment is not handled!");
+        } else {
+            return Collections.emptySet();
+        }
+    }
 }
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 177f0ea..2ebc8b0 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -1103,6 +1103,11 @@ The following binary components are provided under the Apache Software License v
         Couchbase Java SDK
         Copyright 2014 Couchbase, Inc.
 
+    (ASLv2) Hazelcast
+      The following NOTICE information applies:
+        Core Hazelcast Module 4.0.1
+        Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
+
     (ASLv2) RxJava
       The following NOTICE information applies:
         Couchbase Java SDK
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 2edb536..8f15299 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -698,6 +698,18 @@ language governing permissions and limitations under the License. -->
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hazelcast-services-api-nar</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hazelcast-services-nar</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-metrics-reporter-service-api-nar</artifactId>
             <version>1.13.0-SNAPSHOT</version>
             <type>nar</type>
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
index 79cb961..021bdc2 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.util;
 
-import java.io.File;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
@@ -26,6 +25,8 @@ import org.apache.nifi.kerberos.KerberosContext;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.state.MockStateManager;
 
+import java.io.File;
+
 public class MockControllerServiceInitializationContext extends MockControllerServiceLookup implements ControllerServiceInitializationContext, ControllerServiceLookup, NodeTypeProvider {
 
     private final String identifier;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index fb9e8b1..f86ea8e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -2247,6 +2248,24 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         }
     }
 
+    @Override
+    public Set<String> getClusterMembers() {
+        if (isClustered()) {
+            return clusterCoordinator.getConnectionStatuses().stream().map(s -> s.getNodeIdentifier().getApiAddress()).collect(Collectors.toSet());
+        } else {
+            return Collections.emptySet();
+        }
+    }
+
+    @Override
+    public Optional<String> getCurrentNode() {
+        if (isClustered() && getNodeId() != null) {
+            return Optional.of(getNodeId().getApiAddress());
+        } else {
+            return Optional.empty();
+        }
+    }
+
     public boolean isConfiguredForClustering() {
         return configuredForClustering;
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockNodeTypeProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockNodeTypeProvider.java
index 61390e1..7a976a4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockNodeTypeProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockNodeTypeProvider.java
@@ -36,5 +36,4 @@ public class MockNodeTypeProvider implements NodeTypeProvider {
     public boolean isPrimary() {
         return false;
     }
-
 }
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api-nar/pom.xml
new file mode 100644
index 0000000..0b2a4aa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api-nar/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-hazelcast-bundle</artifactId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-hazelcast-services-api-nar</artifactId>
+    <version>1.13.0-SNAPSHOT</version>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hazelcast-services-api</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..7a4a3ea
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..f96505b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,9 @@
+nifi-hazelcast-services-nar
+Copyright 2015-2020 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api/pom.xml b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api/pom.xml
new file mode 100644
index 0000000..22effb7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-hazelcast-bundle</artifactId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-hazelcast-services-api</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api/src/main/java/org/apache/nifi/hazelcast/services/cache/HazelcastCache.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api/src/main/java/org/apache/nifi/hazelcast/services/cache/HazelcastCache.java
new file mode 100644
index 0000000..34a086d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api/src/main/java/org/apache/nifi/hazelcast/services/cache/HazelcastCache.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cache;
+
+import java.util.function.Predicate;
+
+/**
+ * Represents a cache storage. The API gives no restriction for the data structure used or to the Hazelcast setup. There
+ * can be multiple separate cache instances, sharing the same underlying Hazelcast. The cache instances with the same name
+ * are pointing to the same storage. It is recommended to use unique names for the different purposes.
+ */
+public interface HazelcastCache {
+    /**
+     * Serves as identifier for the cache. Defines the underlying storage.
+     *
+     * @return The name of the cache.
+     */
+    String name();
+
+    /**
+     * Returns the value of the cache entry defined by the the key.
+     *
+     * @param key Key of the entry, must not be null.
+     *
+     * @return The serialized value of the cache entry if it exits. The serialization and deserialization is handled by the client. In case the entry does not exist, the result is null.
+     */
+    byte[] get(String key);
+
+    /**
+     * Adds a new entry to the cache under the given key. If the entry already exists, it will be overwritten. In case the entry is locked by an other client, the method will wait or return
+     * with false depending on the implementation.
+     *
+     * @param key Key of the entry, must not be null.
+     * @param value The serialized value of the entry. In case the entry already exists, the new value. The value must not be null. The serialization and deserialization is handled by the client.
+     *
+     * @return True if the writing was successful.
+     */
+    boolean put(String key, byte[] value);
+
+    /**
+     * Adds a new entry to the cache under the given key. If the entry already exists, no changes will be applied. In case the entry is locked by an other client, the method will wait or return
+     * with false depending on the implementation.
+     *
+     * @param key Key of the entry, must not be null.
+     * @param value The serialized value of the entry. The value must not be null. The serialization and deserialization is handled by the client.
+     *
+     * @return The serialized value of the cache entry if exists already. Null otherwise. The serialization and deserialization is handled by the client.
+     */
+    byte[] putIfAbsent(String key, byte[] value);
+
+    /**
+     * Returns true if an entry with the given key exists in the cache. Returns false otherwise.
+     *
+     * @param key Key of the entry, must not be null.
+     *
+     * @return True if an entry with the given key exists.
+     */
+    boolean contains(String key);
+
+    /**
+     * Removes the entry from the cache with the given key.
+     *
+     * @param key Key of the entry, must not be null.
+     *
+     * @return True if the entry existed in the cache before the removal, false otherwise.
+     */
+    boolean remove(String key);
+
+    /**
+     * Removes all matching entries from the cache. An entry is considered matching if its key matches the provided predicate.
+     *
+     * @param keyMatcher The predicate determines if an entry is matching.
+     *
+     * @return The number of deleted entries.
+     *
+     * Note: the implementation of this method is not necessarily atomic. Because of this, in some cases the number of deleted entries might
+     * not be equal to the number of matching entries at the moment of calling. There is no guarantee for that, during the execution of the method a new matching entry is not added
+     * or an already existing is being not deleted.
+     */
+    int removeAll(Predicate<String> keyMatcher);
+
+    /**
+     * Locks an entry with the given key to prevent its modification by other clients. Closing the connection automatically releases the lock.
+     * Non-existing keys might be locked in this way as well. This operation is not transactional and other clients might read the value while the entry is locked. For further
+     * information please check Hazelcast documentation.
+     *
+     * Note: the current implementation of Hazelcast (4.X) also prevents modification by the same client on a different thread.
+     *
+     * @param key Key of the entry, must not be null.
+     *
+     * @return The entry lock instance.
+     */
+    HazelcastCacheEntryLock acquireLock(String key);
+
+    /**
+     * Represents a lock on a given entry based on key. The lock is bound to a cache and does not allow other caches to modify the entry in any manner until it is released. Calling close
+     * on the HazelcastCacheEntryLock instance will release the lock if not already released.
+     */
+    interface HazelcastCacheEntryLock extends AutoCloseable {
+        /**
+         * Note: Entry lock will not throw generic exception.
+         */
+        @Override
+        void close();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/HazelcastCacheManager.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/HazelcastCacheManager.java
new file mode 100644
index 0000000..e884026
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/HazelcastCacheManager.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+
+/**
+ * Controller service responsible for providing cache instances and managing connection with the Hazelcast server.
+ */
+public interface HazelcastCacheManager extends ControllerService {
+
+    /**
+     * Returns a cache instance maintaining a Hazelcast connection.
+     *
+     * @param name Name of the cache instance. Cache instances having the same name are depending on the same Hazelcast storage!
+     * @param ttlInMillis The guaranteed lifetime of a cache entry in milliseconds. In case of 0, the entry will exists until it's deletion.
+     *
+     * @return Cache instance. Depending on the implementation it is not guaranteed that it will be a new instance.
+     */
+    HazelcastCache getCache(String name, long ttlInMillis);
+}
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-nar/pom.xml b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-nar/pom.xml
new file mode 100644
index 0000000..3746644
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-nar/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-hazelcast-bundle</artifactId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-hazelcast-services-nar</artifactId>
+    <version>1.13.0-SNAPSHOT</version>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hazelcast-services-api-nar</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hazelcast-services</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..7a4a3ea
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..e12aefe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,21 @@
+nifi-hazelcast-services-nar
+Copyright 2015-2020 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License v2
+
+(ASLv2) Hazelcast
+    The following NOTICE information applies:
+      Core Hazelcast Module 4.0.1
+      Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
+
+(ASLv2) Guava
+    The following NOTICE information applies:
+      Guava
+      Copyright 2015 The Guava Authors
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/pom.xml b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/pom.xml
new file mode 100644
index 0000000..15deb4a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-hazelcast-bundle</artifactId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-hazelcast-services</artifactId>
+    <version>1.13.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <!-- Internal dependencies -->
+
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hazelcast-services-api</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+        </dependency>
+
+        <!-- External dependencies -->
+
+        <dependency>
+            <groupId>com.hazelcast</groupId>
+            <artifactId>hazelcast</artifactId>
+            <version>4.0.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>28.1-jre</version>
+        </dependency>
+
+        <!-- Test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCache.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCache.java
new file mode 100644
index 0000000..47d7151
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCache.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cache;
+
+import com.hazelcast.map.IMap;
+import com.hazelcast.map.ReachedMaxSizeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+/**
+ * Implementation of {@link HazelcastCache} backed by Hazelcast's IMap data structure. It's purpose is to wrap Hazelcast implementation specific details in order to
+ * make it possible to easily change version or data structure.
+ */
+public class IMapBasedHazelcastCache implements HazelcastCache {
+    private static final Logger LOGGER = LoggerFactory.getLogger(IMapBasedHazelcastCache.class);
+
+    private final long ttlInMillis;
+    private final IMap<String, byte[]> storage;
+
+    /**
+     * @param storage Reference to the actual storage. It should be the IMap with the same identifier as cache name.
+     * @param ttlInMillis The guaranteed lifetime of a cache entry in milliseconds.
+     */
+    public IMapBasedHazelcastCache(
+            final IMap<String, byte[]> storage,
+            final long ttlInMillis) {
+        this.ttlInMillis = ttlInMillis;
+        this.storage = storage;
+    }
+
+    @Override
+    public String name() {
+        return storage.getName();
+    }
+
+    @Override
+    public byte[] get(final String key) {
+        return storage.get(key);
+    }
+
+    @Override
+    public byte[] putIfAbsent(final String key, final byte[] value) {
+        return storage.putIfAbsent(key, value, ttlInMillis, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public boolean put(final String key, final byte[] value) {
+        try {
+            storage.put(key, value, ttlInMillis, TimeUnit.MILLISECONDS);
+            return true;
+        } catch (final ReachedMaxSizeException e) {
+            LOGGER.error("Cache {} reached the maximum allowed size!", storage.getName());
+            return false;
+        }
+    }
+
+    @Override
+    public boolean contains(final String key) {
+        return storage.containsKey(key);
+    }
+
+    @Override
+    public boolean remove(final String key) {
+        return storage.remove(key) != null;
+    }
+
+    @Override
+    public int removeAll(final Predicate<String> keyMatcher) {
+        // Note: the Hazelcast IMap provides support for predicate based <code>removeAll</code> method, but it's neither atomic nor provides information about the number of deleted items.
+        final Set<String> keys = storage.keySet();
+        int result = 0;
+
+        for (final String key : keys) {
+            if (keyMatcher.test(key)) {
+                storage.delete(key);
+                result++;
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    public HazelcastCacheEntryLock acquireLock(final String key) {
+        if (key == null) {
+            throw new IllegalArgumentException("The key of acquired lock cannot be null!");
+        }
+
+        final IMapAdapterEntryLock lock = new IMapAdapterEntryLock(key);
+        lock.lock();
+        return lock;
+    }
+
+    private final class IMapAdapterEntryLock implements HazelcastCacheEntryLock {
+        private final String key;
+
+        private IMapAdapterEntryLock(final String key) {
+            this.key = key;
+        }
+
+        void lock() {
+            storage.lock(key);
+        }
+
+        @Override
+        public void close() {
+            storage.unlock(key);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java
new file mode 100644
index 0000000..2d24c86
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cacheclient;
+
+import com.google.common.primitives.Longs;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache.
+ *
+ * Note: By design, the client should not directly depend on Hazelcast specific classes to allow easy version and implementation changes.
+ */
+@Tags({ "hazelcast", "cache", "map"})
+@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " +
+        "an other controller service, manages the actual Hazelcast calls, set in Hazelcast Cache Manager.")
+public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> {
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_MANAGER = new PropertyDescriptor.Builder()
+            .name("hazelcast-cache-manager")
+            .displayName("Hazelcast Cache Manager")
+            .description("A Hazelcast Cache Manager which manages connections to Hazelcast and provides cache instances.")
+            .identifiesControllerService(HazelcastCacheManager.class)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_CACHE_NAME = new PropertyDescriptor.Builder()
+            .name("hazelcast-cache-name")
+            .displayName("Hazelcast Cache Name")
+            .description("The name of a given cache. A Hazelcast cluster may handle multiple independent caches, each identified by a name." +
+                    " Clients using caches with the same name are working on the same data structure within Hazelcast.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_ENTRY_TTL = new PropertyDescriptor.Builder()
+            .name("hazelcast-entry-ttl")
+            .displayName("Hazelcast Entry Lifetime")
+            .description("Indicates how long the written entries should exist in Hazelcast. Setting it to '0 secs' means that the data" +
+                    "will exists until its deletion or until the Hazelcast server is shut down.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("0 secs") // Note: in case of Hazelcast IMap, negative value would mean "map default" which might be overridden by a different client.
+            .build();
+
+    private static final long STARTING_REVISION = 1;
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    static {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HAZELCAST_CACHE_MANAGER);
+        properties.add(HAZELCAST_CACHE_NAME);
+        properties.add(HAZELCAST_ENTRY_TTL);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties);
+    }
+
+    private volatile HazelcastCache cache = null;
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final HazelcastCacheManager hazelcastCacheManager = context.getProperty(HAZELCAST_CACHE_MANAGER).asControllerService(HazelcastCacheManager.class);
+        cache = hazelcastCacheManager.getCache(
+                context.getProperty(HAZELCAST_CACHE_NAME).evaluateAttributeExpressions().getValue(),
+                context.getProperty(HAZELCAST_ENTRY_TTL).asTimePeriod(TimeUnit.MILLISECONDS));
+        getLogger().debug("Enable Hazelcast cache client for cache " + cache.name());
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        if (cache != null) {
+            // The cache state will be preserved until the Service is not stopped!
+            getLogger().debug("Disable Hazelcast cache client for cache " + cache.name());
+            cache = null;
+        }
+    }
+
+    @Override
+    public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+        final byte[] result = cache.get(getCacheEntryKey(key, keySerializer));
+        return (result == null) ? null : new AtomicCacheEntry<>(key, parsePayload(valueDeserializer, result), parseRevision(result));
+    }
+
+    @Override
+    public <K, V> boolean replace(final AtomicCacheEntry<K, V, Long> entry, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+        if (entry.getKey() == null) {
+            return false;
+        }
+
+        final String key = getCacheEntryKey(entry.getKey(), keySerializer);
+
+        try(final HazelcastCache.HazelcastCacheEntryLock lock = cache.acquireLock(key)) {
+            final byte[] oldValue = cache.get(key);
+
+            if (oldValue == null && (!entry.getRevision().isPresent() || entry.getRevision().get() < STARTING_REVISION)) {
+                cache.put(key, serialize(entry.getValue(), valueSerializer, STARTING_REVISION));
+                getLogger().debug("Entry with key " + key + " was added during replace");
+                return true;
+            } else if (oldValue != null && Objects.equals(entry.getRevision().get(), parseRevision(oldValue))) {
+                cache.put(key, serialize(entry.getValue(), valueSerializer, entry.getRevision().get() + 1));
+                getLogger().debug("Entry with key " + key + " was updated during replace, with revision " + entry.getRevision().get() + 1);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+        return cache.putIfAbsent(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_REVISION)) == null;
+    }
+
+    @Override
+    public <K, V> V getAndPutIfAbsent(
+            final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer
+    ) throws IOException {
+        final byte[] result = cache.putIfAbsent(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_REVISION));
+        return (result == null) ? null : parsePayload(valueDeserializer, result);
+    }
+
+    @Override
+    public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
+        return cache.contains(getCacheEntryKey(key, keySerializer));
+    }
+
+    @Override
+    public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+        cache.put(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_REVISION));
+    }
+
+    @Override
+    public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+        final byte[] result = cache.get(getCacheEntryKey(key, keySerializer));
+        return result == null ? null : parsePayload(valueDeserializer, result);
+    }
+
+    @Override
+    public <K> boolean remove(final K key, final Serializer<K> keySerializer) throws IOException {
+        return cache.remove(getCacheEntryKey(key, keySerializer));
+    }
+
+    @Override
+    public long removeByPattern(final String regex) throws IOException {
+        return cache.removeAll(new RegexPredicate(regex));
+    }
+
+    private static class RegexPredicate implements Predicate<String>, Serializable {
+        private final Pattern pattern;
+
+        private RegexPredicate(final String regex) {
+            this.pattern = Pattern.compile(regex);
+        }
+
+        @Override
+        public boolean test(final String string) {
+            return pattern.matcher(string).matches();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        getLogger().debug("Closing " + this.getClass().getSimpleName());
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    private static long parseRevision(final byte[] value) {
+        return Longs.fromByteArray(Arrays.copyOfRange(value, 0, Long.BYTES));
+    }
+
+    private static <V> V parsePayload(final Deserializer<V> deserializer, final byte[] value) throws IOException {
+        return deserializer.deserialize(Arrays.copyOfRange(value, Long.BYTES, value.length));
+    }
+
+    private <S> String getCacheEntryKey(final S key, final Serializer<S> serializer) throws IOException {
+        final String result;
+
+        if (key instanceof String) {
+            result = (String) key;
+        } else {
+            final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+            serializer.serialize(key, stream);
+            result = stream.toString("UTF-8");
+        }
+
+        if (result.isEmpty()) {
+            throw new IOException("Cache record key cannot be empty!");
+        }
+
+        return result;
+    }
+
+    /**
+     * Serializes a value using the given serializer. The first eight bytes of the array contains the revision.
+     * The rest holds the actual serialized value.
+     *
+     * @param value The value to serialize.
+     * @param serializer The serializer to use in order to serialize the incoming value.
+     * @param version The version of the entry.
+     * @param <S> The type of the value to be serialized.
+     *
+     * @return Byte array containing both version and value of the cache entry.
+     *
+     * @throws IOException In case of any issue during working with intermediate byte stream.
+     */
+    private <S> byte[] serialize(final S value, final Serializer<S> serializer, final long version) throws IOException {
+        final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        stream.write(Longs.toByteArray(version));
+        serializer.serialize(value, stream);
+        return stream.toByteArray();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java
new file mode 100644
index 0000000..d02ef07
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManager.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.NetworkConfig;
+import com.hazelcast.config.TcpIpConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@Tags({"hazelcast", "cache"})
+@CapabilityDescription("A service that runs embedded Hazelcast and provides cache instances backed by that." +
+        " The server does not ask for authentication, it is recommended to run it within secured network.")
+public class EmbeddedHazelcastCacheManager extends IMapBasedHazelcastCacheManager {
+
+    private static final int DEFAULT_HAZELCAST_PORT = 5701;
+    private static final String PORT_SEPARATOR = ":";
+    private static final String INSTANCE_CREATION_LOG = "Embedded Hazelcast server instance with instance name %s has been created successfully";
+    private static final String MEMBER_LIST_LOG = "Hazelcast cluster will be created based on the NiFi cluster with the following members: %s";
+
+    private static final AllowableValue CLUSTER_NONE = new AllowableValue("none", "None", "No high availability or data replication is provided," +
+            " every node has access only to the data stored locally.");
+    private static final AllowableValue CLUSTER_ALL_NODES = new AllowableValue("all_nodes", "All Nodes", "Creates Hazelcast cluster based on the NiFi cluster:" +
+            " It expects every NiFi nodes to have a running Hazelcast instance on the same port as specified in the Hazelcast Port property. No explicit listing of the" +
+            " instances is needed.");
+    private static final AllowableValue CLUSTER_EXPLICIT = new AllowableValue("explicit", "Explicit", "Works with an explicit list of Hazelcast instances," +
+            " creating a cluster using the listed instances. This provides greater control, making it possible to utilize only certain nodes as Hazelcast servers." +
+            " The list of Hazelcast instances can be set in the property \"Hazelcast Instances\". The list items must refer to hosts within the NiFi cluster, no external Hazelcast" +
+            " is allowed. NiFi nodes are not listed will be join to the Hazelcast cluster as clients.");
+
+    private static final PropertyDescriptor HAZELCAST_PORT = new PropertyDescriptor.Builder()
+            .name("hazelcast-port")
+            .displayName("Hazelcast Port")
+            .description("Port for the Hazelcast instance to use.")
+            .required(true)
+            .defaultValue(String.valueOf(DEFAULT_HAZELCAST_PORT))
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    private static final PropertyDescriptor HAZELCAST_CLUSTERING_STRATEGY = new PropertyDescriptor.Builder()
+            .name("hazelcast-clustering-strategy")
+            .displayName("Hazelcast Clustering Strategy")
+            .description("Specifies with what strategy the Hazelcast cluster should be created.")
+            .required(true)
+            .allowableValues(CLUSTER_NONE, CLUSTER_ALL_NODES, CLUSTER_EXPLICIT)
+            .defaultValue(CLUSTER_NONE.getValue()) // None is used for default in order to be valid with standalone NiFi.
+            .build();
+
+    private static final PropertyDescriptor HAZELCAST_INSTANCES = new PropertyDescriptor.Builder()
+            .name("hazelcast-instances")
+            .displayName("Hazelcast Instances")
+            .description("Only used with \"Explicit\" Clustering Strategy!" +
+                    " List of NiFi instance host names which should be part of the Hazelcast cluster. Host names are separated by comma." +
+                    " The port specified in the \"Hazelcast Port\" property will be used as server port." +
+                    " The list must contain every instance that will be part of the cluster. Other instances will join the Hazelcast cluster as clients.")
+            .required(false)
+            // HOSTNAME_PORT_LIST_VALIDATOR would not work properly as we do not expect port here, only list of hosts. Custom validator provides further checks.
+            .addValidator(StandardValidators.URI_LIST_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    static {
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+                HAZELCAST_CLUSTER_NAME,
+                HAZELCAST_PORT,
+                HAZELCAST_CLUSTERING_STRATEGY,
+                HAZELCAST_INSTANCES
+        ));
+    }
+
+    @Override
+    protected HazelcastInstance getInstance(final ConfigurationContext context) {
+        final String instanceName = UUID.randomUUID().toString();
+        final Config config = new Config(instanceName);
+        final NetworkConfig networkConfig = config.getNetworkConfig();
+        final TcpIpConfig tcpIpConfig = networkConfig.getJoin().getTcpIpConfig();
+        final String clusteringStrategy = context.getProperty(HAZELCAST_CLUSTERING_STRATEGY).getValue();
+        final String clusterName = context.getProperty(HAZELCAST_CLUSTER_NAME).evaluateAttributeExpressions().getValue();
+        final int port = context.getProperty(HAZELCAST_PORT).evaluateAttributeExpressions().asInteger();
+
+        config.setClusterName(clusterName);
+
+        // If clustering is turned off, we turn off the capability of the Hazelcast instance to form a cluster.
+        tcpIpConfig.setEnabled(!clusteringStrategy.equals(CLUSTER_NONE.getValue()));
+
+        // Multicasting and automatic port increment are explicitly turned off.
+        networkConfig.setPort(port);
+        networkConfig.setPortCount(1);
+        networkConfig.setPortAutoIncrement(false);
+        networkConfig.getJoin().getMulticastConfig().setEnabled(false);
+
+        final HazelcastInstance result;
+
+        if (clusteringStrategy.equals(CLUSTER_ALL_NODES.getValue())) {
+            final List<String> hazelcastMembers = getNodeTypeProvider()
+                    .getClusterMembers()
+                    .stream()
+                    .map(m -> m + PORT_SEPARATOR + port)
+                    .collect(Collectors.toList());
+
+            getLogger().info(String.format(MEMBER_LIST_LOG, hazelcastMembers.stream().collect(Collectors.joining(", "))));
+            tcpIpConfig.setMembers(hazelcastMembers);
+            result = Hazelcast.newHazelcastInstance(config);
+            getLogger().info(String.format(INSTANCE_CREATION_LOG, instanceName));
+
+        } else if (clusteringStrategy.equals(CLUSTER_EXPLICIT.getValue())) {
+            final List<String> hazelcastMembers = getHazelcastMemberHosts(context);
+
+            if (hazelcastMembers.contains(getNodeTypeProvider().getCurrentNode().get())) {
+                tcpIpConfig.setMembers(hazelcastMembers.stream().map(m -> m + PORT_SEPARATOR + port).collect(Collectors.toList()));
+                result = Hazelcast.newHazelcastInstance(config);
+                getLogger().info(String.format(INSTANCE_CREATION_LOG, instanceName));
+            } else {
+                result = getClientInstance(
+                        clusterName,
+                        hazelcastMembers.stream().map(m -> m + PORT_SEPARATOR + port).collect(Collectors.toList()),
+                        TimeUnit.SECONDS.toMillis(DEFAULT_CLIENT_TIMEOUT_MAXIMUM_IN_SEC),
+                        Long.valueOf(TimeUnit.SECONDS.toMillis(DEFAULT_CLIENT_BACKOFF_INITIAL_IN_SEC)).intValue(),
+                        Long.valueOf(TimeUnit.SECONDS.toMillis(DEFAULT_CLIENT_BACKOFF_MAXIMUM_IN_SEC)).intValue(),
+                        DEFAULT_CLIENT_BACKOFF_MULTIPLIER);
+                getLogger().info("This host was not part of the expected Hazelcast instances. Hazelcast client has been started and joined to listed instances.");
+            }
+        } else if (clusteringStrategy.equals(CLUSTER_NONE.getValue())) {
+            result = Hazelcast.newHazelcastInstance(config);
+            getLogger().info(String.format(INSTANCE_CREATION_LOG, instanceName));
+        } else {
+            throw new ProcessException("Unknown Hazelcast Clustering Strategy!");
+        }
+
+        return result;
+    }
+
+    private List<String> getHazelcastMemberHosts(final PropertyContext context) {
+        return Arrays.asList(context.getProperty(HAZELCAST_INSTANCES).evaluateAttributeExpressions().getValue().split(ADDRESS_SEPARATOR))
+                .stream().map(i -> i.trim()).collect(Collectors.toList());
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+        final List<ValidationResult> results = new LinkedList<>();
+
+        // This validation also prevents early activation: up to the point node is considered as clustered this validation step
+        // prevents automatic enabling when the flow stars (and the last known status of the controller service was enabled).
+        if (!getNodeTypeProvider().isClustered() && context.getProperty(HAZELCAST_CLUSTERING_STRATEGY).getValue().equals(CLUSTER_ALL_NODES.getValue())) {
+            results.add(new ValidationResult.Builder()
+                    .subject(HAZELCAST_CLUSTERING_STRATEGY.getDisplayName())
+                    .valid(false)
+                    .explanation("cannot use \"" + CLUSTER_ALL_NODES.getDisplayName() + "\" Clustering Strategy when NiFi is not part of a cluster!")
+                    .build());
+        }
+
+        if (!getNodeTypeProvider().isClustered() && context.getProperty(HAZELCAST_CLUSTERING_STRATEGY).getValue().equals(CLUSTER_EXPLICIT.getValue())) {
+            results.add(new ValidationResult.Builder()
+                    .subject(HAZELCAST_CLUSTERING_STRATEGY.getDisplayName())
+                    .valid(false)
+                    .explanation("cannot use \"" + CLUSTER_EXPLICIT.getDisplayName() + "\" Clustering Strategy when NiFi is not part of a cluster!")
+                    .build());
+        }
+
+        if (!context.getProperty(HAZELCAST_INSTANCES).isSet() && context.getProperty(HAZELCAST_CLUSTERING_STRATEGY).getValue().equals(CLUSTER_EXPLICIT.getValue())) {
+            results.add(new ValidationResult.Builder()
+                    .subject(HAZELCAST_INSTANCES.getDisplayName())
+                    .valid(false)
+                    .explanation("in case of \"" + CLUSTER_EXPLICIT.getDisplayName() + "\" Clustering Strategy, instances need to be specified!")
+                    .build());
+        }
+
+        if (context.getProperty(HAZELCAST_INSTANCES).isSet() && !context.getProperty(HAZELCAST_CLUSTERING_STRATEGY).getValue().equals(CLUSTER_EXPLICIT.getValue())) {
+            results.add(new ValidationResult.Builder()
+                    .subject(HAZELCAST_INSTANCES.getDisplayName())
+                    .valid(false)
+                    .explanation("in case of other Clustering Strategy than \"" + CLUSTER_EXPLICIT.getDisplayName() + "\", instances should not be specified!")
+                    .build());
+        }
+
+        if (context.getProperty(HAZELCAST_INSTANCES).isSet() && context.getProperty(HAZELCAST_CLUSTERING_STRATEGY).getValue().equals(CLUSTER_EXPLICIT.getValue())) {
+            final Collection<String> niFiHosts = getNodeTypeProvider().getClusterMembers();
+            final Collection<String> hazelcastHosts = getHazelcastMemberHosts(context);
+
+            for (final String hazelcastHost : hazelcastHosts) {
+                if (!niFiHosts.contains(hazelcastHost)) {
+                    results.add(new ValidationResult.Builder()
+                            .subject(HAZELCAST_INSTANCES.getDisplayName())
+                            .valid(false)
+                            .explanation("host \"" + hazelcastHost + "\" is not part of the NiFi cluster!")
+                            .build());
+                }
+            }
+        }
+
+        if (!getNodeTypeProvider().getCurrentNode().isPresent() && context.getProperty(HAZELCAST_CLUSTERING_STRATEGY).getValue().equals(CLUSTER_EXPLICIT.getValue())) {
+            results.add(new ValidationResult.Builder()
+                    .subject(HAZELCAST_CLUSTERING_STRATEGY.getDisplayName())
+                    .valid(false)
+                    .explanation("cannot determine current node's host!")
+                    .build());
+        }
+
+        return results;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/ExternalHazelcastCacheManager.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/ExternalHazelcastCacheManager.java
new file mode 100644
index 0000000..14f2547
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/ExternalHazelcastCacheManager.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"hazelcast", "cache"})
+@CapabilityDescription("A service that provides cache instances backed by Hazelcast running outside of NiFi.")
+public class ExternalHazelcastCacheManager extends IMapBasedHazelcastCacheManager {
+
+    public static final PropertyDescriptor HAZELCAST_SERVER_ADDRESS = new PropertyDescriptor.Builder()
+            .name("hazelcast-server-address")
+            .displayName("Hazelcast Server Address")
+            .description("Addresses of one or more the Hazelcast instances, using {host:port} format, separated by comma.")
+            .required(true)
+            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_RETRY_BACKOFF_INITIAL = new PropertyDescriptor.Builder()
+            .name("hazelcast-retry-backoff-initial")
+            .displayName("Hazelcast Initial Backoff")
+            .description("The amount of time the client waits before it tries to reestablish connection for the first time.")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .required(true)
+            .defaultValue(DEFAULT_CLIENT_BACKOFF_INITIAL_IN_SEC + " secs")
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_RETRY_BACKOFF_MAXIMUM = new PropertyDescriptor.Builder()
+            .name("hazelcast-retry-backoff-maximum")
+            .displayName("Hazelcast Maximum Backoff")
+            .description("The maximum amount of time the client waits before it tries to reestablish connection.")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .required(true)
+            .defaultValue(DEFAULT_CLIENT_BACKOFF_MAXIMUM_IN_SEC + " secs")
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_RETRY_BACKOFF_MULTIPLIER = new PropertyDescriptor.Builder()
+            .name("hazelcast-retry-backoff-multiplier")
+            .displayName("Hazelcast Backoff Multiplier")
+            .description("A multiplier by which the wait time is increased before each attempt to reestablish connection.")
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            .required(true)
+            .defaultValue(String.valueOf(DEFAULT_CLIENT_BACKOFF_MULTIPLIER))
+            .build();
+
+    public static final PropertyDescriptor HAZELCAST_CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("hazelcast-connection-timeout")
+            .displayName("Hazelcast Connection Timeout")
+            .description("The maximum amount of time the client tries to connect or reconnect before giving up.")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .required(true)
+            .defaultValue(DEFAULT_CLIENT_TIMEOUT_MAXIMUM_IN_SEC + " secs")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    static {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HAZELCAST_CLUSTER_NAME);
+        properties.add(HAZELCAST_SERVER_ADDRESS);
+        properties.add(HAZELCAST_RETRY_BACKOFF_INITIAL);
+        properties.add(HAZELCAST_RETRY_BACKOFF_MAXIMUM);
+        properties.add(HAZELCAST_RETRY_BACKOFF_MULTIPLIER);
+        properties.add(HAZELCAST_CONNECTION_TIMEOUT);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties);
+    }
+
+    @Override
+    protected HazelcastInstance getInstance(final ConfigurationContext context) {
+        return getClientInstance(
+                context.getProperty(HAZELCAST_CLUSTER_NAME).evaluateAttributeExpressions().getValue(),
+                Arrays.asList(context.getProperty(HAZELCAST_SERVER_ADDRESS).evaluateAttributeExpressions().getValue().split(ADDRESS_SEPARATOR)),
+                context.getProperty(HAZELCAST_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue(),
+                context.getProperty(HAZELCAST_RETRY_BACKOFF_INITIAL).asTimePeriod(TimeUnit.MILLISECONDS).intValue(),
+                context.getProperty(HAZELCAST_RETRY_BACKOFF_MAXIMUM).asTimePeriod(TimeUnit.MILLISECONDS).intValue(),
+                context.getProperty(HAZELCAST_RETRY_BACKOFF_MULTIPLIER).asDouble());
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+        final Set<ValidationResult> results = new HashSet<>();
+
+        if (context.getProperty(HAZELCAST_RETRY_BACKOFF_INITIAL).asTimePeriod(TimeUnit.MILLISECONDS).compareTo((long) Integer.MAX_VALUE) > 0) {
+            results.add(new ValidationResult.Builder()
+                    .subject(HAZELCAST_RETRY_BACKOFF_INITIAL.getDisplayName())
+                    .valid(false)
+                    .explanation("millisecond representation of initial backoff time must not be above " + Integer.MAX_VALUE + "!")
+                    .build());
+        }
+
+        if (context.getProperty(HAZELCAST_RETRY_BACKOFF_MAXIMUM).asTimePeriod(TimeUnit.MILLISECONDS).compareTo((long) Integer.MAX_VALUE) > 0) {
+            results.add(new ValidationResult.Builder()
+                    .subject(HAZELCAST_RETRY_BACKOFF_MAXIMUM.getDisplayName())
+                    .valid(false)
+                    .explanation("millisecond representation of maximum backoff time must not be above " + Integer.MAX_VALUE + "!")
+                    .build());
+        }
+
+        return results;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/IMapBasedHazelcastCacheManager.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/IMapBasedHazelcastCacheManager.java
new file mode 100644
index 0000000..a56496f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cachemanager/IMapBasedHazelcastCacheManager.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import com.hazelcast.client.HazelcastClient;
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.hazelcast.services.cache.HazelcastCache;
+import org.apache.nifi.hazelcast.services.cache.IMapBasedHazelcastCache;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+
+import java.net.BindException;
+import java.util.List;
+
+abstract class IMapBasedHazelcastCacheManager extends AbstractControllerService implements HazelcastCacheManager {
+    protected static final String ADDRESS_SEPARATOR = ",";
+
+    /**
+     * Used to involve some fluctuation into the backoff time. For details, please see Hazelcast documentation.
+     */
+    protected static final double CLIENT_BACKOFF_JITTER = 0.2;
+    protected static final long DEFAULT_CLIENT_TIMEOUT_MAXIMUM_IN_SEC = 20;
+    protected static final long DEFAULT_CLIENT_BACKOFF_INITIAL_IN_SEC = 1;
+    protected static final long DEFAULT_CLIENT_BACKOFF_MAXIMUM_IN_SEC = 5;
+    protected static final double DEFAULT_CLIENT_BACKOFF_MULTIPLIER = 1.5;
+
+    public static final PropertyDescriptor HAZELCAST_CLUSTER_NAME = new PropertyDescriptor.Builder()
+            .name("hazelcast-cluster-name")
+            .displayName("Hazelcast Cluster Name")
+            .description("Name of the Hazelcast cluster.")
+            .defaultValue("nifi") // Hazelcast's default is "dev", "nifi" overwrites this.
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    private volatile HazelcastInstance instance;
+
+    @Override
+    public HazelcastCache getCache(final String name, final long ttlInMillis) {
+        return new IMapBasedHazelcastCache(instance.getMap(name), ttlInMillis);
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws InitializationException {
+        try {
+            instance = getInstance(context);
+        } catch (final Exception e) {
+            getLogger().error("Could not create Hazelcast instance. Reason: " + e.getMessage(), e);
+
+            // In case of bind exception, we provide a more specific error message to avoid ambiguity
+            if (e.getCause() instanceof BindException && e.getCause().getMessage().equals("Address already in use")) {
+                throw new InitializationException("The given port is already in use, probably by an externally running Hazelcast instance!");
+            } else {
+                throw new InitializationException(e);
+            }
+        }
+    }
+
+    @OnShutdown
+    @OnDisabled
+    public void shutdown() {
+        if (instance != null) {
+            instance.shutdown();
+            instance = null;
+        }
+    }
+
+    protected HazelcastInstance getClientInstance(
+            final String clusterName,
+            final List<String> serverAddresses,
+            final long maxTimeout,
+            final int initialBackoff,
+            final int maxBackoff,
+            final double backoffMultiplier) {
+        final ClientConfig clientConfig = new ClientConfig();
+        clientConfig.setClusterName(clusterName);
+        clientConfig.getNetworkConfig().setAddresses(serverAddresses);
+        clientConfig.getConnectionStrategyConfig().getConnectionRetryConfig()
+                .setClusterConnectTimeoutMillis(maxTimeout)
+                .setInitialBackoffMillis(initialBackoff)
+                .setMaxBackoffMillis(maxBackoff)
+                .setMultiplier(backoffMultiplier)
+                .setJitter(CLIENT_BACKOFF_JITTER);
+
+        return HazelcastClient.newHazelcastClient(clientConfig);
+    }
+
+    protected abstract HazelcastInstance getInstance(final ConfigurationContext context);
+}
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..be495c4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.hazelcast.services.cachemanager.EmbeddedHazelcastCacheManager
+org.apache.nifi.hazelcast.services.cachemanager.ExternalHazelcastCacheManager
+org.apache.nifi.hazelcast.services.cacheclient.HazelcastMapCacheClient
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/resources/docs/org.apache.nifi.hazelcast.services.cacheclient.HazelcastMapCacheClient/additionalDetails.html b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/resources/docs/org.apache.nifi.hazelcast.services.cacheclient.HazelcastMapCacheClient/additionalDetails.html
new file mode 100644
index 0000000..c4b2411
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/resources/docs/org.apache.nifi.hazelcast.services.cacheclient.HazelcastMapCacheClient/additionalDetails.html
@@ -0,0 +1,45 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>HazelcastMapCacheClient</title>
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>HazelcastMapCacheClient</h2>
+
+<p>
+    This implementation of distributed map cache is backed by Hazelcast. The Hazelcast connection is provided and maintained
+    by an instance of HazelcastCacheManager. One HazelcastCacheManager might serve multiple cache clients. This implementation
+    uses the IMap data structure. The identifier of the Hazelcast IMap will be the same as the value of the property Hazelcast
+    Cache Name. It is recommended for all HazelcastMapCacheClient instances to use different cache names.
+</p>
+
+<p>
+    The implementation supports the atomic method family defined in AtomicDistributedMapCacheClient. This is achieved by maintaining
+    a revision number for every entry. The revision is a 8 byte long integer. It is increased when the entry is updated. The value is kept
+    during modifications not part of the atomic method family but this is mainly for regular management of the entries. It is not
+    recommended to work with elements by mixing the two method families.
+</p>
+
+<p>
+    The convention for all the entries is to reserve the first 8 bytes for the revision. The rest of the content is the serialized payload.
+</p>
+
+</body>
+</html>
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/resources/docs/org.apache.nifi.hazelcast.services.cachemanager.EmbeddedHazelcastCacheManager/additionalDetails.html b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/resources/docs/org.apache.nifi.hazelcast.services.cachemanager.EmbeddedHazelcastCacheManager/additionalDetails.html
new file mode 100644
index 0000000..2c78e66
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/resources/docs/org.apache.nifi.hazelcast.services.cachemanager.EmbeddedHazelcastCacheManager/additionalDetails.html
@@ -0,0 +1,89 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>EmbeddedHazelcastCacheManager</title>
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>EmbeddedHazelcastCacheManager</h2>
+
+<p>
+    This service starts and manages an embedded Hazelcast instance. The cache manager has direct accesses to the
+    instance - and the data stored in it. However, the instance sill opens a port for potential clients to join and
+    this cannot be prevented. Note that this might leave the instance open for rogue clients to join.
+</p>
+
+<p>
+    It is possible to have multiple independent Hazelcast instances on the same host (whether via EmbeddedHazelcastCacheManager
+    or externally) without any interference by setting the properties accordingly. If there are no other instances, the default
+    cluster name and port number can simply be used.
+</p>
+
+<p>
+    The service supports multiple ways to set up a Hazelcast cluster. This is controlled by the property, named "Hazelcast Clustering
+    Strategy". The following strategies may be used:
+</p>
+
+<h3>None</h3>
+
+<p>
+    This is the default value. Used when sharing data between nodes is not required. With this value, every NiFi node
+    in the cluster (if it is clustered) connects to its local Hazelcast server only. The Hazelcast servers do not form a cluster.
+</p>
+
+<h3>All Nodes</h3>
+
+<p>
+    Can be used only in clustered node. Using this strategy will result a single Hazelcast cluster consisting of the embedded instances
+    of all the NiFi nodes. This strategy requires all Hazelcast servers listening on the same port. Having different port numbers
+    (based on expression for example) would prevent the cluster from forming.
+</p>
+
+<p>
+    The controller service automatically gathers the host list from the NiFi cluster itself when it is enabled. It is
+    not required for all the nodes to have been successfully joined at this point, but the join must have been initiated. When
+    the controller service is enabled at the start of the NiFi instance, the enabling of the service will be prevented until the
+    node is considered clustered.
+</p>
+
+<p>
+    Hazelcast can accept nodes that join at a later time. As the new node has a comprehensive list of the expected instances - including the
+    already existing ones and itself - Hazelcast will be able to reach the expected state. Beware: this may take significant time.
+</p>
+
+<h3>Explicit</h3>
+
+<p>
+    Can be used only in clustered node. Explicit Clustering Strategy allows more control over the Hazelcast cluster members.
+    All Nodes Clustering Strategy, this strategy works with a list of Hazelcast servers, but instance discovery is not automatic.
+</p>
+
+<p>
+    This strategy uses property named "Hazelcast Instances" to determine the members of the Hazelcast clusters. This list of hosts must contain
+    all the instances expected to be part of the cluster. The instance list may contain only hosts which are part of the NiFi cluster. The
+    port specified in the "Hazelcast Port" will be used as Hazelcast server port.
+</p>
+
+<p>
+    In case the current node is not part of the instance list, the service will start a Hazelcast client. The client then will connect
+    to the Hazelcast addresses specified in the instance list. Users of the service should not perceive any difference in functionality.
+</p>
+
+</body>
+</html>
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/resources/docs/org.apache.nifi.hazelcast.services.cachemanager.ExternalHazelcastCacheManager/additionalDetails.html b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/resources/docs/org.apache.nifi.hazelcast.services.cachemanager.ExternalHazelcastCacheManager/additionalDetails.html
new file mode 100644
index 0000000..70d9555
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/resources/docs/org.apache.nifi.hazelcast.services.cachemanager.ExternalHazelcastCacheManager/additionalDetails.html
@@ -0,0 +1,48 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>ExternalHazelcastCacheManager</title>
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>ExternalHazelcastCacheManager</h2>
+
+<p>
+    This service connects to an external Hazelcast cluster (or standalone instance) as client. Hazelcast 4.0.0 or newer version is required.
+    The connection to the server is kept alive using Hazelcast's built in reconnection capability. This might be fine-tuned
+    by setting the following properties:
+</p>
+
+<ul>
+    <li>Hazelcast Initial Backoff</li>
+    <li>Hazelcast Maximum Backoff</li>
+    <li>Hazelcast Backoff Multiplier</li>
+    <li>Hazelcast Connection Timeout</li>
+</ul>
+
+<p>
+    If the service cannot connect or abruptly disconnected it tries to reconnect after a backoff time. The amount of time waiting
+    before the first attempt is defined by the Initial Backoff. If the connection is still not successful the client waits gradually
+    more between the attempts until the waiting time reaches the value set in the 'Hazelcast Maximum Backoff' property (or the connection timeout,
+    whichever is smaller). The backoff time after the first attempt is always based on the previous amount, multiplied by the Backoff Multiplier.
+    Note: the real backoff time might be slightly differ as some "jitter" is added to the calculation in order to avoid regularity.
+</p>
+
+</body>
+</html>
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/DummyStringSerializer.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/DummyStringSerializer.java
new file mode 100644
index 0000000..0e8a1df
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/DummyStringSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services;
+
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Simple serializer and deserializer for testing purposes.
+ */
+public final class DummyStringSerializer implements Serializer<String>, Deserializer<String> {
+    @Override
+    public void serialize(final String value, final OutputStream output) throws SerializationException, IOException {
+        output.write(value.getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Override
+    public String deserialize(final byte[] input) throws DeserializationException, IOException {
+        return new String(input, StandardCharsets.UTF_8);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cache/HashMapHazelcastCache.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cache/HashMapHazelcastCache.java
new file mode 100644
index 0000000..347eb93
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cache/HashMapHazelcastCache.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cache;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+
+final public class HashMapHazelcastCache implements HazelcastCache {
+    private final String name;
+    private final Map<String, byte[]> values = new HashMap<>();
+    private final Set<String> lockedEntries = new HashSet<>();
+
+    public HashMapHazelcastCache(final String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public byte[] get(final String key) {
+        return values.get(key);
+    }
+
+    @Override
+    public byte[] putIfAbsent(final String key, final byte[] value) {
+        return values.putIfAbsent(key, value);
+    }
+
+    @Override
+    public boolean put(final String key, final byte[] value) {
+        values.put(key, value);
+        return true;
+    }
+
+    @Override
+    public boolean contains(final String key) {
+        return values.containsKey(key);
+    }
+
+    @Override
+    public boolean remove(final String key) {
+        return values.remove(key) != null;
+    }
+
+    @Override
+    public int removeAll(final Predicate<String> keyMatcher) {
+        final Set<String> marked = new HashSet<>();
+
+        for (final String key : values.keySet()) {
+            if (keyMatcher.test(key)) {
+                marked.add(key);
+            }
+        }
+
+        for (final String key : marked) {
+            values.remove(key);
+        }
+
+        return marked.size();
+    }
+
+    @Override
+    public HazelcastCacheEntryLock acquireLock(final String key) {
+        lockedEntries.add(key);
+        return () -> lockedEntries.remove(key);
+    }
+
+    public Set<String> getLockedEntries() {
+        return lockedEntries;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCacheTest.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCacheTest.java
new file mode 100644
index 0000000..7950fbc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCacheTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cache;
+
+import com.hazelcast.map.IMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(MockitoJUnitRunner.class)
+public class IMapBasedHazelcastCacheTest {
+    private static final String KEY = "key";
+    private static final String KEY_2 = "key2";
+    private static final byte[] VALUE = "value".getBytes();
+    private static final byte[] VALUE_2 = "value2".getBytes();
+    private static final long TTL = 5;
+
+    @Mock
+    private IMap<String, byte[]> storage;
+
+    private IMapBasedHazelcastCache testSubject;
+
+    @Before
+    public void setUp() {
+        testSubject = new IMapBasedHazelcastCache(storage, TTL);
+    }
+
+    @Test
+    public void testGet() {
+        // given
+        Mockito.when(storage.get(Mockito.anyString())).thenReturn(VALUE);
+
+        // when
+        final byte[] result = testSubject.get(KEY);
+
+        // then
+        Mockito.verify(storage).get(KEY);
+        Assert.assertEquals(VALUE, result);
+    }
+
+    @Test
+    public void testPutIfAbsent() {
+        // given
+        Mockito.when(storage.putIfAbsent(Mockito.anyString(), Mockito.any(byte[].class), Mockito.anyLong(), Mockito.any(TimeUnit.class))).thenReturn(VALUE_2);
+
+        // when
+        final byte[] result = testSubject.putIfAbsent(KEY, VALUE);
+
+        // then
+        Mockito.verify(storage).putIfAbsent(KEY, VALUE, TTL, TimeUnit.MILLISECONDS);
+        Assert.assertEquals(VALUE_2, result);
+    }
+
+    @Test
+    public void testPut() {
+        // when
+        testSubject.put(KEY, VALUE);
+
+        // then
+        Mockito.verify(storage).put(KEY, VALUE, TTL, TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void testContains() {
+        // given
+        Mockito.when(storage.containsKey(Mockito.anyString())).thenReturn(true);
+
+        // when
+        final boolean result = testSubject.contains(KEY);
+
+        // then
+        Mockito.verify(storage).containsKey(KEY);
+        Assert.assertTrue(result);
+    }
+
+    @Test
+    public void testRemoveWhenExists() {
+        // given
+        Mockito.when(storage.remove(Mockito.anyString())).thenReturn(VALUE);
+
+        // when
+        final boolean result = testSubject.remove(KEY);
+
+        // then
+        Mockito.verify(storage).remove(KEY);
+        Assert.assertTrue(result);
+    }
+
+    @Test
+    public void testRemoveWhenDoesNotExist() {
+        // given
+        Mockito.when(storage.remove(Mockito.anyString())).thenReturn(null);
+
+        // when
+        final boolean result = testSubject.remove(KEY);
+
+        // then
+        Mockito.verify(storage).remove(KEY);
+        Assert.assertFalse(result);
+    }
+
+
+    @Test
+    public void testRemoveAll() {
+        // given
+        Mockito.when(storage.keySet()).thenReturn(new HashSet<>(Arrays.asList(KEY, KEY_2)));
+
+        // when
+        final int result = testSubject.removeAll(s -> true);
+
+        // then
+        Mockito.verify(storage).delete(KEY);
+        Mockito.verify(storage).delete(KEY_2);
+        Assert.assertEquals(2, result);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClientTest.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClientTest.java
new file mode 100644
index 0000000..156aca5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClientTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cacheclient;
+
+
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.hazelcast.services.DummyStringSerializer;
+import org.apache.nifi.hazelcast.services.cache.HashMapHazelcastCache;
+import org.apache.nifi.hazelcast.services.cachemanager.HazelcastCacheManager;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(MockitoJUnitRunner.class)
+public class HazelcastMapCacheClientTest {
+
+    private final static long TTL = 0;
+    private final static String CACHE_NAME = "cache";
+    private final static String KEY = "key";
+    private final static String VALUE = "lorem ipsum";
+    private final static String VALUE_2 = "lorem ipsum dolor sit amet";
+    private final static String VALUE_3 = "cras ac felis tincidunt";
+
+    private final static DummyStringSerializer SERIALIZER = new DummyStringSerializer();
+
+    @Mock
+    private HazelcastCacheManager hazelcastCacheService;
+
+    @Mock
+    private ConfigurationContext configurationContext;
+
+    @Mock
+    private ControllerServiceInitializationContext initializationContext;
+
+    @Mock
+    private PropertyValue propertyValueForTTL;
+
+    @Mock
+    private PropertyValue propertyValueForCacheName;
+
+    @Mock
+    private PropertyValue propertyValueForCacheNameEvaluated;
+
+    @Mock
+    private PropertyValue propertyValueForConnectionService;
+
+    private HashMapHazelcastCache cache;
+    private HazelcastMapCacheClient testSubject;
+
+    @Before
+    public void setUp() throws Exception {
+        cache = new HashMapHazelcastCache(CACHE_NAME);
+
+        Mockito.when(propertyValueForTTL.asTimePeriod(TimeUnit.MILLISECONDS)).thenReturn(TTL);
+        Mockito.when(propertyValueForCacheName.evaluateAttributeExpressions()).thenReturn(propertyValueForCacheNameEvaluated);
+        Mockito.when(propertyValueForCacheNameEvaluated.getValue()).thenReturn(CACHE_NAME);
+        Mockito.when(propertyValueForConnectionService.asControllerService(HazelcastCacheManager.class)).thenReturn(hazelcastCacheService);
+
+        Mockito.when(configurationContext.getProperty(HazelcastMapCacheClient.HAZELCAST_CACHE_MANAGER)).thenReturn(propertyValueForConnectionService);
+        Mockito.when(configurationContext.getProperty(HazelcastMapCacheClient.HAZELCAST_CACHE_NAME)).thenReturn(propertyValueForCacheName);
+        Mockito.when(configurationContext.getProperty(HazelcastMapCacheClient.HAZELCAST_ENTRY_TTL)).thenReturn(propertyValueForTTL);
+
+        Mockito.when(hazelcastCacheService.getCache(CACHE_NAME, TTL)).thenReturn(cache);
+        Mockito.when(initializationContext.getLogger()).thenReturn(Mockito.mock(ComponentLog.class));
+
+        testSubject = new HazelcastMapCacheClient();
+        testSubject.initialize(initializationContext);
+        testSubject.onEnabled(configurationContext);
+    }
+
+    @Test
+    public void testWhenReadingDataBackItDoesNotChange() throws Exception {
+        // when
+        thenEntryIsNotInCache();
+        whenPutEntry(VALUE);
+
+        // then
+        thenEntryIsInCache();
+        thenGetEntryEquals(VALUE);
+    }
+
+    @Test
+    public void testRemoveEntry() throws Exception {
+        // given
+        whenRemoveEntryIsUnsuccessful();
+
+        // when
+        whenPutEntry(VALUE);
+
+        // then
+        thenEntryIsInCache();
+        whenRemoveEntryIsSuccessful();
+        thenEntryIsNotInCache();
+    }
+
+    @Test
+    public void testPutIfAbsent() throws Exception {
+        // given
+        thenEntryIsNotInCache();
+
+        // when
+        whenPutIfAbsentIsSuccessful(VALUE);
+
+        // then
+        thenGetEntryEquals(VALUE);
+
+        // when
+        whenPutIfAbsentIsFailed(VALUE_2);
+
+        // then
+        thenGetEntryEquals(VALUE);
+    }
+
+    @Test
+    public void testGetAndPutIfAbsent() throws Exception {
+        // given
+        thenEntryIsNotInCache();
+
+        // when
+        Assert.assertNull(whenGetAndPutIfAbsent(VALUE));
+
+        // then
+        thenGetEntryEquals(VALUE);
+
+        // when
+        Assert.assertEquals(VALUE, whenGetAndPutIfAbsent(VALUE_2));
+
+        // then
+        thenGetEntryEquals(VALUE);
+    }
+
+    @Test
+    public void testRemoveByPattern() throws Exception {
+        // given
+        whenPutEntry("key1", "a");
+        whenPutEntry("key2", "b");
+        whenPutEntry("key3", "c");
+        whenPutEntry("other", "d");
+
+        // when
+        testSubject.removeByPattern("key.*");
+
+        // then
+        thenEntryIsNotInCache("key1");
+        thenEntryIsNotInCache("key2");
+        thenEntryIsNotInCache("key3");
+        thenEntryIsInCache("other");
+    }
+
+    @Test
+    public void testWhenReplaceNonExistingAtomicEntry() throws Exception {
+        // given
+        thenFetchedEntryIsNull();
+
+        // when
+        whenReplaceEntryIsFailed(5L, VALUE);
+        thenEntryIsNotLocked();
+
+        // then
+        thenFetchedEntryIsNull();
+    }
+
+    @Test
+    public void testReplacingNonExistingAtomicEntry() throws Exception {
+        // given
+        whenReplaceEntryIsSuccessful(0L, VALUE);
+
+        // then
+        thenEntryIsInCache();
+        thenFetchedEntryEquals(1L, VALUE);
+
+        // then
+        thenGetEntryEquals(VALUE);
+    }
+
+    @Test
+    public void testReplacingExistingAtomicEntry() throws Exception {
+        // given
+        whenReplaceEntryIsSuccessful(0L, VALUE);
+        thenEntryIsNotLocked();
+
+        // when
+        whenReplaceEntryIsSuccessful(1L, VALUE_2);
+        thenEntryIsNotLocked();
+
+        // then
+        thenFetchedEntryEquals(2L, VALUE_2);
+
+        // when & then - replace with too low version fails
+        whenReplaceEntryIsFailed(1L, VALUE_3);
+        thenFetchedEntryEquals(2L, VALUE_2);
+
+        // when & then - replace with too high version fails
+        whenReplaceEntryIsFailed(5L, VALUE_3);
+        thenFetchedEntryEquals(2L, VALUE_2);
+    }
+
+    @Test
+    public void testStartVersionWithNull() throws Exception {
+        // given
+        whenReplaceEntryIsSuccessful(null, VALUE);
+
+        // when & then
+        whenReplaceEntryIsSuccessful(1L, VALUE_2);
+    }
+
+    @Test
+    public void testOverrideVersionedEntryWithPut() throws Exception {
+        // given
+        whenReplaceEntryIsSuccessful(0L, VALUE);
+        whenReplaceEntryIsSuccessful(1L, VALUE_2);
+        whenReplaceEntryIsSuccessful(2L, VALUE_3);
+
+        // when
+        whenPutEntry(KEY, VALUE);
+
+        // then - version number is increased
+        thenFetchedEntryEquals(1L, VALUE);
+    }
+
+    @Test
+    public void testReplacingWithTheSame() throws Exception {
+        // given
+        whenReplaceEntryIsSuccessful(0L, VALUE);
+
+        // when
+        whenReplaceEntryIsSuccessful(1L, VALUE);
+
+        // then - version number is increased
+        thenFetchedEntryEquals(2L, VALUE);
+    }
+
+    @Test
+    public void testReplacingExistingEntryAddedWithPut() throws Exception {
+        // given
+        whenPutEntry(VALUE);
+
+        // then
+        thenFetchedEntryEquals(1L, VALUE);
+
+        // when
+        whenReplaceEntryIsSuccessful(1L, VALUE_2);
+
+        // then
+        thenFetchedEntryEquals(2L, VALUE_2);
+    }
+
+    @Test
+    public void testMultiplePutsWillHaveNoAffectOnVersion() throws Exception {
+        // given
+        whenPutEntry(VALUE);
+
+        // when
+        whenPutEntry(VALUE_2);
+
+        // then
+        thenGetEntryEquals(VALUE_2);
+        thenFetchedEntryEquals(1L, VALUE_2);
+    }
+
+    @Test
+    public void testSerialization() throws Exception {
+        // given
+        final Long key = 1L;
+        final Double value = 1.2;
+
+        final Serializer<Long> keySerializer = (x, output) -> output.write(x.toString().getBytes(StandardCharsets.UTF_8));
+        final Serializer<Double> valueSerializer = (x, output) -> output.write(x.toString().getBytes(StandardCharsets.UTF_8));
+        final Deserializer<Double> valueDeserializer = input -> Double.valueOf(new String(input, StandardCharsets.UTF_8));
+
+        // when
+        testSubject.put(key, value, keySerializer, valueSerializer);
+        final Double result = testSubject.get(key, keySerializer, valueDeserializer);
+
+        // then
+        Assert.assertEquals(value, result);
+    }
+
+    private void whenRemoveEntryIsSuccessful() throws IOException {
+        Assert.assertTrue(testSubject.remove(KEY, SERIALIZER));
+    }
+
+    private void whenRemoveEntryIsUnsuccessful() throws IOException {
+        Assert.assertFalse(testSubject.remove(KEY, SERIALIZER));
+    }
+
+    private void whenPutEntry(final String value) throws IOException {
+        whenPutEntry(KEY, value);
+    }
+
+    private void whenPutEntry(final String key, final String value) throws IOException {
+        testSubject.put(key, value, SERIALIZER, SERIALIZER);
+    }
+
+    private void whenPutIfAbsentIsSuccessful(final String value) throws IOException {
+        Assert.assertTrue(testSubject.putIfAbsent(KEY, value, SERIALIZER, SERIALIZER));
+    }
+
+    private void whenPutIfAbsentIsFailed(final String value) throws IOException {
+        Assert.assertFalse(testSubject.putIfAbsent(KEY, value, SERIALIZER, SERIALIZER));
+    }
+
+    private String whenGetAndPutIfAbsent(final String value) throws IOException {
+        return testSubject.getAndPutIfAbsent(KEY, value, SERIALIZER, SERIALIZER, SERIALIZER);
+    }
+
+    private void whenReplaceEntryIsSuccessful(final Long version, final String newValue) throws IOException {
+        final AtomicCacheEntry<String, String, Long> cacheEntry = new AtomicCacheEntry<>(KEY, newValue, version);
+        Assert.assertTrue(testSubject.replace(cacheEntry, SERIALIZER, SERIALIZER));
+    }
+
+    private void whenReplaceEntryIsFailed(final Long version, final String newValue) throws IOException {
+        final AtomicCacheEntry<String, String, Long> cacheEntry = new AtomicCacheEntry<>(KEY, newValue, version);
+        Assert.assertFalse(testSubject.replace(cacheEntry, SERIALIZER, SERIALIZER));
+    }
+
+    private void thenEntryIsNotInCache(final String key) throws IOException {
+        Assert.assertFalse(testSubject.containsKey(key, SERIALIZER));
+    }
+
+    private void thenEntryIsNotInCache() throws IOException {
+        thenEntryIsNotInCache(KEY);
+    }
+
+    private void thenEntryIsInCache(final String key) throws IOException {
+        Assert.assertTrue(testSubject.containsKey(key, SERIALIZER));
+    }
+
+    private void thenEntryIsInCache() throws IOException {
+        thenEntryIsInCache(KEY);
+    }
+
+    private void thenFetchedEntryIsNull() throws Exception {
+        Assert.assertNull(testSubject.fetch(KEY, SERIALIZER, SERIALIZER));
+    }
+
+    private void thenFetchedEntryEquals(final long version, final String value) throws IOException {
+        final AtomicCacheEntry<String, String, Long> result = testSubject.fetch(KEY, SERIALIZER, SERIALIZER);
+        Assert.assertNotNull(result);
+        Assert.assertEquals(version, result.getRevision().get().longValue());
+        Assert.assertEquals(KEY, result.getKey());
+        Assert.assertEquals(value, result.getValue());
+    }
+
+    private void thenGetEntryEquals(final String value) throws IOException {
+        Assert.assertEquals(value, testSubject.get(KEY, SERIALIZER, SERIALIZER));
+    }
+
+    private void thenEntryIsNotLocked() {
+        Assert.assertFalse(cache.getLockedEntries().contains(KEY));
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/AbstractHazelcastCacheManagerTest.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/AbstractHazelcastCacheManagerTest.java
new file mode 100644
index 0000000..dfa0104
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/AbstractHazelcastCacheManagerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import org.apache.nifi.hazelcast.services.cacheclient.HazelcastMapCacheClient;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+public abstract class AbstractHazelcastCacheManagerTest {
+
+    protected TestHazelcastProcessor processor;
+    protected TestRunner testRunner;
+    protected  HazelcastMapCacheClient hazelcastMapCacheClient;
+    protected IMapBasedHazelcastCacheManager testSubject;
+
+    @Before
+    public void setUp() {
+        processor = new TestHazelcastProcessor();
+        testRunner = TestRunners.newTestRunner(processor);
+    }
+
+    @After
+    public void tearDown() {
+        testRunner.disableControllerService(hazelcastMapCacheClient);
+        testRunner.disableControllerService(testSubject);
+        testRunner.shutdown();
+    }
+
+    protected void givenHazelcastMapCacheClient() throws Exception {
+        hazelcastMapCacheClient = new HazelcastMapCacheClient();
+        testRunner.addControllerService("hazelcast-map-cache-client", hazelcastMapCacheClient);
+
+        testRunner.setProperty(hazelcastMapCacheClient, HazelcastMapCacheClient.HAZELCAST_ENTRY_TTL, "20 sec");
+        testRunner.setProperty(hazelcastMapCacheClient, HazelcastMapCacheClient.HAZELCAST_CACHE_NAME, "cache");
+        testRunner.setProperty(hazelcastMapCacheClient, HazelcastMapCacheClient.HAZELCAST_CACHE_MANAGER, "hazelcast-connection-service");
+
+        testRunner.setProperty(TestHazelcastProcessor.TEST_HAZELCAST_MAP_CACHE_CLIENT, "hazelcast-map-cache-client");
+    }
+
+    protected void givenServicesAreEnabled() {
+        testRunner.enableControllerService(testSubject);
+        Assert.assertTrue(testSubject.isEnabled());
+
+        testRunner.enableControllerService(hazelcastMapCacheClient);
+        Assert.assertTrue(hazelcastMapCacheClient.isEnabled());
+    }
+
+    protected void whenExecuting() {
+        testRunner.enqueue("trigger");
+        testRunner.run();
+    }
+
+    protected void thenProcessingIsSuccessful() {
+        testRunner.assertAllFlowFilesTransferred(TestHazelcastProcessor.REL_SUCCESS, 1);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockNodeTypeProvider.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManagerTest.java
similarity index 59%
copy from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockNodeTypeProvider.java
copy to nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManagerTest.java
index 61390e1..d27d150 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockNodeTypeProvider.java
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/EmbeddedHazelcastCacheManagerTest.java
@@ -14,27 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.mock;
+package org.apache.nifi.hazelcast.services.cachemanager;
 
-import org.apache.nifi.controller.NodeTypeProvider;
+import org.junit.Test;
 
-/**
- * A Mock NodeTypeProvider that can be used so that
- * ConfigurableComponents can be initialized for the purpose of generating
- * documentation
- *
- *
- */
-public class MockNodeTypeProvider implements NodeTypeProvider {
+public class EmbeddedHazelcastCacheManagerTest extends AbstractHazelcastCacheManagerTest {
 
-    @Override
-    public boolean isClustered() {
-        return false;
-    }
+    @Test
+    public void testExecution() throws Exception {
+        // given
+        testSubject = new EmbeddedHazelcastCacheManager();
+        testRunner.addControllerService("hazelcast-connection-service", testSubject);
 
-    @Override
-    public boolean isPrimary() {
-        return false;
-    }
+        givenHazelcastMapCacheClient();
+        givenServicesAreEnabled();
+
+        // when
+        whenExecuting();
 
+        // then
+        thenProcessingIsSuccessful();
+    }
 }
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/ExternalHazelcastCacheManagerTest.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/ExternalHazelcastCacheManagerTest.java
new file mode 100644
index 0000000..cccf707
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/ExternalHazelcastCacheManagerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ExternalHazelcastCacheManagerTest extends AbstractHazelcastCacheManagerTest {
+    private Thread hazelcastServer;
+
+    @Before
+    public void setUp() {
+        hazelcastServer = new Thread(new Runnable() {
+            HazelcastInstance hazelcastInstance;
+
+            @Override
+            public void run() {
+                final Config config = new Config();
+                config.getNetworkConfig().setPort(5704);
+                config.setClusterName("nifi");
+                hazelcastInstance = Hazelcast.newHazelcastInstance(config);
+
+                while (!Thread.currentThread().isInterrupted()) {
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                        Thread.currentThread().interrupt();
+                    }
+                }
+
+                hazelcastInstance.shutdown();
+                hazelcastInstance = null;
+            }
+        });
+
+        hazelcastServer.start();
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() {
+        super.tearDown();
+        hazelcastServer.interrupt();
+    }
+
+    @Test
+    public void testExecution() throws Exception {
+        // given
+        testSubject = new ExternalHazelcastCacheManager();
+        testRunner.addControllerService("hazelcast-connection-service", testSubject);
+        testRunner.setProperty(testSubject, ExternalHazelcastCacheManager.HAZELCAST_SERVER_ADDRESS, "localhost:5704");
+
+        givenHazelcastMapCacheClient();
+        givenServicesAreEnabled();
+
+        // when
+        whenExecuting();
+
+        // then
+        thenProcessingIsSuccessful();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/TestHazelcastProcessor.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/TestHazelcastProcessor.java
new file mode 100644
index 0000000..1cbdf6a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/TestHazelcastProcessor.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hazelcast.services.cachemanager;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hazelcast.services.DummyStringSerializer;
+import org.apache.nifi.hazelcast.services.cacheclient.HazelcastMapCacheClient;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+class TestHazelcastProcessor extends AbstractProcessor {
+    private static final String KEY_1 = "key1";
+    private static final String KEY_2 = "key2";
+    private static final String VALUE_1 = "value1";
+    private static final String VALUE_2 = "value2";
+
+    private static final DummyStringSerializer SERIALIZER = new DummyStringSerializer();
+
+    public static final PropertyDescriptor TEST_HAZELCAST_MAP_CACHE_CLIENT = new PropertyDescriptor.Builder()
+            .name("test-hazelcast-map-cache-client")
+            .displayName("Test Hazelcast Map Cache Client")
+            .identifiesControllerService(HazelcastMapCacheClient.class)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Collections.singletonList(TEST_HAZELCAST_MAP_CACHE_CLIENT);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE));
+    }
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").build();
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+
+        if (flowFile == null) {
+            return;
+        }
+
+        final HazelcastMapCacheClient testSubject = context.getProperty(TEST_HAZELCAST_MAP_CACHE_CLIENT).asControllerService(HazelcastMapCacheClient.class);
+
+        try {
+            Assert.assertFalse(testSubject.containsKey(KEY_1, SERIALIZER));
+            testSubject.put(KEY_1, VALUE_1, SERIALIZER, SERIALIZER);
+            Assert.assertTrue(testSubject.containsKey(KEY_1, SERIALIZER));
+            Assert.assertEquals(VALUE_1, testSubject.get(KEY_1, SERIALIZER, SERIALIZER));
+            Assert.assertTrue(testSubject.remove(KEY_1, SERIALIZER));
+            Assert.assertFalse(testSubject.containsKey(KEY_1, SERIALIZER));
+
+            Assert.assertNull(testSubject.getAndPutIfAbsent(KEY_2, VALUE_2, SERIALIZER, SERIALIZER, SERIALIZER));
+            Assert.assertEquals(VALUE_2, testSubject.getAndPutIfAbsent(KEY_2, VALUE_2, SERIALIZER, SERIALIZER, SERIALIZER));
+            testSubject.put(KEY_1, VALUE_1, SERIALIZER, SERIALIZER);
+
+            Assert.assertTrue(testSubject.containsKey(KEY_1, SERIALIZER));
+            Assert.assertTrue(testSubject.containsKey(KEY_2, SERIALIZER));
+
+            Assert.assertEquals(2, testSubject.removeByPattern("key.*"));
+
+            Assert.assertTrue(testSubject.replace(new AtomicCacheEntry<>(KEY_1, VALUE_1, 0L), SERIALIZER, SERIALIZER));
+            Assert.assertEquals(VALUE_1, testSubject.fetch(KEY_1, SERIALIZER, SERIALIZER).getValue());
+
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (final AssertionError| IOException e) {
+            session.transfer(flowFile, REL_FAILURE);
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/pom.xml b/nifi-nar-bundles/nifi-hazelcast-bundle/pom.xml
new file mode 100644
index 0000000..44a8eae
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hazelcast-bundle/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-hazelcast-bundle</artifactId>
+    <version>1.13.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-hazelcast-services-api</module>
+        <module>nifi-hazelcast-services-api-nar</module>
+
+        <module>nifi-hazelcast-services</module>
+        <module>nifi-hazelcast-services-nar</module>
+    </modules>
+</project>
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 12bbb28..0047114 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -102,6 +102,7 @@
         <module>nifi-sql-reporting-bundle</module>
 	<module>nifi-rules-action-handler-bundle</module>
 	<module>nifi-accumulo-bundle</module>
+        <module>nifi-hazelcast-bundle</module>
     </modules>
 
     <build>


Mime
View raw message