geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bogle...@apache.org
Subject [2/2] incubator-geode git commit: GEODE-967: Added xml support for GatewayEventSubstitutionFilter
Date Thu, 18 Feb 2016 19:13:53 GMT
GEODE-967: Added xml support for GatewayEventSubstitutionFilter

- added configuration to AsyncEventQueueCreation
- added support to generate xml for GatewayEventSubstitutionFilter for
both GatewaySender and AsyncEventQueue


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/609e2395
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/609e2395
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/609e2395

Branch: refs/heads/develop
Commit: 609e2395d6cdd3e22abffdfe04aab511c4169c91
Parents: e685fd8
Author: Barry Oglesby <boglesby@pivotal.io>
Authored: Wed Feb 17 10:10:59 2016 -0800
Committer: Barry Oglesby <boglesby@pivotal.io>
Committed: Thu Feb 18 11:13:42 2016 -0800

----------------------------------------------------------------------
 .../cache/xmlcache/AsyncEventQueueCreation.java |   1 +
 .../cache/xmlcache/CacheXmlGenerator.java       |  37 +++-
 .../cache/wan/AsyncEventQueueTestBase.java      | 217 ++++++++++---------
 .../asyncqueue/AsyncEventListenerDUnitTest.java |  38 +++-
 .../cache/CacheXml80GatewayDUnitTest.java       |  72 +++++-
 5 files changed, 240 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/609e2395/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
index 60afc14..77f9596 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
@@ -63,6 +63,7 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
     this.asyncEventListener = eventListener;
     this.isBucketSorted = senderAttrs.isBucketSorted; 
     this.isHDFSQueue = senderAttrs.isHDFSQueue;
+    this.gatewayEventSubstitutionFilter = senderAttrs.eventSubstitutionFilter;
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/609e2395/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
index 47c341c..4ba1409 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
@@ -44,6 +44,7 @@ import javax.xml.transform.TransformerFactory;
 import javax.xml.transform.sax.SAXSource;
 import javax.xml.transform.stream.StreamResult;
 
+import com.gemstone.gemfire.cache.wan.*;
 import org.xml.sax.Attributes;
 import org.xml.sax.ContentHandler;
 import org.xml.sax.DTDHandler;
@@ -104,10 +105,6 @@ import com.gemstone.gemfire.cache.query.internal.index.PrimaryKeyIndex;
 import com.gemstone.gemfire.cache.server.CacheServer;
 import com.gemstone.gemfire.cache.server.ServerLoadProbe;
 import com.gemstone.gemfire.cache.util.ObjectSizer;
-import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
-import com.gemstone.gemfire.cache.wan.GatewayReceiver;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
 import com.gemstone.gemfire.distributed.Role;
 import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.cache.AbstractRegion;
@@ -1468,6 +1465,12 @@ public class CacheXmlGenerator extends CacheXml implements XMLReader
{
          generateGatewayEventFilter(gef);
       }
 
+      if (this.version.compareTo(CacheXmlVersion.VERSION_8_0) >= 0) {
+        if (sender.getGatewayEventSubstitutionFilter() != null) {
+          generateGatewayEventSubstitutionFilter(sender.getGatewayEventSubstitutionFilter());
+        }
+      }
+
       for (GatewayTransportFilter gsf : sender.getGatewayTransportFilters()) {
         generateGatewayTransportFilter(gsf);
      }
@@ -1532,7 +1535,13 @@ public class CacheXmlGenerator extends CacheXml implements XMLReader
{
     		generateGatewayEventFilter(eventFilter);
     	  }
       }
-      
+
+      if (this.version.compareTo(CacheXmlVersion.VERSION_8_0) >= 0) {
+        if (asyncEventQueue.getGatewayEventSubstitutionFilter() != null) {
+          generateGatewayEventSubstitutionFilter(asyncEventQueue.getGatewayEventSubstitutionFilter());
+        }
+      }
+
       AsyncEventListener asyncListener = asyncEventQueue.getAsyncEventListener();
       if (asyncListener != null) {
         generate(ASYNC_EVENT_LISTENER, asyncListener);
@@ -1628,6 +1637,24 @@ public class CacheXmlGenerator extends CacheXml implements XMLReader
{
     }
     handler.endElement("", GATEWAY_TRANSPORT_FILTER, GATEWAY_TRANSPORT_FILTER);
   }
+
+  private void generateGatewayEventSubstitutionFilter(GatewayEventSubstitutionFilter filter)
+      throws SAXException {
+
+    handler.startElement("", GATEWAY_EVENT_SUBSTITUTION_FILTER, GATEWAY_EVENT_SUBSTITUTION_FILTER,
+        EMPTY);
+    String className = filter.getClass().getName();
+
+    handler.startElement("", CLASS_NAME, CLASS_NAME, EMPTY);
+    handler.characters(className.toCharArray(), 0, className.length());
+    handler.endElement("", CLASS_NAME, CLASS_NAME);
+    Properties props = null;
+    if (filter instanceof Declarable2) {
+      props = ((Declarable2)filter).getConfig();
+      generate(props, null);
+    }
+    handler.endElement("", GATEWAY_EVENT_SUBSTITUTION_FILTER, GATEWAY_EVENT_SUBSTITUTION_FILTER);
+  }
 //
 //  private void generateGatewayEventListener(GatewayEventListener gef)
 //      throws SAXException {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/609e2395/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
index ff918b8..e6efcb2 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -32,6 +32,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.gemstone.gemfire.cache.AttributesFactory;
 import com.gemstone.gemfire.cache.Cache;
@@ -61,12 +62,8 @@ import com.gemstone.gemfire.cache.control.RebalanceResults;
 import com.gemstone.gemfire.cache.control.ResourceManager;
 import com.gemstone.gemfire.cache.persistence.PartitionOfflineException;
 import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
-import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
-import com.gemstone.gemfire.cache.wan.GatewayReceiver;
-import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.*;
 import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
-import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
 import com.gemstone.gemfire.distributed.Locator;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
@@ -190,13 +187,7 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
         .getName());
     try {
       AttributesFactory fact = new AttributesFactory();
-      if (asyncQueueIds != null) {
-        StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
-        while (tokenizer.hasMoreTokens()) {
-          String asyncQueueId = tokenizer.nextToken();
-          fact.addAsyncEventQueueId(asyncQueueId);
-        }
-      }
+      addAsyncEventQueueIds(fact, asyncQueueIds);
       fact.setDataPolicy(DataPolicy.REPLICATE);
       fact.setOffHeap(offHeap);
       RegionFactory regionFactory = cache.createRegionFactory(fact.create());
@@ -212,6 +203,16 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
       String regionName, String asyncQueueIds) {
 
     AttributesFactory fact = new AttributesFactory();
+    addAsyncEventQueueIds(fact, asyncQueueIds);
+    fact.setDataPolicy(DataPolicy.REPLICATE);
+    // set the CacheLoader
+    fact.setCacheLoader(new MyCacheLoader());
+    RegionFactory regionFactory = cache.createRegionFactory(fact.create());
+    Region r = regionFactory.create(regionName);
+    assertNotNull(r);
+  }
+
+  private static void addAsyncEventQueueIds(AttributesFactory fact, String asyncQueueIds)
{
     if (asyncQueueIds != null) {
       StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
       while (tokenizer.hasMoreTokens()) {
@@ -219,12 +220,6 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
         fact.addAsyncEventQueueId(asyncQueueId);
       }
     }
-    fact.setDataPolicy(DataPolicy.REPLICATE);
-    // set the CacheLoader
-    fact.setCacheLoader(new MyCacheLoader());
-    RegionFactory regionFactory = cache.createRegionFactory(fact.create());
-    Region r = regionFactory.create(regionName);
-    assertNotNull(r);
   }
 
   public static void createReplicatedRegionWithSenderAndAsyncEventQueue(
@@ -258,37 +253,21 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
       boolean isParallel, Integer maxMemory, Integer batchSize,
       boolean isConflation, boolean isPersistent, String diskStoreName,
       boolean isDiskSynchronous) {
-
-    if (diskStoreName != null) {
-      File directory = new File(asyncChannelId + "_disk_"
-          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
-      directory.mkdir();
-      File[] dirs1 = new File[] { directory };
-      DiskStoreFactory dsf = cache.createDiskStoreFactory();
-      dsf.setDiskDirs(dirs1);
-      DiskStore ds = dsf.create(diskStoreName);
-    }
+    createDiskStore(asyncChannelId, diskStoreName);
 
     AsyncEventListener asyncEventListener = new MyAsyncEventListener();
 
-    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
-    factory.setBatchSize(batchSize);
-    factory.setPersistent(isPersistent);
-    factory.setDiskStoreName(diskStoreName);
+    AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory,
batchSize, isPersistent, diskStoreName);
     factory.setDiskSynchronous(isDiskSynchronous);
     factory.setBatchConflationEnabled(isConflation);
-    factory.setMaximumQueueMemory(maxMemory);
-    factory.setParallel(isParallel);
     // set dispatcher threads
     factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    // Set GatewayEventSubstitutionFilter
     AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
         asyncEventListener);
   }
 
-  public static void createAsyncEventQueueWithListener2(String asyncChannelId,
-      boolean isParallel, Integer maxMemory, Integer batchSize,
-      boolean isPersistent, String diskStoreName) {
-
+  private static void createDiskStore(String asyncChannelId, String diskStoreName) {
     if (diskStoreName != null) {
       File directory = new File(asyncChannelId + "_disk_"
           + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
@@ -298,15 +277,17 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
       dsf.setDiskDirs(dirs1);
       DiskStore ds = dsf.create(diskStoreName);
     }
+  }
+
+  public static void createAsyncEventQueueWithListener2(String asyncChannelId,
+      boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isPersistent, String diskStoreName) {
+
+    createDiskStore(asyncChannelId, diskStoreName);
 
     AsyncEventListener asyncEventListener = new MyAsyncEventListener2();
 
-    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
-    factory.setBatchSize(batchSize);
-    factory.setPersistent(isPersistent);
-    factory.setDiskStoreName(diskStoreName);
-    factory.setMaximumQueueMemory(maxMemory);
-    factory.setParallel(isParallel);
+    AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory,
batchSize, isPersistent, diskStoreName);
     // set dispatcher threads
     factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
     AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
@@ -317,46 +298,40 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
       boolean isParallel, Integer maxMemory, Integer batchSize,
       boolean isConflation, boolean isPersistent, String diskStoreName,
       boolean isDiskSynchronous, String asyncListenerClass) throws Exception {
+    createAsyncEventQueue(asyncChannelId, isParallel, maxMemory, batchSize, isConflation,
isPersistent,
+        diskStoreName, isDiskSynchronous, asyncListenerClass, null);
+  }
 
-    if (diskStoreName != null) {
-      File directory = new File(asyncChannelId + "_disk_"
-          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
-      directory.mkdir();
-      File[] dirs1 = new File[] { directory };
-      DiskStoreFactory dsf = cache.createDiskStoreFactory();
-      dsf.setDiskDirs(dirs1);
-      DiskStore ds = dsf.create(diskStoreName);
+  public static void createAsyncEventQueue(String asyncChannelId,
+      boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isConflation, boolean isPersistent, String diskStoreName,
+      boolean isDiskSynchronous, String asyncListenerClass,
+      String substitutionFilterClass) throws Exception {
+
+    createDiskStore(asyncChannelId, diskStoreName);
+
+    AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory,
batchSize, isPersistent, diskStoreName);
+    factory.setDiskSynchronous(isDiskSynchronous);
+    factory.setBatchConflationEnabled(isConflation);
+    if (substitutionFilterClass != null) {
+      factory.setGatewayEventSubstitutionListener((GatewayEventSubstitutionFilter)getClass(substitutionFilterClass).newInstance());
     }
+    // set dispatcher threads
+    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    AsyncEventQueue asyncChannel = factory.create(asyncChannelId, (AsyncEventListener)getClass(asyncListenerClass).newInstance());
+  }
 
+  private static Class getClass(String simpleClassName) throws Exception {
     String packagePrefix = "com.gemstone.gemfire.internal.cache.wan.";
-    String className = packagePrefix + asyncListenerClass;
-    AsyncEventListener asyncEventListener = null;
+    String className = packagePrefix + simpleClassName;
+    Class clazz = null;
     try {
-      Class clazz = Class.forName(className);
-      asyncEventListener = (AsyncEventListener)clazz.newInstance();
+      clazz = Class.forName(className);
     }
-    catch (ClassNotFoundException e) {
+    catch (Exception e) {
       throw e;
     }
-    catch (InstantiationException e) {
-      throw e;
-    }
-    catch (IllegalAccessException e) {
-      throw e;
-    }
-
-    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
-    factory.setBatchSize(batchSize);
-    factory.setPersistent(isPersistent);
-    factory.setDiskStoreName(diskStoreName);
-    factory.setDiskSynchronous(isDiskSynchronous);
-    factory.setBatchConflationEnabled(isConflation);
-    factory.setMaximumQueueMemory(maxMemory);
-    factory.setParallel(isParallel);
-    // set dispatcher threads
-    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
-    AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
-        asyncEventListener);
+    return clazz;
   }
 
   public static void createAsyncEventQueueWithCustomListener(
@@ -377,24 +352,11 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
         .getName());
 
     try {
-      if (diskStoreName != null) {
-        File directory = new File(asyncChannelId + "_disk_"
-            + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
-        directory.mkdir();
-        File[] dirs1 = new File[] { directory };
-        DiskStoreFactory dsf = cache.createDiskStoreFactory();
-        dsf.setDiskDirs(dirs1);
-        DiskStore ds = dsf.create(diskStoreName);
-      }
+      createDiskStore(asyncChannelId, diskStoreName);
 
       AsyncEventListener asyncEventListener = new CustomAsyncEventListener();
 
-      AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
-      factory.setBatchSize(batchSize);
-      factory.setPersistent(isPersistent);
-      factory.setDiskStoreName(diskStoreName);
-      factory.setMaximumQueueMemory(maxMemory);
-      factory.setParallel(isParallel);
+      AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory,
batchSize, isPersistent, diskStoreName);
       factory.setDispatcherThreads(nDispatchers);
       AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
           asyncEventListener);
@@ -404,32 +366,29 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
     }
   }
 
+  private static AsyncEventQueueFactory getInitialAsyncEventQueueFactory(boolean isParallel,
Integer maxMemory, Integer batchSize,
+      boolean isPersistent, String diskStoreName) {
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setPersistent(isPersistent);
+    factory.setDiskStoreName(diskStoreName);
+    factory.setMaximumQueueMemory(maxMemory);
+    factory.setParallel(isParallel);
+    return factory;
+  }
+
   public static void createConcurrentAsyncEventQueue(String asyncChannelId,
       boolean isParallel, Integer maxMemory, Integer batchSize,
       boolean isConflation, boolean isPersistent, String diskStoreName,
       boolean isDiskSynchronous, int dispatcherThreads, OrderPolicy policy) {
 
-    if (diskStoreName != null) {
-      File directory = new File(asyncChannelId + "_disk_"
-          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
-      directory.mkdir();
-      File[] dirs1 = new File[] { directory };
-      DiskStoreFactory dsf = cache.createDiskStoreFactory();
-      dsf.setDiskDirs(dirs1);
-      DiskStore ds = dsf.create(diskStoreName);
-    }
+    createDiskStore(asyncChannelId, diskStoreName);
 
     AsyncEventListener asyncEventListener = new MyAsyncEventListener();
 
-    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
-    factory.setBatchSize(batchSize);
-    factory.setPersistent(isPersistent);
-    factory.setDiskStoreName(diskStoreName);
+    AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory,
batchSize, isPersistent, diskStoreName);
     factory.setDiskSynchronous(isDiskSynchronous);
     factory.setBatchConflationEnabled(isConflation);
-    factory.setMaximumQueueMemory(maxMemory);
-    factory.setParallel(isParallel);
-    factory.setDispatcherThreads(dispatcherThreads);
     factory.setOrderPolicy(policy);
     AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
         asyncEventListener);
@@ -1416,6 +1375,27 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
     }
   }
 
+  public static void verifySubstitutionFilterInvocations(String asyncEventQueueId, int numInvocations)
{
+    AsyncEventQueue queue = cache.getAsyncEventQueue(asyncEventQueueId);
+    assertNotNull(queue);
+
+    // Verify the GatewayEventSubstitutionFilter has been invoked the appropriate number
of times
+    MyGatewayEventSubstitutionFilter filter = (MyGatewayEventSubstitutionFilter) queue.getGatewayEventSubstitutionFilter();
+    assertNotNull(filter);
+    assertEquals(numInvocations, filter.getNumInvocations());
+
+    // Verify the AsyncEventListener has received the substituted values
+    MyAsyncEventListener listener = (MyAsyncEventListener) queue.getAsyncEventListener();
+    final Map eventsMap = listener.getEventsMap();
+    assertNotNull(eventsMap);
+    assertEquals(numInvocations, eventsMap.size());
+
+    for (Iterator i = eventsMap.entrySet().iterator(); i.hasNext();) {
+      Map.Entry<Integer,String> entry = (Map.Entry<Integer,String>) i.next();
+      assertEquals(MyGatewayEventSubstitutionFilter.SUBSTITUTION_PREFIX + entry.getKey(),
entry.getValue());
+    }
+  }
+
   public static int getAsyncEventListenerMapSize(String asyncEventQueueId) {
     AsyncEventListener theListener = null;
 
@@ -1630,7 +1610,6 @@ public class AsyncEventQueueTestBase extends DistributedTestCase {
   public boolean isOffHeap() {
     return false;
   }
-
 }
 
 class MyAsyncEventListener_CacheLoader implements AsyncEventListener {
@@ -1669,3 +1648,25 @@ class MyCacheLoader implements CacheLoader, Declarable {
   }
 
 }
+
+class MyGatewayEventSubstitutionFilter implements GatewayEventSubstitutionFilter, Declarable
{
+
+  private AtomicInteger numInvocations = new AtomicInteger();
+
+  protected static final String SUBSTITUTION_PREFIX = "substituted_";
+
+  public Object getSubstituteValue(EntryEvent event) {
+    this.numInvocations.incrementAndGet();
+    return SUBSTITUTION_PREFIX + event.getKey();
+  }
+
+  public void close() {
+  }
+
+  public void init(Properties properties) {
+  }
+
+  protected int getNumInvocations() {
+    return this.numInvocations.get();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/609e2395/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index 02ed4ef..978f4af 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -16,11 +16,15 @@
  */
 package com.gemstone.gemfire.internal.cache.wan.asyncqueue;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import com.gemstone.gemfire.cache.Declarable;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter;
 import org.junit.Ignore;
 
 import com.gemstone.gemfire.cache.CacheFactory;
@@ -1068,7 +1072,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase
{
       }
     }
   }
-  
+
   public void testParallelAsyncEventQueue() {
     Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
         "createFirstLocatorWithDSId", new Object[] { 1 });
@@ -1098,7 +1102,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase
{
 
     vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { getTestMethodName()
+ "_PR",
         256 });
-    
+
     vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
         new Object[] { "ln" });
     vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
@@ -1107,7 +1111,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase
{
         new Object[] { "ln" });
     vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
         new Object[] { "ln" });
-    
+
     int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
         new Object[] { "ln"});
     int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
@@ -1116,10 +1120,33 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase
{
         new Object[] { "ln"});
     int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
         new Object[] { "ln"});
-    
+
     assertEquals(vm4size + vm5size + vm6size + vm7size, 256);
   }
-  
+
+  public void testParallelAsyncEventQueueWithSubstitutionFilter() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false, "MyAsyncEventListener", "MyGatewayEventSubstitutionFilter"
});
+
+    String regionName = getTestMethodName() + "_PR";
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { regionName, "ln", isOffHeap() });
+
+    int numPuts = 10;
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { regionName, numPuts
});
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "verifySubstitutionFilterInvocations",
+        new Object[] { "ln" , numPuts });
+  }
+
   /**
    * Verify that the events reaching the AsyncEventListener have correct operation detail.
    * (added for defect #50237).
@@ -1918,5 +1945,4 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase
{
         "getAsyncEventListenerMapSize", new Object[] { "ln" });
     assertEquals(vm3size, 1000);
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/609e2395/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
b/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
index 9495171..d04e916 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
@@ -17,14 +17,13 @@
 package com.gemstone.gemfire.cache;
 
 import java.io.IOException;
+import java.util.Properties;
 import java.util.Set;
 
-import com.gemstone.gemfire.cache.wan.GatewayReceiver;
-import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
-import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
-import com.gemstone.gemfire.cache30.CacheXmlTestCase;
-import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1;
-import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.wan.*;
+import com.gemstone.gemfire.cache30.*;
 import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
 import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
 
@@ -69,9 +68,70 @@ public class CacheXml80GatewayDUnitTest extends CacheXmlTestCase {
     }
   }
 
+  public void testAsyncEventQueueWithSubstitutionFilter() {
+    getSystem();
+    CacheCreation cache = new CacheCreation();
+
+    // Create an AsyncEventQueue with GatewayEventSubstitutionFilter.
+    String id = getName();
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setGatewayEventSubstitutionListener(new MyGatewayEventSubstitutionFilter());
+    AsyncEventQueue queue = factory.create(id, new CacheXml70DUnitTest.MyAsyncEventListener());
+
+    // Verify the GatewayEventSubstitutionFilter is set on the AsyncEventQueue.
+    assertNotNull(queue.getGatewayEventSubstitutionFilter());
+
+    testXml(cache);
+    Cache c = getCache();
+    assertNotNull(c);
+
+    // Get the AsyncEventQueue. Verify the GatewayEventSubstitutionFilter is not null.
+    AsyncEventQueue queueOnCache = c.getAsyncEventQueue(id);
+    assertNotNull(queueOnCache);
+    assertNotNull(queueOnCache.getGatewayEventSubstitutionFilter());
+  }
+
+  public void testGatewaySenderWithSubstitutionFilter() {
+    getSystem();
+    CacheCreation cache = new CacheCreation();
+
+    // Create a GatewaySender with GatewayEventSubstitutionFilter.
+    // Don't start the sender to avoid 'Locators must be configured before starting gateway-sender'
exception.
+    String id = getName();
+    GatewaySenderFactory factory = cache.createGatewaySenderFactory();
+    factory.setManualStart(true);
+    factory.setGatewayEventSubstitutionFilter(new MyGatewayEventSubstitutionFilter());
+    GatewaySender sender = factory.create(id, 2);
+
+    // Verify the GatewayEventSubstitutionFilter is set on the GatewaySender.
+    assertNotNull(sender.getGatewayEventSubstitutionFilter());
+
+    testXml(cache);
+    Cache c = getCache();
+    assertNotNull(c);
+
+    // Get the GatewaySender. Verify the GatewayEventSubstitutionFilter is not null.
+    GatewaySender senderOnCache = c.getGatewaySender(id);
+    assertNotNull(senderOnCache);
+    assertNotNull(senderOnCache.getGatewayEventSubstitutionFilter());
+  }
+
   protected void validateGatewayReceiver(GatewayReceiver receiver1,
       GatewayReceiver gatewayReceiver){
     CacheXml70GatewayDUnitTest.validateGatewayReceiver(receiver1, gatewayReceiver);
     assertEquals(receiver1.isManualStart(), gatewayReceiver.isManualStart());
   }
+
+  public static class MyGatewayEventSubstitutionFilter implements GatewayEventSubstitutionFilter,
Declarable {
+
+    public Object getSubstituteValue(EntryEvent event) {
+      return event.getKey();
+    }
+
+    public void close() {
+    }
+
+    public void init(Properties properties) {
+    }
+  }
 }


Mime
View raw message