Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 513DE10273 for ; Wed, 11 Dec 2013 18:31:28 +0000 (UTC) Received: (qmail 8444 invoked by uid 500); 11 Dec 2013 18:31:28 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 8426 invoked by uid 500); 11 Dec 2013 18:31:28 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 8418 invoked by uid 99); 11 Dec 2013 18:31:28 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Dec 2013 18:31:28 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Dec 2013 18:31:26 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E86CF23888A6; Wed, 11 Dec 2013 18:31:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: svn commit: r1550220 - in /zookeeper/branches/branch-3.4: ./ src/java/main/org/apache/zookeeper/server/ src/java/test/org/apache/zookeeper/ src/java/test/org/apache/zookeeper/server/ src/java/test/org/apache/zookeeper/server/quorum/ Date: Wed, 11 Dec 2013 18:31:05 -0000 To: commits@zookeeper.apache.org From: camille@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131211183105.E86CF23888A6@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: camille Date: Wed Dec 11 18:31:05 2013 New Revision: 1550220 URL: http://svn.apache.org/r1550220 Log: ZOOKEEPER-1382. Zookeeper server holds onto dead/expired session ids in the watch data structures (Germán Blanco and Michael Morello via camille) Added: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/MockPacket.java (with props) zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java (with props) zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/WatchLeakTest.java (with props) Modified: zookeeper/branches/branch-3.4/CHANGES.txt zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Modified: zookeeper/branches/branch-3.4/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1550220&r1=1550219&r2=1550220&view=diff ============================================================================== --- zookeeper/branches/branch-3.4/CHANGES.txt (original) +++ zookeeper/branches/branch-3.4/CHANGES.txt Wed Dec 11 18:31:05 2013 @@ -179,6 +179,9 @@ BUGFIXES: (Raul Gutierrez Segales via michim) ZOOKEEPER-1834. Catch IOException in FileTxnLog (fpj via michim) + + ZOOKEEPER-1382. Zookeeper server holds onto dead/expired session ids in the watch data structures + (Germán Blanco and Michael Morello via camille) IMPROVEMENTS: Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=1550220&r1=1550219&r2=1550220&view=diff ============================================================================== --- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original) +++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Wed Dec 11 18:31:05 2013 @@ -204,9 +204,19 @@ public class NIOServerCnxn extends Serve } } + /** + * Only used in order to allow testing + */ + protected boolean isSocketOpen() { + return sock.isOpen(); + } + + /** + * Handles read/write IO on connection. + */ void doIO(SelectionKey k) throws InterruptedException { try { - if (sock.isOpen() == false) { + if (isSocketOpen() == false) { LOG.warn("trying to do i/o on a null socket for session:0x" + Long.toHexString(sessionId)); Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1550220&r1=1550219&r2=1550220&view=diff ============================================================================== --- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original) +++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Wed Dec 11 18:31:05 2013 @@ -619,9 +619,9 @@ public class ZooKeeperServer implements + " with negotiated timeout " + cnxn.getSessionTimeout() + " for client " + cnxn.getRemoteSocketAddress()); + cnxn.enableRecv(); } - cnxn.enableRecv(); } catch (Exception e) { LOG.warn("Exception while establishing session, closing", e); cnxn.close(); Added: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/MockPacket.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/MockPacket.java?rev=1550220&view=auto ============================================================================== --- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/MockPacket.java (added) +++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/MockPacket.java Wed Dec 11 18:31:05 2013 @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import org.apache.zookeeper.proto.RequestHeader; +import org.apache.zookeeper.proto.ReplyHeader; +import org.apache.jute.Record; +import org.apache.zookeeper.ZooKeeper.WatchRegistration; +import java.nio.ByteBuffer; + +public class MockPacket extends ClientCnxn.Packet { + + public MockPacket(RequestHeader requestHeader, ReplyHeader replyHeader, + Record request, Record response, + WatchRegistration watchRegistration) { + super(requestHeader, replyHeader, request, response, watchRegistration); + } + + public MockPacket(RequestHeader requestHeader, ReplyHeader replyHeader, + Record request, Record response, + WatchRegistration watchRegistration, boolean readOnly) { + super(requestHeader, replyHeader, request, response, watchRegistration, readOnly); + } + + public ByteBuffer createAndReturnBB() { + createBB(); + return this.bb; + } + +} Propchange: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/MockPacket.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java?rev=1550220&view=auto ============================================================================== --- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java (added) +++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java Wed Dec 11 18:31:05 2013 @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.io.IOException; + +public class MockNIOServerCnxn extends NIOServerCnxn { + + public MockNIOServerCnxn(ZooKeeperServer zk, SocketChannel sock, + SelectionKey sk, NIOServerCnxnFactory factory) + throws IOException { + super(zk, sock, sk, factory); + } + + /** + * Handles read/write IO on connection. + */ + public void doIO(SelectionKey k) throws InterruptedException { + super.doIO(k); + } + + @Override + protected boolean isSocketOpen() { + return true; + } + +} Propchange: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/MockNIOServerCnxn.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/WatchLeakTest.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/WatchLeakTest.java?rev=1550220&view=auto ============================================================================== --- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/WatchLeakTest.java (added) +++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/WatchLeakTest.java Wed Dec 11 18:31:05 2013 @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.junit.Assert.*; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import org.apache.jute.InputArchive; +import org.apache.jute.OutputArchive; +import org.apache.zookeeper.MockPacket; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.proto.ConnectRequest; +import org.apache.zookeeper.proto.ReplyHeader; +import org.apache.zookeeper.proto.RequestHeader; +import org.apache.zookeeper.proto.SetWatches; +import org.apache.zookeeper.server.MockNIOServerCnxn; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.ZooTrace; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Demonstrate ZOOKEEPER-1382 : Watches leak on expired session + */ +public class WatchLeakTest { + + protected static final Logger LOG = LoggerFactory + .getLogger(WatchLeakTest.class); + + final long SESSION_ID = 0xBABEL; + + /** + * ZOOKEEPR-1382 test class + */ + @Test + public void testWatchesWithClientSessionTimeout() throws Exception { + + NIOServerCnxnFactory serverCnxnFactory = new NIOServerCnxnFactory(); + + ZKDatabase database = new ZKDatabase(null); + database.setlastProcessedZxid(2L); + QuorumPeer quorumPeer = mock(QuorumPeer.class); + FileTxnSnapLog logfactory = mock(FileTxnSnapLog.class); + // Directories are not used but we need it to avoid NPE + when(logfactory.getDataDir()).thenReturn(new File("/tmp")); + when(logfactory.getSnapDir()).thenReturn(new File("/tmp")); + FollowerZooKeeperServer fzks = null; + try { + fzks = new FollowerZooKeeperServer(logfactory, quorumPeer, null, + database); + fzks.startup(); + fzks.setServerCnxnFactory(serverCnxnFactory); + quorumPeer.follower = new MyFollower(quorumPeer, fzks); + final SelectionKey sk = new FakeSK(); + // Simulate a socket channel between a client and a follower + final SocketChannel socketChannel = createClientSocketChannel(); + // Create the NIOServerCnxn that will handle the client requests + final MockNIOServerCnxn nioCnxn = new MockNIOServerCnxn(fzks, + socketChannel, sk, serverCnxnFactory); + // Send the connection request as a client do + nioCnxn.doIO(sk); + // Send the invalid session packet to the follower + QuorumPacket qp = createInvalidSessionPacket(); + quorumPeer.follower.processPacket(qp); + // OK, now the follower knows that the session is invalid, let's try + // to + // send it the watches + nioCnxn.doIO(sk); + // wait for the the request processor to do his job + Thread.sleep(1000L); + // Session has not been re-validated ! + // If session has not been validated, there must be NO watches + int watchCount = database.getDataTree().getWatchCount(); + LOG.info("watches = " + watchCount); + assertEquals(0, watchCount); + } finally { + if (fzks != null) { + fzks.shutdown(); + } + } + } + + /** + * A follower with no real leader connection + */ + public static class MyFollower extends Follower { + /** + * Create a follower with a mocked leader connection + * + * @param self + * @param zk + */ + MyFollower(QuorumPeer self, FollowerZooKeeperServer zk) { + super(self, zk); + leaderOs = mock(OutputArchive.class); + leaderIs = mock(InputArchive.class); + bufferedOutput = mock(BufferedOutputStream.class); + } + } + + /** + * Simulate the behavior of a real selection key + */ + private static class FakeSK extends SelectionKey { + + @Override + public SelectableChannel channel() { + return null; + } + + @Override + public Selector selector() { + return mock(Selector.class); + } + + @Override + public boolean isValid() { + return true; + } + + @Override + public void cancel() { + } + + @Override + public int interestOps() { + return ops; + } + + private int ops = OP_WRITE + OP_READ; + + @Override + public SelectionKey interestOps(int ops) { + this.ops = ops; + return this; + } + + @Override + public int readyOps() { + return ops; + } + + } + + /** + * Create a watches message with a single watch on / + * + * @return + */ + private ByteBuffer createWatchesMessage() { + List dataWatches = new ArrayList(1); + dataWatches.add("/"); + List existWatches = Collections.emptyList(); + List childWatches = Collections.emptyList(); + SetWatches sw = new SetWatches(1L, dataWatches, existWatches, + childWatches); + RequestHeader h = new RequestHeader(); + h.setType(ZooDefs.OpCode.setWatches); + h.setXid(-8); + MockPacket p = new MockPacket(h, new ReplyHeader(), sw, null, null); + return p.createAndReturnBB(); + } + + /** + * This is the secret that we use to generate passwords, for the moment it + * is more of a sanity check. + */ + static final private long superSecret = 0XB3415C00L; + + /** + * Create a connection request + * + * @return + */ + private ByteBuffer createConnRequest() { + Random r = new Random(SESSION_ID ^ superSecret); + byte p[] = new byte[16]; + r.nextBytes(p); + ConnectRequest conReq = new ConnectRequest(0, 1L, 30000, SESSION_ID, p); + MockPacket packet = new MockPacket(null, null, conReq, null, null, false); + return packet.createAndReturnBB(); + } + + /** + * Mock a client channel with a connection request and a watches message + * inside. + * + * @return a socket channel + * @throws IOException + */ + private SocketChannel createClientSocketChannel() throws IOException { + + SocketChannel socketChannel = mock(SocketChannel.class); + Socket socket = mock(Socket.class); + InetSocketAddress socketAddress = new InetSocketAddress(1234); + when(socket.getRemoteSocketAddress()).thenReturn(socketAddress); + when(socketChannel.socket()).thenReturn(socket); + + // Send watches packet to server connection + final ByteBuffer connRequest = createConnRequest(); + final ByteBuffer watchesMessage = createWatchesMessage(); + final ByteBuffer request = ByteBuffer.allocate(connRequest.limit() + + watchesMessage.limit()); + request.put(connRequest); + request.put(watchesMessage); + + Answer answer = new Answer() { + int i = 0; + + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + ByteBuffer bb = (ByteBuffer) args[0]; + for (int k = 0; k < bb.limit(); k++) { + bb.put(request.get(i)); + i = i + 1; + } + return bb.limit(); + } + }; + when(socketChannel.read(any(ByteBuffer.class))).thenAnswer(answer); + return socketChannel; + } + + /** + * Forge an invalid session packet as a LEADER do + * + * @throws Exception + */ + private QuorumPacket createInvalidSessionPacket() throws Exception { + QuorumPacket qp = createValidateSessionQuorumPacket(); + ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); + DataInputStream dis = new DataInputStream(bis); + long id = dis.readLong(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + dos.writeLong(id); + // false means that the session has expired + dos.writeBoolean(false); + qp.setData(bos.toByteArray()); + return qp; + } + + /** + * Forge an validate session packet as a LEARNER do + * + * @return + * @throws Exception + */ + private QuorumPacket createValidateSessionQuorumPacket() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeLong(SESSION_ID); + dos.writeInt(3000); + dos.close(); + QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, + baos.toByteArray(), null); + if (LOG.isTraceEnabled()) { + ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, + "To validate session 0x" + Long.toHexString(2L)); + } + return qp; + } + +} \ No newline at end of file Propchange: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/WatchLeakTest.java ------------------------------------------------------------------------------ svn:mime-type = text/plain