nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ijokaruma...@apache.org
Subject nifi git commit: NIFI-4504, NIFI-4505 added removeAndGet, removeByPatternAndGet, and keySet methods to MapCache API cleaned up some warnings on deprecated nifi.stream.io classes
Date Thu, 28 Dec 2017 15:24:19 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 2fbe922a2 -> c59a96762


NIFI-4504, NIFI-4505 added removeAndGet, removeByPatternAndGet, and keySet methods to MapCache
API
  cleaned up some warnings on deprecated nifi.stream.io classes

This closes #2284.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c59a9676
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c59a9676
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c59a9676

Branch: refs/heads/master
Commit: c59a967623da5c1dede58e4b8ff21ddadab913fe
Parents: 2fbe922
Author: Mike Moser <mosermw@apache.org>
Authored: Wed Nov 15 21:54:46 2017 +0000
Committer: Koji Kawamura <ijokarumawak@apache.org>
Committed: Fri Dec 29 00:23:09 2017 +0900

----------------------------------------------------------------------
 .../standard/TestDetectDuplicate.java           |  7 +-
 .../cache/client/DistributedMapCacheClient.java | 45 +++++++++++++
 .../DistributedMapCacheClientService.java       | 71 ++++++++++++++++++++
 .../DistributedSetCacheClientService.java       |  4 +-
 .../cache/client/SSLCommsSession.java           |  4 +-
 .../cache/client/StandardCommsSession.java      |  4 +-
 .../cache/protocol/ProtocolHandshake.java       |  1 +
 .../cache/server/SetCacheServer.java            |  2 +-
 .../distributed/cache/server/map/MapCache.java  |  3 +
 .../cache/server/map/MapCacheServer.java        | 48 +++++++++++++
 .../cache/server/map/PersistentMapCache.java    |  6 ++
 .../cache/server/map/SimpleMapCache.java        | 11 +++
 .../cache/server/TestServerAndClient.java       | 48 +++++++++++++
 13 files changed, 246 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
index ef0bd59..99c8ef5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
@@ -244,7 +244,12 @@ public class TestDetectDuplicate {
 
         @Override
         public long removeByPattern(String regex) throws IOException {
-            return exists ? 1L : 0L;
+            if (exists) {
+                exists = false;
+                return 1L;
+            } else {
+                return 0L;
+            }
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
index d2e0085..7fa6a61 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
@@ -167,6 +167,23 @@ public interface DistributedMapCacheClient extends ControllerService
{
     <K> boolean remove(K key, Serializer<K> serializer) throws IOException;
 
     /**
+     * Removes the entry with the given key from the cache, if it is present,
+     * and returns the value that was removed from the map.
+     *
+     * @param <K> type of key
+     * @param <V> type of value
+     * @param key key
+     * @param keySerializer key serializer
+     * @param valueDeserializer value deserializer
+     * @return the value previously associated with the key, or null if there was no mapping
+     * null can also indicate that the map previously associated null with the key
+     * @throws IOException ex
+     */
+    default <K, V> V removeAndGet(K key, Serializer<K> keySerializer, Deserializer<V>
valueDeserializer) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
      * Removes entries whose keys match the specified pattern
      *
      * @param regex The regular expression / pattern on which to match the keys to be removed
@@ -174,4 +191,32 @@ public interface DistributedMapCacheClient extends ControllerService
{
      * @throws IOException if any error occurred while removing an entry
      */
     long removeByPattern(String regex) throws IOException;
+
+    /**
+     * Removes entries whose keys match the specified pattern, and returns a map of entries
that
+     * were removed.
+     *
+     * @param <K> type of key
+     * @param <V> type of value
+     * @param regex The regular expression / pattern on which to match the keys to be removed
+     * @param keyDeserializer key deserializer
+     * @param valueDeserializer value deserializer
+     * @return A map of key/value entries that were removed from the cache
+     * @throws IOException if any error occurred while removing an entry
+     */
+    default <K, V> Map<K, V> removeByPatternAndGet(String regex, Deserializer<K>
keyDeserializer, Deserializer<V> valueDeserializer) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Returns a set of all keys currently in the cache
+     *
+     * @param <K> type of key
+     * @param keyDeserializer key deserializer
+     * @return a Set of all keys currently in the cache
+     * @throws IOException ex
+     */
+    default <K> Set<K> keySet(Deserializer<K> keyDeserializer) throws IOException
{
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index c454063..e655121 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -22,6 +22,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -256,6 +257,27 @@ public class DistributedMapCacheClientService extends AbstractControllerService
     }
 
     @Override
+    public <K, V> V removeAndGet(K key, Serializer<K> keySerializer, Deserializer<V>
valueDeserializer) throws IOException {
+        return withCommsSession(new CommsAction<V>() {
+            @Override
+            public V execute(final CommsSession session) throws IOException {
+                validateProtocolVersion(session, 3);
+
+                final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+                dos.writeUTF("removeAndGet");
+
+                serialize(key, keySerializer, dos);
+                dos.flush();
+
+                // read response
+                final DataInputStream dis = new DataInputStream(session.getInputStream());
+                final byte[] responseBuffer = readLengthDelimitedResponse(dis);
+                return valueDeserializer.deserialize(responseBuffer);
+            }
+        });
+    }
+
+    @Override
     public long removeByPattern(String regex) throws IOException {
         return withCommsSession(session -> {
             final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
@@ -270,6 +292,34 @@ public class DistributedMapCacheClientService extends AbstractControllerService
     }
 
     @Override
+    public <K, V> Map<K, V> removeByPatternAndGet(String regex, Deserializer<K>
keyDeserializer, Deserializer<V> valueDeserializer) throws IOException {
+        return withCommsSession(new CommsAction<Map<K, V>>() {
+            @Override
+            public Map<K, V> execute(CommsSession session) throws IOException {
+                validateProtocolVersion(session, 3);
+
+                final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+                dos.writeUTF("removeByPatternAndGet");
+                dos.writeUTF(regex);
+                dos.flush();
+
+                // read response
+                final DataInputStream dis = new DataInputStream(session.getInputStream());
+                final int mapSize = dis.readInt();
+                HashMap<K, V> resultMap = new HashMap<>(mapSize);
+                for (int i=0; i<mapSize; i++) {
+                    final byte[] keyBuffer = readLengthDelimitedResponse(dis);
+                    K key = keyDeserializer.deserialize(keyBuffer);
+                    final byte[] valueBuffer = readLengthDelimitedResponse(dis);
+                    V value = valueDeserializer.deserialize(valueBuffer);
+                    resultMap.put(key, value);
+                }
+                return resultMap;
+            }
+        });
+    }
+
+    @Override
     @SuppressWarnings("unchecked")
     public <K, V> AtomicCacheEntry<K, V, Long> fetch(K key, Serializer<K>
keySerializer, Deserializer<V> valueDeserializer) throws IOException {
         return withCommsSession(session -> {
@@ -321,6 +371,27 @@ public class DistributedMapCacheClientService extends AbstractControllerService
         });
     }
 
+    @Override
+    public <K> Set<K> keySet(Deserializer<K> keyDeserializer) throws IOException
{
+        return withCommsSession(session -> {
+            validateProtocolVersion(session, 3);
+
+            final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+            dos.writeUTF("keySet");
+            dos.flush();
+
+            // read response
+            final DataInputStream dis = new DataInputStream(session.getInputStream());
+            final int setSize = dis.readInt();
+            HashSet<K> resultSet = new HashSet<>(setSize);
+            for (int i=0; i<setSize; i++) {
+                final byte[] responseBuffer = readLengthDelimitedResponse(dis);
+                resultSet.add(keyDeserializer.deserialize(responseBuffer));
+            }
+            return resultSet;
+        });
+    }
+
     private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException
{
         final int responseLength = dis.readInt();
         final byte[] responseBuffer = new byte[responseLength];

http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
index 34a0a7c..c35900e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
@@ -16,7 +16,9 @@
  */
 package org.apache.nifi.distributed.cache.client;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -40,8 +42,6 @@ import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.remote.VersionNegotiator;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.ssl.SSLContextService.ClientAuth;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
-import org.apache.nifi.stream.io.DataOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
index 18ca571..3089eeb 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.distributed.cache.client;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -25,8 +27,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.SSLContext;
 
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
 import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
 import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
index 7545bef..d157161 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.distributed.cache.client;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -25,8 +27,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.SSLContext;
 
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.remote.io.InterruptableInputStream;
 import org.apache.nifi.remote.io.InterruptableOutputStream;
 import org.apache.nifi.remote.io.socket.SocketChannelInputStream;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
index 3df2f09..3f2e26a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
@@ -38,6 +38,7 @@ public class ProtocolHandshake {
      * If the server doesn't support requested protocol version, HandshakeException will
be thrown.</p>
      *
      * <p>DistributedMapCache version histories:<ul>
+     *     <li>3: Added subMap, keySet, removeAndGet, removeByPatternAndGet methods.</li>
      *     <li>2: Added atomic update operations (fetch and replace) using optimistic
lock with revision number.</li>
      *     <li>1: Initial version.</li>
      * </ul></p>

http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
index 3dd224b..3e6e7a9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.distributed.cache.server;
 
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -29,7 +30,6 @@ import org.apache.nifi.distributed.cache.server.set.PersistentSetCache;
 import org.apache.nifi.distributed.cache.server.set.SetCache;
 import org.apache.nifi.distributed.cache.server.set.SetCacheResult;
 import org.apache.nifi.distributed.cache.server.set.SimpleSetCache;
-import org.apache.nifi.stream.io.DataOutputStream;
 
 public class SetCacheServer extends AbstractCacheServer {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
index bbffbf9..e007ff0 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public interface MapCache {
 
@@ -41,5 +42,7 @@ public interface MapCache {
 
     MapPutResult replace(MapCacheRecord record) throws IOException;
 
+    Set<ByteBuffer> keySet() throws IOException;
+
     void shutdown() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
index a0a01c1..57af28e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.Set;
 
 import javax.net.ssl.SSLContext;
 
@@ -144,12 +145,47 @@ public class MapCacheServer extends AbstractCacheServer {
                 dos.writeBoolean(removed);
                 break;
             }
+            case "removeAndGet": {
+                final byte[] key = readValue(dis);
+                final ByteBuffer removed = cache.remove(ByteBuffer.wrap(key));
+                if (removed == null) {
+                    // there was no value removed
+                    dos.writeInt(0);
+                } else {
+                    // reply with the value that was removed
+                    final byte[] byteArray = removed.array();
+                    dos.writeInt(byteArray.length);
+                    dos.write(byteArray);
+                }
+                break;
+            }
             case "removeByPattern": {
                 final String pattern = dis.readUTF();
                 final Map<ByteBuffer, ByteBuffer> removed = cache.removeByPattern(pattern);
                 dos.writeLong(removed == null ? 0 : removed.size());
                 break;
             }
+            case "removeByPatternAndGet": {
+                final String pattern = dis.readUTF();
+                final Map<ByteBuffer, ByteBuffer> removed = cache.removeByPattern(pattern);
+                if (removed == null || removed.size() == 0) {
+                    dos.writeLong(0);
+                } else {
+                    // write the map size
+                    dos.writeInt(removed.size());
+                    for (Map.Entry<ByteBuffer, ByteBuffer> entry : removed.entrySet())
{
+                        // write map entry key
+                        final byte[] key = entry.getKey().array();
+                        dos.writeInt(key.length);
+                        dos.write(key);
+                        // write map entry value
+                        final byte[] value = entry.getValue().array();
+                        dos.writeInt(value.length);
+                        dos.write(value);
+                    }
+                }
+                break;
+            }
             case "fetch": {
                 final byte[] key = readValue(dis);
                 final MapCacheRecord existing = cache.fetch(ByteBuffer.wrap(key));
@@ -175,6 +211,18 @@ public class MapCacheServer extends AbstractCacheServer {
                 dos.writeBoolean(result.isSuccessful());
                 break;
             }
+            case "keySet": {
+                final Set<ByteBuffer> result = cache.keySet();
+                // write the set size
+                dos.writeInt(result.size());
+                // write each key in the set
+                for (ByteBuffer bb : result) {
+                    final byte[] byteArray = bb.array();
+                    dos.writeInt(byteArray.length);
+                    dos.write(byteArray);
+                }
+                break;
+            }
             default: {
                 throw new IOException("Illegal Request");
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
index 6bf6e5a..c1eebd6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -171,6 +172,11 @@ public class PersistentMapCache implements MapCache {
     }
 
     @Override
+    public Set<ByteBuffer> keySet() throws IOException {
+        return wrapped.keySet();
+    }
+
+    @Override
     public void shutdown() throws IOException {
         wali.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
index df78332..8571432 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.locks.Lock;
@@ -274,6 +275,16 @@ public class SimpleMapCache implements MapCache {
     }
 
     @Override
+    public Set<ByteBuffer> keySet() throws IOException {
+        readLock.lock();
+        try {
+            return cache.keySet();
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
     public void shutdown() throws IOException {
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c59a9676/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
index 419a471..e2a74d4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
@@ -30,6 +30,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SystemUtils;
@@ -274,6 +275,13 @@ public class TestServerAndClient {
         assertTrue(contains);
         assertTrue(contains2);
 
+        final Deserializer<String> deserializer = new StringDeserializer();
+        final Set<String> keys = client.keySet(deserializer);
+        assertEquals(3, keys.size());
+        assertTrue(keys.contains("test"));
+        assertTrue(keys.contains("test2"));
+        assertTrue(keys.contains("test3"));
+
         final boolean addedAgain = client.putIfAbsent("test", "1", serializer, serializer);
         assertFalse(addedAgain);
 
@@ -307,6 +315,19 @@ public class TestServerAndClient {
         assertFalse(client.containsKey("test.2", serializer));
         assertTrue(client.containsKey("test3", serializer));
 
+        // test removeByPatternAndGet
+        client.put("test.1", "1", serializer, serializer);
+        client.put("test.2", "2", serializer, serializer);
+        Map<String,String> removed = client.removeByPatternAndGet("test\\..*", deserializer,
deserializer);
+        assertEquals(2, removed.size());
+        assertTrue(removed.containsKey("test.1"));
+        assertTrue(removed.containsKey("test.2"));
+        assertFalse(client.containsKey("test.1", serializer));
+        assertFalse(client.containsKey("test.2", serializer));
+        assertTrue(client.containsKey("test3", serializer));
+        removed = client.removeByPatternAndGet("test\\..*", deserializer, deserializer);
+        assertEquals(0, removed.size());
+
         newServer.shutdownServer();
         client.close();
     }
@@ -437,6 +458,16 @@ public class TestServerAndClient {
         assertTrue(removed);
         LOGGER.debug("end remove");
 
+        client.put("testKey", "testValue", keySerializer, valueSerializer);
+        assertTrue(client.containsKey("testKey", keySerializer));
+        String removedValue = client.removeAndGet("testKey", keySerializer, deserializer);
+        assertEquals("testValue", removedValue);
+        removedValue = client.removeAndGet("testKey", keySerializer, deserializer);
+        assertNull(removedValue);
+
+        final Set<String> keys = client.keySet(deserializer);
+        assertEquals(0, keys.size());
+
         // Test removeByPattern, the first two should be removed and the last should remain
         client.put("test.1", "1", keySerializer, keySerializer);
         client.put("test.2", "2", keySerializer, keySerializer);
@@ -687,6 +718,23 @@ public class TestServerAndClient {
         } catch (UnsupportedOperationException e) {
         }
 
+        try {
+            Set<String> keys = client.keySet(stringDeserializer);
+            fail("Version 3 operations should NOT work.");
+        } catch (UnsupportedOperationException e) {
+        }
+
+        try {
+            String removed = client.removeAndGet("v.*", stringSerializer, stringDeserializer);
+            fail("Version 3 operations should NOT work.");
+        } catch (UnsupportedOperationException e) {
+        }
+
+        try {
+            Map<String, String> removed = client.removeByPatternAndGet("v.*", stringDeserializer,
stringDeserializer);
+            fail("Version 3 operations should NOT work.");
+        } catch (UnsupportedOperationException e) {
+        }
         client.close();
         server.shutdownServer();
     }


Mime
View raw message