Author: camille
Date: Wed Dec 11 18:22:04 2013
New Revision: 1550213
URL: http://svn.apache.org/r1550213
Log:
ZOOKEEPER-1382. Zookeeper server holds onto dead/expired session ids in the watch data structures
(Germán Blanco and Michael Morello via camille)
Added:
zookeeper/trunk/src/java/test/org/apache/zookeeper/MockPacket.java (with props)
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java (with
props)
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/MockSelectorThread.java (with
props)
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/WatchLeakTest.java
(with props)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1550213&r1=1550212&r2=1550213&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Wed Dec 11 18:22:04 2013
@@ -492,6 +492,9 @@ BUGFIXES:
(Raul Gutierrez Segales via michim)
ZOOKEEPER-1834. Catch IOException in FileTxnLog (fpj via michim)
+
+ ZOOKEEPER-1382. Zookeeper server holds onto dead/expired session ids in the watch data
structures
+ (Germán Blanco and Michael Morello via camille)
IMPROVEMENTS:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=1550213&r1=1550212&r2=1550213&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Wed Dec 11
18:22:04 2013
@@ -302,11 +302,18 @@ public class NIOServerCnxn extends Serve
}
/**
+ * Only used in order to allow testing
+ */
+ protected boolean isSocketOpen() {
+ return sock.isOpen();
+ }
+
+ /**
* Handles read/write IO on connection.
*/
void doIO(SelectionKey k) throws InterruptedException {
try {
- if (sock.isOpen() == false) {
+ if (isSocketOpen() == false) {
LOG.warn("trying to do i/o on a null socket for session:0x"
+ Long.toHexString(sessionId));
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1550213&r1=1550212&r2=1550213&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Wed Dec
11 18:22:04 2013
@@ -606,6 +606,7 @@ public class ZooKeeperServer implements
+ " with negotiated timeout " + cnxn.getSessionTimeout()
+ " for client "
+ cnxn.getRemoteSocketAddress());
+ cnxn.enableRecv();
} else {
LOG.info("Invalid session 0x"
@@ -616,7 +617,6 @@ public class ZooKeeperServer implements
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
}
- cnxn.enableRecv();
} catch (Exception e) {
LOG.warn("Exception while establishing session, closing", e);
cnxn.close();
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/MockPacket.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/MockPacket.java?rev=1550213&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/MockPacket.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/MockPacket.java Wed Dec 11 18:22:04
2013
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.jute.Record;
+import org.apache.zookeeper.ZooKeeper.WatchRegistration;
+import java.nio.ByteBuffer;
+
+public class MockPacket extends ClientCnxn.Packet {
+
+ public MockPacket(RequestHeader requestHeader, ReplyHeader replyHeader,
+ Record request, Record response,
+ WatchRegistration watchRegistration) {
+ super(requestHeader, replyHeader, request, response, watchRegistration);
+ }
+
+ public MockPacket(RequestHeader requestHeader, ReplyHeader replyHeader,
+ Record request, Record response,
+ WatchRegistration watchRegistration, boolean readOnly) {
+ super(requestHeader, replyHeader, request, response, watchRegistration, readOnly);
+ }
+
+ public ByteBuffer createAndReturnBB() {
+ createBB();
+ return this.bb;
+ }
+
+}
Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/MockPacket.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java?rev=1550213&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java Wed Dec
11 18:22:04 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+import org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread;
+
+public class MockNIOServerCnxn extends NIOServerCnxn {
+
+ public MockNIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
+ SelectionKey sk, NIOServerCnxnFactory factory,
+ SelectorThread selectorThread) throws IOException {
+ super(zk, sock, sk, factory, selectorThread);
+ }
+
+ /**
+ * Handles read/write IO on connection.
+ */
+ public void doIO(SelectionKey k) throws InterruptedException {
+ super.doIO(k);
+ }
+
+ @Override
+ protected boolean isSocketOpen() {
+ return true;
+ }
+}
Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/MockSelectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/MockSelectorThread.java?rev=1550213&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/MockSelectorThread.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/MockSelectorThread.java Wed
Dec 11 18:22:04 2013
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+
+public class MockSelectorThread extends NIOServerCnxnFactory.SelectorThread {
+ public MockSelectorThread(NIOServerCnxnFactory fact) throws IOException {
+ fact.super(0);
+ }
+
+ public boolean addInterestOpsUpdateRequest(SelectionKey sk) {
+ return super.addInterestOpsUpdateRequest(sk);
+ }
+}
Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/MockSelectorThread.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/WatchLeakTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/WatchLeakTest.java?rev=1550213&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/WatchLeakTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/WatchLeakTest.java Wed
Dec 11 18:22:04 2013
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.zookeeper.MockPacket;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.proto.ConnectRequest;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.proto.SetWatches;
+import org.apache.zookeeper.server.MockNIOServerCnxn;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.MockSelectorThread;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Demonstrate ZOOKEEPER-1382 : Watches leak on expired session
+ */
+@RunWith(Parameterized.class)
+public class WatchLeakTest {
+
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(WatchLeakTest.class);
+
+ final long SESSION_ID = 0xBABEL;
+
+ private final boolean sessionTimedout;
+
+ public WatchLeakTest(boolean sessionTimedout) {
+ this.sessionTimedout = sessionTimedout;
+ }
+
+ @Parameters
+ public static Collection<Object[]> configs() {
+ return Arrays.asList(new Object[][] {
+ { false }, { true },
+ });
+ }
+
+ /**
+ * Check that if session has expired then no watch can be set
+ */
+
+ @Test
+ public void testWatchesLeak() throws Exception {
+
+ NIOServerCnxnFactory serverCnxnFactory = mock(NIOServerCnxnFactory.class);
+ final SelectionKey sk = new FakeSK();
+ MockSelectorThread selectorThread = mock(MockSelectorThread.class);
+ when(selectorThread.addInterestOpsUpdateRequest(any(SelectionKey.class))).thenAnswer(new
Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ SelectionKey sk = (SelectionKey)invocation.getArguments()[0];
+ NIOServerCnxn nioSrvCnx = (NIOServerCnxn)sk.attachment();
+ sk.interestOps(nioSrvCnx.getInterestOps());
+ return true;
+ }
+ });
+
+ ZKDatabase database = new ZKDatabase(null);
+ database.setlastProcessedZxid(2L);
+ QuorumPeer quorumPeer = mock(QuorumPeer.class);
+ FileTxnSnapLog logfactory = mock(FileTxnSnapLog.class);
+ // Directories are not used but we need it to avoid NPE
+ when(logfactory.getDataDir()).thenReturn(new File(""));
+ when(logfactory.getSnapDir()).thenReturn(new File(""));
+ FollowerZooKeeperServer fzks = null;
+
+ try {
+ // Create a new follower
+ fzks = new FollowerZooKeeperServer(logfactory, quorumPeer, database);
+ fzks.startup();
+ fzks.setServerCnxnFactory(serverCnxnFactory);
+ quorumPeer.follower = new MyFollower(quorumPeer, fzks);
+ LOG.info("Follower created");
+ // Simulate a socket channel between a client and a follower
+ final SocketChannel socketChannel = createClientSocketChannel();
+ // Create the NIOServerCnxn that will handle the client requests
+ final MockNIOServerCnxn nioCnxn = new MockNIOServerCnxn(fzks,
+ socketChannel, sk, serverCnxnFactory, selectorThread);
+ sk.attach(nioCnxn);
+ // Send the connection request as a client do
+ nioCnxn.doIO(sk);
+ LOG.info("Client connection sent");
+ // Send the valid or invalid session packet to the follower
+ QuorumPacket qp = createValidateSessionPacketResponse(!sessionTimedout);
+ quorumPeer.follower.processPacket(qp);
+ LOG.info("Session validation sent");
+ // OK, now the follower knows that the session is valid or invalid, let's try
+ // to send the watches
+ nioCnxn.doIO(sk);
+ // wait for the the request processor to do his job
+ Thread.sleep(1000L);
+ LOG.info("Watches processed");
+ // If session has not been validated, there must be NO watches
+ int watchCount = database.getDataTree().getWatchCount();
+ if (sessionTimedout) {
+ // Session has not been re-validated !
+ LOG.info("session is not valid, watches = {}", watchCount);
+ assertEquals("Session is not valid so there should be no watches", 0, watchCount);
+ } else {
+ // Session has been re-validated
+ LOG.info("session is valid, watches = {}", watchCount);
+ assertEquals("Session is valid so the watch should be there", 1, watchCount);
+ }
+ } finally {
+ if (fzks != null) {
+ fzks.shutdown();
+ }
+ }
+ }
+
+ /**
+ * A follower with no real leader connection
+ */
+ public static class MyFollower extends Follower {
+ /**
+ * Create a follower with a mocked leader connection
+ *
+ * @param self
+ * @param zk
+ */
+ MyFollower(QuorumPeer self, FollowerZooKeeperServer zk) {
+ super(self, zk);
+ leaderOs = mock(OutputArchive.class);
+ leaderIs = mock(InputArchive.class);
+ bufferedOutput = mock(BufferedOutputStream.class);
+ }
+ }
+
+ /**
+ * Simulate the behavior of a real selection key
+ */
+ private static class FakeSK extends SelectionKey {
+
+ @Override
+ public SelectableChannel channel() {
+ return null;
+ }
+
+ @Override
+ public Selector selector() {
+ return mock(Selector.class);
+ }
+
+ @Override
+ public boolean isValid() {
+ return true;
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ @Override
+ public int interestOps() {
+ return ops;
+ }
+
+ private int ops = OP_WRITE + OP_READ;
+
+ @Override
+ public SelectionKey interestOps(int ops) {
+ this.ops = ops;
+ return this;
+ }
+
+ @Override
+ public int readyOps() {
+ boolean reading = (ops & OP_READ) != 0;
+ boolean writing = (ops & OP_WRITE) != 0;
+ if (reading && writing) {
+ LOG.info("Channel is ready for reading and writing");
+ } else if (reading) {
+ LOG.info("Channel is ready for reading only");
+ } else if (writing) {
+ LOG.info("Channel is ready for writing only");
+ }
+ return ops;
+ }
+
+ }
+
+ /**
+ * Create a watches message with a single watch on /
+ *
+ * @return a message that attempts to set 1 watch on /
+ */
+ private ByteBuffer createWatchesMessage() {
+ List<String> dataWatches = new ArrayList<String>(1);
+ dataWatches.add("/");
+ List<String> existWatches = Collections.emptyList();
+ List<String> childWatches = Collections.emptyList();
+ SetWatches sw = new SetWatches(1L, dataWatches, existWatches,
+ childWatches);
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.setWatches);
+ h.setXid(-8);
+ MockPacket p = new MockPacket(h, new ReplyHeader(), sw, null, null);
+ return p.createAndReturnBB();
+ }
+
+ /**
+ * This is the secret that we use to generate passwords, for the moment it
+ * is more of a sanity check.
+ */
+ static final private long superSecret = 0XB3415C00L;
+
+ /**
+ * Create a connection request
+ *
+ * @return a serialized connection request
+ */
+ private ByteBuffer createConnRequest() {
+ Random r = new Random(SESSION_ID ^ superSecret);
+ byte p[] = new byte[16];
+ r.nextBytes(p);
+ ConnectRequest conReq = new ConnectRequest(0, 1L, 30000, SESSION_ID, p);
+ MockPacket packet = new MockPacket(null, null, conReq, null, null, false);
+ return packet.createAndReturnBB();
+ }
+
+ /**
+ * Mock a client channel with a connection request and a watches message
+ * inside.
+ *
+ * @return a socket channel
+ * @throws IOException
+ */
+ private SocketChannel createClientSocketChannel() throws IOException {
+
+ SocketChannel socketChannel = mock(SocketChannel.class);
+ Socket socket = mock(Socket.class);
+ InetSocketAddress socketAddress = new InetSocketAddress(1234);
+ when(socket.getRemoteSocketAddress()).thenReturn(socketAddress);
+ when(socketChannel.socket()).thenReturn(socket);
+
+ // Send watches packet to server connection
+ final ByteBuffer connRequest = createConnRequest();
+ final ByteBuffer watchesMessage = createWatchesMessage();
+ final ByteBuffer request = ByteBuffer.allocate(connRequest.limit()
+ + watchesMessage.limit());
+ request.put(connRequest);
+ request.put(watchesMessage);
+
+ Answer<Integer> answer = new Answer<Integer>() {
+ int i = 0;
+
+ @Override
+ public Integer answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ ByteBuffer bb = (ByteBuffer) args[0];
+ for (int k = 0; k < bb.limit(); k++) {
+ bb.put(request.get(i));
+ i = i + 1;
+ }
+ return bb.limit();
+ }
+ };
+ when(socketChannel.read(any(ByteBuffer.class))).thenAnswer(answer);
+ return socketChannel;
+ }
+
+ /**
+ * Forge an invalid session packet as a LEADER do
+ *
+ * @param valid <code>true</code> to create a valid session message
+ *
+ * @throws Exception
+ */
+ private QuorumPacket createValidateSessionPacketResponse(boolean valid) throws Exception
{
+ QuorumPacket qp = createValidateSessionPacket();
+ ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
+ DataInputStream dis = new DataInputStream(bis);
+ long id = dis.readLong();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ dos.writeLong(id);
+ // false means that the session has expired
+ dos.writeBoolean(valid);
+ qp.setData(bos.toByteArray());
+ return qp;
+ }
+
+ /**
+ * Forge an validate session packet as a LEARNER do
+ *
+ * @return
+ * @throws Exception
+ */
+ private QuorumPacket createValidateSessionPacket() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.writeLong(SESSION_ID);
+ dos.writeInt(3000);
+ dos.close();
+ QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1,
+ baos.toByteArray(), null);
+ return qp;
+ }
+
+}
Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/WatchLeakTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
|