Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A8214200BF0 for ; Fri, 30 Dec 2016 17:20:57 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A69F0160B32; Fri, 30 Dec 2016 16:20:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7CDFA160B24 for ; Fri, 30 Dec 2016 17:20:56 +0100 (CET) Received: (qmail 44978 invoked by uid 500); 30 Dec 2016 16:20:55 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 44969 invoked by uid 99); 30 Dec 2016 16:20:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Dec 2016 16:20:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4B629DFC15; Fri, 30 Dec 2016 16:20:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bschuchardt@apache.org To: commits@geode.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: geode git commit: GEODE-2257 Client configured to use locator with addPoolServer fails to connect Date: Fri, 30 Dec 2016 16:20:55 +0000 (UTC) archived-at: Fri, 30 Dec 2016 16:20:57 -0000 Repository: geode Updated Branches: refs/heads/develop 17577a6f5 -> e3cb1b747 GEODE-2257 Client configured to use locator with addPoolServer fails to connect The first byte that a client sends is a connection-type that is >= 100. The first bytes expected by a locator are the 4 bytes of an integer indicating the protocol version. I've changed the locator to read the first byte and, if it's >= 100 send a reply byte back to the client telling it that it's trying to contact a locator using a client/server handshake. The client has a new reply code that the locator is now using. If the client sees this reply code it will throw a GemFireConfigException. Some of these exceptions will be thrown in the background and get logged but the thread initiating cache creation will also get this exception when it invokes ClientCacheFactory.create(). The client-side error message will be in this form: _Improperly configured client detected. Server at 10.154.30.28 is actually a locator. Use addPoolLocator to configure locators_. The locator will also log a warning in this form so that alerts will be raised: _Unable to process request from 10.118.33.195 exception=Improperly configured client detected - use addPoolLocator to configure its locators instead of addPoolServer_. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/e3cb1b74 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/e3cb1b74 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/e3cb1b74 Branch: refs/heads/develop Commit: e3cb1b74710fc94b8e100f6031e0ea8f2c48b5f4 Parents: 17577a6 Author: Bruce Schuchardt Authored: Fri Dec 30 08:17:26 2016 -0800 Committer: Bruce Schuchardt Committed: Fri Dec 30 08:18:41 2016 -0800 ---------------------------------------------------------------------- .../client/internal/ConnectionFactoryImpl.java | 3 + .../cache/client/internal/ConnectionImpl.java | 2 +- .../cache/client/internal/QueueManagerImpl.java | 3 + .../geode/internal/cache/tier/Acceptor.java | 5 ++ .../cache/tier/sockets/CacheClientUpdater.java | 2 +- .../internal/cache/tier/sockets/HandShake.java | 92 ++++++++------------ .../tier/sockets/ClientServerMiscDUnitTest.java | 29 +++--- 7 files changed, 60 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java index eceabcb..a419d57 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java @@ -20,6 +20,7 @@ import java.util.concurrent.ScheduledExecutorService; import org.apache.geode.CancelCriterion; import org.apache.geode.CancelException; +import org.apache.geode.GemFireConfigException; import org.apache.geode.cache.GatewayConfigurationException; import org.apache.geode.cache.client.ServerRefusedConnectionException; import org.apache.geode.cache.client.internal.ServerBlackList.FailureTracker; @@ -139,6 +140,8 @@ public class ConnectionFactoryImpl implements ConnectionFactory { connection.setHandShake(connHandShake); authenticateIfRequired(connection); initialized = true; + } catch (GemFireConfigException e) { + throw e; } catch (CancelException e) { // propagate this up, don't retry throw e; http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java index 6bc68ae..a494138 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java @@ -103,7 +103,7 @@ public class ConnectionImpl implements Connection { theSocket.setSoTimeout(handShakeTimeout); out = theSocket.getOutputStream(); in = theSocket.getInputStream(); - this.status = handShake.greet(this, location, communicationMode); + this.status = handShake.handshakeWithServer(this, location, communicationMode); commBuffer = ServerConnection.allocateCommBuffer(socketBufferSize, theSocket); if (sender != null) { commBufferForAsyncRead = ServerConnection.allocateCommBuffer(socketBufferSize, theSocket); http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java index 4712268..965ee57 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java @@ -34,6 +34,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.apache.geode.GemFireConfigException; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; @@ -466,6 +467,8 @@ public class QueueManagerImpl implements QueueManager { connection = factory.createClientToServerConnection(server, true); } catch (GemFireSecurityException e) { throw e; + } catch (GemFireConfigException e) { + throw e; } catch (Exception e) { if (isDebugEnabled) { logger.debug("SubscriptionManager - Error connected to server: {}", server, e); http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java index 0454f53..9a3241b 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java @@ -26,6 +26,10 @@ import org.apache.geode.internal.Version; * @since GemFire 2.0.2 */ public abstract class Acceptor { + + // The following are communications "mode" bytes sent as the first byte of a + // client/server handshake. They must not be larger than 1 byte + /** * Byte meaning that the Socket is being used for 'client to server' communication. */ @@ -67,6 +71,7 @@ public abstract class Acceptor { */ public static final byte CLIENT_TO_SERVER_FOR_QUEUE = (byte) 107; + /** * The GFE version of the server. * http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java index b4a6bed..f85ecb4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java @@ -300,7 +300,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn mySock.getInetAddress().getHostAddress(), mySock.getLocalPort(), mySock.getPort()); } - ServerQueueStatus sqs = handshake.greetNotifier(mySock, this.isPrimary); + ServerQueueStatus sqs = handshake.handshakeWithSubscriptionFeed(mySock, this.isPrimary); if (sqs.isPrimary() || sqs.isNonRedundant()) { PoolImpl pool = (PoolImpl) this.qManager.getPool(); if (!pool.getReadyForEventsCalled()) { http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java index 9a5c6c6..6e119c0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java @@ -60,6 +60,7 @@ import javax.net.ssl.SSLSocket; import org.apache.geode.CancelCriterion; import org.apache.geode.DataSerializer; +import org.apache.geode.GemFireConfigException; import org.apache.geode.InternalGemFireException; import org.apache.geode.cache.GatewayConfigurationException; import org.apache.geode.cache.client.PoolFactory; @@ -116,6 +117,8 @@ public class HandShake implements ClientHandShake { protected static final byte REPLY_AUTH_NOT_REQUIRED = (byte) 66; + public static final byte REPLY_SERVER_IS_LOCATOR = (byte) 67; + private static SecurityService securityService = IntegratedSecurityService.getSecurityService(); private byte code; @@ -124,11 +127,6 @@ public class HandShake implements ClientHandShake { private boolean isRead = false; protected final DistributedSystem system; - /** Singleton for client side */ - // Client has no more a singleton handShake instance. Now each connection will - // have its own handShake instance. - private static HandShake handshake; - final protected ClientProxyMembershipID id; private Properties credentials; @@ -251,7 +249,9 @@ public class HandShake implements ClientHandShake { id = null; } - /** Constructor used by server side connection */ + /** + * HandShake Constructor used by server side connection + */ public HandShake(Socket sock, int timeout, DistributedSystem sys, Version clientVersion, byte communicationMode) throws IOException, AuthenticationRequiredException { this.clientVersion = clientVersion; @@ -325,12 +325,9 @@ public class HandShake implements ClientHandShake { return this.clientVersion; } - // private void initSingleton(DistributedSystem sys) { - // id = ClientProxyMembershipID.getNewProxyMembership(); - // this.system = sys; - // this.code = REPLY_OK; - // } - + /** + * Client-side handshake. This form of HandShake can communicate with a server + */ public HandShake(ClientProxyMembershipID id, DistributedSystem sys) { this.id = id; this.code = REPLY_OK; @@ -343,6 +340,9 @@ public class HandShake implements ClientHandShake { this.id.updateID(idm); } + /** + * Clone a HandShake to be used in creating other connections + */ public HandShake(HandShake handShake) { this.appSecureMode = handShake.appSecureMode; this.clientConflation = handShake.clientConflation; @@ -362,19 +362,6 @@ public class HandShake implements ClientHandShake { this._encrypt = null; } - /* - * private void read(DataInputStream dis,byte[] toFill) throws IOException { /* this.code = (byte) - * in.read(); if (this.code == -1) { throw new - * IOException(LocalizedStrings.HandShake_HANDSHAKE_READ_AT_END_OF_STREAM.toLocalizedString()); } - * if (this.code != REPLY_OK) { throw new - * IOException(LocalizedStrings.HandShake_HANDSHAKE_REPLY_CODE_IS_NOT_OK.toLocalizedString()); } - * try { ObjectInput in = new ObjectInputStream(is); this.id = - * ClientProxyMembershipID.readCanonicalized(in); } catch(IOException ioe) { this.code = -2; throw - * ioe; } catch(ClassNotFoundException cnfe) { this.code = -3; IOException e = new - * IOException(LocalizedStrings.HandShake_ERROR_DESERIALIZING_HANDSHAKE.toLocalizedString()); - * e.initCause(cnfe); throw e; } } finally { synchronized (this) { this.isRead = true; } } } - */ - // used by the client side private byte setClientConflation() { byte result = CONFLATION_DEFAULT; @@ -455,7 +442,10 @@ public class HandShake implements ClientHandShake { } } - public byte write(DataOutputStream dos, DataInputStream dis, byte communicationMode, + /** + * client-to-server handshake. Nothing is sent to the server prior to invoking this method. + */ + private byte write(DataOutputStream dos, DataInputStream dis, byte communicationMode, int replyCode, int readTimeout, List ports, Properties p_credentials, DistributedMember member, boolean isCallbackConnection) throws IOException { HeapDataOutputStream hdos = new HeapDataOutputStream(32, Version.CURRENT); @@ -529,17 +519,7 @@ public class HandShake implements ClientHandShake { } /** - * * This assumes that authentication is the last piece of info in handshake - * - * @param dos - * @param dis - * @param p_credentials - * @param isNotification - * @param member - * @param heapdos stream to append data to. - * @throws IOException - * @throws GemFireSecurityException */ public void writeCredentials(DataOutputStream dos, DataInputStream dis, Properties p_credentials, boolean isNotification, DistributedMember member, HeapDataOutputStream heapdos) @@ -666,15 +646,6 @@ public class HandShake implements ClientHandShake { * This method writes what readCredential() method expects to read. (Note the use of singular * credential). It is similar to writeCredentials(), except that it doesn't write * credential-properties. - * - * @param dos - * @param dis - * @param authInit - * @param isNotification - * @param member - * @param heapdos - * @throws IOException - * @throws GemFireSecurityException */ public byte writeCredential(DataOutputStream dos, DataInputStream dis, String authInit, boolean isNotification, DistributedMember member, HeapDataOutputStream heapdos) @@ -1201,20 +1172,23 @@ public class HandShake implements ClientHandShake { * @param sock the socket this handshake is operating on * @return temporary id to reprent the other vm */ - private DistributedMember getDistributedMember(Socket sock) { + private DistributedMember getIDForSocket(Socket sock) { return new InternalDistributedMember(sock.getInetAddress(), sock.getPort(), false); } - public ServerQueueStatus greet(Connection conn, ServerLocation location, byte communicationMode) - throws IOException, AuthenticationRequiredException, AuthenticationFailedException, - ServerRefusedConnectionException { + /** + * Client-side handshake with a Server + */ + public ServerQueueStatus handshakeWithServer(Connection conn, ServerLocation location, + byte communicationMode) throws IOException, AuthenticationRequiredException, + AuthenticationFailedException, ServerRefusedConnectionException { try { ServerQueueStatus serverQStatus = null; Socket sock = conn.getSocket(); DataOutputStream dos = new DataOutputStream(sock.getOutputStream()); final InputStream in = sock.getInputStream(); DataInputStream dis = new DataInputStream(in); - DistributedMember member = getDistributedMember(sock); + DistributedMember member = getIDForSocket(sock); // if running in a loner system, use the new port number in the ID to // help differentiate from other clients DM dm = ((InternalDistributedSystem) this.system).getDistributionManager(); @@ -1246,6 +1220,10 @@ public class HandShake implements ClientHandShake { throw new AuthenticationRequiredException( LocalizedStrings.HandShake_SERVER_EXPECTING_SSL_CONNECTION.toLocalizedString()); } + if (acceptanceCode == REPLY_SERVER_IS_LOCATOR) { + throw new GemFireConfigException("Improperly configured client detected. " + "Server at " + + location + " is actually a locator. Use addPoolLocator to configure locators."); + } // Successful handshake for GATEWAY_TO_GATEWAY mode sets the peer version in connection if (communicationMode == Acceptor.GATEWAY_TO_GATEWAY @@ -1309,7 +1287,11 @@ public class HandShake implements ClientHandShake { } } - public ServerQueueStatus greetNotifier(Socket sock, boolean isPrimary) + /** + * Used by client-side CacheClientUpdater to handshake with a server in order to receive messages + * generated by subscriptions (register-interest, continuous query) + */ + public ServerQueueStatus handshakeWithSubscriptionFeed(Socket sock, boolean isPrimary) throws IOException, AuthenticationRequiredException, AuthenticationFailedException, ServerRefusedConnectionException, ClassNotFoundException { ServerQueueStatus sqs = null; @@ -1317,7 +1299,7 @@ public class HandShake implements ClientHandShake { DataOutputStream dos = new DataOutputStream(sock.getOutputStream()); final InputStream in = sock.getInputStream(); DataInputStream dis = new DataInputStream(in); - DistributedMember member = getDistributedMember(sock); + DistributedMember member = getIDForSocket(sock); if (!this.multiuserSecureMode) { this.credentials = getCredentials(member); } @@ -1466,12 +1448,6 @@ public class HandShake implements ClientHandShake { return false; final HandShake that = (HandShake) other; - /* - * if (identity != null && identity.length > 0) { for (int i = 0; i < identity.length; i++) { if - * (this.identity[i] != that.identity[i]) return false; } } if (this.code != that.code) return - * false; return true; - */ - if (this.id.isSameDSMember(that.id) && this.code == that.code) { return true; } else { http://git-wip-us.apache.org/repos/asf/geode/blob/e3cb1b74/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java index 82f30bf..391653c 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java @@ -21,6 +21,10 @@ import java.util.Iterator; import java.util.Properties; import java.util.Set; +import org.apache.geode.GemFireConfigException; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.test.dunit.DistributedTestUtils; import org.apache.geode.test.junit.categories.ClientServerTest; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -743,6 +747,15 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase { } } + @Test(expected = GemFireConfigException.class) + public void clientIsPreventedFromConnectingToLocatorAsServer() throws Exception { + IgnoredException.addIgnoredException("Improperly configured client detected"); + ClientCacheFactory clientCacheFactory = new ClientCacheFactory(); + clientCacheFactory.addPoolServer("localhost", DistributedTestUtils.getDUnitLocatorPort()); + clientCacheFactory.setPoolSubscriptionEnabled(true); + getClientCache(clientCacheFactory); + } + private void createCache(Properties props) throws Exception { createCacheV(props); @@ -1317,22 +1330,6 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase { } } - @Override - public final void postTearDownCacheTestCase() throws Exception { - // close the clients first - closeCacheAndDisconnect(); - // then close the servers - server1.invoke(() -> ClientServerMiscDUnitTest.closeCacheAndDisconnect()); - } - - public static void closeCacheAndDisconnect() { - Cache cache = new ClientServerMiscDUnitTest().getCache(); - if (cache != null && !cache.isClosed()) { - cache.close(); - cache.getDistributedSystem().disconnect(); - } - } - /** * set the boolean for starting the dispatcher thread a bit later to FALSE. This is just a * precaution in case any test set it to true and did not unset it on completion.