geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [geode] 04/07: GEODE-6626: Make GatewayReceiverCommand unit testable
Date Fri, 12 Apr 2019 22:14:11 GMT
This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch GEODE-6626-gatewayReceiver-metrics
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 7bc3864a525284188e6ad59f192e90fa1538f80d
Author: Kirk Lund <klund@apache.org>
AuthorDate: Fri Apr 12 15:08:05 2019 -0700

    GEODE-6626: Make GatewayReceiverCommand unit testable
    
    Co-authored-by: Michael Oleske <moleske@pivotal.io>
---
 .../sockets/command/GatewayReceiverCommand.java    | 180 +++++++--------------
 1 file changed, 62 insertions(+), 118 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 4ba2721..cffa887 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -43,6 +43,7 @@ import org.apache.geode.internal.cache.tier.sockets.Part;
 import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
 import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewayReceiverMetrics;
 import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
 import org.apache.geode.internal.security.AuthorizeRequest;
 import org.apache.geode.internal.security.SecurityService;
@@ -57,45 +58,37 @@ import org.apache.geode.pdx.internal.PeerTypeRegistration;
 public class GatewayReceiverCommand extends BaseCommand {
 
   @Immutable
-  private static final GatewayReceiverCommand singleton = new GatewayReceiverCommand();
+  private static final GatewayReceiverCommand SINGLETON = new GatewayReceiverCommand();
 
   public static Command getCommand() {
-    return singleton;
+    return SINGLETON;
   }
 
-  private GatewayReceiverCommand() {}
+  GatewayReceiverCommand() {
+    // nothing
+  }
 
   private void handleRegionNull(ServerConnection servConn, String regionName, int batchId)
{
     InternalCache cache = servConn.getCachedRegionHelper().getCacheForGatewayCommand();
     if (cache != null && cache.isCacheAtShutdownAll()) {
       throw cache.getCacheClosedException("Shutdown occurred during message processing");
-    } else {
-      String reason = String.format("Region %s was not found during batch create request
%s",
-          new Object[] {regionName, Integer.valueOf(batchId)});
-      throw new RegionDestroyedException(reason, regionName);
     }
+    String reason = String.format("Region %s was not found during batch create request %s",
+        regionName, batchId);
+    throw new RegionDestroyedException(reason, regionName);
   }
 
   @Override
   public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection,
       final SecurityService securityService, long start) throws IOException, InterruptedException
{
-    Part regionNamePart = null, keyPart = null, valuePart = null, callbackArgPart = null;
-    String regionName = null;
-    Object callbackArg = null, key = null;
-    int partNumber = 0;
     CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
     GatewayReceiverStats stats = (GatewayReceiverStats) serverConnection.getCacheServerStats();
-    EventID eventId = null;
-    LocalRegion region = null;
-    List<BatchException70> exceptions = new ArrayList<BatchException70>();
-    Throwable fatalException = null;
-    // requiresResponse = true;// let PROCESS_BATCH deal with this itself
+
     {
       long oldStart = start;
       start = DistributionStats.getStatTime();
       stats.incReadProcessBatchRequestTime(start - oldStart);
     }
-    Part callbackArgExistsPart;
 
     stats.incBatchSize(clientMessage.getPayloadLength());
 
@@ -130,60 +123,46 @@ public class GatewayReceiverCommand extends BaseCommand {
     if (batchId != serverConnection.getLatestBatchIdReplied() + 1) {
       logger.warn(
           "Received process batch request {} out of order. The id of the last batch processed
was {}. This batch request will be processed, but some messages may have been lost.",
-          new Object[] {batchId, serverConnection.getLatestBatchIdReplied()});
+          batchId, serverConnection.getLatestBatchIdReplied());
       stats.incOutoforderBatchesReceived();
     }
 
-
     if (logger.isDebugEnabled()) {
       logger.debug("Received process batch request {} that will be processed.", batchId);
     }
 
-
     if (logger.isDebugEnabled()) {
       logger.debug(
           "{}: Received process batch request {} containing {} events ({} bytes) with {}
acknowledgement on {}",
           serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(),
           "normal", serverConnection.getSocketString());
     }
-    // logger.warn("Received process batch request " + batchId + " containing
-    // " + numberOfEvents + " events (" + msg.getPayloadLength() + " bytes) with
-    // " + (earlyAck ? "early" : "normal") + " acknowledgement on " +
-    // getSocketString());
-    // if (earlyAck) {
-    // logger.warn("Sent process batch early response for batch " + batchId +
-    // " containing " + numberOfEvents + " events (" + msg.getPayloadLength() +
-    // " bytes) with " + (earlyAck ? "early" : "normal") + " acknowledgement on
-    // " + getSocketString());
-    // }
 
     // Retrieve the events from the message parts. The '2' below
     // represents the number of events (part0) and the batchId (part1)
-    partNumber = 2;
+    int partNumber = 2;
     int dsid = clientMessage.getPart(partNumber++).getInt();
 
-    boolean removeOnException =
-        clientMessage.getPart(partNumber++).getSerializedForm()[0] == 1 ? true : false;
+    boolean removeOnException = clientMessage.getPart(partNumber++).getSerializedForm()[0]
== 1;
 
     // event received in batch also have PDX events at the start of the batch,to
     // represent correct index on which the exception occurred, number of PDX
     // events need to be subtracted.
     int indexWithoutPDXEvent = -1; //
+    Part valuePart = null;
+    Throwable fatalException = null;
+    List<BatchException70> exceptions = new ArrayList<>();
     for (int i = 0; i < numberOfEvents; i++) {
-      boolean retry = true;
-      boolean isPdxEvent = false;
       indexWithoutPDXEvent++;
-      // System.out.println("Processing event " + i + " in batch " + batchId + "
-      // starting with part number " + partNumber);
+
       Part actionTypePart = clientMessage.getPart(partNumber);
       int actionType = actionTypePart.getInt();
 
-      long versionTimeStamp = VersionTag.ILLEGAL_VERSION_TIMESTAMP;
-      EventIDHolder clientEvent = null;
-
       boolean callbackArgExists = false;
 
       try {
+        boolean isPdxEvent = false;
+        boolean retry = true;
         do {
           if (isPdxEvent) {
             // This is a retried event. Reset the PDX event index.
@@ -197,21 +176,19 @@ public class GatewayReceiverCommand extends BaseCommand {
           } catch (Exception e) {
             logger.warn(String.format(
                 "%s: Caught exception processing batch request %s containing %s events",
-                new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                    Integer.valueOf(numberOfEvents)}),
-                e);
+                serverConnection.getName(), batchId, numberOfEvents), e);
             handleException(removeOnException, stats, e);
             break;
           }
           boolean possibleDuplicate = possibleDuplicatePartBytes[0] == 0x01;
 
           // Make sure instance variables are null before each iteration
-          regionName = null;
-          key = null;
-          callbackArg = null;
+          String regionName = null;
+          Object key = null;
+          Object callbackArg = null;
 
           // Retrieve the region name from the message parts
-          regionNamePart = clientMessage.getPart(partNumber + 2);
+          Part regionNamePart = clientMessage.getPart(partNumber + 2);
           regionName = regionNamePart.getString();
           if (regionName.equals(PeerTypeRegistration.REGION_FULL_PATH)) {
             indexWithoutPDXEvent--;
@@ -226,32 +203,34 @@ public class GatewayReceiverCommand extends BaseCommand {
           Part eventIdPart = clientMessage.getPart(partNumber + 3);
           eventIdPart.setVersion(serverConnection.getClientVersion());
           // String eventId = eventIdPart.getString();
+          EventID eventId;
           try {
             eventId = (EventID) eventIdPart.getObject();
           } catch (Exception e) {
             logger.warn(String.format(
                 "%s: Caught exception processing batch request %s containing %s events",
-                new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                    Integer.valueOf(numberOfEvents)}),
-                e);
+                serverConnection.getName(), batchId, numberOfEvents), e);
             handleException(removeOnException, stats, e);
             break;
           }
 
           // Retrieve the key from the message parts
-          keyPart = clientMessage.getPart(partNumber + 4);
+          Part keyPart = clientMessage.getPart(partNumber + 4);
           try {
             key = keyPart.getStringOrObject();
           } catch (Exception e) {
             logger.warn(String.format(
                 "%s: Caught exception processing batch request %s containing %s events",
-                new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                    Integer.valueOf(numberOfEvents)}),
-                e);
+                serverConnection.getName(), batchId, numberOfEvents), e);
             handleException(removeOnException, stats, e);
             break;
           }
-          int index = -1;
+          int index;
+          Part callbackArgPart;
+          EventIDHolder clientEvent;
+          long versionTimeStamp;
+          Part callbackArgExistsPart;
+          LocalRegion region;
           switch (actionType) {
             case 0: // Create
               try {
@@ -286,9 +265,7 @@ public class GatewayReceiverCommand extends BaseCommand {
                     logger
                         .warn(String.format(
                             "%s: Caught exception processing batch create request %s for
%s events",
-                            new Object[] {serverConnection.getName(),
-                                Integer.valueOf(batchId),
-                                Integer.valueOf(numberOfEvents)}),
+                            serverConnection.getName(), batchId, numberOfEvents),
                             e);
                     throw e;
                   }
@@ -303,17 +280,13 @@ public class GatewayReceiverCommand extends BaseCommand {
                 // Process the create request
                 if (key == null || regionName == null) {
                   String message = null;
-                  Object[] messageArgs =
-                      new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
                   if (key == null) {
-                    message =
-                        "%s: The input region name for the batch create request %s is null";
+                    message = "%s: The input key for the batch create request %s is null";
                   }
                   if (regionName == null) {
-                    message =
-                        "%s: The input region name for the batch create request %s is null";
+                    message = "%s: The input region name for the batch create request %s
is null";
                   }
-                  String s = String.format(message, messageArgs);
+                  String s = String.format(message, serverConnection.getName(), batchId);
                   logger.warn(s);
                   throw new Exception(s);
                 }
@@ -366,29 +339,19 @@ public class GatewayReceiverCommand extends BaseCommand {
                     throw new Exception(
                         String.format(
                             "%s: Failed to create or update entry for region %s key %s value
%s callbackArg %s",
-                            new Object[] {serverConnection.getName(), regionName,
-                                key, valuePart, callbackArg}));
+                            serverConnection.getName(), regionName, key, valuePart, callbackArg));
                   }
                 }
               } catch (Exception e) {
                 logger.warn(String.format(
                     "%s: Caught exception processing batch create request %s for %s events",
-                    new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                        Integer.valueOf(numberOfEvents)}),
-                    e);
+                    serverConnection.getName(), batchId, numberOfEvents), e);
                 handleException(removeOnException, stats, e);
               }
               break;
+
             case 1: // Update
               try {
-                /*
-                 * CLIENT EXCEPTION HANDLING TESTING CODE keySt = (String) key;
-                 * System.out.println("Processing updated key: " + key); if
-                 * (keySt.startsWith("failure")) { throw new Exception(LocalizedStrings
-                 * .ProcessBatch_THIS_EXCEPTION_REPRESENTS_A_FAILURE_ON_THE_SERVER
-                 * )); }
-                 */
-
                 // Retrieve the value from the message parts (do not deserialize it)
                 valuePart = clientMessage.getPart(partNumber + 5);
                 // try {
@@ -412,9 +375,7 @@ public class GatewayReceiverCommand extends BaseCommand {
                         .warn(
                             String.format(
                                 "%s: Caught exception processing batch update request %s
containing %s events",
-                                new Object[] {serverConnection.getName(),
-                                    Integer.valueOf(batchId),
-                                    Integer.valueOf(numberOfEvents)}),
+                                serverConnection.getName(), batchId, numberOfEvents),
                             e);
                     throw e;
                   }
@@ -429,17 +390,13 @@ public class GatewayReceiverCommand extends BaseCommand {
                 // Process the update request
                 if (key == null || regionName == null) {
                   String message = null;
-                  Object[] messageArgs =
-                      new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
                   if (key == null) {
-                    message =
-                        "%s: The input key for the batch update request %s is null";
+                    message = "%s: The input key for the batch update request %s is null";
                   }
                   if (regionName == null) {
-                    message =
-                        "%s: The input region name for the batch update request %s is null";
+                    message = "%s: The input region name for the batch update request %s
is null";
                   }
-                  String s = String.format(message, messageArgs);
+                  String s = String.format(message, serverConnection.getName(), batchId);
                   logger.warn(s);
                   throw new Exception(s);
                 }
@@ -478,11 +435,10 @@ public class GatewayReceiverCommand extends BaseCommand {
                     stats.incUpdateRequest();
                     retry = false;
                   } else {
-                    final Object[] msgArgs = new Object[] {serverConnection.getName(), regionName,
-                        key, valuePart, callbackArg};
                     final String message =
                         "%s: Failed to update entry for region %s, key %s, value %s, and
callbackArg %s";
-                    String s = String.format(message, msgArgs);
+                    String s = String.format(message, serverConnection.getName(), regionName,
+                        key, valuePart, callbackArg);
                     logger.info(s);
                     throw new Exception(s);
                   }
@@ -491,12 +447,11 @@ public class GatewayReceiverCommand extends BaseCommand {
                 // Preserve the connection under all circumstances
                 logger.warn(String.format(
                     "%s: Caught exception processing batch update request %s containing %s
events",
-                    new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                        Integer.valueOf(numberOfEvents)}),
-                    e);
+                    serverConnection.getName(), batchId, numberOfEvents), e);
                 handleException(removeOnException, stats, e);
               }
               break;
+
             case 2: // Destroy
               try {
                 // Retrieve the callbackArg from the message parts if necessary
@@ -515,9 +470,7 @@ public class GatewayReceiverCommand extends BaseCommand {
                         .warn(
                             String.format(
                                 "%s: Caught exception processing batch destroy request %s
containing %s events",
-                                new Object[] {serverConnection.getName(),
-                                    Integer.valueOf(batchId),
-                                    Integer.valueOf(numberOfEvents)}),
+                                serverConnection.getName(), batchId, numberOfEvents),
                             e);
                     throw e;
                   }
@@ -541,9 +494,7 @@ public class GatewayReceiverCommand extends BaseCommand {
                     message =
                         "%s: The input region name for the batch destroy request %s is null";
                   }
-                  Object[] messageArgs =
-                      new Object[] {serverConnection.getName(), Integer.valueOf(batchId)};
-                  String s = String.format(message, messageArgs);
+                  String s = String.format(message, serverConnection.getName(), batchId);
                   logger.warn(s);
                   throw new Exception(s);
                 }
@@ -573,8 +524,7 @@ public class GatewayReceiverCommand extends BaseCommand {
                     serverConnection.setModificationInfo(true, regionName, key);
                   } catch (EntryNotFoundException e) {
                     logger.info("{}: during batch destroy no entry was found for key {}",
-                        new Object[] {serverConnection.getName(), key});
-                    // throw new Exception(e);
+                        serverConnection.getName(), key);
                   }
                   stats.incDestroyRequest();
                   retry = false;
@@ -582,14 +532,13 @@ public class GatewayReceiverCommand extends BaseCommand {
               } catch (Exception e) {
                 logger.warn(String.format(
                     "%s: Caught exception processing batch destroy request %s containing
%s events",
-                    new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                        Integer.valueOf(numberOfEvents)}),
+                    serverConnection.getName(), batchId, numberOfEvents),
                     e);
                 handleException(removeOnException, stats, e);
               }
               break;
-            case 3: // Update Time-stamp for a RegionEntry
 
+            case 3: // Update Time-stamp for a RegionEntry
               try {
                 // Region name
                 regionNamePart = clientMessage.getPart(partNumber + 2);
@@ -627,9 +576,8 @@ public class GatewayReceiverCommand extends BaseCommand {
                   String message =
                       "%s: Caught exception processing batch update version request request
%s containing %s events";
 
-                  Object[] messageArgs = new Object[] {serverConnection.getName(),
-                      Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)};
-                  String s = String.format(message, messageArgs);
+                  String s = String.format(message, serverConnection.getName(),
+                      batchId, numberOfEvents);
                   logger.warn(s);
                   throw new Exception(s);
 
@@ -657,7 +605,7 @@ public class GatewayReceiverCommand extends BaseCommand {
                     } catch (EntryNotFoundException e) {
                       logger.info(
                           "Entry for key {} was not found in Region {} during ProcessBatch
for Update Entry Version",
-                          new Object[] {serverConnection.getName(), key});
+                          serverConnection.getName(), key);
                     }
                     retry = false;
                   }
@@ -665,17 +613,14 @@ public class GatewayReceiverCommand extends BaseCommand {
               } catch (Exception e) {
                 logger.warn(String.format(
                     "%s: Caught exception processing batch update version request request
%s containing %s events",
-                    new Object[] {serverConnection.getName(), Integer.valueOf(batchId),
-                        Integer.valueOf(numberOfEvents)}),
-                    e);
+                    serverConnection.getName(), batchId, numberOfEvents), e);
                 handleException(removeOnException, stats, e);
               }
 
               break;
             default:
               logger.fatal("{}: Unknown action type ({}) for batch from {}",
-                  new Object[] {serverConnection.getName(), Integer.valueOf(actionType),
-                      serverConnection.getSocketString()});
+                  serverConnection.getName(), actionType, serverConnection.getSocketString());
               stats.incUnknowsOperationsReceived();
           }
         } while (retry);
@@ -698,7 +643,7 @@ public class GatewayReceiverCommand extends BaseCommand {
           fatalException = e.getCause();
           logger.fatal(String.format(
               "This gateway receiver has received a PDX type from %s that does match the
existing PDX type. This gateway receiver will not process any more events, in order to prevent
receiving objects which may not be deserializable.",
-              new Object[] {serverConnection.getMembershipID()}), e.getCause());
+              serverConnection.getMembershipID()), e.getCause());
           break;
         }
 
@@ -707,9 +652,8 @@ public class GatewayReceiverCommand extends BaseCommand {
         DistributedSystem ds = crHelper.getCacheForGatewayCommand().getDistributedSystem();
         String exceptionMessage = String.format(
             "Exception occurred while processing a batch on the receiver running on DistributedSystem
with Id: %s, DistributedMember on which the receiver is running: %s",
-            new Object[] {
-                ((InternalDistributedSystem) ds).getDistributionManager().getDistributedSystemId(),
-                ds.getDistributedMember()});
+            ((InternalDistributedSystem) ds).getDistributionManager().getDistributedSystemId(),
+            ds.getDistributedMember());
         BatchException70 be =
             new BatchException70(exceptionMessage, e, indexWithoutPDXEvent, batchId);
         exceptions.add(be);


Mime
View raw message