geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hiteshkhame...@apache.org
Subject geode git commit: GEODE-2277 Now pdxEvent from gateway added through pdx Type registry
Date Wed, 18 Jan 2017 17:40:23 GMT
Repository: geode
Updated Branches:
  refs/heads/develop 9f62ffc88 -> dc2588bd9


GEODE-2277 Now pdxEvent from gateway added through pdx Type registry

This make sure that we have one mechanism to add pdx type into pdx
registry. Added test for it.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/dc2588bd
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/dc2588bd
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/dc2588bd

Branch: refs/heads/develop
Commit: dc2588bd9ff079ecd74a5190c75442a42bf3b771
Parents: 9f62ffc
Author: Hitesh Khamesra <hkhamesra@pivotal.io>
Authored: Tue Jan 17 10:03:13 2017 -0800
Committer: Hitesh Khamesra <hkhamesra@pivotal.io>
Committed: Wed Jan 18 09:22:07 2017 -0800

----------------------------------------------------------------------
 .../cache/DistributedCacheOperation.java        |   5 +-
 .../sockets/command/GatewayReceiverCommand.java |  48 +++-
 .../geode/pdx/PdxSerializableDUnitTest.java     | 219 +++++++++++++++++--
 3 files changed, 248 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/dc2588bd/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 7a7669f..e4658b4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -956,6 +956,10 @@ public abstract class DistributedCacheOperation {
       this.inhibitAllNotifications = inhibit;
     }
 
+    public String getRegionPath() {
+      return this.regionPath;
+    }
+
     /**
      * process a reply
      * 
@@ -1610,5 +1614,4 @@ public abstract class DistributedCacheOperation {
     }
 
   }
-
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/dc2588bd/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 54140bd..2ccca76 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -51,8 +51,12 @@ import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.security.AuthorizeRequest;
+import org.apache.geode.internal.util.BlobHelper;
 import org.apache.geode.pdx.PdxConfigurationException;
 import org.apache.geode.pdx.PdxRegistryMismatchException;
+import org.apache.geode.pdx.internal.EnumId;
+import org.apache.geode.pdx.internal.EnumInfo;
+import org.apache.geode.pdx.internal.PdxType;
 import org.apache.geode.pdx.internal.PeerTypeRegistration;
 import org.apache.geode.i18n.StringId;
 
@@ -197,6 +201,7 @@ public class GatewayReceiverCommand extends BaseCommand {
     // events need to be subtratced.
     int indexWithoutPDXEvent = -1; //
     for (int i = 0; i < numberOfEvents; i++) {
+      boolean isPdxEvent = false;
       indexWithoutPDXEvent++;
       // System.out.println("Processing event " + i + " in batch " + batchId + "
       // starting with part number " + partNumber);
@@ -233,6 +238,7 @@ public class GatewayReceiverCommand extends BaseCommand {
         regionName = regionNamePart.getString();
         if (regionName.equals(PeerTypeRegistration.REGION_FULL_PATH)) {
           indexWithoutPDXEvent--;
+          isPdxEvent = true;
         }
 
         // Retrieve the event id from the message parts
@@ -354,13 +360,17 @@ public class GatewayReceiverCommand extends BaseCommand {
                 }
                 // Attempt to create the entry
                 boolean result = false;
-                result = region.basicBridgeCreate(key, value, isObject, callbackArg,
-                    servConn.getProxyID(), false, clientEvent, false);
-                // If the create fails (presumably because it already exists),
-                // attempt to update the entry
-                if (!result) {
-                  result = region.basicBridgePut(key, value, null, isObject, callbackArg,
-                      servConn.getProxyID(), false, clientEvent);
+                if (isPdxEvent) {
+                  result = addPdxType(crHelper, key, value);
+                } else {
+                  result = region.basicBridgeCreate(key, value, isObject, callbackArg,
+                      servConn.getProxyID(), false, clientEvent, false);
+                  // If the create fails (presumably because it already exists),
+                  // attempt to update the entry
+                  if (!result) {
+                    result = region.basicBridgePut(key, value, null, isObject, callbackArg,
+                        servConn.getProxyID(), false, clientEvent);
+                  }
                 }
 
                 if (result || clientEvent.isConcurrencyConflict()) {
@@ -465,8 +475,13 @@ public class GatewayReceiverCommand extends BaseCommand {
                   value = putContext.getSerializedValue();
                   isObject = putContext.isObject();
                 }
-                boolean result = region.basicBridgePut(key, value, null, isObject, callbackArg,
-                    servConn.getProxyID(), false, clientEvent);
+                boolean result = false;
+                if (isPdxEvent) {
+                  result = addPdxType(crHelper, key, value);
+                } else {
+                  result = region.basicBridgePut(key, value, null, isObject, callbackArg,
+                      servConn.getProxyID(), false, clientEvent);
+                }
                 if (result || clientEvent.isConcurrencyConflict()) {
                   servConn.setModificationInfo(true, regionName, key);
                   stats.incUpdateRequest();
@@ -788,6 +803,21 @@ public class GatewayReceiverCommand extends BaseCommand {
     }
   }
 
+  private boolean addPdxType(CachedRegionHelper crHelper, Object key, Object value)
+      throws Exception {
+    if (key instanceof EnumId) {
+      EnumId enumId = (EnumId) key;
+      value = BlobHelper.deserializeBlob((byte[]) value);
+      ((GemFireCacheImpl) crHelper.getCache()).getPdxRegistry().addRemoteEnum(enumId.intValue(),
+          (EnumInfo) value);
+    } else {
+      value = BlobHelper.deserializeBlob((byte[]) value);
+      ((GemFireCacheImpl) crHelper.getCache()).getPdxRegistry().addRemoteType((int) key,
+          (PdxType) value);
+    }
+    return true;
+  }
+
   private void handleMessageRetry(LocalRegion region, EntryEventImpl clientEvent) {
     if (clientEvent.isPossibleDuplicate()) {
       if (region.getAttributes().getConcurrencyChecksEnabled()) {

http://git-wip-us.apache.org/repos/asf/geode/blob/dc2588bd/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializableDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializableDUnitTest.java b/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializableDUnitTest.java
index 9ebf5b2..daa2eca 100644
--- a/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializableDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/pdx/PdxSerializableDUnitTest.java
@@ -14,17 +14,10 @@
  */
 package org.apache.geode.pdx;
 
-import org.apache.geode.test.junit.categories.SerializationTest;
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
-
-import java.util.List;
+import static org.apache.geode.internal.Assert.assertTrue;
+import static org.apache.geode.internal.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.Cache;
@@ -32,24 +25,47 @@ import org.apache.geode.cache.CacheEvent;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.TransactionEvent;
 import org.apache.geode.cache.TransactionListener;
 import org.apache.geode.cache.TransactionWriter;
 import org.apache.geode.cache.TransactionWriterException;
-import org.apache.geode.cache30.CacheTestCase;
-import org.apache.geode.cache30.TestTransactionListener;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.internal.cache.DistributedCacheOperation;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.TXEvent;
+import org.apache.geode.pdx.internal.EnumId;
+import org.apache.geode.pdx.internal.EnumInfo;
 import org.apache.geode.pdx.internal.PeerTypeRegistration;
+import org.apache.geode.pdx.internal.TypeRegistry;
 import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DUnitBlackboard;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.SerializableCallable;
 import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.SerializationTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 @Category({DistributedTest.class, SerializationTest.class})
 public class PdxSerializableDUnitTest extends JUnit4CacheTestCase {
 
+  public Cache getCache(Properties properties) {
+    getSystem(properties);
+    return getCache();
+  }
+
+
   public PdxSerializableDUnitTest() {
     super();
   }
@@ -250,6 +266,181 @@ public class PdxSerializableDUnitTest extends JUnit4CacheTestCase {
     vm3.invoke(checkForObject);
   }
 
+
+  @Test
+  public void testVmWaitsForPdxType() throws Throwable {
+    VM vm0 = Host.getHost(0).getVM(0);
+    VM vm1 = Host.getHost(0).getVM(1);
+    getBlackboard().initBlackboard();
+    final Properties properties = getDistributedSystemProperties();
+    properties.put("conserve-sockets", "false");
+
+    // steps:
+    // 1 create two caches and define a PdxType
+    // 2 install a block in VM1 that delays receipt of new PDX types
+    // 3 update the value of the PdxInstance in VM0 using a new Enum type
+    // 4 get the value in VM0
+    // The result should be that step 4 hangs unless the bug is fixed
+
+    vm0.invoke("create cache", () -> {
+      Cache cache = getCache(properties);
+      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create("testRegion");
+      region.put("TestObject", new TestPdxObject("aString", 1, 1.0, TestPdxObject.AnEnum.ONE));
+    });
+
+    vm1.invoke("create cache and region", () -> {
+      Cache cache = getCache(properties);
+      // note that initial image transfer in testRegion will cause the object to be serialized
in
+      // vm0
+      // and populate the PdxRegion in this vm
+      cache.createRegionFactory(RegionShortcut.REPLICATE).create("testRegion");
+
+      // this message observer will ensure that a new PDX registration doesn't occur
+      final DUnitBlackboard bb = getBlackboard();
+      DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
+        @Override
+        public void beforeProcessMessage(DistributionManager dm, DistributionMessage msg)
{
+          if (msg instanceof DistributedCacheOperation.CacheOperationMessage) {
+            try {
+              DistributedCacheOperation.CacheOperationMessage cmsg =
+                  (DistributedCacheOperation.CacheOperationMessage) msg;
+              String path = cmsg.getRegionPath();
+              if (path.equals(PeerTypeRegistration.REGION_FULL_PATH)) {
+                System.out
+                    .println("message observer found a PDX update message and is stalling:
" + msg);
+                try {
+                  bb.signalGate("listenerWasInvoked");
+                  bb.waitForGate("pdxObjectGetStarting", 3, TimeUnit.SECONDS);
+                  // let the get() commence and block
+                  Thread.sleep(30000);
+                  System.out.println("message observer after sleep ");
+                } catch (InterruptedException e) {
+                  System.out.println("message observer is done stalling1 ");
+                  bb.setMailbox("listenerProblem", e);
+                } catch (TimeoutException e) {
+                  System.out.println("message observer is done stalling2");
+                  bb.setMailbox("listenerProblem", e);
+                } finally {
+                  System.out.println("message observer is done stalling");
+                }
+              }
+            } catch (Exception e) {
+              e.printStackTrace();
+            }
+          }
+        }
+      });
+    });
+
+    AsyncInvocation async0 = vm0.invokeAsync("propagate value with new pdx enum type", ()
-> {
+      Cache cache = getCache(properties);
+      final Region pdxRegion = cache.getRegion(PeerTypeRegistration.REGION_FULL_PATH);
+      final DUnitBlackboard bb = getBlackboard();
+      // now we register a new Id for our enum in a different thread. This will
+      // block in vm1 due to its message observer
+      Thread t = new Thread("PdxSerializableDUnitTest async thread") {
+        public void run() {
+          bb.signalGate("asyncThreadReady");
+          // pdxRegion.put(new EnumId(0x3010101), new EnumInfo(TestPdxObject.AnEnum.TWO));
+          ((GemFireCacheImpl) cache).getPdxRegistry().addRemoteEnum(0x3010101,
+              new EnumInfo(TestPdxObject.AnEnum.TWO));
+        }
+      };
+      t.setDaemon(true);
+      t.start();
+      try {
+        bb.waitForGate("asyncThreadReady", 20, TimeUnit.SECONDS);
+      } catch (TimeoutException e) {
+        fail("timed out");
+      }
+      // reserialization will use the new Enumeration PDX type
+      Region region = cache.getRegion("testRegion");
+      bb.waitForGate("listenerWasInvoked", 20, TimeUnit.SECONDS);
+      region.put("TestObject", new TestPdxObject("TestObject'", 2, 2.0, TestPdxObject.AnEnum.TWO));
+      System.out.println("TestObject added put");
+      bb.signalGate("pdxObjectPut");
+    });
+
+    // vm0 has sent a new TestObject but vm1 does not have the enum type needed to
+    // deserialize it.
+    AsyncInvocation async1 = vm1.invokeAsync("try to read object w/o enum type", () ->
{
+      DUnitBlackboard bb = getBlackboard();
+      bb.waitForGate("pdxObjectPut", 10, TimeUnit.SECONDS);
+      Region region = getCache(properties).getRegion("testRegion");
+      bb.signalGate("pdxObjectGetStarting");
+      Object testObject = region.get("TestObject");
+      System.out.println("found " + testObject);
+    });
+
+
+    DUnitBlackboard bb = getBlackboard();
+
+    try {
+      async0.join(20000);
+      async1.join(10000);
+      if (async0.exceptionOccurred()) {
+        throw async0.getException();
+      }
+      if (async1.exceptionOccurred()) {
+        throw async1.getException();
+      }
+      assertTrue(bb.isGateSignaled("listenerWasInvoked"));
+      /*
+       * Throwable throwable = (Throwable)bb.getMailbox("listenerProblem"); if (throwable
!= null) {
+       * RuntimeException rte = new RuntimeException("message observer had a problem", throwable);
+       * throw rte; }
+       */
+    } finally {
+      bb.signalGate("pdxObjectGetStarting");
+      bb.signalGate("pdxObjectPut");
+      bb.initBlackboard();
+    }
+  }
+
+  public static class TestPdxObject implements org.apache.geode.pdx.PdxSerializable {
+
+    public String stringVar;
+    public int intVar;
+    public double floatVar;
+
+    public static enum AnEnum {
+      ONE, TWO
+    }
+
+    ;
+
+    public AnEnum enumVar;
+
+    public TestPdxObject() {}
+
+    public TestPdxObject(String stringVar, int intVar, double floatVar, AnEnum enumVar) {
+      this.stringVar = stringVar;
+      this.intVar = intVar;
+      this.floatVar = floatVar;
+      this.enumVar = enumVar;
+    }
+
+    @Override
+    public void toData(final PdxWriter writer) {
+      writer.writeString("stringVar", stringVar).writeInt("intVar", intVar)
+          .writeDouble("floatVar", floatVar).writeObject("enumVar", this.enumVar);
+    }
+
+    @Override
+    public void fromData(final PdxReader reader) {
+      stringVar = reader.readString("stringVar");
+      intVar = reader.readInt("intVar");
+      floatVar = reader.readDouble("floatVar");
+      enumVar = (AnEnum) reader.readObject("enumVar");
+    }
+
+    @Override
+    public String toString() {
+      return "TestPdxObject [stringVar=" + stringVar + ", intVar=" + intVar + ", floatVar="
+          + floatVar + ", enumVar=" + enumVar + "]";
+    }
+  }
+
   /**
    * add a listener and writer that will throw an exception when invoked if events are for
internal
    * regions


Mime
View raw message