From commits-return-7228-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Fri Oct 19 14:40:08 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 7A764180771 for ; Fri, 19 Oct 2018 14:40:03 +0200 (CEST) Received: (qmail 87191 invoked by uid 500); 19 Oct 2018 12:40:01 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 86158 invoked by uid 99); 19 Oct 2018 12:40:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Oct 2018 12:40:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 891C4E11BB; Fri, 19 Oct 2018 12:39:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andor@apache.org To: commits@zookeeper.apache.org Date: Fri, 19 Oct 2018 12:40:21 -0000 Message-Id: In-Reply-To: <5db4e085f50c4c09bdc831f415d9301c@git.apache.org> References: <5db4e085f50c4c09bdc831f415d9301c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [23/51] [partial] zookeeper git commit: ZOOKEEPER-3032: MAVEN MIGRATION - zookeeper-server http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java new file mode 100644 index 0000000..f02142f --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -0,0 +1,1660 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.security.auth.login.LoginException; +import javax.security.sasl.SaslException; + +import org.apache.jute.BinaryInputArchive; +import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.Record; +import org.apache.zookeeper.AsyncCallback.ACLCallback; +import org.apache.zookeeper.AsyncCallback.Children2Callback; +import org.apache.zookeeper.AsyncCallback.ChildrenCallback; +import org.apache.zookeeper.AsyncCallback.Create2Callback; +import org.apache.zookeeper.AsyncCallback.DataCallback; +import org.apache.zookeeper.AsyncCallback.MultiCallback; +import org.apache.zookeeper.AsyncCallback.StatCallback; +import org.apache.zookeeper.AsyncCallback.StringCallback; +import org.apache.zookeeper.AsyncCallback.VoidCallback; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.OpResult.ErrorResult; +import org.apache.zookeeper.Watcher.Event; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.ZooKeeper.WatchRegistration; +import org.apache.zookeeper.client.ZKClientConfig; +import org.apache.zookeeper.client.HostProvider; +import org.apache.zookeeper.client.ZooKeeperSaslClient; +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.proto.AuthPacket; +import org.apache.zookeeper.proto.ConnectRequest; +import org.apache.zookeeper.proto.Create2Response; +import org.apache.zookeeper.proto.CreateResponse; +import org.apache.zookeeper.proto.ExistsResponse; +import org.apache.zookeeper.proto.GetACLResponse; +import org.apache.zookeeper.proto.GetChildren2Response; +import org.apache.zookeeper.proto.GetChildrenResponse; +import org.apache.zookeeper.proto.GetDataResponse; +import org.apache.zookeeper.proto.GetSASLRequest; +import org.apache.zookeeper.proto.ReplyHeader; +import org.apache.zookeeper.proto.RequestHeader; +import org.apache.zookeeper.proto.SetACLResponse; +import org.apache.zookeeper.proto.SetDataResponse; +import org.apache.zookeeper.proto.SetWatches; +import org.apache.zookeeper.proto.WatcherEvent; +import org.apache.zookeeper.server.ByteBufferInputStream; +import org.apache.zookeeper.server.ZooKeeperThread; +import org.apache.zookeeper.server.ZooTrace; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * This class manages the socket i/o for the client. ClientCnxn maintains a list + * of available servers to connect to and "transparently" switches servers it is + * connected to as needed. + * + */ +public class ClientCnxn { + private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class); + + /* ZOOKEEPER-706: If a session has a large number of watches set then + * attempting to re-establish those watches after a connection loss may + * fail due to the SetWatches request exceeding the server's configured + * jute.maxBuffer value. To avoid this we instead split the watch + * re-establishement across multiple SetWatches calls. This constant + * controls the size of each call. It is set to 128kB to be conservative + * with respect to the server's 1MB default for jute.maxBuffer. + */ + private static final int SET_WATCHES_MAX_LENGTH = 128 * 1024; + + static class AuthData { + AuthData(String scheme, byte data[]) { + this.scheme = scheme; + this.data = data; + } + + String scheme; + + byte data[]; + } + + private final CopyOnWriteArraySet authInfo = new CopyOnWriteArraySet(); + + /** + * These are the packets that have been sent and are waiting for a response. + */ + private final LinkedList pendingQueue = new LinkedList(); + + /** + * These are the packets that need to be sent. + */ + private final LinkedBlockingDeque outgoingQueue = new LinkedBlockingDeque(); + + private int connectTimeout; + + /** + * The timeout in ms the client negotiated with the server. This is the + * "real" timeout, not the timeout request by the client (which may have + * been increased/decreased by the server which applies bounds to this + * value. + */ + private volatile int negotiatedSessionTimeout; + + private int readTimeout; + + private final int sessionTimeout; + + private final ZooKeeper zooKeeper; + + private final ClientWatchManager watcher; + + private long sessionId; + + private byte sessionPasswd[] = new byte[16]; + + /** + * If true, the connection is allowed to go to r-o mode. This field's value + * is sent, besides other data, during session creation handshake. If the + * server on the other side of the wire is partitioned it'll accept + * read-only clients only. + */ + private boolean readOnly; + + final String chrootPath; + + final SendThread sendThread; + + final EventThread eventThread; + + /** + * Set to true when close is called. Latches the connection such that we + * don't attempt to re-connect to the server if in the middle of closing the + * connection (client sends session disconnect to server as part of close + * operation) + */ + private volatile boolean closing = false; + + /** + * A set of ZooKeeper hosts this client could connect to. + */ + private final HostProvider hostProvider; + + /** + * Is set to true when a connection to a r/w server is established for the + * first time; never changed afterwards. + *

+ * Is used to handle situations when client without sessionId connects to a + * read-only server. Such client receives "fake" sessionId from read-only + * server, but this sessionId is invalid for other servers. So when such + * client finds a r/w server, it sends 0 instead of fake sessionId during + * connection handshake and establishes new, valid session. + *

+ * If this field is false (which implies we haven't seen r/w server before) + * then non-zero sessionId is fake, otherwise it is valid. + */ + volatile boolean seenRwServerBefore = false; + + + public ZooKeeperSaslClient zooKeeperSaslClient; + + private final ZKClientConfig clientConfig; + /** + * If any request's response in not received in configured requestTimeout + * then it is assumed that the response packet is lost. + */ + private long requestTimeout; + + public long getSessionId() { + return sessionId; + } + + public byte[] getSessionPasswd() { + return sessionPasswd; + } + + public int getSessionTimeout() { + return negotiatedSessionTimeout; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress(); + SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress(); + sb + .append("sessionid:0x").append(Long.toHexString(getSessionId())) + .append(" local:").append(local) + .append(" remoteserver:").append(remote) + .append(" lastZxid:").append(lastZxid) + .append(" xid:").append(xid) + .append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount()) + .append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount()) + .append(" queuedpkts:").append(outgoingQueue.size()) + .append(" pendingresp:").append(pendingQueue.size()) + .append(" queuedevents:").append(eventThread.waitingEvents.size()); + + return sb.toString(); + } + + /** + * This class allows us to pass the headers and the relevant records around. + */ + static class Packet { + RequestHeader requestHeader; + + ReplyHeader replyHeader; + + Record request; + + Record response; + + ByteBuffer bb; + + /** Client's view of the path (may differ due to chroot) **/ + String clientPath; + /** Servers's view of the path (may differ due to chroot) **/ + String serverPath; + + boolean finished; + + AsyncCallback cb; + + Object ctx; + + WatchRegistration watchRegistration; + + public boolean readOnly; + + WatchDeregistration watchDeregistration; + + /** Convenience ctor */ + Packet(RequestHeader requestHeader, ReplyHeader replyHeader, + Record request, Record response, + WatchRegistration watchRegistration) { + this(requestHeader, replyHeader, request, response, + watchRegistration, false); + } + + Packet(RequestHeader requestHeader, ReplyHeader replyHeader, + Record request, Record response, + WatchRegistration watchRegistration, boolean readOnly) { + + this.requestHeader = requestHeader; + this.replyHeader = replyHeader; + this.request = request; + this.response = response; + this.readOnly = readOnly; + this.watchRegistration = watchRegistration; + } + + public void createBB() { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); + boa.writeInt(-1, "len"); // We'll fill this in later + if (requestHeader != null) { + requestHeader.serialize(boa, "header"); + } + if (request instanceof ConnectRequest) { + request.serialize(boa, "connect"); + // append "am-I-allowed-to-be-readonly" flag + boa.writeBool(readOnly, "readOnly"); + } else if (request != null) { + request.serialize(boa, "request"); + } + baos.close(); + this.bb = ByteBuffer.wrap(baos.toByteArray()); + this.bb.putInt(this.bb.capacity() - 4); + this.bb.rewind(); + } catch (IOException e) { + LOG.warn("Ignoring unexpected exception", e); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + sb.append("clientPath:" + clientPath); + sb.append(" serverPath:" + serverPath); + sb.append(" finished:" + finished); + + sb.append(" header:: " + requestHeader); + sb.append(" replyHeader:: " + replyHeader); + sb.append(" request:: " + request); + sb.append(" response:: " + response); + + // jute toString is horrible, remove unnecessary newlines + return sb.toString().replaceAll("\r*\n+", " "); + } + } + + /** + * Creates a connection object. The actual network connect doesn't get + * established until needed. The start() instance method must be called + * subsequent to construction. + * + * @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838 + * @param hostProvider + * the list of ZooKeeper servers to connect to + * @param sessionTimeout + * the timeout for connections. + * @param zooKeeper + * the zookeeper object that this connection is related to. + * @param watcher watcher for this connection + * @param clientCnxnSocket + * the socket implementation used (e.g. NIO/Netty) + * @param canBeReadOnly + * whether the connection is allowed to go to read-only + * mode in case of partitioning + * @throws IOException + */ + public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, + ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) + throws IOException { + this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, + clientCnxnSocket, 0, new byte[16], canBeReadOnly); + } + + /** + * Creates a connection object. The actual network connect doesn't get + * established until needed. The start() instance method must be called + * subsequent to construction. + * + * @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838 + * @param hostProvider + * the list of ZooKeeper servers to connect to + * @param sessionTimeout + * the timeout for connections. + * @param zooKeeper + * the zookeeper object that this connection is related to. + * @param watcher watcher for this connection + * @param clientCnxnSocket + * the socket implementation used (e.g. NIO/Netty) + * @param sessionId session id if re-establishing session + * @param sessionPasswd session passwd if re-establishing session + * @param canBeReadOnly + * whether the connection is allowed to go to read-only + * mode in case of partitioning + * @throws IOException + */ + public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, + ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, + long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { + this.zooKeeper = zooKeeper; + this.watcher = watcher; + this.sessionId = sessionId; + this.sessionPasswd = sessionPasswd; + this.sessionTimeout = sessionTimeout; + this.hostProvider = hostProvider; + this.chrootPath = chrootPath; + + connectTimeout = sessionTimeout / hostProvider.size(); + readTimeout = sessionTimeout * 2 / 3; + readOnly = canBeReadOnly; + + sendThread = new SendThread(clientCnxnSocket); + eventThread = new EventThread(); + this.clientConfig=zooKeeper.getClientConfig(); + initRequestTimeout(); + } + + public void start() { + sendThread.start(); + eventThread.start(); + } + + private Object eventOfDeath = new Object(); + + private static class WatcherSetEventPair { + private final Set watchers; + private final WatchedEvent event; + + public WatcherSetEventPair(Set watchers, WatchedEvent event) { + this.watchers = watchers; + this.event = event; + } + } + + /** + * Guard against creating "-EventThread-EventThread-EventThread-..." thread + * names when ZooKeeper object is being created from within a watcher. + * See ZOOKEEPER-795 for details. + */ + private static String makeThreadName(String suffix) { + String name = Thread.currentThread().getName(). + replaceAll("-EventThread", ""); + return name + suffix; + } + + class EventThread extends ZooKeeperThread { + private final LinkedBlockingQueue waitingEvents = + new LinkedBlockingQueue(); + + /** This is really the queued session state until the event + * thread actually processes the event and hands it to the watcher. + * But for all intents and purposes this is the state. + */ + private volatile KeeperState sessionState = KeeperState.Disconnected; + + private volatile boolean wasKilled = false; + private volatile boolean isRunning = false; + + EventThread() { + super(makeThreadName("-EventThread")); + setDaemon(true); + } + + public void queueEvent(WatchedEvent event) { + queueEvent(event, null); + } + + private void queueEvent(WatchedEvent event, + Set materializedWatchers) { + if (event.getType() == EventType.None + && sessionState == event.getState()) { + return; + } + sessionState = event.getState(); + final Set watchers; + if (materializedWatchers == null) { + // materialize the watchers based on the event + watchers = watcher.materialize(event.getState(), + event.getType(), event.getPath()); + } else { + watchers = new HashSet(); + watchers.addAll(materializedWatchers); + } + WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event); + // queue the pair (watch set & event) for later processing + waitingEvents.add(pair); + } + + public void queueCallback(AsyncCallback cb, int rc, String path, + Object ctx) { + waitingEvents.add(new LocalCallback(cb, rc, path, ctx)); + } + + public void queuePacket(Packet packet) { + if (wasKilled) { + synchronized (waitingEvents) { + if (isRunning) waitingEvents.add(packet); + else processEvent(packet); + } + } else { + waitingEvents.add(packet); + } + } + + public void queueEventOfDeath() { + waitingEvents.add(eventOfDeath); + } + + @Override + public void run() { + try { + isRunning = true; + while (true) { + Object event = waitingEvents.take(); + if (event == eventOfDeath) { + wasKilled = true; + } else { + processEvent(event); + } + if (wasKilled) + synchronized (waitingEvents) { + if (waitingEvents.isEmpty()) { + isRunning = false; + break; + } + } + } + } catch (InterruptedException e) { + LOG.error("Event thread exiting due to interruption", e); + } + + LOG.info("EventThread shut down for session: 0x{}", + Long.toHexString(getSessionId())); + } + + private void processEvent(Object event) { + try { + if (event instanceof WatcherSetEventPair) { + // each watcher will process the event + WatcherSetEventPair pair = (WatcherSetEventPair) event; + for (Watcher watcher : pair.watchers) { + try { + watcher.process(pair.event); + } catch (Throwable t) { + LOG.error("Error while calling watcher ", t); + } + } + } else if (event instanceof LocalCallback) { + LocalCallback lcb = (LocalCallback) event; + if (lcb.cb instanceof StatCallback) { + ((StatCallback) lcb.cb).processResult(lcb.rc, lcb.path, + lcb.ctx, null); + } else if (lcb.cb instanceof DataCallback) { + ((DataCallback) lcb.cb).processResult(lcb.rc, lcb.path, + lcb.ctx, null, null); + } else if (lcb.cb instanceof ACLCallback) { + ((ACLCallback) lcb.cb).processResult(lcb.rc, lcb.path, + lcb.ctx, null, null); + } else if (lcb.cb instanceof ChildrenCallback) { + ((ChildrenCallback) lcb.cb).processResult(lcb.rc, + lcb.path, lcb.ctx, null); + } else if (lcb.cb instanceof Children2Callback) { + ((Children2Callback) lcb.cb).processResult(lcb.rc, + lcb.path, lcb.ctx, null, null); + } else if (lcb.cb instanceof StringCallback) { + ((StringCallback) lcb.cb).processResult(lcb.rc, + lcb.path, lcb.ctx, null); + } else { + ((VoidCallback) lcb.cb).processResult(lcb.rc, lcb.path, + lcb.ctx); + } + } else { + Packet p = (Packet) event; + int rc = 0; + String clientPath = p.clientPath; + if (p.replyHeader.getErr() != 0) { + rc = p.replyHeader.getErr(); + } + if (p.cb == null) { + LOG.warn("Somehow a null cb got to EventThread!"); + } else if (p.response instanceof ExistsResponse + || p.response instanceof SetDataResponse + || p.response instanceof SetACLResponse) { + StatCallback cb = (StatCallback) p.cb; + if (rc == 0) { + if (p.response instanceof ExistsResponse) { + cb.processResult(rc, clientPath, p.ctx, + ((ExistsResponse) p.response) + .getStat()); + } else if (p.response instanceof SetDataResponse) { + cb.processResult(rc, clientPath, p.ctx, + ((SetDataResponse) p.response) + .getStat()); + } else if (p.response instanceof SetACLResponse) { + cb.processResult(rc, clientPath, p.ctx, + ((SetACLResponse) p.response) + .getStat()); + } + } else { + cb.processResult(rc, clientPath, p.ctx, null); + } + } else if (p.response instanceof GetDataResponse) { + DataCallback cb = (DataCallback) p.cb; + GetDataResponse rsp = (GetDataResponse) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, rsp + .getData(), rsp.getStat()); + } else { + cb.processResult(rc, clientPath, p.ctx, null, + null); + } + } else if (p.response instanceof GetACLResponse) { + ACLCallback cb = (ACLCallback) p.cb; + GetACLResponse rsp = (GetACLResponse) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, rsp + .getAcl(), rsp.getStat()); + } else { + cb.processResult(rc, clientPath, p.ctx, null, + null); + } + } else if (p.response instanceof GetChildrenResponse) { + ChildrenCallback cb = (ChildrenCallback) p.cb; + GetChildrenResponse rsp = (GetChildrenResponse) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, rsp + .getChildren()); + } else { + cb.processResult(rc, clientPath, p.ctx, null); + } + } else if (p.response instanceof GetChildren2Response) { + Children2Callback cb = (Children2Callback) p.cb; + GetChildren2Response rsp = (GetChildren2Response) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, rsp + .getChildren(), rsp.getStat()); + } else { + cb.processResult(rc, clientPath, p.ctx, null, null); + } + } else if (p.response instanceof CreateResponse) { + StringCallback cb = (StringCallback) p.cb; + CreateResponse rsp = (CreateResponse) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, + (chrootPath == null + ? rsp.getPath() + : rsp.getPath() + .substring(chrootPath.length()))); + } else { + cb.processResult(rc, clientPath, p.ctx, null); + } + } else if (p.response instanceof Create2Response) { + Create2Callback cb = (Create2Callback) p.cb; + Create2Response rsp = (Create2Response) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, + (chrootPath == null + ? rsp.getPath() + : rsp.getPath() + .substring(chrootPath.length())), rsp.getStat()); + } else { + cb.processResult(rc, clientPath, p.ctx, null, null); + } + } else if (p.response instanceof MultiResponse) { + MultiCallback cb = (MultiCallback) p.cb; + MultiResponse rsp = (MultiResponse) p.response; + if (rc == 0) { + List results = rsp.getResultList(); + int newRc = rc; + for (OpResult result : results) { + if (result instanceof ErrorResult + && KeeperException.Code.OK.intValue() != (newRc = ((ErrorResult) result) + .getErr())) { + break; + } + } + cb.processResult(newRc, clientPath, p.ctx, results); + } else { + cb.processResult(rc, clientPath, p.ctx, null); + } + } else if (p.cb instanceof VoidCallback) { + VoidCallback cb = (VoidCallback) p.cb; + cb.processResult(rc, clientPath, p.ctx); + } + } + } catch (Throwable t) { + LOG.error("Caught unexpected throwable", t); + } + } + } + + // @VisibleForTesting + protected void finishPacket(Packet p) { + int err = p.replyHeader.getErr(); + if (p.watchRegistration != null) { + p.watchRegistration.register(err); + } + // Add all the removed watch events to the event queue, so that the + // clients will be notified with 'Data/Child WatchRemoved' event type. + if (p.watchDeregistration != null) { + Map> materializedWatchers = null; + try { + materializedWatchers = p.watchDeregistration.unregister(err); + for (Entry> entry : materializedWatchers + .entrySet()) { + Set watchers = entry.getValue(); + if (watchers.size() > 0) { + queueEvent(p.watchDeregistration.getClientPath(), err, + watchers, entry.getKey()); + // ignore connectionloss when removing from local + // session + p.replyHeader.setErr(Code.OK.intValue()); + } + } + } catch (KeeperException.NoWatcherException nwe) { + p.replyHeader.setErr(nwe.code().intValue()); + } catch (KeeperException ke) { + p.replyHeader.setErr(ke.code().intValue()); + } + } + + if (p.cb == null) { + synchronized (p) { + p.finished = true; + p.notifyAll(); + } + } else { + p.finished = true; + eventThread.queuePacket(p); + } + } + + void queueEvent(String clientPath, int err, + Set materializedWatchers, EventType eventType) { + KeeperState sessionState = KeeperState.SyncConnected; + if (KeeperException.Code.SESSIONEXPIRED.intValue() == err + || KeeperException.Code.CONNECTIONLOSS.intValue() == err) { + sessionState = Event.KeeperState.Disconnected; + } + WatchedEvent event = new WatchedEvent(eventType, sessionState, + clientPath); + eventThread.queueEvent(event, materializedWatchers); + } + + void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) { + eventThread.queueCallback(cb, rc, path, ctx); + } + + private void conLossPacket(Packet p) { + if (p.replyHeader == null) { + return; + } + switch (state) { + case AUTH_FAILED: + p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue()); + break; + case CLOSED: + p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue()); + break; + default: + p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue()); + } + finishPacket(p); + } + + private volatile long lastZxid; + + public long getLastZxid() { + return lastZxid; + } + + static class EndOfStreamException extends IOException { + private static final long serialVersionUID = -5438877188796231422L; + + public EndOfStreamException(String msg) { + super(msg); + } + + @Override + public String toString() { + return "EndOfStreamException: " + getMessage(); + } + } + + private static class SessionTimeoutException extends IOException { + private static final long serialVersionUID = 824482094072071178L; + + public SessionTimeoutException(String msg) { + super(msg); + } + } + + private static class SessionExpiredException extends IOException { + private static final long serialVersionUID = -1388816932076193249L; + + public SessionExpiredException(String msg) { + super(msg); + } + } + + private static class RWServerFoundException extends IOException { + private static final long serialVersionUID = 90431199887158758L; + + public RWServerFoundException(String msg) { + super(msg); + } + } + + /** + * This class services the outgoing request queue and generates the heart + * beats. It also spawns the ReadThread. + */ + class SendThread extends ZooKeeperThread { + private long lastPingSentNs; + private final ClientCnxnSocket clientCnxnSocket; + private Random r = new Random(); + private boolean isFirstConnect = true; + + void readResponse(ByteBuffer incomingBuffer) throws IOException { + ByteBufferInputStream bbis = new ByteBufferInputStream( + incomingBuffer); + BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); + ReplyHeader replyHdr = new ReplyHeader(); + + replyHdr.deserialize(bbia, "header"); + if (replyHdr.getXid() == -2) { + // -2 is the xid for pings + if (LOG.isDebugEnabled()) { + LOG.debug("Got ping response for sessionid: 0x" + + Long.toHexString(sessionId) + + " after " + + ((System.nanoTime() - lastPingSentNs) / 1000000) + + "ms"); + } + return; + } + if (replyHdr.getXid() == -4) { + // -4 is the xid for AuthPacket + if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { + state = States.AUTH_FAILED; + eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, + Watcher.Event.KeeperState.AuthFailed, null) ); + eventThread.queueEventOfDeath(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Got auth sessionid:0x" + + Long.toHexString(sessionId)); + } + return; + } + if (replyHdr.getXid() == -1) { + // -1 means notification + if (LOG.isDebugEnabled()) { + LOG.debug("Got notification sessionid:0x" + + Long.toHexString(sessionId)); + } + WatcherEvent event = new WatcherEvent(); + event.deserialize(bbia, "response"); + + // convert from a server path to a client path + if (chrootPath != null) { + String serverPath = event.getPath(); + if(serverPath.compareTo(chrootPath)==0) + event.setPath("/"); + else if (serverPath.length() > chrootPath.length()) + event.setPath(serverPath.substring(chrootPath.length())); + else { + LOG.warn("Got server path " + event.getPath() + + " which is too short for chroot path " + + chrootPath); + } + } + + WatchedEvent we = new WatchedEvent(event); + if (LOG.isDebugEnabled()) { + LOG.debug("Got " + we + " for sessionid 0x" + + Long.toHexString(sessionId)); + } + + eventThread.queueEvent( we ); + return; + } + + // If SASL authentication is currently in progress, construct and + // send a response packet immediately, rather than queuing a + // response as with other packets. + if (tunnelAuthInProgress()) { + GetSASLRequest request = new GetSASLRequest(); + request.deserialize(bbia,"token"); + zooKeeperSaslClient.respondToServer(request.getToken(), + ClientCnxn.this); + return; + } + + Packet packet; + synchronized (pendingQueue) { + if (pendingQueue.size() == 0) { + throw new IOException("Nothing in the queue, but got " + + replyHdr.getXid()); + } + packet = pendingQueue.remove(); + } + /* + * Since requests are processed in order, we better get a response + * to the first request! + */ + try { + if (packet.requestHeader.getXid() != replyHdr.getXid()) { + packet.replyHeader.setErr( + KeeperException.Code.CONNECTIONLOSS.intValue()); + throw new IOException("Xid out of order. Got Xid " + + replyHdr.getXid() + " with err " + + + replyHdr.getErr() + + " expected Xid " + + packet.requestHeader.getXid() + + " for a packet with details: " + + packet ); + } + + packet.replyHeader.setXid(replyHdr.getXid()); + packet.replyHeader.setErr(replyHdr.getErr()); + packet.replyHeader.setZxid(replyHdr.getZxid()); + if (replyHdr.getZxid() > 0) { + lastZxid = replyHdr.getZxid(); + } + if (packet.response != null && replyHdr.getErr() == 0) { + packet.response.deserialize(bbia, "response"); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Reading reply sessionid:0x" + + Long.toHexString(sessionId) + ", packet:: " + packet); + } + } finally { + finishPacket(packet); + } + } + + SendThread(ClientCnxnSocket clientCnxnSocket) { + super(makeThreadName("-SendThread()")); + state = States.CONNECTING; + this.clientCnxnSocket = clientCnxnSocket; + setDaemon(true); + } + + // TODO: can not name this method getState since Thread.getState() + // already exists + // It would be cleaner to make class SendThread an implementation of + // Runnable + /** + * Used by ClientCnxnSocket + * + * @return + */ + ZooKeeper.States getZkState() { + return state; + } + + ClientCnxnSocket getClientCnxnSocket() { + return clientCnxnSocket; + } + + /** + * Setup session, previous watches, authentication. + */ + void primeConnection() throws IOException { + LOG.info("Socket connection established, initiating session, client: {}, server: {}", + clientCnxnSocket.getLocalSocketAddress(), + clientCnxnSocket.getRemoteSocketAddress()); + isFirstConnect = false; + long sessId = (seenRwServerBefore) ? sessionId : 0; + ConnectRequest conReq = new ConnectRequest(0, lastZxid, + sessionTimeout, sessId, sessionPasswd); + // We add backwards since we are pushing into the front + // Only send if there's a pending watch + // TODO: here we have the only remaining use of zooKeeper in + // this class. It's to be eliminated! + if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) { + List dataWatches = zooKeeper.getDataWatches(); + List existWatches = zooKeeper.getExistWatches(); + List childWatches = zooKeeper.getChildWatches(); + if (!dataWatches.isEmpty() + || !existWatches.isEmpty() || !childWatches.isEmpty()) { + Iterator dataWatchesIter = prependChroot(dataWatches).iterator(); + Iterator existWatchesIter = prependChroot(existWatches).iterator(); + Iterator childWatchesIter = prependChroot(childWatches).iterator(); + long setWatchesLastZxid = lastZxid; + + while (dataWatchesIter.hasNext() + || existWatchesIter.hasNext() || childWatchesIter.hasNext()) { + List dataWatchesBatch = new ArrayList(); + List existWatchesBatch = new ArrayList(); + List childWatchesBatch = new ArrayList(); + int batchLength = 0; + + // Note, we may exceed our max length by a bit when we add the last + // watch in the batch. This isn't ideal, but it makes the code simpler. + while (batchLength < SET_WATCHES_MAX_LENGTH) { + final String watch; + if (dataWatchesIter.hasNext()) { + watch = dataWatchesIter.next(); + dataWatchesBatch.add(watch); + } else if (existWatchesIter.hasNext()) { + watch = existWatchesIter.next(); + existWatchesBatch.add(watch); + } else if (childWatchesIter.hasNext()) { + watch = childWatchesIter.next(); + childWatchesBatch.add(watch); + } else { + break; + } + batchLength += watch.length(); + } + + SetWatches sw = new SetWatches(setWatchesLastZxid, + dataWatchesBatch, + existWatchesBatch, + childWatchesBatch); + RequestHeader header = new RequestHeader(-8, OpCode.setWatches); + Packet packet = new Packet(header, new ReplyHeader(), sw, null, null); + outgoingQueue.addFirst(packet); + } + } + } + + for (AuthData id : authInfo) { + outgoingQueue.addFirst(new Packet(new RequestHeader(-4, + OpCode.auth), null, new AuthPacket(0, id.scheme, + id.data), null, null)); + } + outgoingQueue.addFirst(new Packet(null, null, conReq, + null, null, readOnly)); + clientCnxnSocket.connectionPrimed(); + if (LOG.isDebugEnabled()) { + LOG.debug("Session establishment request sent on " + + clientCnxnSocket.getRemoteSocketAddress()); + } + } + + private List prependChroot(List paths) { + if (chrootPath != null && !paths.isEmpty()) { + for (int i = 0; i < paths.size(); ++i) { + String clientPath = paths.get(i); + String serverPath; + // handle clientPath = "/" + if (clientPath.length() == 1) { + serverPath = chrootPath; + } else { + serverPath = chrootPath + clientPath; + } + paths.set(i, serverPath); + } + } + return paths; + } + + private void sendPing() { + lastPingSentNs = System.nanoTime(); + RequestHeader h = new RequestHeader(-2, OpCode.ping); + queuePacket(h, null, null, null, null, null, null, null, null); + } + + private InetSocketAddress rwServerAddress = null; + + private final static int minPingRwTimeout = 100; + + private final static int maxPingRwTimeout = 60000; + + private int pingRwTimeout = minPingRwTimeout; + + // Set to true if and only if constructor of ZooKeeperSaslClient + // throws a LoginException: see startConnect() below. + private boolean saslLoginFailed = false; + + private void startConnect(InetSocketAddress addr) throws IOException { + // initializing it for new connection + saslLoginFailed = false; + if(!isFirstConnect){ + try { + Thread.sleep(r.nextInt(1000)); + } catch (InterruptedException e) { + LOG.warn("Unexpected exception", e); + } + } + state = States.CONNECTING; + + String hostPort = addr.getHostString() + ":" + addr.getPort(); + MDC.put("myid", hostPort); + setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")")); + if (clientConfig.isSaslClientEnabled()) { + try { + if (zooKeeperSaslClient != null) { + zooKeeperSaslClient.shutdown(); + } + zooKeeperSaslClient = new ZooKeeperSaslClient(getServerPrincipal(addr), clientConfig); + } catch (LoginException e) { + // An authentication error occurred when the SASL client tried to initialize: + // for Kerberos this means that the client failed to authenticate with the KDC. + // This is different from an authentication error that occurs during communication + // with the Zookeeper server, which is handled below. + LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + + "SASL authentication, if Zookeeper server allows it."); + eventThread.queueEvent(new WatchedEvent( + Watcher.Event.EventType.None, + Watcher.Event.KeeperState.AuthFailed, null)); + saslLoginFailed = true; + } + } + logStartConnect(addr); + + clientCnxnSocket.connect(addr); + } + + private String getServerPrincipal(InetSocketAddress addr) { + String principalUserName = clientConfig.getProperty(ZKClientConfig.ZK_SASL_CLIENT_USERNAME, + ZKClientConfig.ZK_SASL_CLIENT_USERNAME_DEFAULT); + String serverPrincipal = principalUserName + "/" + addr.getHostString(); + return serverPrincipal; + } + + private void logStartConnect(InetSocketAddress addr) { + String msg = "Opening socket connection to server " + addr; + if (zooKeeperSaslClient != null) { + msg += ". " + zooKeeperSaslClient.getConfigStatus(); + } + LOG.info(msg); + } + + private static final String RETRY_CONN_MSG = + ", closing socket connection and attempting reconnect"; + @Override + public void run() { + clientCnxnSocket.introduce(this, sessionId, outgoingQueue); + clientCnxnSocket.updateNow(); + clientCnxnSocket.updateLastSendAndHeard(); + int to; + long lastPingRwServer = Time.currentElapsedTime(); + final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds + InetSocketAddress serverAddress = null; + while (state.isAlive()) { + try { + if (!clientCnxnSocket.isConnected()) { + // don't re-establish connection if we are closing + if (closing) { + break; + } + if (rwServerAddress != null) { + serverAddress = rwServerAddress; + rwServerAddress = null; + } else { + serverAddress = hostProvider.next(1000); + } + startConnect(serverAddress); + clientCnxnSocket.updateLastSendAndHeard(); + } + + if (state.isConnected()) { + // determine whether we need to send an AuthFailed event. + if (zooKeeperSaslClient != null) { + boolean sendAuthEvent = false; + if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { + try { + zooKeeperSaslClient.initialize(ClientCnxn.this); + } catch (SaslException e) { + LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e); + state = States.AUTH_FAILED; + sendAuthEvent = true; + } + } + KeeperState authState = zooKeeperSaslClient.getKeeperState(); + if (authState != null) { + if (authState == KeeperState.AuthFailed) { + // An authentication error occurred during authentication with the Zookeeper Server. + state = States.AUTH_FAILED; + sendAuthEvent = true; + } else { + if (authState == KeeperState.SaslAuthenticated) { + sendAuthEvent = true; + } + } + } + + if (sendAuthEvent == true) { + eventThread.queueEvent(new WatchedEvent( + Watcher.Event.EventType.None, + authState,null)); + if (state == States.AUTH_FAILED) { + eventThread.queueEventOfDeath(); + } + } + } + to = readTimeout - clientCnxnSocket.getIdleRecv(); + } else { + to = connectTimeout - clientCnxnSocket.getIdleRecv(); + } + + if (to <= 0) { + String warnInfo; + warnInfo = "Client session timed out, have not heard from server in " + + clientCnxnSocket.getIdleRecv() + + "ms" + + " for sessionid 0x" + + Long.toHexString(sessionId); + LOG.warn(warnInfo); + throw new SessionTimeoutException(warnInfo); + } + if (state.isConnected()) { + //1000(1 second) is to prevent race condition missing to send the second ping + //also make sure not to send too many pings when readTimeout is small + int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - + ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); + //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL + if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { + sendPing(); + clientCnxnSocket.updateLastSend(); + } else { + if (timeToNextPing < to) { + to = timeToNextPing; + } + } + } + + // If we are in read-only mode, seek for read/write server + if (state == States.CONNECTEDREADONLY) { + long now = Time.currentElapsedTime(); + int idlePingRwServer = (int) (now - lastPingRwServer); + if (idlePingRwServer >= pingRwTimeout) { + lastPingRwServer = now; + idlePingRwServer = 0; + pingRwTimeout = + Math.min(2*pingRwTimeout, maxPingRwTimeout); + pingRwServer(); + } + to = Math.min(to, pingRwTimeout - idlePingRwServer); + } + + clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); + } catch (Throwable e) { + if (closing) { + if (LOG.isDebugEnabled()) { + // closing so this is expected + LOG.debug("An exception was thrown while closing send thread for session 0x" + + Long.toHexString(getSessionId()) + + " : " + e.getMessage()); + } + break; + } else { + // this is ugly, you have a better way speak up + if (e instanceof SessionExpiredException) { + LOG.info(e.getMessage() + ", closing socket connection"); + } else if (e instanceof SessionTimeoutException) { + LOG.info(e.getMessage() + RETRY_CONN_MSG); + } else if (e instanceof EndOfStreamException) { + LOG.info(e.getMessage() + RETRY_CONN_MSG); + } else if (e instanceof RWServerFoundException) { + LOG.info(e.getMessage()); + } else if (e instanceof SocketException) { + LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage()); + } else { + LOG.warn("Session 0x{} for server {}, unexpected error{}", + Long.toHexString(getSessionId()), + serverAddress, + RETRY_CONN_MSG, + e); + } + // At this point, there might still be new packets appended to outgoingQueue. + // they will be handled in next connection or cleared up if closed. + cleanAndNotifyState(); + } + } + } + synchronized (state) { + // When it comes to this point, it guarantees that later queued + // packet to outgoingQueue will be notified of death. + cleanup(); + } + clientCnxnSocket.close(); + if (state.isAlive()) { + eventThread.queueEvent(new WatchedEvent(Event.EventType.None, + Event.KeeperState.Disconnected, null)); + } + eventThread.queueEvent(new WatchedEvent(Event.EventType.None, + Event.KeeperState.Closed, null)); + ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), + "SendThread exited loop for session: 0x" + + Long.toHexString(getSessionId())); + } + + private void cleanAndNotifyState() { + cleanup(); + if (state.isAlive()) { + eventThread.queueEvent(new WatchedEvent(Event.EventType.None, + Event.KeeperState.Disconnected, null)); + } + clientCnxnSocket.updateNow(); + clientCnxnSocket.updateLastSendAndHeard(); + } + + private void pingRwServer() throws RWServerFoundException { + String result = null; + InetSocketAddress addr = hostProvider.next(0); + LOG.info("Checking server " + addr + " for being r/w." + + " Timeout " + pingRwTimeout); + + Socket sock = null; + BufferedReader br = null; + try { + sock = new Socket(addr.getHostString(), addr.getPort()); + sock.setSoLinger(false, -1); + sock.setSoTimeout(1000); + sock.setTcpNoDelay(true); + sock.getOutputStream().write("isro".getBytes()); + sock.getOutputStream().flush(); + sock.shutdownOutput(); + br = new BufferedReader( + new InputStreamReader(sock.getInputStream())); + result = br.readLine(); + } catch (ConnectException e) { + // ignore, this just means server is not up + } catch (IOException e) { + // some unexpected error, warn about it + LOG.warn("Exception while seeking for r/w server " + + e.getMessage(), e); + } finally { + if (sock != null) { + try { + sock.close(); + } catch (IOException e) { + LOG.warn("Unexpected exception", e); + } + } + if (br != null) { + try { + br.close(); + } catch (IOException e) { + LOG.warn("Unexpected exception", e); + } + } + } + + if ("rw".equals(result)) { + pingRwTimeout = minPingRwTimeout; + // save the found address so that it's used during the next + // connection attempt + rwServerAddress = addr; + throw new RWServerFoundException("Majority server found at " + + addr.getHostString() + ":" + addr.getPort()); + } + } + + private void cleanup() { + clientCnxnSocket.cleanup(); + synchronized (pendingQueue) { + for (Packet p : pendingQueue) { + conLossPacket(p); + } + pendingQueue.clear(); + } + // We can't call outgoingQueue.clear() here because + // between iterating and clear up there might be new + // packets added in queuePacket(). + Iterator iter = outgoingQueue.iterator(); + while (iter.hasNext()) { + Packet p = iter.next(); + conLossPacket(p); + iter.remove(); + } + } + + /** + * Callback invoked by the ClientCnxnSocket once a connection has been + * established. + * + * @param _negotiatedSessionTimeout + * @param _sessionId + * @param _sessionPasswd + * @param isRO + * @throws IOException + */ + void onConnected(int _negotiatedSessionTimeout, long _sessionId, + byte[] _sessionPasswd, boolean isRO) throws IOException { + negotiatedSessionTimeout = _negotiatedSessionTimeout; + if (negotiatedSessionTimeout <= 0) { + state = States.CLOSED; + + eventThread.queueEvent(new WatchedEvent( + Watcher.Event.EventType.None, + Watcher.Event.KeeperState.Expired, null)); + eventThread.queueEventOfDeath(); + + String warnInfo; + warnInfo = "Unable to reconnect to ZooKeeper service, session 0x" + + Long.toHexString(sessionId) + " has expired"; + LOG.warn(warnInfo); + throw new SessionExpiredException(warnInfo); + } + if (!readOnly && isRO) { + LOG.error("Read/write client got connected to read-only server"); + } + readTimeout = negotiatedSessionTimeout * 2 / 3; + connectTimeout = negotiatedSessionTimeout / hostProvider.size(); + hostProvider.onConnected(); + sessionId = _sessionId; + sessionPasswd = _sessionPasswd; + state = (isRO) ? + States.CONNECTEDREADONLY : States.CONNECTED; + seenRwServerBefore |= !isRO; + LOG.info("Session establishment complete on server " + + clientCnxnSocket.getRemoteSocketAddress() + + ", sessionid = 0x" + Long.toHexString(sessionId) + + ", negotiated timeout = " + negotiatedSessionTimeout + + (isRO ? " (READ-ONLY mode)" : "")); + KeeperState eventState = (isRO) ? + KeeperState.ConnectedReadOnly : KeeperState.SyncConnected; + eventThread.queueEvent(new WatchedEvent( + Watcher.Event.EventType.None, + eventState, null)); + } + + void close() { + state = States.CLOSED; + clientCnxnSocket.onClosing(); + } + + void testableCloseSocket() throws IOException { + clientCnxnSocket.testableCloseSocket(); + } + + public boolean tunnelAuthInProgress() { + // 1. SASL client is disabled. + if (!clientConfig.isSaslClientEnabled()) { + return false; + } + + // 2. SASL login failed. + if (saslLoginFailed == true) { + return false; + } + + // 3. SendThread has not created the authenticating object yet, + // therefore authentication is (at the earliest stage of being) in progress. + if (zooKeeperSaslClient == null) { + return true; + } + + // 4. authenticating object exists, so ask it for its progress. + return zooKeeperSaslClient.clientTunneledAuthenticationInProgress(); + } + + public void sendPacket(Packet p) throws IOException { + clientCnxnSocket.sendPacket(p); + } + } + + /** + * Shutdown the send/event threads. This method should not be called + * directly - rather it should be called as part of close operation. This + * method is primarily here to allow the tests to verify disconnection + * behavior. + */ + public void disconnect() { + if (LOG.isDebugEnabled()) { + LOG.debug("Disconnecting client for session: 0x" + + Long.toHexString(getSessionId())); + } + + sendThread.close(); + try { + sendThread.join(); + } catch (InterruptedException ex) { + LOG.warn("Got interrupted while waiting for the sender thread to close", ex); + } + eventThread.queueEventOfDeath(); + if (zooKeeperSaslClient != null) { + zooKeeperSaslClient.shutdown(); + } + } + + /** + * Close the connection, which includes; send session disconnect to the + * server, shutdown the send/event threads. + * + * @throws IOException + */ + public void close() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Closing client for session: 0x" + + Long.toHexString(getSessionId())); + } + + try { + RequestHeader h = new RequestHeader(); + h.setType(ZooDefs.OpCode.closeSession); + + submitRequest(h, null, null, null); + } catch (InterruptedException e) { + // ignore, close the send/event threads + } finally { + disconnect(); + } + } + + private int xid = 1; + + // @VisibleForTesting + volatile States state = States.NOT_CONNECTED; + + /* + * getXid() is called externally by ClientCnxnNIO::doIO() when packets are sent from the outgoingQueue to + * the server. Thus, getXid() must be public. + */ + synchronized public int getXid() { + return xid++; + } + + public ReplyHeader submitRequest(RequestHeader h, Record request, + Record response, WatchRegistration watchRegistration) + throws InterruptedException { + return submitRequest(h, request, response, watchRegistration, null); + } + + public ReplyHeader submitRequest(RequestHeader h, Record request, + Record response, WatchRegistration watchRegistration, + WatchDeregistration watchDeregistration) + throws InterruptedException { + ReplyHeader r = new ReplyHeader(); + Packet packet = queuePacket(h, r, request, response, null, null, null, + null, watchRegistration, watchDeregistration); + synchronized (packet) { + if (requestTimeout > 0) { + // Wait for request completion with timeout + waitForPacketFinish(r, packet); + } else { + // Wait for request completion infinitely + while (!packet.finished) { + packet.wait(); + } + } + } + if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) { + sendThread.cleanAndNotifyState(); + } + return r; + } + + /** + * Wait for request completion with timeout. + */ + private void waitForPacketFinish(ReplyHeader r, Packet packet) + throws InterruptedException { + long waitStartTime = Time.currentElapsedTime(); + while (!packet.finished) { + packet.wait(requestTimeout); + if (!packet.finished && ((Time.currentElapsedTime() + - waitStartTime) >= requestTimeout)) { + LOG.error("Timeout error occurred for the packet '{}'.", + packet); + r.setErr(Code.REQUESTTIMEOUT.intValue()); + break; + } + } + } + + public void saslCompleted() { + sendThread.getClientCnxnSocket().saslCompleted(); + } + + public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode) + throws IOException { + // Generate Xid now because it will be sent immediately, + // by call to sendThread.sendPacket() below. + int xid = getXid(); + RequestHeader h = new RequestHeader(); + h.setXid(xid); + h.setType(opCode); + + ReplyHeader r = new ReplyHeader(); + r.setXid(xid); + + Packet p = new Packet(h, r, request, response, null, false); + p.cb = cb; + sendThread.sendPacket(p); + } + + public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, + Record response, AsyncCallback cb, String clientPath, + String serverPath, Object ctx, WatchRegistration watchRegistration) { + return queuePacket(h, r, request, response, cb, clientPath, serverPath, + ctx, watchRegistration, null); + } + + public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, + Record response, AsyncCallback cb, String clientPath, + String serverPath, Object ctx, WatchRegistration watchRegistration, + WatchDeregistration watchDeregistration) { + Packet packet = null; + + // Note that we do not generate the Xid for the packet yet. It is + // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(), + // where the packet is actually sent. + packet = new Packet(h, r, request, response, watchRegistration); + packet.cb = cb; + packet.ctx = ctx; + packet.clientPath = clientPath; + packet.serverPath = serverPath; + packet.watchDeregistration = watchDeregistration; + // The synchronized block here is for two purpose: + // 1. synchronize with the final cleanup() in SendThread.run() to avoid race + // 2. synchronized against each packet. So if a closeSession packet is added, + // later packet will be notified. + synchronized (state) { + if (!state.isAlive() || closing) { + conLossPacket(packet); + } else { + // If the client is asking to close the session then + // mark as closing + if (h.getType() == OpCode.closeSession) { + closing = true; + } + outgoingQueue.add(packet); + } + } + sendThread.getClientCnxnSocket().packetAdded(); + return packet; + } + + public void addAuthInfo(String scheme, byte auth[]) { + if (!state.isAlive()) { + return; + } + authInfo.add(new AuthData(scheme, auth)); + queuePacket(new RequestHeader(-4, OpCode.auth), null, + new AuthPacket(0, scheme, auth), null, null, null, null, + null, null); + } + + States getState() { + return state; + } + + private static class LocalCallback { + private final AsyncCallback cb; + private final int rc; + private final String path; + private final Object ctx; + + public LocalCallback(AsyncCallback cb, int rc, String path, Object ctx) { + this.cb = cb; + this.rc = rc; + this.path = path; + this.ctx = ctx; + } + } + + private void initRequestTimeout() { + try { + requestTimeout = clientConfig.getLong( + ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, + ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT); + LOG.info("{} value is {}. feature enabled=", + ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, + requestTimeout, requestTimeout > 0); + } catch (NumberFormatException e) { + LOG.error( + "Configured value {} for property {} can not be parsed to long.", + clientConfig.getProperty( + ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT), + ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT); + throw e; + } + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cb9f303b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java new file mode 100644 index 0000000..51ae8bf --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.text.MessageFormat; +import java.util.List; +import java.util.concurrent.LinkedBlockingDeque; + +import org.apache.jute.BinaryInputArchive; +import org.apache.zookeeper.ClientCnxn.Packet; +import org.apache.zookeeper.client.ZKClientConfig; +import org.apache.zookeeper.common.ZKConfig; +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.proto.ConnectResponse; +import org.apache.zookeeper.server.ByteBufferInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A ClientCnxnSocket does the lower level communication with a socket + * implementation. + * + * This code has been moved out of ClientCnxn so that a Netty implementation can + * be provided as an alternative to the NIO socket code. + * + */ +abstract class ClientCnxnSocket { + private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocket.class); + + protected boolean initialized; + + /** + * This buffer is only used to read the length of the incoming message. + */ + protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4); + + /** + * After the length is read, a new incomingBuffer is allocated in + * readLength() to receive the full message. + */ + protected ByteBuffer incomingBuffer = lenBuffer; + protected long sentCount = 0; + protected long recvCount = 0; + protected long lastHeard; + protected long lastSend; + protected long now; + protected ClientCnxn.SendThread sendThread; + protected LinkedBlockingDeque outgoingQueue; + protected ZKClientConfig clientConfig; + private int packetLen = ZKClientConfig.CLIENT_MAX_PACKET_LENGTH_DEFAULT; + + /** + * The sessionId is only available here for Log and Exception messages. + * Otherwise the socket doesn't need to know it. + */ + protected long sessionId; + + void introduce(ClientCnxn.SendThread sendThread, long sessionId, + LinkedBlockingDeque outgoingQueue) { + this.sendThread = sendThread; + this.sessionId = sessionId; + this.outgoingQueue = outgoingQueue; + } + + void updateNow() { + now = Time.currentElapsedTime(); + } + + int getIdleRecv() { + return (int) (now - lastHeard); + } + + int getIdleSend() { + return (int) (now - lastSend); + } + + long getSentCount() { + return sentCount; + } + + long getRecvCount() { + return recvCount; + } + + void updateLastHeard() { + this.lastHeard = now; + } + + void updateLastSend() { + this.lastSend = now; + } + + void updateLastSendAndHeard() { + this.lastSend = now; + this.lastHeard = now; + } + + void readLength() throws IOException { + int len = incomingBuffer.getInt(); + if (len < 0 || len >= packetLen) { + throw new IOException("Packet len " + len + " is out of range!"); + } + incomingBuffer = ByteBuffer.allocate(len); + } + + void readConnectResult() throws IOException { + if (LOG.isTraceEnabled()) { + StringBuilder buf = new StringBuilder("0x["); + for (byte b : incomingBuffer.array()) { + buf.append(Integer.toHexString(b) + ","); + } + buf.append("]"); + LOG.trace("readConnectResult " + incomingBuffer.remaining() + " " + + buf.toString()); + } + ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); + BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); + ConnectResponse conRsp = new ConnectResponse(); + conRsp.deserialize(bbia, "connect"); + + // read "is read-only" flag + boolean isRO = false; + try { + isRO = bbia.readBool("readOnly"); + } catch (IOException e) { + // this is ok -- just a packet from an old server which + // doesn't contain readOnly field + LOG.warn("Connected to an old server; r-o mode will be unavailable"); + } + + this.sessionId = conRsp.getSessionId(); + sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, + conRsp.getPasswd(), isRO); + } + + abstract boolean isConnected(); + + abstract void connect(InetSocketAddress addr) throws IOException; + + /** + * Returns the address to which the socket is connected. + */ + abstract SocketAddress getRemoteSocketAddress(); + + /** + * Returns the address to which the socket is bound. + */ + abstract SocketAddress getLocalSocketAddress(); + + /** + * Clean up resources for a fresh new socket. + * It's called before reconnect or close. + */ + abstract void cleanup(); + + /** + * new packets are added to outgoingQueue. + */ + abstract void packetAdded(); + + /** + * connState is marked CLOSED and notify ClientCnxnSocket to react. + */ + abstract void onClosing(); + + /** + * Sasl completes. Allows non-priming packgets to be sent. + * Note that this method will only be called if Sasl starts and completes. + */ + abstract void saslCompleted(); + + /** + * being called after ClientCnxn finish PrimeConnection + */ + abstract void connectionPrimed(); + + /** + * Do transportation work: + * - read packets into incomingBuffer. + * - write outgoing queue packets. + * - update relevant timestamp. + * + * @param waitTimeOut timeout in blocking wait. Unit in MilliSecond. + * @param pendingQueue These are the packets that have been sent and + * are waiting for a response. + * @param cnxn + * @throws IOException + * @throws InterruptedException + */ + abstract void doTransport(int waitTimeOut, List pendingQueue, + ClientCnxn cnxn) + throws IOException, InterruptedException; + + /** + * Close the socket. + */ + abstract void testableCloseSocket() throws IOException; + + /** + * Close this client. + */ + abstract void close(); + + /** + * Send Sasl packets directly. + * The Sasl process will send the first (requestHeader == null) packet, + * and then block the doTransport write, + * finally unblock it when finished. + */ + abstract void sendPacket(Packet p) throws IOException; + + protected void initProperties() throws IOException { + try { + packetLen = clientConfig.getInt(ZKConfig.JUTE_MAXBUFFER, + ZKClientConfig.CLIENT_MAX_PACKET_LENGTH_DEFAULT); + LOG.info("{} value is {} Bytes", ZKConfig.JUTE_MAXBUFFER, + packetLen); + } catch (NumberFormatException e) { + String msg = MessageFormat.format( + "Configured value {0} for property {1} can not be parsed to int", + clientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER), + ZKConfig.JUTE_MAXBUFFER); + LOG.error(msg); + throw new IOException(msg); + } + } + +}