Return-Path: Delivered-To: apmail-hadoop-zookeeper-commits-archive@locus.apache.org Received: (qmail 34604 invoked from network); 24 Jun 2008 19:06:02 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 24 Jun 2008 19:06:02 -0000 Received: (qmail 2989 invoked by uid 500); 24 Jun 2008 19:06:03 -0000 Delivered-To: apmail-hadoop-zookeeper-commits-archive@hadoop.apache.org Received: (qmail 2968 invoked by uid 500); 24 Jun 2008 19:06:03 -0000 Mailing-List: contact zookeeper-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: zookeeper-dev@ Delivered-To: mailing list zookeeper-commits@hadoop.apache.org Received: (qmail 2957 invoked by uid 99); 24 Jun 2008 19:06:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jun 2008 12:06:03 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jun 2008 19:05:11 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id AE7922388A54; Tue, 24 Jun 2008 12:05:00 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r671303 [2/6] - in /hadoop/zookeeper/trunk/src/java: lib/cobertura/lib/ lib/svnant/ main/org/apache/zookeeper/ main/org/apache/zookeeper/server/ main/org/apache/zookeeper/server/auth/ main/org/apache/zookeeper/server/quorum/ main/org/apache... Date: Tue, 24 Jun 2008 19:04:59 -0000 To: zookeeper-commits@hadoop.apache.org From: phunt@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080624190500.AE7922388A54@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=671303&r1=671302&r2=671303&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Tue Jun 24 12:04:58 2008 @@ -1,850 +1,850 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.Channel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.log4j.Logger; - -import org.apache.jute.BinaryInputArchive; -import org.apache.jute.BinaryOutputArchive; -import org.apache.jute.Record; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Version; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs.OpCode; -import org.apache.zookeeper.data.Id; -import org.apache.zookeeper.proto.AuthPacket; -import org.apache.zookeeper.proto.ConnectRequest; -import org.apache.zookeeper.proto.ConnectResponse; -import org.apache.zookeeper.proto.ReplyHeader; -import org.apache.zookeeper.proto.RequestHeader; -import org.apache.zookeeper.proto.WatcherEvent; -import org.apache.zookeeper.server.auth.AuthenticationProvider; -import org.apache.zookeeper.server.auth.ProviderRegistry; - -/** - * This class handles communication with clients using NIO. There is one per - * client, but only one thread doing the communication. - */ -public class NIOServerCnxn implements Watcher, ServerCnxn { - private static final Logger LOG = Logger.getLogger(NIOServerCnxn.class); - - static public class Factory extends Thread { - ZooKeeperServer zks; - - ServerSocketChannel ss; - - Selector selector = Selector.open(); - - /** - * We use this buffer to do efficient socket I/O. Since there is a single - * sender thread per NIOServerCnxn instance, we can use a member variable to - * only allocate it once. - */ - ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024); - - HashSet cnxns = new HashSet(); - - int outstandingLimit = 1; - - public Factory(int port) throws IOException { - super("NIOServerCxn.Factory"); - setDaemon(true); - this.ss = ServerSocketChannel.open(); - ss.socket().bind(new InetSocketAddress(port)); - ss.configureBlocking(false); - ss.register(selector, SelectionKey.OP_ACCEPT); - start(); - } - - public void startup(ZooKeeperServer zks) throws IOException, - InterruptedException { - zks.startup(); - setZooKeeperServer(zks); - } - - public void setZooKeeperServer(ZooKeeperServer zks) { - this.zks = zks; - if (zks != null) { - this.outstandingLimit = zks.getGlobalOutstandingLimit(); - zks.setServerCnxnFactory(this); - } else { - this.outstandingLimit = 1; - } - } - - public InetSocketAddress getLocalAddress(){ - return (InetSocketAddress)ss.socket().getLocalSocketAddress(); - } - - private void addCnxn(NIOServerCnxn cnxn) { - synchronized (cnxns) { - cnxns.add(cnxn); - } - } - - protected NIOServerCnxn createConnection(SocketChannel sock, - SelectionKey sk) throws IOException { - return new NIOServerCnxn(zks, sock, sk, this); - } - - public void run() { - while (!ss.socket().isClosed()) { - try { - selector.select(1000); - Set selected; - synchronized (this) { - selected = selector.selectedKeys(); - } - ArrayList selectedList = new ArrayList( - selected); - Collections.shuffle(selectedList); - for (SelectionKey k : selectedList) { - if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { - SocketChannel sc = ((ServerSocketChannel) k - .channel()).accept(); - sc.configureBlocking(false); - SelectionKey sk = sc.register(selector, - SelectionKey.OP_READ); - NIOServerCnxn cnxn = createConnection(sc, sk); - sk.attach(cnxn); - addCnxn(cnxn); - } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { - NIOServerCnxn c = (NIOServerCnxn) k.attachment(); - c.doIO(k); - } - } - selected.clear(); - } catch (Exception e) { - LOG.error("FIXMSG",e); - } - } - ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), - "NIOServerCnxn factory exitedloop."); - clear(); - LOG.error("=====> Goodbye cruel world <======"); - // System.exit(0); - } - - /** - * clear all the connections in the selector - * - */ - synchronized public void clear() { - selector.wakeup(); - synchronized (cnxns) { - // got to clear all the connections that we have in the selector - for (Iterator it = cnxns.iterator(); it - .hasNext();) { - NIOServerCnxn cnxn = it.next(); - it.remove(); - try { - cnxn.close(); - } catch (Exception e) { - // Do nothing. - } - } - } - - } - - public void shutdown() { - try { - ss.close(); - clear(); - this.interrupt(); - this.join(); - } catch (InterruptedException e) { - LOG.warn("Interrupted",e); - } catch (Exception e) { - LOG.error("Unexpected exception", e); - } - if (zks != null) { - zks.shutdown(); - } - } - - synchronized void closeSession(long sessionId) { - selector.wakeup(); - synchronized (cnxns) { - for (Iterator it = cnxns.iterator(); it - .hasNext();) { - NIOServerCnxn cnxn = it.next(); - if (cnxn.sessionId == sessionId) { - it.remove(); - try { - cnxn.close(); - } catch (Exception e) { - } - break; - } - } - } - } - } - - /** - * The buffer will cause the connection to be close when we do a send. - */ - static final ByteBuffer closeConn = ByteBuffer.allocate(0); - - Factory factory; - - ZooKeeperServer zk; - - private SocketChannel sock; - - private SelectionKey sk; - - boolean initialized; - - ByteBuffer lenBuffer = ByteBuffer.allocate(4); - - ByteBuffer incomingBuffer = lenBuffer; - - LinkedBlockingQueue outgoingBuffers = new LinkedBlockingQueue(); - - int sessionTimeout; - - ArrayList authInfo = new ArrayList(); - - LinkedList outstanding = new LinkedList(); - - void sendBuffer(ByteBuffer bb) { - synchronized (factory) { - try { - sk.selector().wakeup(); - // ZooLog.logTraceMessage(LOG, - // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, - // "Add a buffer to outgoingBuffers"); - // ZooLog.logTraceMessage(LOG, - // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, - //"sk " + sk + " is valid: " + - // sk.isValid(), ); - outgoingBuffers.add(bb); - if (sk.isValid()) { - sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); - } - } catch (RuntimeException e) { - LOG.error("FIXMSG",e); - throw e; - } - } - } - - void doIO(SelectionKey k) throws InterruptedException { - try { - if (sock == null) { - return; - } - if (k.isReadable()) { - int rc = sock.read(incomingBuffer); - if (rc < 0) { - throw new IOException("Read error"); - } - if (incomingBuffer.remaining() == 0) { - incomingBuffer.flip(); - if (incomingBuffer == lenBuffer) { - readLength(k); - } else if (!initialized) { - stats.packetsReceived++; - ServerStats.getInstance().incrementPacketsReceived(); - readConnectRequest(); - lenBuffer.clear(); - incomingBuffer = lenBuffer; - } else { - stats.packetsReceived++; - ServerStats.getInstance().incrementPacketsReceived(); - readRequest(); - lenBuffer.clear(); - incomingBuffer = lenBuffer; - } - } - } - if (k.isWritable()) { - // ZooLog.logTraceMessage(LOG, - // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK - // "outgoingBuffers.size() = " + - // outgoingBuffers.size()); - if (outgoingBuffers.size() > 0) { - // ZooLog.logTraceMessage(LOG, - // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, - // "sk " + k + " is valid: " + - // k.isValid()); - - /* - * This is going to reset the buffer position to 0 and the - * limit to the size of the buffer, so that we can fill it - * with data from the non-direct buffers that we need to - * send. - */ - ByteBuffer directBuffer = factory.directBuffer; - directBuffer.clear(); - - for (ByteBuffer b : outgoingBuffers) { - if (directBuffer.remaining() < b.remaining()) { - /* - * When we call put later, if the directBuffer is to - * small to hold everything, nothing will be copied, - * so we've got to slice the buffer if it's too big. - */ - b = (ByteBuffer) b.slice().limit( - directBuffer.remaining()); - } - /* - * put() is going to modify the positions of both - * buffers, put we don't want to change the position of - * the source buffers (we'll do that after the send, if - * needed), so we save and reset the position after the - * copy - */ - int p = b.position(); - directBuffer.put(b); - b.position(p); - if (directBuffer.remaining() == 0) { - break; - } - } - /* - * Do the flip: limit becomes position, position gets set to - * 0. This sets us up for the write. - */ - directBuffer.flip(); - - int sent = sock.write(directBuffer); - ByteBuffer bb; - - // Remove the buffers that we have sent - while (outgoingBuffers.size() > 0) { - bb = outgoingBuffers.peek(); - if (bb == closeConn) { - throw new IOException("closing"); - } - int left = bb.remaining() - sent; - if (left > 0) { - /* - * We only partially sent this buffer, so we update - * the position and exit the loop. - */ - bb.position(bb.position() + sent); - break; - } - stats.packetsSent++; - /* We've sent the whole buffer, so drop the buffer */ - sent -= bb.remaining(); - ServerStats.getInstance().incrementPacketsSent(); - outgoingBuffers.remove(); - } - // ZooLog.logTraceMessage(LOG, - // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send, - // outgoingBuffers.size() = " + outgoingBuffers.size()); - } - synchronized (this) { - if (outgoingBuffers.size() == 0) { - if (!initialized - && (sk.interestOps() & SelectionKey.OP_READ) == 0) { - throw new IOException("Responded to info probe"); - } - sk.interestOps(sk.interestOps() - & (~SelectionKey.OP_WRITE)); - } else { - sk.interestOps(sk.interestOps() - | SelectionKey.OP_WRITE); - } - } - } - } catch (CancelledKeyException e) { - close(); - } catch (IOException e) { - // LOG.error("FIXMSG",e); - close(); - } - } - - private void readRequest() throws IOException { - // We have the request, now process and setup for next - InputStream bais = new ByteBufferInputStream(incomingBuffer); - BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); - RequestHeader h = new RequestHeader(); - h.deserialize(bia, "header"); - // Through the magic of byte buffers, txn will not be - // pointing - // to the start of the txn - incomingBuffer = incomingBuffer.slice(); - if (h.getType() == OpCode.auth) { - AuthPacket authPacket = new AuthPacket(); - ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket); - String scheme = authPacket.getScheme(); - AuthenticationProvider ap = ProviderRegistry.getProvider(scheme); - if (ap == null - || ap.handleAuthentication(this, authPacket.getAuth()) != KeeperException.Code.Ok) { - if (ap == null) - LOG.error("No authentication provider for scheme: " - + scheme); - else - LOG.error("Authentication failed for scheme: " - + scheme); - // send a response... - ReplyHeader rh = new ReplyHeader(h.getXid(), 0, - KeeperException.Code.AuthFailed); - sendResponse(rh, null, null); - // ... and close connection - sendBuffer(NIOServerCnxn.closeConn); - disableRecv(); - } else { - LOG.error("Authentication succeeded for scheme: " - + scheme); - ReplyHeader rh = new ReplyHeader(h.getXid(), 0, - KeeperException.Code.Ok); - sendResponse(rh, null, null); - } - return; - } else { - zk.submitRequest(this, sessionId, h.getType(), h.getXid(), - incomingBuffer, authInfo); - } - if (h.getXid() >= 0) { - synchronized (this) { - outstandingRequests++; - // check throttling - if (zk.getInProcess() > factory.outstandingLimit) { - disableRecv(); - // following lines should not be needed since we are already - // reading - // } else { - // enableRecv(); - } - } - } - } - - public void disableRecv() { - sk.interestOps(sk.interestOps() & (~SelectionKey.OP_READ)); - } - - public void enableRecv() { - if (sk.isValid()) { - int interest = sk.interestOps(); - if ((interest & SelectionKey.OP_READ) == 0) { - sk.interestOps(interest | SelectionKey.OP_READ); - } - } - } - - private void readConnectRequest() throws IOException, InterruptedException { - BinaryInputArchive bia = BinaryInputArchive - .getArchive(new ByteBufferInputStream(incomingBuffer)); - ConnectRequest connReq = new ConnectRequest(); - connReq.deserialize(bia, "connect"); - LOG.warn("Connected to " + sock.socket().getRemoteSocketAddress() - + " lastZxid " + connReq.getLastZxidSeen()); - if (zk == null) { - throw new IOException("ZooKeeperServer not running"); - } - if (connReq.getLastZxidSeen() > zk.dataTree.lastProcessedZxid) { - LOG.error("Client has seen " - + Long.toHexString(connReq.getLastZxidSeen()) - + " our last zxid is " - + Long.toHexString(zk.dataTree.lastProcessedZxid)); - throw new IOException("We are out of date"); - } - sessionTimeout = connReq.getTimeOut(); - byte passwd[] = connReq.getPasswd(); - if (sessionTimeout < zk.tickTime * 2) { - sessionTimeout = zk.tickTime * 2; - } - if (sessionTimeout > zk.tickTime * 20) { - sessionTimeout = zk.tickTime * 20; - } - // We don't want to receive any packets until we are sure that the - // session is setup - disableRecv(); - if (connReq.getSessionId() != 0) { - setSessionId(connReq.getSessionId()); - zk.reopenSession(this, sessionId, passwd, sessionTimeout); - LOG.warn("Renewing session " + Long.toHexString(sessionId)); - } else { - zk.createSession(this, passwd, sessionTimeout); - LOG.warn("Creating new session " - + Long.toHexString(sessionId)); - } - initialized = true; - } - - private void readLength(SelectionKey k) throws IOException { - // Read the length, now get the buffer - int len = lenBuffer.getInt(); - if (!initialized) { - // We take advantage of the limited size of the length to look - // for cmds. They are all 4-bytes which fits inside of an int - if (len == ruokCmd) { - sendBuffer(imok.duplicate()); - sendBuffer(NIOServerCnxn.closeConn); - k.interestOps(SelectionKey.OP_WRITE); - return; - } else if (len == killCmd) { - System.exit(0); - } else if (len == getTraceMaskCmd) { - long traceMask = ZooTrace.getTextTraceLevel(); - ByteBuffer resp = ByteBuffer.allocate(8); - resp.putLong(traceMask); - resp.flip(); - sendBuffer(resp); - sendBuffer(NIOServerCnxn.closeConn); - k.interestOps(SelectionKey.OP_WRITE); - return; - } else if (len == setTraceMaskCmd) { - incomingBuffer = ByteBuffer.allocate(8); - - int rc = sock.read(incomingBuffer); - if (rc < 0) { - throw new IOException("Read error"); - } - System.out.println("rc=" + rc); - incomingBuffer.flip(); - long traceMask = incomingBuffer.getLong(); - ZooTrace.setTextTraceLevel(traceMask); - ByteBuffer resp = ByteBuffer.allocate(8); - resp.putLong(traceMask); - resp.flip(); - sendBuffer(resp); - sendBuffer(NIOServerCnxn.closeConn); - k.interestOps(SelectionKey.OP_WRITE); - return; - } else if (len == dumpCmd) { - if (zk == null) { - sendBuffer(ByteBuffer.wrap("ZooKeeper not active \n" - .getBytes())); - } else { - StringBuffer sb = new StringBuffer(); - sb.append("SessionTracker dump: \n"); - sb.append(zk.sessionTracker.toString()).append("\n"); - sb.append("ephemeral nodes dump:\n"); - sb.append(zk.dataTree.dumpEphemerals()).append("\n"); - sendBuffer(ByteBuffer.wrap(sb.toString().getBytes())); - } - k.interestOps(SelectionKey.OP_WRITE); - return; - } else if (len == reqsCmd) { - StringBuffer sb = new StringBuffer(); - sb.append("Requests:\n"); - synchronized (outstanding) { - for (Request r : outstanding) { - sb.append(r.toString()); - sb.append('\n'); - } - } - sendBuffer(ByteBuffer.wrap(sb.toString().getBytes())); - k.interestOps(SelectionKey.OP_WRITE); - return; - } else if (len == statCmd) { - StringBuffer sb = new StringBuffer(); - if(zk!=null){ - sb.append("Zookeeper version: ").append(Version.getFullVersion()) - .append("\n"); - sb.append("Clients:\n"); - synchronized(factory.cnxns){ - for(NIOServerCnxn c : factory.cnxns){ - sb.append(c.getStats().toString()); - } - } - sb.append("\n"); - sb.append(ServerStats.getInstance().toString()); - sb.append("Node count: ").append(zk.dataTree.getNodeCount()). - append("\n"); - }else - sb.append("ZooKeeperServer not running\n"); - - sendBuffer(ByteBuffer.wrap(sb.toString().getBytes())); - k.interestOps(SelectionKey.OP_WRITE); - return; - } - } - if (len < 0 || len > BinaryInputArchive.maxBuffer) { - throw new IOException("Len error " + len); - } - if (zk == null) { - throw new IOException("ZooKeeperServer not running"); - } - incomingBuffer = ByteBuffer.allocate(len); - } - - /** - * The number of requests that have been submitted but not yet responded to. - */ - int outstandingRequests; - - /* - * (non-Javadoc) - * - * @see org.apache.zookeeper.server.ServerCnxnIface#getSessionTimeout() - */ - public int getSessionTimeout() { - return sessionTimeout; - } - - /** - * This is the id that uniquely identifies the session of a client. Once - * this session is no longer active, the ephemeral nodes will go away. - */ - long sessionId; - - static long nextSessionId = 1; - - public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock, - SelectionKey sk, Factory factory) throws IOException { - this.zk = zk; - this.sock = sock; - this.sk = sk; - this.factory = factory; - sock.socket().setTcpNoDelay(true); - sock.socket().setSoLinger(true, 2); - InetAddress addr = ((InetSocketAddress) sock.socket() - .getRemoteSocketAddress()).getAddress(); - authInfo.add(new Id("ip", addr.getHostAddress())); - authInfo.add(new Id("host", addr.getCanonicalHostName())); - sk.interestOps(SelectionKey.OP_READ); - } - - public String toString() { - return "NIOServerCnxn object with sock = " + sock + " and sk = " + sk; - } - - boolean closed; - - /* - * (non-Javadoc) - * - * @see org.apache.zookeeper.server.ServerCnxnIface#close() - */ - public void close() { - if (closed) { - return; - } - closed = true; - synchronized (factory.cnxns) { - factory.cnxns.remove(this); - } - if (zk != null) { - zk.removeCnxn(this); - } - - ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, - "close NIOServerCnxn: " + sock); - try { - /* - * The following sequence of code is stupid! You would think that - * only sock.close() is needed, but alas, it doesn't work that way. - * If you just do sock.close() there are cases where the socket - * doesn't actually close... - */ - sock.socket().shutdownOutput(); - } catch (IOException e) { - // This is a relatively common exception that we can't avoid - } - try { - sock.socket().shutdownInput(); - } catch (IOException e) { - } - try { - sock.socket().close(); - } catch (IOException e) { - LOG.error("FIXMSG",e); - } - try { - sock.close(); - // XXX The next line doesn't seem to be needed, but some posts - // to forums suggest that it is needed. Keep in mind if errors in - // this section arise. - // factory.selector.wakeup(); - } catch (IOException e) { - LOG.error("FIXMSG",e); - } - sock = null; - if (sk != null) { - try { - // need to cancel this selection key from the selector - sk.cancel(); - } catch (Exception e) { - } - } - } - - private final static byte fourBytes[] = new byte[4]; - - /* - * (non-Javadoc) - * - * @see org.apache.zookeeper.server.ServerCnxnIface#sendResponse(org.apache.zookeeper.proto.ReplyHeader, - * org.apache.jute.Record, java.lang.String) - */ - synchronized public void sendResponse(ReplyHeader h, Record r, String tag) { - if (closed) { - return; - } - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - // Make space for length - BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); - try { - baos.write(fourBytes); - bos.writeRecord(h, "header"); - if (r != null) { - bos.writeRecord(r, tag); - } - baos.close(); - } catch (IOException e) { - LOG.error("Error serializing response"); - } - byte b[] = baos.toByteArray(); - ByteBuffer bb = ByteBuffer.wrap(b); - bb.putInt(b.length - 4).rewind(); - sendBuffer(bb); - if (h.getXid() > 0) { - synchronized (this.factory) { - outstandingRequests--; - // check throttling - if (zk.getInProcess() < factory.outstandingLimit - || outstandingRequests < 1) { - sk.selector().wakeup(); - enableRecv(); - } - } - } - } - - /* - * (non-Javadoc) - * - * @see org.apache.zookeeper.server.ServerCnxnIface#process(org.apache.zookeeper.proto.WatcherEvent) - */ - synchronized public void process(WatcherEvent event) { - ReplyHeader h = new ReplyHeader(-1, -1L, 0); - ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, - "Deliver event " + event + " to " - + this.sessionId + " through " + this); - sendResponse(h, event, "notification"); - } - - public void finishSessionInit(boolean valid) { - try { - ConnectResponse rsp = new ConnectResponse(0, valid ? sessionTimeout - : 0, valid ? sessionId : 0, // send 0 if session is no - // longer valid - valid ? zk.generatePasswd(sessionId) : new byte[16]); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); - bos.writeInt(-1, "len"); - rsp.serialize(bos, "connect"); - baos.close(); - ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); - bb.putInt(bb.remaining() - 4).rewind(); - sendBuffer(bb); - LOG.warn("Finished init of " + Long.toHexString(sessionId) - + ": " + valid); - if (!valid) { - sendBuffer(closeConn); - } - // Now that the session is ready we can start receiving packets - synchronized (this.factory) { - sk.selector().wakeup(); - enableRecv(); - } - } catch (Exception e) { - LOG.error("FIXMSG",e); - close(); - } - } - - /* - * (non-Javadoc) - * - * @see org.apache.zookeeper.server.ServerCnxnIface#getSessionId() - */ - public long getSessionId() { - return sessionId; - } - - public void setSessionId(long sessionId) { - this.sessionId = sessionId; - } - - public ArrayList getAuthInfo() { - return authInfo; - } - - public InetSocketAddress getRemoteAddress() { - return (InetSocketAddress) sock.socket().getRemoteSocketAddress(); - } - - private class CnxnStats implements ServerCnxn.Stats{ - long packetsReceived; - long packetsSent; - - /** - * The number of requests that have been submitted but not yet responded to. - */ - public long getOutstandingRequests() { - return outstandingRequests; - } - public long getPacketsReceived() { - return packetsReceived; - } - public long getPacketsSent() { - return packetsSent; - } - public String toString(){ - StringBuilder sb=new StringBuilder(); - Channel channel = sk.channel(); - if (channel instanceof SocketChannel) { - sb.append(" ").append(((SocketChannel)channel).socket() - .getRemoteSocketAddress()) - .append("[").append(Integer.toHexString(sk.interestOps())) - .append("](queued=").append(getOutstandingRequests()) - .append(",recved=").append(getPacketsReceived()) - .append(",sent=").append(getPacketsSent()).append(")\n"); - } - return sb.toString(); - } - } - - private CnxnStats stats=new CnxnStats(); - public Stats getStats() { - return stats; - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.Channel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.log4j.Logger; + +import org.apache.jute.BinaryInputArchive; +import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.Record; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Version; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.proto.AuthPacket; +import org.apache.zookeeper.proto.ConnectRequest; +import org.apache.zookeeper.proto.ConnectResponse; +import org.apache.zookeeper.proto.ReplyHeader; +import org.apache.zookeeper.proto.RequestHeader; +import org.apache.zookeeper.proto.WatcherEvent; +import org.apache.zookeeper.server.auth.AuthenticationProvider; +import org.apache.zookeeper.server.auth.ProviderRegistry; + +/** + * This class handles communication with clients using NIO. There is one per + * client, but only one thread doing the communication. + */ +public class NIOServerCnxn implements Watcher, ServerCnxn { + private static final Logger LOG = Logger.getLogger(NIOServerCnxn.class); + + static public class Factory extends Thread { + ZooKeeperServer zks; + + ServerSocketChannel ss; + + Selector selector = Selector.open(); + + /** + * We use this buffer to do efficient socket I/O. Since there is a single + * sender thread per NIOServerCnxn instance, we can use a member variable to + * only allocate it once. + */ + ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024); + + HashSet cnxns = new HashSet(); + + int outstandingLimit = 1; + + public Factory(int port) throws IOException { + super("NIOServerCxn.Factory"); + setDaemon(true); + this.ss = ServerSocketChannel.open(); + ss.socket().bind(new InetSocketAddress(port)); + ss.configureBlocking(false); + ss.register(selector, SelectionKey.OP_ACCEPT); + start(); + } + + public void startup(ZooKeeperServer zks) throws IOException, + InterruptedException { + zks.startup(); + setZooKeeperServer(zks); + } + + public void setZooKeeperServer(ZooKeeperServer zks) { + this.zks = zks; + if (zks != null) { + this.outstandingLimit = zks.getGlobalOutstandingLimit(); + zks.setServerCnxnFactory(this); + } else { + this.outstandingLimit = 1; + } + } + + public InetSocketAddress getLocalAddress(){ + return (InetSocketAddress)ss.socket().getLocalSocketAddress(); + } + + private void addCnxn(NIOServerCnxn cnxn) { + synchronized (cnxns) { + cnxns.add(cnxn); + } + } + + protected NIOServerCnxn createConnection(SocketChannel sock, + SelectionKey sk) throws IOException { + return new NIOServerCnxn(zks, sock, sk, this); + } + + public void run() { + while (!ss.socket().isClosed()) { + try { + selector.select(1000); + Set selected; + synchronized (this) { + selected = selector.selectedKeys(); + } + ArrayList selectedList = new ArrayList( + selected); + Collections.shuffle(selectedList); + for (SelectionKey k : selectedList) { + if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { + SocketChannel sc = ((ServerSocketChannel) k + .channel()).accept(); + sc.configureBlocking(false); + SelectionKey sk = sc.register(selector, + SelectionKey.OP_READ); + NIOServerCnxn cnxn = createConnection(sc, sk); + sk.attach(cnxn); + addCnxn(cnxn); + } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { + NIOServerCnxn c = (NIOServerCnxn) k.attachment(); + c.doIO(k); + } + } + selected.clear(); + } catch (Exception e) { + LOG.error("FIXMSG",e); + } + } + ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), + "NIOServerCnxn factory exitedloop."); + clear(); + LOG.error("=====> Goodbye cruel world <======"); + // System.exit(0); + } + + /** + * clear all the connections in the selector + * + */ + synchronized public void clear() { + selector.wakeup(); + synchronized (cnxns) { + // got to clear all the connections that we have in the selector + for (Iterator it = cnxns.iterator(); it + .hasNext();) { + NIOServerCnxn cnxn = it.next(); + it.remove(); + try { + cnxn.close(); + } catch (Exception e) { + // Do nothing. + } + } + } + + } + + public void shutdown() { + try { + ss.close(); + clear(); + this.interrupt(); + this.join(); + } catch (InterruptedException e) { + LOG.warn("Interrupted",e); + } catch (Exception e) { + LOG.error("Unexpected exception", e); + } + if (zks != null) { + zks.shutdown(); + } + } + + synchronized void closeSession(long sessionId) { + selector.wakeup(); + synchronized (cnxns) { + for (Iterator it = cnxns.iterator(); it + .hasNext();) { + NIOServerCnxn cnxn = it.next(); + if (cnxn.sessionId == sessionId) { + it.remove(); + try { + cnxn.close(); + } catch (Exception e) { + } + break; + } + } + } + } + } + + /** + * The buffer will cause the connection to be close when we do a send. + */ + static final ByteBuffer closeConn = ByteBuffer.allocate(0); + + Factory factory; + + ZooKeeperServer zk; + + private SocketChannel sock; + + private SelectionKey sk; + + boolean initialized; + + ByteBuffer lenBuffer = ByteBuffer.allocate(4); + + ByteBuffer incomingBuffer = lenBuffer; + + LinkedBlockingQueue outgoingBuffers = new LinkedBlockingQueue(); + + int sessionTimeout; + + ArrayList authInfo = new ArrayList(); + + LinkedList outstanding = new LinkedList(); + + void sendBuffer(ByteBuffer bb) { + synchronized (factory) { + try { + sk.selector().wakeup(); + // ZooLog.logTraceMessage(LOG, + // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, + // "Add a buffer to outgoingBuffers"); + // ZooLog.logTraceMessage(LOG, + // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, + //"sk " + sk + " is valid: " + + // sk.isValid(), ); + outgoingBuffers.add(bb); + if (sk.isValid()) { + sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); + } + } catch (RuntimeException e) { + LOG.error("FIXMSG",e); + throw e; + } + } + } + + void doIO(SelectionKey k) throws InterruptedException { + try { + if (sock == null) { + return; + } + if (k.isReadable()) { + int rc = sock.read(incomingBuffer); + if (rc < 0) { + throw new IOException("Read error"); + } + if (incomingBuffer.remaining() == 0) { + incomingBuffer.flip(); + if (incomingBuffer == lenBuffer) { + readLength(k); + } else if (!initialized) { + stats.packetsReceived++; + ServerStats.getInstance().incrementPacketsReceived(); + readConnectRequest(); + lenBuffer.clear(); + incomingBuffer = lenBuffer; + } else { + stats.packetsReceived++; + ServerStats.getInstance().incrementPacketsReceived(); + readRequest(); + lenBuffer.clear(); + incomingBuffer = lenBuffer; + } + } + } + if (k.isWritable()) { + // ZooLog.logTraceMessage(LOG, + // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK + // "outgoingBuffers.size() = " + + // outgoingBuffers.size()); + if (outgoingBuffers.size() > 0) { + // ZooLog.logTraceMessage(LOG, + // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, + // "sk " + k + " is valid: " + + // k.isValid()); + + /* + * This is going to reset the buffer position to 0 and the + * limit to the size of the buffer, so that we can fill it + * with data from the non-direct buffers that we need to + * send. + */ + ByteBuffer directBuffer = factory.directBuffer; + directBuffer.clear(); + + for (ByteBuffer b : outgoingBuffers) { + if (directBuffer.remaining() < b.remaining()) { + /* + * When we call put later, if the directBuffer is to + * small to hold everything, nothing will be copied, + * so we've got to slice the buffer if it's too big. + */ + b = (ByteBuffer) b.slice().limit( + directBuffer.remaining()); + } + /* + * put() is going to modify the positions of both + * buffers, put we don't want to change the position of + * the source buffers (we'll do that after the send, if + * needed), so we save and reset the position after the + * copy + */ + int p = b.position(); + directBuffer.put(b); + b.position(p); + if (directBuffer.remaining() == 0) { + break; + } + } + /* + * Do the flip: limit becomes position, position gets set to + * 0. This sets us up for the write. + */ + directBuffer.flip(); + + int sent = sock.write(directBuffer); + ByteBuffer bb; + + // Remove the buffers that we have sent + while (outgoingBuffers.size() > 0) { + bb = outgoingBuffers.peek(); + if (bb == closeConn) { + throw new IOException("closing"); + } + int left = bb.remaining() - sent; + if (left > 0) { + /* + * We only partially sent this buffer, so we update + * the position and exit the loop. + */ + bb.position(bb.position() + sent); + break; + } + stats.packetsSent++; + /* We've sent the whole buffer, so drop the buffer */ + sent -= bb.remaining(); + ServerStats.getInstance().incrementPacketsSent(); + outgoingBuffers.remove(); + } + // ZooLog.logTraceMessage(LOG, + // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send, + // outgoingBuffers.size() = " + outgoingBuffers.size()); + } + synchronized (this) { + if (outgoingBuffers.size() == 0) { + if (!initialized + && (sk.interestOps() & SelectionKey.OP_READ) == 0) { + throw new IOException("Responded to info probe"); + } + sk.interestOps(sk.interestOps() + & (~SelectionKey.OP_WRITE)); + } else { + sk.interestOps(sk.interestOps() + | SelectionKey.OP_WRITE); + } + } + } + } catch (CancelledKeyException e) { + close(); + } catch (IOException e) { + // LOG.error("FIXMSG",e); + close(); + } + } + + private void readRequest() throws IOException { + // We have the request, now process and setup for next + InputStream bais = new ByteBufferInputStream(incomingBuffer); + BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); + RequestHeader h = new RequestHeader(); + h.deserialize(bia, "header"); + // Through the magic of byte buffers, txn will not be + // pointing + // to the start of the txn + incomingBuffer = incomingBuffer.slice(); + if (h.getType() == OpCode.auth) { + AuthPacket authPacket = new AuthPacket(); + ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket); + String scheme = authPacket.getScheme(); + AuthenticationProvider ap = ProviderRegistry.getProvider(scheme); + if (ap == null + || ap.handleAuthentication(this, authPacket.getAuth()) != KeeperException.Code.Ok) { + if (ap == null) + LOG.error("No authentication provider for scheme: " + + scheme); + else + LOG.error("Authentication failed for scheme: " + + scheme); + // send a response... + ReplyHeader rh = new ReplyHeader(h.getXid(), 0, + KeeperException.Code.AuthFailed); + sendResponse(rh, null, null); + // ... and close connection + sendBuffer(NIOServerCnxn.closeConn); + disableRecv(); + } else { + LOG.error("Authentication succeeded for scheme: " + + scheme); + ReplyHeader rh = new ReplyHeader(h.getXid(), 0, + KeeperException.Code.Ok); + sendResponse(rh, null, null); + } + return; + } else { + zk.submitRequest(this, sessionId, h.getType(), h.getXid(), + incomingBuffer, authInfo); + } + if (h.getXid() >= 0) { + synchronized (this) { + outstandingRequests++; + // check throttling + if (zk.getInProcess() > factory.outstandingLimit) { + disableRecv(); + // following lines should not be needed since we are already + // reading + // } else { + // enableRecv(); + } + } + } + } + + public void disableRecv() { + sk.interestOps(sk.interestOps() & (~SelectionKey.OP_READ)); + } + + public void enableRecv() { + if (sk.isValid()) { + int interest = sk.interestOps(); + if ((interest & SelectionKey.OP_READ) == 0) { + sk.interestOps(interest | SelectionKey.OP_READ); + } + } + } + + private void readConnectRequest() throws IOException, InterruptedException { + BinaryInputArchive bia = BinaryInputArchive + .getArchive(new ByteBufferInputStream(incomingBuffer)); + ConnectRequest connReq = new ConnectRequest(); + connReq.deserialize(bia, "connect"); + LOG.warn("Connected to " + sock.socket().getRemoteSocketAddress() + + " lastZxid " + connReq.getLastZxidSeen()); + if (zk == null) { + throw new IOException("ZooKeeperServer not running"); + } + if (connReq.getLastZxidSeen() > zk.dataTree.lastProcessedZxid) { + LOG.error("Client has seen " + + Long.toHexString(connReq.getLastZxidSeen()) + + " our last zxid is " + + Long.toHexString(zk.dataTree.lastProcessedZxid)); + throw new IOException("We are out of date"); + } + sessionTimeout = connReq.getTimeOut(); + byte passwd[] = connReq.getPasswd(); + if (sessionTimeout < zk.tickTime * 2) { + sessionTimeout = zk.tickTime * 2; + } + if (sessionTimeout > zk.tickTime * 20) { + sessionTimeout = zk.tickTime * 20; + } + // We don't want to receive any packets until we are sure that the + // session is setup + disableRecv(); + if (connReq.getSessionId() != 0) { + setSessionId(connReq.getSessionId()); + zk.reopenSession(this, sessionId, passwd, sessionTimeout); + LOG.warn("Renewing session " + Long.toHexString(sessionId)); + } else { + zk.createSession(this, passwd, sessionTimeout); + LOG.warn("Creating new session " + + Long.toHexString(sessionId)); + } + initialized = true; + } + + private void readLength(SelectionKey k) throws IOException { + // Read the length, now get the buffer + int len = lenBuffer.getInt(); + if (!initialized) { + // We take advantage of the limited size of the length to look + // for cmds. They are all 4-bytes which fits inside of an int + if (len == ruokCmd) { + sendBuffer(imok.duplicate()); + sendBuffer(NIOServerCnxn.closeConn); + k.interestOps(SelectionKey.OP_WRITE); + return; + } else if (len == killCmd) { + System.exit(0); + } else if (len == getTraceMaskCmd) { + long traceMask = ZooTrace.getTextTraceLevel(); + ByteBuffer resp = ByteBuffer.allocate(8); + resp.putLong(traceMask); + resp.flip(); + sendBuffer(resp); + sendBuffer(NIOServerCnxn.closeConn); + k.interestOps(SelectionKey.OP_WRITE); + return; + } else if (len == setTraceMaskCmd) { + incomingBuffer = ByteBuffer.allocate(8); + + int rc = sock.read(incomingBuffer); + if (rc < 0) { + throw new IOException("Read error"); + } + System.out.println("rc=" + rc); + incomingBuffer.flip(); + long traceMask = incomingBuffer.getLong(); + ZooTrace.setTextTraceLevel(traceMask); + ByteBuffer resp = ByteBuffer.allocate(8); + resp.putLong(traceMask); + resp.flip(); + sendBuffer(resp); + sendBuffer(NIOServerCnxn.closeConn); + k.interestOps(SelectionKey.OP_WRITE); + return; + } else if (len == dumpCmd) { + if (zk == null) { + sendBuffer(ByteBuffer.wrap("ZooKeeper not active \n" + .getBytes())); + } else { + StringBuffer sb = new StringBuffer(); + sb.append("SessionTracker dump: \n"); + sb.append(zk.sessionTracker.toString()).append("\n"); + sb.append("ephemeral nodes dump:\n"); + sb.append(zk.dataTree.dumpEphemerals()).append("\n"); + sendBuffer(ByteBuffer.wrap(sb.toString().getBytes())); + } + k.interestOps(SelectionKey.OP_WRITE); + return; + } else if (len == reqsCmd) { + StringBuffer sb = new StringBuffer(); + sb.append("Requests:\n"); + synchronized (outstanding) { + for (Request r : outstanding) { + sb.append(r.toString()); + sb.append('\n'); + } + } + sendBuffer(ByteBuffer.wrap(sb.toString().getBytes())); + k.interestOps(SelectionKey.OP_WRITE); + return; + } else if (len == statCmd) { + StringBuffer sb = new StringBuffer(); + if(zk!=null){ + sb.append("Zookeeper version: ").append(Version.getFullVersion()) + .append("\n"); + sb.append("Clients:\n"); + synchronized(factory.cnxns){ + for(NIOServerCnxn c : factory.cnxns){ + sb.append(c.getStats().toString()); + } + } + sb.append("\n"); + sb.append(ServerStats.getInstance().toString()); + sb.append("Node count: ").append(zk.dataTree.getNodeCount()). + append("\n"); + }else + sb.append("ZooKeeperServer not running\n"); + + sendBuffer(ByteBuffer.wrap(sb.toString().getBytes())); + k.interestOps(SelectionKey.OP_WRITE); + return; + } + } + if (len < 0 || len > BinaryInputArchive.maxBuffer) { + throw new IOException("Len error " + len); + } + if (zk == null) { + throw new IOException("ZooKeeperServer not running"); + } + incomingBuffer = ByteBuffer.allocate(len); + } + + /** + * The number of requests that have been submitted but not yet responded to. + */ + int outstandingRequests; + + /* + * (non-Javadoc) + * + * @see org.apache.zookeeper.server.ServerCnxnIface#getSessionTimeout() + */ + public int getSessionTimeout() { + return sessionTimeout; + } + + /** + * This is the id that uniquely identifies the session of a client. Once + * this session is no longer active, the ephemeral nodes will go away. + */ + long sessionId; + + static long nextSessionId = 1; + + public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock, + SelectionKey sk, Factory factory) throws IOException { + this.zk = zk; + this.sock = sock; + this.sk = sk; + this.factory = factory; + sock.socket().setTcpNoDelay(true); + sock.socket().setSoLinger(true, 2); + InetAddress addr = ((InetSocketAddress) sock.socket() + .getRemoteSocketAddress()).getAddress(); + authInfo.add(new Id("ip", addr.getHostAddress())); + authInfo.add(new Id("host", addr.getCanonicalHostName())); + sk.interestOps(SelectionKey.OP_READ); + } + + public String toString() { + return "NIOServerCnxn object with sock = " + sock + " and sk = " + sk; + } + + boolean closed; + + /* + * (non-Javadoc) + * + * @see org.apache.zookeeper.server.ServerCnxnIface#close() + */ + public void close() { + if (closed) { + return; + } + closed = true; + synchronized (factory.cnxns) { + factory.cnxns.remove(this); + } + if (zk != null) { + zk.removeCnxn(this); + } + + ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, + "close NIOServerCnxn: " + sock); + try { + /* + * The following sequence of code is stupid! You would think that + * only sock.close() is needed, but alas, it doesn't work that way. + * If you just do sock.close() there are cases where the socket + * doesn't actually close... + */ + sock.socket().shutdownOutput(); + } catch (IOException e) { + // This is a relatively common exception that we can't avoid + } + try { + sock.socket().shutdownInput(); + } catch (IOException e) { + } + try { + sock.socket().close(); + } catch (IOException e) { + LOG.error("FIXMSG",e); + } + try { + sock.close(); + // XXX The next line doesn't seem to be needed, but some posts + // to forums suggest that it is needed. Keep in mind if errors in + // this section arise. + // factory.selector.wakeup(); + } catch (IOException e) { + LOG.error("FIXMSG",e); + } + sock = null; + if (sk != null) { + try { + // need to cancel this selection key from the selector + sk.cancel(); + } catch (Exception e) { + } + } + } + + private final static byte fourBytes[] = new byte[4]; + + /* + * (non-Javadoc) + * + * @see org.apache.zookeeper.server.ServerCnxnIface#sendResponse(org.apache.zookeeper.proto.ReplyHeader, + * org.apache.jute.Record, java.lang.String) + */ + synchronized public void sendResponse(ReplyHeader h, Record r, String tag) { + if (closed) { + return; + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + // Make space for length + BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); + try { + baos.write(fourBytes); + bos.writeRecord(h, "header"); + if (r != null) { + bos.writeRecord(r, tag); + } + baos.close(); + } catch (IOException e) { + LOG.error("Error serializing response"); + } + byte b[] = baos.toByteArray(); + ByteBuffer bb = ByteBuffer.wrap(b); + bb.putInt(b.length - 4).rewind(); + sendBuffer(bb); + if (h.getXid() > 0) { + synchronized (this.factory) { + outstandingRequests--; + // check throttling + if (zk.getInProcess() < factory.outstandingLimit + || outstandingRequests < 1) { + sk.selector().wakeup(); + enableRecv(); + } + } + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.zookeeper.server.ServerCnxnIface#process(org.apache.zookeeper.proto.WatcherEvent) + */ + synchronized public void process(WatcherEvent event) { + ReplyHeader h = new ReplyHeader(-1, -1L, 0); + ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, + "Deliver event " + event + " to " + + this.sessionId + " through " + this); + sendResponse(h, event, "notification"); + } + + public void finishSessionInit(boolean valid) { + try { + ConnectResponse rsp = new ConnectResponse(0, valid ? sessionTimeout + : 0, valid ? sessionId : 0, // send 0 if session is no + // longer valid + valid ? zk.generatePasswd(sessionId) : new byte[16]); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); + bos.writeInt(-1, "len"); + rsp.serialize(bos, "connect"); + baos.close(); + ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); + bb.putInt(bb.remaining() - 4).rewind(); + sendBuffer(bb); + LOG.warn("Finished init of " + Long.toHexString(sessionId) + + ": " + valid); + if (!valid) { + sendBuffer(closeConn); + } + // Now that the session is ready we can start receiving packets + synchronized (this.factory) { + sk.selector().wakeup(); + enableRecv(); + } + } catch (Exception e) { + LOG.error("FIXMSG",e); + close(); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.zookeeper.server.ServerCnxnIface#getSessionId() + */ + public long getSessionId() { + return sessionId; + } + + public void setSessionId(long sessionId) { + this.sessionId = sessionId; + } + + public ArrayList getAuthInfo() { + return authInfo; + } + + public InetSocketAddress getRemoteAddress() { + return (InetSocketAddress) sock.socket().getRemoteSocketAddress(); + } + + private class CnxnStats implements ServerCnxn.Stats{ + long packetsReceived; + long packetsSent; + + /** + * The number of requests that have been submitted but not yet responded to. + */ + public long getOutstandingRequests() { + return outstandingRequests; + } + public long getPacketsReceived() { + return packetsReceived; + } + public long getPacketsSent() { + return packetsSent; + } + public String toString(){ + StringBuilder sb=new StringBuilder(); + Channel channel = sk.channel(); + if (channel instanceof SocketChannel) { + sb.append(" ").append(((SocketChannel)channel).socket() + .getRemoteSocketAddress()) + .append("[").append(Integer.toHexString(sk.interestOps())) + .append("](queued=").append(getOutstandingRequests()) + .append(",recved=").append(getPacketsReceived()) + .append(",sent=").append(getPacketsSent()).append(")\n"); + } + return sb.toString(); + } + } + + private CnxnStats stats=new CnxnStats(); + public Stats getStats() { + return stats; + } + +} Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java?rev=671303&r1=671302&r2=671303&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java Tue Jun 24 12:04:58 2008 @@ -1,85 +1,85 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server; - -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.text.DateFormat; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -public class PurgeTxnLog { - - static void printUsage(){ - System.out.println("PurgeTxnLog dataLogDir "); - System.out.println("\tdataLogDir -- path to the txn log directory"); - System.exit(1); - } - /** - * @param args PurgeTxnLog dataLogDir - * dataLogDir -- txn log directory - */ - public static void main(String[] args) throws IOException { - if(args.length!=1) - printUsage(); - - File dataDir=new File(args[0]); - - // find the most recent valid snapshot - long highestZxid = -1; - for (File f : dataDir.listFiles()) { - long zxid = ZooKeeperServer.isValidSnapshot(f); - if (zxid > highestZxid) { - highestZxid = zxid; - } - } - // found any valid snapshots? - if(highestZxid==-1) - return; // no snapshots - - // files to exclude from deletion - Set exc=new HashSet(); - exc.add(new File(dataDir, "snapshot."+Long.toHexString(highestZxid))); - exc.addAll(Arrays.asList(ZooKeeperServer.getLogFiles(dataDir.listFiles(),highestZxid))); - - final Set exclude=exc; - List files=Arrays.asList(dataDir.listFiles(new FileFilter(){ - public boolean accept(File f){ - if(!f.getName().startsWith("log.") && - !f.getName().startsWith("snapshot.")) - return false; - if(exclude.contains(f)) - return false; - return true; - }})); - // remove the old files - for(File f: files) - { - System.out.println("Removing file: "+ - DateFormat.getDateTimeInstance().format(f.lastModified())+ - "\t"+f.getPath()); - if(!f.delete()){ - System.err.println("Failed to remove "+f.getPath()); - } - } - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.text.DateFormat; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class PurgeTxnLog { + + static void printUsage(){ + System.out.println("PurgeTxnLog dataLogDir "); + System.out.println("\tdataLogDir -- path to the txn log directory"); + System.exit(1); + } + /** + * @param args PurgeTxnLog dataLogDir + * dataLogDir -- txn log directory + */ + public static void main(String[] args) throws IOException { + if(args.length!=1) + printUsage(); + + File dataDir=new File(args[0]); + + // find the most recent valid snapshot + long highestZxid = -1; + for (File f : dataDir.listFiles()) { + long zxid = ZooKeeperServer.isValidSnapshot(f); + if (zxid > highestZxid) { + highestZxid = zxid; + } + } + // found any valid snapshots? + if(highestZxid==-1) + return; // no snapshots + + // files to exclude from deletion + Set exc=new HashSet(); + exc.add(new File(dataDir, "snapshot."+Long.toHexString(highestZxid))); + exc.addAll(Arrays.asList(ZooKeeperServer.getLogFiles(dataDir.listFiles(),highestZxid))); + + final Set exclude=exc; + List files=Arrays.asList(dataDir.listFiles(new FileFilter(){ + public boolean accept(File f){ + if(!f.getName().startsWith("log.") && + !f.getName().startsWith("snapshot.")) + return false; + if(exclude.contains(f)) + return false; + return true; + }})); + // remove the old files + for(File f: files) + { + System.out.println("Removing file: "+ + DateFormat.getDateTimeInstance().format(f.lastModified())+ + "\t"+f.getPath()); + if(!f.delete()){ + System.err.println("Failed to remove "+f.getPath()); + } + } + } +} Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerConfig.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerConfig.java?rev=671303&r1=671302&r2=671303&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerConfig.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerConfig.java Tue Jun 24 12:04:58 2008 @@ -1,32 +1,32 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zookeeper.server; public class ServerConfig { private int clientPort; private String dataDir; - private String dataLogDir; + private String dataLogDir; protected ServerConfig(int port, String dataDir,String dataLogDir) { this.clientPort = port; this.dataDir = dataDir; - this.dataLogDir=dataLogDir; + this.dataLogDir=dataLogDir; } protected boolean isStandaloneServer(){ return true;