Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F19B5200C7C for ; Mon, 5 Jun 2017 18:46:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F06C3160BBB; Mon, 5 Jun 2017 16:46:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1F7EF160BE7 for ; Mon, 5 Jun 2017 18:46:25 +0200 (CEST) Received: (qmail 34960 invoked by uid 500); 5 Jun 2017 16:46:25 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 34785 invoked by uid 99); 5 Jun 2017 16:46:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Jun 2017 16:46:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B9961E041D; Mon, 5 Jun 2017 16:46:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.apache.org Date: Mon, 05 Jun 2017 16:46:30 -0000 Message-Id: <71a24b684c2d4825a1f2a222bdc9aa64@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [7/9] geode git commit: Use immutable SecurityService archived-at: Mon, 05 Jun 2017 16:46:30 -0000 http://git-wip-us.apache.org/repos/asf/geode/blob/eab9e6e0/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java index 23d5b18..167aa3b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java @@ -48,6 +48,7 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.security.AuthorizeRequest; +import org.apache.geode.internal.security.SecurityService; /** * @since GemFire 6.5 @@ -63,8 +64,8 @@ public class ExecuteFunction65 extends BaseCommand { ExecuteFunction65() {} @Override - public void cmdExecute(Message clientMessage, ServerConnection servConn, long start) - throws IOException { + public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, + final SecurityService securityService, long start) throws IOException { Object function = null; Object args = null; MemberMappedArgument memberMappedArg = null; @@ -88,8 +89,8 @@ public class ExecuteFunction65 extends BaseCommand { hasResult = functionState; } if (hasResult == 1) { - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); } function = clientMessage.getPart(1).getStringOrObject(); args = clientMessage.getPart(2).getObject(); @@ -103,8 +104,8 @@ public class ExecuteFunction65 extends BaseCommand { LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), exception); if (hasResult == 1) { - writeChunkedException(clientMessage, exception, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, exception, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -112,8 +113,8 @@ public class ExecuteFunction65 extends BaseCommand { final String message = LocalizedStrings.ExecuteFunction_THE_INPUT_FUNCTION_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL .toLocalizedString(); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } @@ -125,8 +126,8 @@ public class ExecuteFunction65 extends BaseCommand { if (functionObject == null) { final String message = LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED .toLocalizedString(function); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } else { byte functionStateOnServerSide = AbstractExecution.getFunctionState(functionObject.isHA(), @@ -139,8 +140,8 @@ public class ExecuteFunction65 extends BaseCommand { String message = LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER .toLocalizedString(function); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } } @@ -150,21 +151,21 @@ public class ExecuteFunction65 extends BaseCommand { FunctionStats stats = FunctionStats.getFunctionStats(functionObject.getId()); - this.securityService.authorizeDataWrite(); + securityService.authorizeDataWrite(); // check if the caller is authorized to do this operation on server - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); ExecuteFunctionOperationContext executeContext = null; if (authzRequest != null) { executeContext = authzRequest.executeFunctionAuthorize(functionObject.getId(), null, null, args, functionObject.optimizeForWrite()); } - ChunkedMessage m = servConn.getFunctionResponseMessage(); + ChunkedMessage m = serverConnection.getFunctionResponseMessage(); m.setTransactionId(clientMessage.getTransactionId()); ResultSender resultSender = new ServerToClientFunctionResultSender65(m, - MessageType.EXECUTE_FUNCTION_RESULT, servConn, functionObject, executeContext); + MessageType.EXECUTE_FUNCTION_RESULT, serverConnection, functionObject, executeContext); - InternalDistributedMember localVM = (InternalDistributedMember) servConn.getCache() + InternalDistributedMember localVM = (InternalDistributedMember) serverConnection.getCache() .getDistributedSystem().getDistributedMember(); FunctionContext context = null; @@ -174,16 +175,17 @@ public class ExecuteFunction65 extends BaseCommand { } else { context = new FunctionContextImpl(functionObject.getId(), args, resultSender, isReexecute); } - HandShake handShake = (HandShake) servConn.getHandshake(); + HandShake handShake = (HandShake) serverConnection.getHandshake(); int earlierClientReadTimeout = handShake.getClientReadTimeout(); handShake.setClientReadTimeout(0); try { long startExecution = stats.startTime(); stats.startFunctionExecution(functionObject.hasResult()); if (logger.isDebugEnabled()) { - logger.debug("Executing Function on Server: {} with context: {}", servConn, context); + logger.debug("Executing Function on Server: {} with context: {}", serverConnection, + context); } - InternalCache cache = servConn.getCache(); + InternalCache cache = serverConnection.getCache(); HeapMemoryMonitor hmm = ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor(); if (functionObject.optimizeForWrite() && cache != null && hmm.getState().isCritical() @@ -194,7 +196,7 @@ public class ExecuteFunction65 extends BaseCommand { .toLocalizedString(new Object[] {functionObject.getId(), sm}), sm); - sendException(hasResult, clientMessage, e.getMessage(), servConn, e); + sendException(hasResult, clientMessage, e.getMessage(), serverConnection, e); return; } functionObject.execute(context); @@ -221,7 +223,7 @@ public class ExecuteFunction65 extends BaseCommand { function), ioException); String message = LocalizedStrings.ExecuteFunction_SERVER_COULD_NOT_SEND_THE_REPLY.toLocalizedString(); - sendException(hasResult, clientMessage, message, servConn, ioException); + sendException(hasResult, clientMessage, message, serverConnection, ioException); } catch (InternalFunctionInvocationTargetException internalfunctionException) { // Fix for #44709: User should not be aware of // InternalFunctionInvocationTargetException. No instance of @@ -237,29 +239,30 @@ public class ExecuteFunction65 extends BaseCommand { new Object[] {function}), internalfunctionException); } final String message = internalfunctionException.getMessage(); - sendException(hasResult, clientMessage, message, servConn, internalfunctionException); + sendException(hasResult, clientMessage, message, serverConnection, internalfunctionException); } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e); final String message = e.getMessage(); - sendException(hasResult, clientMessage, message, servConn, e); + sendException(hasResult, clientMessage, message, serverConnection, e); } } - private void sendException(byte hasResult, Message msg, String message, ServerConnection servConn, - Throwable e) throws IOException { + private void sendException(byte hasResult, Message msg, String message, + ServerConnection serverConnection, Throwable e) throws IOException { if (hasResult == 1) { - writeFunctionResponseException(msg, MessageType.EXCEPTION, servConn, e); - servConn.setAsTrue(RESPONDED); + writeFunctionResponseException(msg, MessageType.EXCEPTION, serverConnection, e); + serverConnection.setAsTrue(RESPONDED); } } - private void sendError(byte hasResult, Message msg, String message, ServerConnection servConn) - throws IOException { + private void sendError(byte hasResult, Message msg, String message, + ServerConnection serverConnection) throws IOException { if (hasResult == 1) { - writeFunctionResponseError(msg, MessageType.EXECUTE_FUNCTION_ERROR, message, servConn); - servConn.setAsTrue(RESPONDED); + writeFunctionResponseError(msg, MessageType.EXECUTE_FUNCTION_ERROR, message, + serverConnection); + serverConnection.setAsTrue(RESPONDED); } } http://git-wip-us.apache.org/repos/asf/geode/blob/eab9e6e0/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java index 46302bc..e212b50 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java @@ -60,6 +60,7 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.security.AuthorizeRequest; +import org.apache.geode.internal.security.SecurityService; /** * @since GemFire 6.6 @@ -87,8 +88,8 @@ public class ExecuteFunction66 extends BaseCommand { ExecuteFunction66() {} @Override - public void cmdExecute(Message clientMessage, ServerConnection servConn, long start) - throws IOException { + public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, + final SecurityService securityService, long start) throws IOException { Object function = null; Object args = null; MemberMappedArgument memberMappedArg = null; @@ -103,7 +104,7 @@ public class ExecuteFunction66 extends BaseCommand { byte[] bytes = clientMessage.getPart(0).getSerializedForm(); functionState = bytes[0]; if (bytes.length >= 5 - && servConn.getClientVersion().ordinal() >= Version.GFE_8009.ordinal()) { + && serverConnection.getClientVersion().ordinal() >= Version.GFE_8009.ordinal()) { functionTimeout = Part.decodeInt(bytes, 1); } @@ -121,8 +122,8 @@ public class ExecuteFunction66 extends BaseCommand { hasResult = functionState; } if (hasResult == 1) { - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); } function = clientMessage.getPart(1).getStringOrObject(); args = clientMessage.getPart(2).getObject(); @@ -140,11 +141,11 @@ public class ExecuteFunction66 extends BaseCommand { LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), exception); if (hasResult == 1) { - writeChunkedException(clientMessage, exception, servConn); + writeChunkedException(clientMessage, exception, serverConnection); } else { - writeException(clientMessage, exception, false, servConn); + writeException(clientMessage, exception, false, serverConnection); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); return; } @@ -153,8 +154,8 @@ public class ExecuteFunction66 extends BaseCommand { LocalizedStrings.ExecuteFunction_THE_INPUT_FUNCTION_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL .toLocalizedString(); logger.warn(LocalizedMessage.create(LocalizedStrings.TWO_ARG_COLON, - new Object[] {servConn.getName(), message})); - sendError(hasResult, clientMessage, message, servConn); + new Object[] {serverConnection.getName(), message})); + sendError(hasResult, clientMessage, message, serverConnection); return; } @@ -166,8 +167,8 @@ public class ExecuteFunction66 extends BaseCommand { if (functionObject == null) { final String message = LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED .toLocalizedString(function); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } else { byte functionStateOnServerSide = AbstractExecution.getFunctionState(functionObject.isHA(), @@ -180,8 +181,8 @@ public class ExecuteFunction66 extends BaseCommand { String message = LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER .toLocalizedString(function); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } } @@ -191,21 +192,21 @@ public class ExecuteFunction66 extends BaseCommand { FunctionStats stats = FunctionStats.getFunctionStats(functionObject.getId()); - this.securityService.authorizeDataWrite(); + securityService.authorizeDataWrite(); // check if the caller is authorized to do this operation on server - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); ExecuteFunctionOperationContext executeContext = null; if (authzRequest != null) { executeContext = authzRequest.executeFunctionAuthorize(functionObject.getId(), null, null, args, functionObject.optimizeForWrite()); } - ChunkedMessage m = servConn.getFunctionResponseMessage(); + ChunkedMessage m = serverConnection.getFunctionResponseMessage(); m.setTransactionId(clientMessage.getTransactionId()); ServerToClientFunctionResultSender resultSender = new ServerToClientFunctionResultSender65(m, - MessageType.EXECUTE_FUNCTION_RESULT, servConn, functionObject, executeContext); + MessageType.EXECUTE_FUNCTION_RESULT, serverConnection, functionObject, executeContext); - InternalDistributedMember localVM = (InternalDistributedMember) servConn.getCache() + InternalDistributedMember localVM = (InternalDistributedMember) serverConnection.getCache() .getDistributedSystem().getDistributedMember(); FunctionContext context = null; @@ -215,14 +216,15 @@ public class ExecuteFunction66 extends BaseCommand { } else { context = new FunctionContextImpl(functionObject.getId(), args, resultSender, isReexecute); } - HandShake handShake = (HandShake) servConn.getHandshake(); + HandShake handShake = (HandShake) serverConnection.getHandshake(); int earlierClientReadTimeout = handShake.getClientReadTimeout(); handShake.setClientReadTimeout(functionTimeout); try { if (logger.isDebugEnabled()) { - logger.debug("Executing Function on Server: {} with context: {}", servConn, context); + logger.debug("Executing Function on Server: {} with context: {}", serverConnection, + context); } - InternalCache cache = servConn.getCache(); + InternalCache cache = serverConnection.getCache(); HeapMemoryMonitor hmm = ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor(); if (functionObject.optimizeForWrite() && cache != null && hmm.getState().isCritical() @@ -233,10 +235,10 @@ public class ExecuteFunction66 extends BaseCommand { .toLocalizedString(new Object[] {functionObject.getId(), sm}), sm); - sendException(hasResult, clientMessage, e.getMessage(), servConn, e); + sendException(hasResult, clientMessage, e.getMessage(), serverConnection, e); return; } - /** + /* * if cache is null, then either cache has not yet been created on this node or it is a * shutdown scenario. */ @@ -253,7 +255,7 @@ public class ExecuteFunction66 extends BaseCommand { } if (!functionObject.hasResult()) { - writeReply(clientMessage, servConn); + writeReply(clientMessage, serverConnection); } } catch (FunctionException functionException) { stats.endFunctionExecutionWithException(functionObject.hasResult()); @@ -270,7 +272,7 @@ public class ExecuteFunction66 extends BaseCommand { function), ioException); String message = LocalizedStrings.ExecuteFunction_SERVER_COULD_NOT_SEND_THE_REPLY.toLocalizedString(); - sendException(hasResult, clientMessage, message, servConn, ioException); + sendException(hasResult, clientMessage, message, serverConnection, ioException); } catch (InternalFunctionInvocationTargetException internalfunctionException) { // Fix for #44709: User should not be aware of // InternalFunctionInvocationTargetException. No instance of @@ -288,13 +290,13 @@ public class ExecuteFunction66 extends BaseCommand { new Object[] {function}), internalfunctionException); } final String message = internalfunctionException.getMessage(); - sendException(hasResult, clientMessage, message, servConn, internalfunctionException); + sendException(hasResult, clientMessage, message, serverConnection, internalfunctionException); } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e); final String message = e.getMessage(); - sendException(hasResult, clientMessage, message, servConn, e); + sendException(hasResult, clientMessage, message, serverConnection, e); } } @@ -384,9 +386,9 @@ public class ExecuteFunction66 extends BaseCommand { }; if (dm == null) { - /** + /* * Executing the function in its own thread pool as FunctionExecution Thread pool of - * DisributionManager is not yet available. + * DistributionManager is not yet available. */ execService.execute(functionExecution); } else { @@ -397,24 +399,25 @@ public class ExecuteFunction66 extends BaseCommand { stats.endFunctionExecution(startExecution, fn.hasResult()); } - private void sendException(byte hasResult, Message msg, String message, ServerConnection servConn, - Throwable e) throws IOException { + private void sendException(byte hasResult, Message msg, String message, + ServerConnection serverConnection, Throwable e) throws IOException { if (hasResult == 1) { - writeFunctionResponseException(msg, MessageType.EXCEPTION, servConn, e); + writeFunctionResponseException(msg, MessageType.EXCEPTION, serverConnection, e); } else { - writeException(msg, e, false, servConn); + writeException(msg, e, false, serverConnection); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } - private void sendError(byte hasResult, Message msg, String message, ServerConnection servConn) - throws IOException { + private void sendError(byte hasResult, Message msg, String message, + ServerConnection serverConnection) throws IOException { if (hasResult == 1) { - writeFunctionResponseError(msg, MessageType.EXECUTE_FUNCTION_ERROR, message, servConn); + writeFunctionResponseError(msg, MessageType.EXECUTE_FUNCTION_ERROR, message, + serverConnection); } else { - writeErrorResponse(msg, MessageType.EXECUTE_FUNCTION_ERROR, message, servConn); + writeErrorResponse(msg, MessageType.EXECUTE_FUNCTION_ERROR, message, serverConnection); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } } http://git-wip-us.apache.org/repos/asf/geode/blob/eab9e6e0/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction70.java index 09fe20e..5299ce4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction70.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction70.java @@ -36,6 +36,7 @@ import org.apache.geode.internal.cache.tier.sockets.Message; import org.apache.geode.internal.cache.tier.sockets.Part; import org.apache.geode.internal.cache.tier.sockets.ServerConnection; import org.apache.geode.internal.i18n.LocalizedStrings; +import org.apache.geode.internal.security.SecurityService; /** * @@ -51,9 +52,9 @@ public class ExecuteFunction70 extends ExecuteFunction66 { private ExecuteFunction70() {} @Override - public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) - throws IOException { - super.cmdExecute(clientMessage, serverConnection, start); + public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, + final SecurityService securityService, long start) throws IOException { + super.cmdExecute(clientMessage, serverConnection, securityService, start); } @Override http://git-wip-us.apache.org/repos/asf/geode/blob/eab9e6e0/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction.java index 82a378d..73eff40 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction.java @@ -43,6 +43,7 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.security.AuthorizeRequest; +import org.apache.geode.internal.security.SecurityService; /** * This is the base command which reads the parts for the MessageType.EXECUTE_REGION_FUNCTION and @@ -63,8 +64,8 @@ public class ExecuteRegionFunction extends BaseCommand { private ExecuteRegionFunction() {} @Override - public void cmdExecute(Message clientMessage, ServerConnection servConn, long start) - throws IOException { + public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, + final SecurityService securityService, long start) throws IOException { String regionName = null; Object function = null; Object args = null; @@ -72,12 +73,12 @@ public class ExecuteRegionFunction extends BaseCommand { Set filter = null; byte hasResult = 0; int filterSize = 0, partNumber = 0; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); try { hasResult = clientMessage.getPart(0).getSerializedForm()[0]; if (hasResult == 1) { - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); } regionName = clientMessage.getPart(1).getString(); function = clientMessage.getPart(2).getStringOrObject(); @@ -103,8 +104,8 @@ public class ExecuteRegionFunction extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), exception); if (hasResult == 1) { - writeChunkedException(clientMessage, exception, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, exception, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -120,8 +121,8 @@ public class ExecuteRegionFunction extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL .toLocalizedString("region"); } - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } @@ -130,12 +131,12 @@ public class ExecuteRegionFunction extends BaseCommand { String message = LocalizedStrings.ExecuteRegionFunction_THE_REGION_NAMED_0_WAS_NOT_FOUND_DURING_EXECUTE_FUNCTION_REQUEST .toLocalizedString(regionName); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } - HandShake handShake = (HandShake) servConn.getHandshake(); + HandShake handShake = (HandShake) serverConnection.getHandshake(); int earlierClientReadTimeout = handShake.getClientReadTimeout(); handShake.setClientReadTimeout(0); ServerToClientFunctionResultSender resultSender = null; @@ -147,18 +148,18 @@ public class ExecuteRegionFunction extends BaseCommand { String message = LocalizedStrings.ExecuteRegionFunction_THE_FUNCTION_0_HAS_NOT_BEEN_REGISTERED .toLocalizedString(function); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } } else { functionObject = (Function) function; } - this.securityService.authorizeDataWrite(); + securityService.authorizeDataWrite(); // check if the caller is authorized to do this operation on server - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); final String functionName = functionObject.getId(); final String regionPath = region.getFullPath(); ExecuteFunctionOperationContext executeContext = null; @@ -169,10 +170,11 @@ public class ExecuteRegionFunction extends BaseCommand { // Construct execution AbstractExecution execution = (AbstractExecution) FunctionService.onRegion(region); - ChunkedMessage m = servConn.getFunctionResponseMessage(); + ChunkedMessage m = serverConnection.getFunctionResponseMessage(); m.setTransactionId(clientMessage.getTransactionId()); - resultSender = new ServerToClientFunctionResultSender(m, - MessageType.EXECUTE_REGION_FUNCTION_RESULT, servConn, functionObject, executeContext); + resultSender = + new ServerToClientFunctionResultSender(m, MessageType.EXECUTE_REGION_FUNCTION_RESULT, + serverConnection, functionObject, executeContext); if (execution instanceof PartitionedRegionFunctionExecutor) { execution = new PartitionedRegionFunctionExecutor((PartitionedRegion) region, filter, args, @@ -184,7 +186,7 @@ public class ExecuteRegionFunction extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug("Executing Function: {} on Server: {} with Execution: {}", - functionObject.getId(), servConn, execution); + functionObject.getId(), serverConnection, execution); } if (hasResult == 1) { if (function instanceof String) { @@ -205,7 +207,7 @@ public class ExecuteRegionFunction extends BaseCommand { function), ioe); final String message = LocalizedStrings.ExecuteRegionFunction_SERVER_COULD_NOT_SEND_THE_REPLY .toLocalizedString(); - sendException(hasResult, clientMessage, message, servConn, ioe); + sendException(hasResult, clientMessage, message, serverConnection, ioe); } catch (InternalFunctionInvocationTargetException internalfunctionException) { // Fix for #44709: User should not be aware of // InternalFunctionInvocationTargetException. No instance of @@ -223,42 +225,42 @@ public class ExecuteRegionFunction extends BaseCommand { new Object[] {function}), internalfunctionException); } final String message = internalfunctionException.getMessage(); - sendException(hasResult, clientMessage, message, servConn, internalfunctionException); + sendException(hasResult, clientMessage, message, serverConnection, internalfunctionException); } catch (FunctionException fe) { logger.warn(LocalizedMessage.create( LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), fe); String message = fe.getMessage(); - sendException(hasResult, clientMessage, message, servConn, fe); + sendException(hasResult, clientMessage, message, serverConnection, fe); } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e); String message = e.getMessage(); - sendException(hasResult, clientMessage, message, servConn, e); + sendException(hasResult, clientMessage, message, serverConnection, e); } finally { handShake.setClientReadTimeout(earlierClientReadTimeout); } } - private void sendException(byte hasResult, Message msg, String message, ServerConnection servConn, - Throwable e) throws IOException { + private void sendException(byte hasResult, Message msg, String message, + ServerConnection serverConnection, Throwable e) throws IOException { synchronized (msg) { if (hasResult == 1) { - writeFunctionResponseException(msg, MessageType.EXCEPTION, servConn, e); - servConn.setAsTrue(RESPONDED); + writeFunctionResponseException(msg, MessageType.EXCEPTION, serverConnection, e); + serverConnection.setAsTrue(RESPONDED); } } } - private void sendError(byte hasResult, Message msg, String message, ServerConnection servConn) - throws IOException { + private void sendError(byte hasResult, Message msg, String message, + ServerConnection serverConnection) throws IOException { synchronized (msg) { if (hasResult == 1) { writeFunctionResponseError(msg, MessageType.EXECUTE_REGION_FUNCTION_ERROR, message, - servConn); - servConn.setAsTrue(RESPONDED); + serverConnection); + serverConnection.setAsTrue(RESPONDED); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/eab9e6e0/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction61.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction61.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction61.java index 12919d0..8e1e01b 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction61.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction61.java @@ -16,7 +16,6 @@ package org.apache.geode.internal.cache.tier.sockets.command; import java.io.IOException; -import java.io.Serializable; import java.util.HashSet; import java.util.Set; @@ -46,6 +45,7 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.security.AuthorizeRequest; +import org.apache.geode.internal.security.SecurityService; /** * @@ -63,8 +63,8 @@ public class ExecuteRegionFunction61 extends BaseCommand { private ExecuteRegionFunction61() {} @Override - public void cmdExecute(Message clientMessage, ServerConnection servConn, long start) - throws IOException { + public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, + final SecurityService securityService, long start) throws IOException { String regionName = null; Object function = null; Object args = null; @@ -75,12 +75,12 @@ public class ExecuteRegionFunction61 extends BaseCommand { int removedNodesSize = 0; Set removedNodesSet = null; int filterSize = 0, partNumber = 0; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); try { hasResult = clientMessage.getPart(0).getSerializedForm()[0]; if (hasResult == 1) { - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); } regionName = clientMessage.getPart(1).getString(); function = clientMessage.getPart(2).getStringOrObject(); @@ -119,8 +119,8 @@ public class ExecuteRegionFunction61 extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), exception); if (hasResult == 1) { - writeChunkedException(clientMessage, exception, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, exception, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -136,8 +136,8 @@ public class ExecuteRegionFunction61 extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL .toLocalizedString("region"); } - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } else { Region region = crHelper.getRegion(regionName); @@ -145,11 +145,11 @@ public class ExecuteRegionFunction61 extends BaseCommand { String message = LocalizedStrings.ExecuteRegionFunction_THE_REGION_NAMED_0_WAS_NOT_FOUND_DURING_EXECUTE_FUNCTION_REQUEST .toLocalizedString(regionName); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } - HandShake handShake = (HandShake) servConn.getHandshake(); + HandShake handShake = (HandShake) serverConnection.getHandshake(); int earlierClientReadTimeout = handShake.getClientReadTimeout(); handShake.setClientReadTimeout(0); ServerToClientFunctionResultSender resultSender = null; @@ -161,15 +161,15 @@ public class ExecuteRegionFunction61 extends BaseCommand { String message = LocalizedStrings.ExecuteRegionFunction_THE_FUNCTION_0_HAS_NOT_BEEN_REGISTERED .toLocalizedString(function); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } } else { functionObject = (Function) function; } // check if the caller is authorized to do this operation on server - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); final String functionName = functionObject.getId(); final String regionPath = region.getFullPath(); ExecuteFunctionOperationContext executeContext = null; @@ -180,10 +180,11 @@ public class ExecuteRegionFunction61 extends BaseCommand { // Construct execution AbstractExecution execution = (AbstractExecution) FunctionService.onRegion(region); - ChunkedMessage m = servConn.getFunctionResponseMessage(); + ChunkedMessage m = serverConnection.getFunctionResponseMessage(); m.setTransactionId(clientMessage.getTransactionId()); - resultSender = new ServerToClientFunctionResultSender(m, - MessageType.EXECUTE_REGION_FUNCTION_RESULT, servConn, functionObject, executeContext); + resultSender = + new ServerToClientFunctionResultSender(m, MessageType.EXECUTE_REGION_FUNCTION_RESULT, + serverConnection, functionObject, executeContext); if (execution instanceof PartitionedRegionFunctionExecutor) { @@ -199,7 +200,7 @@ public class ExecuteRegionFunction61 extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug("Executing Function: {} on Server: {} with Execution: {}", - functionObject.getId(), servConn, execution); + functionObject.getId(), serverConnection, execution); } if (hasResult == 1) { if (function instanceof String) { @@ -221,7 +222,7 @@ public class ExecuteRegionFunction61 extends BaseCommand { final String message = LocalizedStrings.ExecuteRegionFunction_SERVER_COULD_NOT_SEND_THE_REPLY .toLocalizedString(); - sendException(hasResult, clientMessage, message, servConn, ioe); + sendException(hasResult, clientMessage, message, serverConnection, ioe); } catch (FunctionException fe) { String message = fe.getMessage(); @@ -256,7 +257,7 @@ public class ExecuteRegionFunction61 extends BaseCommand { logger.warn(LocalizedMessage.create( LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), fe); - sendException(hasResult, clientMessage, message, servConn, fe); + sendException(hasResult, clientMessage, message, serverConnection, fe); } } catch (Exception e) { @@ -264,7 +265,7 @@ public class ExecuteRegionFunction61 extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e); String message = e.getMessage(); - sendException(hasResult, clientMessage, message, servConn, e); + sendException(hasResult, clientMessage, message, serverConnection, e); } finally { @@ -273,19 +274,20 @@ public class ExecuteRegionFunction61 extends BaseCommand { } } - private void sendException(byte hasResult, Message msg, String message, ServerConnection servConn, - Throwable e) throws IOException { + private void sendException(byte hasResult, Message msg, String message, + ServerConnection serverConnection, Throwable e) throws IOException { if (hasResult == 1) { - writeFunctionResponseException(msg, MessageType.EXCEPTION, servConn, e); - servConn.setAsTrue(RESPONDED); + writeFunctionResponseException(msg, MessageType.EXCEPTION, serverConnection, e); + serverConnection.setAsTrue(RESPONDED); } } - private void sendError(byte hasResult, Message msg, String message, ServerConnection servConn) - throws IOException { + private void sendError(byte hasResult, Message msg, String message, + ServerConnection serverConnection) throws IOException { if (hasResult == 1) { - writeFunctionResponseError(msg, MessageType.EXECUTE_REGION_FUNCTION_ERROR, message, servConn); - servConn.setAsTrue(RESPONDED); + writeFunctionResponseError(msg, MessageType.EXECUTE_REGION_FUNCTION_ERROR, message, + serverConnection); + serverConnection.setAsTrue(RESPONDED); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/eab9e6e0/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction65.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction65.java index 3be907b..391d9d0 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction65.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction65.java @@ -45,6 +45,7 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.security.AuthorizeRequest; +import org.apache.geode.internal.security.SecurityService; /** * @since GemFire 6.5 @@ -60,8 +61,8 @@ public class ExecuteRegionFunction65 extends BaseCommand { private ExecuteRegionFunction65() {} @Override - public void cmdExecute(Message clientMessage, ServerConnection servConn, long start) - throws IOException { + public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, + final SecurityService securityService, long start) throws IOException { String regionName = null; Object function = null; Object args = null; @@ -72,7 +73,7 @@ public class ExecuteRegionFunction65 extends BaseCommand { int removedNodesSize = 0; Set removedNodesSet = null; int filterSize = 0, partNumber = 0; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); byte functionState = 0; try { functionState = clientMessage.getPart(0).getSerializedForm()[0]; @@ -82,8 +83,8 @@ public class ExecuteRegionFunction65 extends BaseCommand { hasResult = functionState; } if (hasResult == 1) { - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); } regionName = clientMessage.getPart(1).getString(); function = clientMessage.getPart(2).getStringOrObject(); @@ -122,8 +123,8 @@ public class ExecuteRegionFunction65 extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), exception); if (hasResult == 1) { - writeChunkedException(clientMessage, exception, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, exception, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -139,8 +140,8 @@ public class ExecuteRegionFunction65 extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL .toLocalizedString("region"); } - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } @@ -149,12 +150,12 @@ public class ExecuteRegionFunction65 extends BaseCommand { String message = LocalizedStrings.ExecuteRegionFunction_THE_REGION_NAMED_0_WAS_NOT_FOUND_DURING_EXECUTE_FUNCTION_REQUEST .toLocalizedString(regionName); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } - HandShake handShake = (HandShake) servConn.getHandshake(); + HandShake handShake = (HandShake) serverConnection.getHandshake(); int earlierClientReadTimeout = handShake.getClientReadTimeout(); handShake.setClientReadTimeout(0); ServerToClientFunctionResultSender resultSender = null; @@ -166,8 +167,8 @@ public class ExecuteRegionFunction65 extends BaseCommand { String message = LocalizedStrings.ExecuteRegionFunction_THE_FUNCTION_0_HAS_NOT_BEEN_REGISTERED .toLocalizedString(function); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } else { byte functionStateOnServerSide = AbstractExecution.getFunctionState(functionObject.isHA(), @@ -180,8 +181,8 @@ public class ExecuteRegionFunction65 extends BaseCommand { String message = LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER .toLocalizedString(function); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } } @@ -189,10 +190,10 @@ public class ExecuteRegionFunction65 extends BaseCommand { functionObject = (Function) function; } - this.securityService.authorizeDataWrite(); + securityService.authorizeDataWrite(); // check if the caller is authorized to do this operation on server - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); final String functionName = functionObject.getId(); final String regionPath = region.getFullPath(); ExecuteFunctionOperationContext executeContext = null; @@ -203,10 +204,11 @@ public class ExecuteRegionFunction65 extends BaseCommand { // Construct execution AbstractExecution execution = (AbstractExecution) FunctionService.onRegion(region); - ChunkedMessage m = servConn.getFunctionResponseMessage(); + ChunkedMessage m = serverConnection.getFunctionResponseMessage(); m.setTransactionId(clientMessage.getTransactionId()); - resultSender = new ServerToClientFunctionResultSender65(m, - MessageType.EXECUTE_REGION_FUNCTION_RESULT, servConn, functionObject, executeContext); + resultSender = + new ServerToClientFunctionResultSender65(m, MessageType.EXECUTE_REGION_FUNCTION_RESULT, + serverConnection, functionObject, executeContext); if (execution instanceof PartitionedRegionFunctionExecutor) { @@ -222,7 +224,8 @@ public class ExecuteRegionFunction65 extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug( "Executing Function: {} on Server: {} with Execution: {} functionState={} reexecute={} hasResult={}", - functionObject.getId(), servConn, execution, functionState, isReExecute, hasResult); + functionObject.getId(), serverConnection, execution, functionState, isReExecute, + hasResult); } if (hasResult == 1) { if (function instanceof String) { @@ -263,7 +266,7 @@ public class ExecuteRegionFunction65 extends BaseCommand { function), ioe); final String message = LocalizedStrings.ExecuteRegionFunction_SERVER_COULD_NOT_SEND_THE_REPLY .toLocalizedString(); - sendException(hasResult, clientMessage, message, servConn, ioe); + sendException(hasResult, clientMessage, message, serverConnection, ioe); } catch (FunctionException fe) { String message = fe.getMessage(); @@ -299,7 +302,7 @@ public class ExecuteRegionFunction65 extends BaseCommand { logger.warn(LocalizedMessage.create( LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), fe); - sendException(hasResult, clientMessage, message, servConn, fe); + sendException(hasResult, clientMessage, message, serverConnection, fe); } } catch (Exception e) { @@ -307,37 +310,37 @@ public class ExecuteRegionFunction65 extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e); String message = e.getMessage(); - sendException(hasResult, clientMessage, message, servConn, e); + sendException(hasResult, clientMessage, message, serverConnection, e); } finally { handShake.setClientReadTimeout(earlierClientReadTimeout); } } - private void sendException(byte hasResult, Message msg, String message, ServerConnection servConn, - Throwable e) throws IOException { + private void sendException(byte hasResult, Message msg, String message, + ServerConnection serverConnection, Throwable e) throws IOException { synchronized (msg) { if (hasResult == 1) { - writeFunctionResponseException(msg, MessageType.EXCEPTION, message, servConn, e); - servConn.setAsTrue(RESPONDED); + writeFunctionResponseException(msg, MessageType.EXCEPTION, message, serverConnection, e); + serverConnection.setAsTrue(RESPONDED); } } } - private void sendError(byte hasResult, Message msg, String message, ServerConnection servConn) - throws IOException { + private void sendError(byte hasResult, Message msg, String message, + ServerConnection serverConnection) throws IOException { synchronized (msg) { if (hasResult == 1) { writeFunctionResponseError(msg, MessageType.EXECUTE_REGION_FUNCTION_ERROR, message, - servConn); - servConn.setAsTrue(RESPONDED); + serverConnection); + serverConnection.setAsTrue(RESPONDED); } } } protected static void writeFunctionResponseException(Message origMsg, int messageType, - String message, ServerConnection servConn, Throwable e) throws IOException { - ChunkedMessage functionResponseMsg = servConn.getFunctionResponseMessage(); - ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + String message, ServerConnection serverConnection, Throwable e) throws IOException { + ChunkedMessage functionResponseMsg = serverConnection.getFunctionResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); int numParts = 0; if (functionResponseMsg.headerHasBeenSent()) { if (e instanceof FunctionException @@ -356,13 +359,13 @@ public class ExecuteRegionFunction65 extends BaseCommand { numParts = 2; } if (logger.isDebugEnabled()) { - logger.debug("{}: Sending exception chunk while reply in progress: ", servConn.getName(), - e); + logger.debug("{}: Sending exception chunk while reply in progress: ", + serverConnection.getName(), e); } - functionResponseMsg.setServerConnection(servConn); + functionResponseMsg.setServerConnection(serverConnection); functionResponseMsg.setLastChunkAndNumParts(true, numParts); // functionResponseMsg.setLastChunk(true); - functionResponseMsg.sendChunk(servConn); + functionResponseMsg.sendChunk(serverConnection); } else { chunkedResponseMsg.setMessageType(messageType); chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); @@ -383,11 +386,11 @@ public class ExecuteRegionFunction65 extends BaseCommand { numParts = 2; } if (logger.isDebugEnabled()) { - logger.debug("{}: Sending exception chunk: ", servConn.getName(), e); + logger.debug("{}: Sending exception chunk: ", serverConnection.getName(), e); } - chunkedResponseMsg.setServerConnection(servConn); + chunkedResponseMsg.setServerConnection(serverConnection); chunkedResponseMsg.setLastChunkAndNumParts(true, numParts); - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/eab9e6e0/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java index 674082c..cf59ad5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java @@ -49,6 +49,7 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.security.AuthorizeRequest; +import org.apache.geode.internal.security.SecurityService; /** * @since GemFire 6.6 @@ -64,8 +65,8 @@ public class ExecuteRegionFunction66 extends BaseCommand { private ExecuteRegionFunction66() {} @Override - public void cmdExecute(Message clientMessage, ServerConnection servConn, long start) - throws IOException { + public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, + final SecurityService securityService, long start) throws IOException { String regionName = null; Object function = null; Object args = null; @@ -77,14 +78,14 @@ public class ExecuteRegionFunction66 extends BaseCommand { int removedNodesSize = 0; Set removedNodesSet = null; int filterSize = 0, partNumber = 0; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); byte functionState = 0; int functionTimeout = ConnectionImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT; try { byte[] bytes = clientMessage.getPart(0).getSerializedForm(); functionState = bytes[0]; if (bytes.length >= 5 - && servConn.getClientVersion().ordinal() >= Version.GFE_8009.ordinal()) { + && serverConnection.getClientVersion().ordinal() >= Version.GFE_8009.ordinal()) { functionTimeout = Part.decodeInt(bytes, 1); } if (functionState != 1) { @@ -93,8 +94,8 @@ public class ExecuteRegionFunction66 extends BaseCommand { hasResult = functionState; } if (hasResult == 1) { - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); } regionName = clientMessage.getPart(1).getString(); function = clientMessage.getPart(2).getStringOrObject(); @@ -107,7 +108,7 @@ public class ExecuteRegionFunction66 extends BaseCommand { } } byte[] flags = clientMessage.getPart(5).getSerializedForm(); - if (servConn.getClientVersion().ordinal() > Version.GFE_81.ordinal()) { + if (serverConnection.getClientVersion().ordinal() > Version.GFE_81.ordinal()) { isBucketsAsFilter = (flags[0] & ExecuteFunctionHelper.BUCKETS_AS_FILTER_MASK) != 0; isReExecute = (flags[0] & ExecuteFunctionHelper.IS_REXECUTE_MASK) != 0 ? (byte) 1 : 0; } else { @@ -140,11 +141,11 @@ public class ExecuteRegionFunction66 extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), exception); if (hasResult == 1) { - writeChunkedException(clientMessage, exception, servConn); + writeChunkedException(clientMessage, exception, serverConnection); } else { - writeException(clientMessage, exception, false, servConn); + writeException(clientMessage, exception, false, serverConnection); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); return; } if (function == null || regionName == null) { @@ -159,8 +160,8 @@ public class ExecuteRegionFunction66 extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL .toLocalizedString("region"); } - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } @@ -169,11 +170,11 @@ public class ExecuteRegionFunction66 extends BaseCommand { String message = LocalizedStrings.ExecuteRegionFunction_THE_REGION_NAMED_0_WAS_NOT_FOUND_DURING_EXECUTE_FUNCTION_REQUEST .toLocalizedString(regionName); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } - HandShake handShake = (HandShake) servConn.getHandshake(); + HandShake handShake = (HandShake) serverConnection.getHandshake(); int earlierClientReadTimeout = handShake.getClientReadTimeout(); handShake.setClientReadTimeout(functionTimeout); ServerToClientFunctionResultSender resultSender = null; @@ -185,8 +186,8 @@ public class ExecuteRegionFunction66 extends BaseCommand { String message = LocalizedStrings.ExecuteRegionFunction_THE_FUNCTION_0_HAS_NOT_BEEN_REGISTERED .toLocalizedString(function); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } else { byte functionStateOnServerSide = AbstractExecution.getFunctionState(functionObject.isHA(), @@ -199,8 +200,8 @@ public class ExecuteRegionFunction66 extends BaseCommand { String message = LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER .toLocalizedString(function); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } } @@ -208,10 +209,10 @@ public class ExecuteRegionFunction66 extends BaseCommand { functionObject = (Function) function; } - this.securityService.authorizeDataWrite(); + securityService.authorizeDataWrite(); // check if the caller is authorized to do this operation on server - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); final String functionName = functionObject.getId(); final String regionPath = region.getFullPath(); ExecuteFunctionOperationContext executeContext = null; @@ -222,10 +223,11 @@ public class ExecuteRegionFunction66 extends BaseCommand { // Construct execution AbstractExecution execution = (AbstractExecution) FunctionService.onRegion(region); - ChunkedMessage m = servConn.getFunctionResponseMessage(); + ChunkedMessage m = serverConnection.getFunctionResponseMessage(); m.setTransactionId(clientMessage.getTransactionId()); - resultSender = new ServerToClientFunctionResultSender65(m, - MessageType.EXECUTE_REGION_FUNCTION_RESULT, servConn, functionObject, executeContext); + resultSender = + new ServerToClientFunctionResultSender65(m, MessageType.EXECUTE_REGION_FUNCTION_RESULT, + serverConnection, functionObject, executeContext); if (execution instanceof PartitionedRegionFunctionExecutor) { if ((hasResult == 1) && filter != null && filter.size() == 1) { @@ -243,7 +245,8 @@ public class ExecuteRegionFunction66 extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug( "Executing Function: {} on Server: {} with Execution: {} functionState={} reExecute={} hasResult={}", - functionObject.getId(), servConn, execution, functionState, isReExecute, hasResult); + functionObject.getId(), serverConnection, execution, functionState, isReExecute, + hasResult); } if (hasResult == 1) { if (function instanceof String) { @@ -277,7 +280,7 @@ public class ExecuteRegionFunction66 extends BaseCommand { } else { execution.execute(functionObject); } - writeReply(clientMessage, servConn); + writeReply(clientMessage, serverConnection); } } catch (IOException ioe) { logger.warn(LocalizedMessage.create( @@ -285,7 +288,7 @@ public class ExecuteRegionFunction66 extends BaseCommand { function), ioe); final String message = LocalizedStrings.ExecuteRegionFunction_SERVER_COULD_NOT_SEND_THE_REPLY .toLocalizedString(); - sendException(hasResult, clientMessage, message, servConn, ioe); + sendException(hasResult, clientMessage, message, serverConnection, ioe); } catch (FunctionException fe) { String message = fe.getMessage(); Object cause = fe.getCause(); @@ -322,7 +325,7 @@ public class ExecuteRegionFunction66 extends BaseCommand { logger.warn(LocalizedMessage.create( LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), fe); - sendException(hasResult, clientMessage, message, servConn, fe); + sendException(hasResult, clientMessage, message, serverConnection, fe); } } catch (Exception e) { @@ -330,42 +333,43 @@ public class ExecuteRegionFunction66 extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e); String message = e.getMessage(); - sendException(hasResult, clientMessage, message, servConn, e); + sendException(hasResult, clientMessage, message, serverConnection, e); } finally { handShake.setClientReadTimeout(earlierClientReadTimeout); ServerConnection.executeFunctionOnLocalNodeOnly((byte) 0); } } - private void sendException(byte hasResult, Message msg, String message, ServerConnection servConn, - Throwable e) throws IOException { + private void sendException(byte hasResult, Message msg, String message, + ServerConnection serverConnection, Throwable e) throws IOException { synchronized (msg) { if (hasResult == 1) { - writeFunctionResponseException(msg, MessageType.EXCEPTION, message, servConn, e); + writeFunctionResponseException(msg, MessageType.EXCEPTION, message, serverConnection, e); } else { - writeException(msg, e, false, servConn); + writeException(msg, e, false, serverConnection); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } } - private void sendError(byte hasResult, Message msg, String message, ServerConnection servConn) - throws IOException { + private void sendError(byte hasResult, Message msg, String message, + ServerConnection serverConnection) throws IOException { synchronized (msg) { if (hasResult == 1) { writeFunctionResponseError(msg, MessageType.EXECUTE_REGION_FUNCTION_ERROR, message, - servConn); + serverConnection); } else { - writeErrorResponse(msg, MessageType.EXECUTE_REGION_FUNCTION_ERROR, message, servConn); + writeErrorResponse(msg, MessageType.EXECUTE_REGION_FUNCTION_ERROR, message, + serverConnection); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } } protected static void writeFunctionResponseException(Message origMsg, int messageType, - String message, ServerConnection servConn, Throwable e) throws IOException { - ChunkedMessage functionResponseMsg = servConn.getFunctionResponseMessage(); - ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + String message, ServerConnection serverConnection, Throwable e) throws IOException { + ChunkedMessage functionResponseMsg = serverConnection.getFunctionResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); int numParts = 0; if (functionResponseMsg.headerHasBeenSent()) { if (e instanceof FunctionException @@ -384,13 +388,13 @@ public class ExecuteRegionFunction66 extends BaseCommand { numParts = 2; } if (logger.isDebugEnabled()) { - logger.debug("{}: Sending exception chunk while reply in progress: ", servConn.getName(), - e); + logger.debug("{}: Sending exception chunk while reply in progress: ", + serverConnection.getName(), e); } - functionResponseMsg.setServerConnection(servConn); + functionResponseMsg.setServerConnection(serverConnection); functionResponseMsg.setLastChunkAndNumParts(true, numParts); // functionResponseMsg.setLastChunk(true); - functionResponseMsg.sendChunk(servConn); + functionResponseMsg.sendChunk(serverConnection); } else { chunkedResponseMsg.setMessageType(messageType); chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); @@ -411,11 +415,11 @@ public class ExecuteRegionFunction66 extends BaseCommand { numParts = 2; } if (logger.isDebugEnabled()) { - logger.debug("{}: Sending exception chunk: ", servConn.getName(), e); + logger.debug("{}: Sending exception chunk: ", serverConnection.getName(), e); } - chunkedResponseMsg.setServerConnection(servConn); + chunkedResponseMsg.setServerConnection(serverConnection); chunkedResponseMsg.setLastChunkAndNumParts(true, numParts); - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/eab9e6e0/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java index cf96137..a0594ad 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java @@ -46,6 +46,7 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.security.AuthorizeRequest; +import org.apache.geode.internal.security.SecurityService; /** * @since GemFire 6.5 @@ -62,8 +63,8 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { private ExecuteRegionFunctionSingleHop() {} @Override - public void cmdExecute(Message clientMessage, ServerConnection servConn, long start) - throws IOException { + public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, + final SecurityService securityService, long start) throws IOException { String regionName = null; Object function = null; @@ -77,13 +78,13 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { int removedNodesSize = 0; Set removedNodesSet = null; int filterSize = 0, bucketIdsSize = 0, partNumber = 0; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); int functionTimeout = ConnectionImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT; try { byte[] bytes = clientMessage.getPart(0).getSerializedForm(); functionState = bytes[0]; if (bytes.length >= 5 - && servConn.getClientVersion().ordinal() >= Version.GFE_8009.ordinal()) { + && serverConnection.getClientVersion().ordinal() >= Version.GFE_8009.ordinal()) { functionTimeout = Part.decodeInt(bytes, 1); } if (functionState != 1) { @@ -92,8 +93,8 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { hasResult = functionState; } if (hasResult == 1) { - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); } regionName = clientMessage.getPart(1).getString(); function = clientMessage.getPart(2).getStringOrObject(); @@ -146,8 +147,8 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), exception); if (hasResult == 1) { - writeChunkedException(clientMessage, exception, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, exception, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -163,8 +164,8 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL .toLocalizedString("region"); } - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } @@ -173,11 +174,11 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { String message = LocalizedStrings.ExecuteRegionFunction_THE_REGION_NAMED_0_WAS_NOT_FOUND_DURING_EXECUTE_FUNCTION_REQUEST .toLocalizedString(regionName); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } - HandShake handShake = (HandShake) servConn.getHandshake(); + HandShake handShake = (HandShake) serverConnection.getHandshake(); int earlierClientReadTimeout = handShake.getClientReadTimeout(); handShake.setClientReadTimeout(functionTimeout); ServerToClientFunctionResultSender resultSender = null; @@ -189,8 +190,8 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { String message = LocalizedStrings.ExecuteRegionFunction_THE_FUNCTION_0_HAS_NOT_BEEN_REGISTERED .toLocalizedString(function); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } else { byte functionStateOnServer = AbstractExecution.getFunctionState(functionObject.isHA(), @@ -199,8 +200,8 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { String message = LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER .toLocalizedString(function); - logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, clientMessage, message, servConn); + logger.warn("{}: {}", serverConnection.getName(), message); + sendError(hasResult, clientMessage, message, serverConnection); return; } } @@ -208,10 +209,10 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { functionObject = (Function) function; } - this.securityService.authorizeDataWrite(); + securityService.authorizeDataWrite(); // check if the caller is authorized to do this operation on server - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); final String functionName = functionObject.getId(); final String regionPath = region.getFullPath(); ExecuteFunctionOperationContext executeContext = null; @@ -222,10 +223,11 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { // Construct execution AbstractExecution execution = (AbstractExecution) FunctionService.onRegion(region); - ChunkedMessage m = servConn.getFunctionResponseMessage(); + ChunkedMessage m = serverConnection.getFunctionResponseMessage(); m.setTransactionId(clientMessage.getTransactionId()); - resultSender = new ServerToClientFunctionResultSender65(m, - MessageType.EXECUTE_REGION_FUNCTION_RESULT, servConn, functionObject, executeContext); + resultSender = + new ServerToClientFunctionResultSender65(m, MessageType.EXECUTE_REGION_FUNCTION_RESULT, + serverConnection, functionObject, executeContext); if (isExecuteOnAllBuckets == 1) { PartitionedRegion pr = (PartitionedRegion) region; @@ -250,7 +252,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug("Executing Function: {} on Server: {} with Execution: {}", - functionObject.getId(), servConn, execution); + functionObject.getId(), serverConnection, execution); } if (hasResult == 1) { if (function instanceof String) { @@ -291,7 +293,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { function), ioe); final String message = LocalizedStrings.ExecuteRegionFunction_SERVER_COULD_NOT_SEND_THE_REPLY .toLocalizedString(); - sendException(hasResult, clientMessage, message, servConn, ioe); + sendException(hasResult, clientMessage, message, serverConnection, ioe); } catch (FunctionException fe) { String message = fe.getMessage(); @@ -309,45 +311,45 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { logger.warn(LocalizedMessage.create( LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), fe); - sendException(hasResult, clientMessage, message, servConn, fe); + sendException(hasResult, clientMessage, message, serverConnection, fe); } } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e); String message = e.getMessage(); - sendException(hasResult, clientMessage, message, servConn, e); + sendException(hasResult, clientMessage, message, serverConnection, e); } finally { handShake.setClientReadTimeout(earlierClientReadTimeout); ServerConnection.executeFunctionOnLocalNodeOnly((byte) 0); } } - private void sendException(byte hasResult, Message msg, String message, ServerConnection servConn, - Throwable e) throws IOException { + private void sendException(byte hasResult, Message msg, String message, + ServerConnection serverConnection, Throwable e) throws IOException { synchronized (msg) { if (hasResult == 1) { - writeFunctionResponseException(msg, MessageType.EXCEPTION, message, servConn, e); - servConn.setAsTrue(RESPONDED); + writeFunctionResponseException(msg, MessageType.EXCEPTION, message, serverConnection, e); + serverConnection.setAsTrue(RESPONDED); } } } - private void sendError(byte hasResult, Message msg, String message, ServerConnection servConn) - throws IOException { + private void sendError(byte hasResult, Message msg, String message, + ServerConnection serverConnection) throws IOException { synchronized (msg) { if (hasResult == 1) { writeFunctionResponseError(msg, MessageType.EXECUTE_REGION_FUNCTION_ERROR, message, - servConn); - servConn.setAsTrue(RESPONDED); + serverConnection); + serverConnection.setAsTrue(RESPONDED); } } } protected static void writeFunctionResponseException(Message origMsg, int messageType, - String message, ServerConnection servConn, Throwable e) throws IOException { - ChunkedMessage functionResponseMsg = servConn.getFunctionResponseMessage(); - ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + String message, ServerConnection serverConnection, Throwable e) throws IOException { + ChunkedMessage functionResponseMsg = serverConnection.getFunctionResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); int numParts = 0; if (functionResponseMsg.headerHasBeenSent()) { if (e instanceof FunctionException @@ -366,12 +368,12 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { numParts = 2; } if (logger.isDebugEnabled()) { - logger.debug("{}: Sending exception chunk while reply in progress: ", servConn.getName(), - e); + logger.debug("{}: Sending exception chunk while reply in progress: ", + serverConnection.getName(), e); } - functionResponseMsg.setServerConnection(servConn); + functionResponseMsg.setServerConnection(serverConnection); functionResponseMsg.setLastChunkAndNumParts(true, numParts); - functionResponseMsg.sendChunk(servConn); + functionResponseMsg.sendChunk(serverConnection); } else { chunkedResponseMsg.setMessageType(messageType); chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); @@ -392,11 +394,11 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { numParts = 2; } if (logger.isDebugEnabled()) { - logger.debug("{}: Sending exception chunk: ", servConn.getName(), e); + logger.debug("{}: Sending exception chunk: ", serverConnection.getName(), e); } - chunkedResponseMsg.setServerConnection(servConn); + chunkedResponseMsg.setServerConnection(serverConnection); chunkedResponseMsg.setLastChunkAndNumParts(true, numParts); - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/eab9e6e0/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java index d489b88..12e494a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java @@ -33,7 +33,6 @@ import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.EventIDHolder; -import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.tier.CachedRegionHelper; @@ -49,6 +48,7 @@ import org.apache.geode.internal.cache.wan.GatewayReceiverStats; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.security.AuthorizeRequest; +import org.apache.geode.internal.security.SecurityService; import org.apache.geode.internal.util.BlobHelper; import org.apache.geode.pdx.PdxConfigurationException; import org.apache.geode.pdx.PdxRegistryMismatchException; @@ -79,8 +79,8 @@ public class GatewayReceiverCommand extends BaseCommand { } @Override - public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) - throws IOException, InterruptedException { + public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, + final SecurityService securityService, long start) throws IOException, InterruptedException { Part regionNamePart = null, keyPart = null, valuePart = null, callbackArgPart = null; String regionName = null; Object callbackArg = null, key = null; http://git-wip-us.apache.org/repos/asf/geode/blob/eab9e6e0/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java index 2ca8804..62644eb 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java @@ -43,6 +43,7 @@ import org.apache.geode.internal.offheap.annotations.Retained; import org.apache.geode.internal.offheap.annotations.Unretained; import org.apache.geode.internal.security.AuthorizeRequest; import org.apache.geode.internal.security.AuthorizeRequestPP; +import org.apache.geode.internal.security.SecurityService; import org.apache.geode.security.NotAuthorizedException; public class Get70 extends BaseCommand { @@ -54,8 +55,8 @@ public class Get70 extends BaseCommand { } @Override - public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long startparam) - throws IOException { + public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, + final SecurityService securityService, long startparam) throws IOException { long start = startparam; Part regionNamePart = null, keyPart = null, valuePart = null; String regionName = null; @@ -131,7 +132,7 @@ public class Get70 extends BaseCommand { GetOperationContext getContext = null; try { // for integrated security - this.securityService.authorizeRegionRead(regionName, key.toString()); + securityService.authorizeRegionRead(regionName, key.toString()); AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { @@ -188,7 +189,7 @@ public class Get70 extends BaseCommand { } // post process - data = this.securityService.postProcess(regionName, key, data, entry.isObject); + data = securityService.postProcess(regionName, key, data, entry.isObject); long oldStart = start; start = DistributionStats.getStatTime();