zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [28/51] [partial] zookeeper git commit: ZOOKEEPER-3032: MAVEN MIGRATION - branch-3.5 - move java server, client
Date Fri, 05 Oct 2018 13:36:56 GMT
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3ce1b18b/src/java/main/org/apache/zookeeper/server/quorum/AckRequestProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/AckRequestProcessor.java b/src/java/main/org/apache/zookeeper/server/quorum/AckRequestProcessor.java
deleted file mode 100644
index 8e7e472..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/AckRequestProcessor.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
-
-
-/**
- * This is a very simple RequestProcessor that simply forwards a request from a
- * previous stage to the leader as an ACK.
- */
-class AckRequestProcessor implements RequestProcessor {
-    private static final Logger LOG = LoggerFactory.getLogger(AckRequestProcessor.class);
-    Leader leader;
-
-    AckRequestProcessor(Leader leader) {
-        this.leader = leader;
-    }
-
-    /**
-     * Forward the request as an ACK to the leader
-     */
-    public void processRequest(Request request) {
-        QuorumPeer self = leader.self;
-        if(self != null)
-            leader.processAck(self.getId(), request.zxid, null);
-        else
-            LOG.error("Null QuorumPeer");
-    }
-
-    public void shutdown() {
-        // XXX No need to do anything
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3ce1b18b/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java b/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
deleted file mode 100644
index 0f8c9c1..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
+++ /dev/null
@@ -1,983 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-
-import java.util.concurrent.TimeUnit;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.zookeeper.common.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.server.ZooKeeperThread;
-import org.apache.zookeeper.server.quorum.Election;
-import org.apache.zookeeper.server.quorum.Vote;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
-
-/**
- * @deprecated This class has been deprecated as of release 3.4.0. 
- */
-@Deprecated
-public class AuthFastLeaderElection implements Election {
-    private static final Logger LOG = LoggerFactory.getLogger(AuthFastLeaderElection.class);
-
-    /* Sequence numbers for messages */
-    static int sequencer = 0;
-    static int maxTag = 0;
-
-    /*
-     * Determine how much time a process has to wait once it believes that it
-     * has reached the end of leader election.
-     */
-    static int finalizeWait = 100;
-
-    /*
-     * Challenge counter to avoid replay attacks
-     */
-
-    static int challengeCounter = 0;
-
-    /*
-     * Flag to determine whether to authenticate or not
-     */
-
-    private boolean authEnabled = false;
-
-    static public class Notification {
-        /*
-         * Proposed leader
-         */
-        long leader;
-
-        /*
-         * zxid of the proposed leader
-         */
-        long zxid;
-
-        /*
-         * Epoch
-         */
-        long epoch;
-
-        /*
-         * current state of sender
-         */
-        QuorumPeer.ServerState state;
-
-        /*
-         * Address of the sender
-         */
-        InetSocketAddress addr;
-    }
-
-    /*
-     * Messages to send, both Notifications and Acks
-     */
-    static public class ToSend {
-        static enum mType {
-            crequest, challenge, notification, ack
-        }
-
-        ToSend(mType type, long tag, long leader, long zxid, long epoch,
-                ServerState state, InetSocketAddress addr) {
-
-            switch (type) {
-            case crequest:
-                this.type = 0;
-                this.tag = tag;
-                this.leader = leader;
-                this.zxid = zxid;
-                this.epoch = epoch;
-                this.state = state;
-                this.addr = addr;
-
-                break;
-            case challenge:
-                this.type = 1;
-                this.tag = tag;
-                this.leader = leader;
-                this.zxid = zxid;
-                this.epoch = epoch;
-                this.state = state;
-                this.addr = addr;
-
-                break;
-            case notification:
-                this.type = 2;
-                this.leader = leader;
-                this.zxid = zxid;
-                this.epoch = epoch;
-                this.state = QuorumPeer.ServerState.LOOKING;
-                this.tag = tag;
-                this.addr = addr;
-
-                break;
-            case ack:
-                this.type = 3;
-                this.tag = tag;
-                this.leader = leader;
-                this.zxid = zxid;
-                this.epoch = epoch;
-                this.state = state;
-                this.addr = addr;
-
-                break;
-            default:
-                break;
-            }
-        }
-
-        /*
-         * Message type: 0 notification, 1 acknowledgement
-         */
-        int type;
-
-        /*
-         * Proposed leader in the case of notification
-         */
-        long leader;
-
-        /*
-         * id contains the tag for acks, and zxid for notifications
-         */
-        long zxid;
-
-        /*
-         * Epoch
-         */
-        long epoch;
-
-        /*
-         * Current state;
-         */
-        QuorumPeer.ServerState state;
-
-        /*
-         * Message tag
-         */
-        long tag;
-
-        InetSocketAddress addr;
-    }
-
-    LinkedBlockingQueue<ToSend> sendqueue;
-
-    LinkedBlockingQueue<Notification> recvqueue;
-
-    private class Messenger {
-
-        final DatagramSocket mySocket;
-        long lastProposedLeader;
-        long lastProposedZxid;
-        long lastEpoch;
-        final Set<Long> ackset;
-        final ConcurrentHashMap<Long, Long> challengeMap;
-        final ConcurrentHashMap<Long, Semaphore> challengeMutex;
-        final ConcurrentHashMap<Long, Semaphore> ackMutex;
-        final ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>> addrChallengeMap;
-
-        class WorkerReceiver extends ZooKeeperThread {
-
-            DatagramSocket mySocket;
-            Messenger myMsg;
-
-            WorkerReceiver(DatagramSocket s, Messenger msg) {
-                super("WorkerReceiver-" + s.getRemoteSocketAddress());
-                mySocket = s;
-                myMsg = msg;
-            }
-
-            boolean saveChallenge(long tag, long challenge) {
-                Semaphore s = challengeMutex.get(tag);
-                if (s != null) {
-                        synchronized (Messenger.this) {
-                            challengeMap.put(tag, challenge);
-                            challengeMutex.remove(tag);
-                        }
-
-                
-                        s.release();
-                } else {
-                    LOG.error("No challenge mutex object");
-                }
-                
-
-                return true;
-            }
-
-            public void run() {
-                byte responseBytes[] = new byte[48];
-                ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes);
-                DatagramPacket responsePacket = new DatagramPacket(
-                        responseBytes, responseBytes.length);
-                while (true) {
-                    // Sleeps on receive
-                    try {
-                        responseBuffer.clear();
-                        mySocket.receive(responsePacket);
-                    } catch (IOException e) {
-                        LOG.warn("Ignoring exception receiving", e);
-                    }
-                    // Receive new message
-                    if (responsePacket.getLength() != responseBytes.length) {
-                        LOG.warn("Got a short response: "
-                                + responsePacket.getLength() + " "
-                                + responsePacket.toString());
-                        continue;
-                    }
-                    responseBuffer.clear();
-                    int type = responseBuffer.getInt();
-                    if ((type > 3) || (type < 0)) {
-                        LOG.warn("Got bad Msg type: " + type);
-                        continue;
-                    }
-                    long tag = responseBuffer.getLong();
-
-                    QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
-                    switch (responseBuffer.getInt()) {
-                    case 0:
-                        ackstate = QuorumPeer.ServerState.LOOKING;
-                        break;
-                    case 1:
-                        ackstate = QuorumPeer.ServerState.LEADING;
-                        break;
-                    case 2:
-                        ackstate = QuorumPeer.ServerState.FOLLOWING;
-                        break;
-                    default:
-                        LOG.warn("unknown type " + responseBuffer.getInt());
-                        break;
-                    }
-
-                    Vote current = self.getCurrentVote();
-
-                    switch (type) {
-                    case 0:
-                        // Receive challenge request
-                        ToSend c = new ToSend(ToSend.mType.challenge, tag,
-                                current.getId(), current.getZxid(),
-                                logicalclock.get(), self.getPeerState(),
-                                (InetSocketAddress) responsePacket
-                                        .getSocketAddress());
-                        sendqueue.offer(c);
-                        break;
-                    case 1:
-                        // Receive challenge and store somewhere else
-                        long challenge = responseBuffer.getLong();
-                        saveChallenge(tag, challenge);
-
-                        break;
-                    case 2:
-                        Notification n = new Notification();
-                        n.leader = responseBuffer.getLong();
-                        n.zxid = responseBuffer.getLong();
-                        n.epoch = responseBuffer.getLong();
-                        n.state = ackstate;
-                        n.addr = (InetSocketAddress) responsePacket
-                                .getSocketAddress();
-
-                        if ((myMsg.lastEpoch <= n.epoch)
-                                && ((n.zxid > myMsg.lastProposedZxid) 
-                                || ((n.zxid == myMsg.lastProposedZxid) 
-                                && (n.leader > myMsg.lastProposedLeader)))) {
-                            myMsg.lastProposedZxid = n.zxid;
-                            myMsg.lastProposedLeader = n.leader;
-                            myMsg.lastEpoch = n.epoch;
-                        }
-
-                        long recChallenge;
-                        InetSocketAddress addr = (InetSocketAddress) responsePacket
-                                .getSocketAddress();
-                        if (authEnabled) {
-                            ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(addr);
-                            if(tmpMap != null){
-                                if (tmpMap.get(tag) != null) {
-                                    recChallenge = responseBuffer.getLong();
-
-                                    if (tmpMap.get(tag) == recChallenge) {
-                                        recvqueue.offer(n);
-
-                                        ToSend a = new ToSend(ToSend.mType.ack,
-                                                tag, current.getId(),
-                                                current.getZxid(),
-                                                logicalclock.get(), self.getPeerState(),
-                                                addr);
-
-                                        sendqueue.offer(a);
-                                    } else {
-                                        LOG.warn("Incorrect challenge: "
-                                                + recChallenge + ", "
-                                                + addrChallengeMap.toString());
-                                    }
-                                } else {
-                                    LOG.warn("No challenge for host: " + addr
-                                            + " " + tag);
-                                }
-                            }
-                        } else {
-                            recvqueue.offer(n);
-
-                            ToSend a = new ToSend(ToSend.mType.ack, tag,
-                                    current.getId(), current.getZxid(),
-                                    logicalclock.get(), self.getPeerState(),
-                                    (InetSocketAddress) responsePacket
-                                            .getSocketAddress());
-
-                            sendqueue.offer(a);
-                        }
-                        break;
-
-                    // Upon reception of an ack message, remove it from the
-                    // queue
-                    case 3:
-                        Semaphore s = ackMutex.get(tag);
-                        
-                        if(s != null)
-                            s.release();
-                        else LOG.error("Empty ack semaphore");
-                        
-                        ackset.add(tag);
-
-                        if (authEnabled) {
-                            ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(responsePacket
-                                    .getSocketAddress());
-                            if(tmpMap != null) {
-                                tmpMap.remove(tag);
-                            } else {
-                                LOG.warn("No such address in the ensemble configuration " + responsePacket
-                                    .getSocketAddress());
-                            }
-                        }
-
-                        if (ackstate != QuorumPeer.ServerState.LOOKING) {
-                            Notification outofsync = new Notification();
-                            outofsync.leader = responseBuffer.getLong();
-                            outofsync.zxid = responseBuffer.getLong();
-                            outofsync.epoch = responseBuffer.getLong();
-                            outofsync.state = ackstate;
-                            outofsync.addr = (InetSocketAddress) responsePacket
-                                    .getSocketAddress();
-
-                            recvqueue.offer(outofsync);
-                        }
-
-                        break;
-                    // Default case
-                    default:
-                        LOG.warn("Received message of incorrect type " + type);
-                        break;
-                    }
-                }
-            }
-        }
-
-        class WorkerSender extends ZooKeeperThread {
-
-            Random rand;
-            int maxAttempts;
-            int ackWait = finalizeWait;
-
-            /*
-             * Receives a socket and max number of attempts as input
-             */
-
-            WorkerSender(int attempts) {
-                super("WorkerSender");
-                maxAttempts = attempts;
-                rand = new Random(java.lang.Thread.currentThread().getId()
-                        + Time.currentElapsedTime());
-            }
-
-            long genChallenge() {
-                byte buf[] = new byte[8];
-
-                buf[0] = (byte) ((challengeCounter & 0xff000000) >>> 24);
-                buf[1] = (byte) ((challengeCounter & 0x00ff0000) >>> 16);
-                buf[2] = (byte) ((challengeCounter & 0x0000ff00) >>> 8);
-                buf[3] = (byte) ((challengeCounter & 0x000000ff));
-
-                challengeCounter++;
-                int secret = rand.nextInt(java.lang.Integer.MAX_VALUE);
-
-                buf[4] = (byte) ((secret & 0xff000000) >>> 24);
-                buf[5] = (byte) ((secret & 0x00ff0000) >>> 16);
-                buf[6] = (byte) ((secret & 0x0000ff00) >>> 8);
-                buf[7] = (byte) ((secret & 0x000000ff));
-
-                return (((long)(buf[0] & 0xFF)) << 56)  
-                        + (((long)(buf[1] & 0xFF)) << 48)
-                        + (((long)(buf[2] & 0xFF)) << 40) 
-                        + (((long)(buf[3] & 0xFF)) << 32)
-                        + (((long)(buf[4] & 0xFF)) << 24) 
-                        + (((long)(buf[5] & 0xFF)) << 16)
-                        + (((long)(buf[6] & 0xFF)) << 8) 
-                        + ((long)(buf[7] & 0xFF));
-            }
-
-            public void run() {
-                while (true) {
-                    try {
-                        ToSend m = sendqueue.take();
-                        process(m);
-                    } catch (InterruptedException e) {
-                        break;
-                    }
-
-                }
-            }
-
-            private void process(ToSend m) {
-                int attempts = 0;
-                byte zeroes[];
-                byte requestBytes[] = new byte[48];
-                DatagramPacket requestPacket = new DatagramPacket(requestBytes,
-                        requestBytes.length);
-                ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
-
-                switch (m.type) {
-                case 0:
-                    /*
-                     * Building challenge request packet to send
-                     */
-                    requestBuffer.clear();
-                    requestBuffer.putInt(ToSend.mType.crequest.ordinal());
-                    requestBuffer.putLong(m.tag);
-                    requestBuffer.putInt(m.state.ordinal());
-                    zeroes = new byte[32];
-                    requestBuffer.put(zeroes);
-
-                    requestPacket.setLength(48);
-                    try {
-                        requestPacket.setSocketAddress(m.addr);
-                    } catch (IllegalArgumentException e) {
-                        // Sun doesn't include the address that causes this
-                        // exception to be thrown, so we wrap the exception
-                        // in order to capture this critical detail.
-                        throw new IllegalArgumentException(
-                                "Unable to set socket address on packet, msg:"
-                                + e.getMessage() + " with addr:" + m.addr,
-                                e);
-                    }
-
-                    try {
-                        if (challengeMap.get(m.tag) == null) {
-                            mySocket.send(requestPacket);
-                        }
-                    } catch (IOException e) {
-                        LOG.warn("Exception while sending challenge: ", e);
-                    }
-
-                    break;
-                case 1:
-                    /*
-                     * Building challenge packet to send
-                     */
-
-                    long newChallenge;
-                    ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(m.addr); 
-                    if(tmpMap != null){
-                        Long tmpLong = tmpMap.get(m.tag);
-                        if (tmpLong != null) {
-                            newChallenge = tmpLong;
-                        } else {
-                            newChallenge = genChallenge();
-                        }
-
-                        tmpMap.put(m.tag, newChallenge);
-
-                        requestBuffer.clear();
-                        requestBuffer.putInt(ToSend.mType.challenge.ordinal());
-                        requestBuffer.putLong(m.tag);
-                        requestBuffer.putInt(m.state.ordinal());
-                        requestBuffer.putLong(newChallenge);
-                        zeroes = new byte[24];
-                        requestBuffer.put(zeroes);
-
-                        requestPacket.setLength(48);
-                        try {
-                            requestPacket.setSocketAddress(m.addr);
-                        } catch (IllegalArgumentException e) {
-                            // Sun doesn't include the address that causes this
-                            // exception to be thrown, so we wrap the exception
-                            // in order to capture this critical detail.
-                            throw new IllegalArgumentException(
-                                    "Unable to set socket address on packet, msg:"
-                                    + e.getMessage() + " with addr:" + m.addr,
-                                    e);
-                        }
-
-
-                        try {
-                            mySocket.send(requestPacket);
-                        } catch (IOException e) {
-                            LOG.warn("Exception while sending challenge: ", e);
-                        }
-                    } else {
-                        LOG.error("Address is not in the configuration: " + m.addr);
-                    }
-
-                    break;
-                case 2:
-
-                    /*
-                     * Building notification packet to send
-                     */
-
-                    requestBuffer.clear();
-                    requestBuffer.putInt(m.type);
-                    requestBuffer.putLong(m.tag);
-                    requestBuffer.putInt(m.state.ordinal());
-                    requestBuffer.putLong(m.leader);
-                    requestBuffer.putLong(m.zxid);
-                    requestBuffer.putLong(m.epoch);
-                    zeroes = new byte[8];
-                    requestBuffer.put(zeroes);
-
-                    requestPacket.setLength(48);
-                    try {
-                        requestPacket.setSocketAddress(m.addr);
-                    } catch (IllegalArgumentException e) {
-                        // Sun doesn't include the address that causes this
-                        // exception to be thrown, so we wrap the exception
-                        // in order to capture this critical detail.
-                        throw new IllegalArgumentException(
-                                "Unable to set socket address on packet, msg:"
-                                + e.getMessage() + " with addr:" + m.addr,
-                                e);
-                    }
-
-
-                    boolean myChallenge = false;
-                    boolean myAck = false;
-
-                    while (attempts < maxAttempts) {
-                        try {
-                            /*
-                             * Try to obtain a challenge only if does not have
-                             * one yet
-                             */
-
-                            if (!myChallenge && authEnabled) {
-                                ToSend crequest = new ToSend(
-                                        ToSend.mType.crequest, m.tag, m.leader,
-                                        m.zxid, m.epoch,
-                                        QuorumPeer.ServerState.LOOKING, m.addr);
-                                sendqueue.offer(crequest);
-
-                                try {
-                                    double timeout = ackWait
-                                            * java.lang.Math.pow(2, attempts);
-
-                                    Semaphore s = new Semaphore(0);
-                                    synchronized(Messenger.this) {
-                                        challengeMutex.put(m.tag, s);
-                                        s.tryAcquire((long) timeout, TimeUnit.MILLISECONDS);
-                                        myChallenge = challengeMap
-                                                .containsKey(m.tag);
-                                    }
-                                } catch (InterruptedException e) {
-                                    LOG.warn("Challenge request exception: ", e);
-                                } 
-                            }
-
-                            /*
-                             * If don't have challenge yet, skip sending
-                             * notification
-                             */
-
-                            if (authEnabled && !myChallenge) {
-                                attempts++;
-                                continue;
-                            }
-
-                            if (authEnabled) {
-                                requestBuffer.position(40);
-                                Long tmpLong = challengeMap.get(m.tag);
-                                if(tmpLong != null){
-                                    requestBuffer.putLong(tmpLong);
-                                } else {
-                                    LOG.warn("No challenge with tag: " + m.tag);
-                                }
-                            }
-                            mySocket.send(requestPacket);
-                            try {
-                                Semaphore s = new Semaphore(0);
-                                double timeout = ackWait
-                                        * java.lang.Math.pow(10, attempts);
-                                ackMutex.put(m.tag, s);
-                                s.tryAcquire((int) timeout, TimeUnit.MILLISECONDS);
-                            } catch (InterruptedException e) {
-                                LOG.warn("Ack exception: ", e);
-                            }
-                            
-                            if(ackset.remove(m.tag)){
-                                myAck = true;
-                            } 
-                        
-                        } catch (IOException e) {
-                            LOG.warn("Sending exception: ", e);
-                            /*
-                             * Do nothing, just try again
-                             */
-                        }
-                        if (myAck) {
-                            /*
-                             * Received ack successfully, so return
-                             */
-                            challengeMap.remove(m.tag);
-                            
-                            return;
-                        } else
-                            attempts++;
-                    }
-                    /*
-                     * Return message to queue for another attempt later if
-                     * epoch hasn't changed.
-                     */
-                    if (m.epoch == logicalclock.get()) {
-                        challengeMap.remove(m.tag);
-                        sendqueue.offer(m);
-                    }
-                    break;
-                case 3:
-
-                    requestBuffer.clear();
-                    requestBuffer.putInt(m.type);
-                    requestBuffer.putLong(m.tag);
-                    requestBuffer.putInt(m.state.ordinal());
-                    requestBuffer.putLong(m.leader);
-                    requestBuffer.putLong(m.zxid);
-                    requestBuffer.putLong(m.epoch);
-
-                    requestPacket.setLength(48);
-                    try {
-                        requestPacket.setSocketAddress(m.addr);
-                    } catch (IllegalArgumentException e) {
-                        // Sun doesn't include the address that causes this
-                        // exception to be thrown, so we wrap the exception
-                        // in order to capture this critical detail.
-                        throw new IllegalArgumentException(
-                                "Unable to set socket address on packet, msg:"
-                                + e.getMessage() + " with addr:" + m.addr,
-                                e);
-                    }
-
-
-                    try {
-                        mySocket.send(requestPacket);
-                    } catch (IOException e) {
-                        LOG.warn("Exception while sending ack: ", e);
-                    }
-                    break;
-                default:
-                    LOG.warn("unknown type " + m.type);
-                    break;
-                }
-            }
-        }
-
-        Messenger(int threads, DatagramSocket s) {
-            mySocket = s;
-            ackset =  Collections.<Long>newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
-            challengeMap = new ConcurrentHashMap<Long, Long>();
-            challengeMutex = new ConcurrentHashMap<Long, Semaphore>();
-            ackMutex = new ConcurrentHashMap<Long, Semaphore>();
-            addrChallengeMap = new ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>>();
-            lastProposedLeader = 0;
-            lastProposedZxid = 0;
-            lastEpoch = 0;
-
-            for (int i = 0; i < threads; ++i) {
-                Thread t = new Thread(new WorkerSender(3),
-                        "WorkerSender Thread: " + (i + 1));
-                t.setDaemon(true);
-                t.start();
-            }
-
-            for (QuorumServer server : self.getVotingView().values()) {
-                InetSocketAddress saddr = new InetSocketAddress(server.addr
-                        .getAddress(), port);
-                addrChallengeMap.put(saddr, new ConcurrentHashMap<Long, Long>());
-            }
-
-            Thread t = new Thread(new WorkerReceiver(s, this),
-                    "WorkerReceiver Thread");
-            t.start();
-        }
-
-    }
-
-    QuorumPeer self;
-    int port;
-    AtomicLong logicalclock = new AtomicLong(); /* Election instance */
-    DatagramSocket mySocket;
-    long proposedLeader;
-    long proposedZxid;
-
-    public AuthFastLeaderElection(QuorumPeer self,
-            boolean auth) {
-        this.authEnabled = auth;
-        starter(self);
-    }
-
-    public AuthFastLeaderElection(QuorumPeer self) {
-        starter(self);
-    }
-
-    private void starter(QuorumPeer self) {
-        this.self = self;
-        port = self.getVotingView().get(self.getId()).electionAddr.getPort();
-        proposedLeader = -1;
-        proposedZxid = -1;
-
-        try {
-            mySocket = new DatagramSocket(port);
-            // mySocket.setSoTimeout(20000);
-        } catch (SocketException e1) {
-            e1.printStackTrace();
-            throw new RuntimeException();
-        }
-        sendqueue = new LinkedBlockingQueue<ToSend>(2 * self.getVotingView().size());
-        recvqueue = new LinkedBlockingQueue<Notification>(2 * self.getVotingView()
-                .size());
-        new Messenger(self.getVotingView().size() * 2, mySocket);
-    }
-
-    private void leaveInstance() {
-        logicalclock.incrementAndGet();
-    }
-
-    private void sendNotifications() {
-        for (QuorumServer server : self.getView().values()) {
-
-            ToSend notmsg = new ToSend(ToSend.mType.notification,
-                    AuthFastLeaderElection.sequencer++, proposedLeader,
-                    proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING,
-                    self.getView().get(server.id).electionAddr);
-
-            sendqueue.offer(notmsg);
-        }
-    }
-
-    private boolean totalOrderPredicate(long id, long zxid) {
-        if ((zxid > proposedZxid)
-                || ((zxid == proposedZxid) && (id > proposedLeader)))
-            return true;
-        else
-            return false;
-
-    }
-
-    private boolean termPredicate(HashMap<InetSocketAddress, Vote> votes,
-            long l, long zxid) {
-
-
-        Collection<Vote> votesCast = votes.values();
-        int count = 0;
-        /*
-         * First make the views consistent. Sometimes peers will have different
-         * zxids for a server depending on timing.
-         */
-        for (Vote v : votesCast) {
-            if ((v.getId() == l) && (v.getZxid() == zxid))
-                count++;
-        }
-
-        if (count > (self.getVotingView().size() / 2))
-            return true;
-        else
-            return false;
-
-    }
-
-    /**
-     * There is nothing to shutdown in this implementation of
-     * leader election, so we simply have an empty method.
-     */
-    public void shutdown(){}
-    
-    /**
-     * Invoked in QuorumPeer to find or elect a new leader.
-     * 
-     * @throws InterruptedException
-     */
-    public Vote lookForLeader() throws InterruptedException {
-        try {
-            self.jmxLeaderElectionBean = new LeaderElectionBean();
-            MBeanRegistry.getInstance().register(
-                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);        
-        } catch (Exception e) {
-            LOG.warn("Failed to register with JMX", e);
-            self.jmxLeaderElectionBean = null;
-        }
-
-        try {
-            HashMap<InetSocketAddress, Vote> recvset = 
-                new HashMap<InetSocketAddress, Vote>();
-    
-            HashMap<InetSocketAddress, Vote> outofelection = 
-                new HashMap<InetSocketAddress, Vote>();
-    
-            logicalclock.incrementAndGet();
-    
-            proposedLeader = self.getId();
-            proposedZxid = self.getLastLoggedZxid();
-    
-            LOG.info("Election tally");
-            sendNotifications();
-    
-            /*
-             * Loop in which we exchange notifications until we find a leader
-             */
-    
-            while (self.getPeerState() == ServerState.LOOKING) {
-                /*
-                 * Remove next notification from queue, times out after 2 times
-                 * the termination time
-                 */
-                Notification n = recvqueue.poll(2 * finalizeWait,
-                        TimeUnit.MILLISECONDS);
-    
-                /*
-                 * Sends more notifications if haven't received enough.
-                 * Otherwise processes new notification.
-                 */
-                if (n == null) {
-                    if (((!outofelection.isEmpty()) || (recvset.size() > 1)))
-                        sendNotifications();
-                } else
-                    switch (n.state) {
-                    case LOOKING:
-                        if (n.epoch > logicalclock.get()) {
-                            logicalclock.set( n.epoch );
-                            recvset.clear();
-                            if (totalOrderPredicate(n.leader, n.zxid)) {
-                                proposedLeader = n.leader;
-                                proposedZxid = n.zxid;
-                            }
-                            sendNotifications();
-                        } else if (n.epoch < logicalclock.get()) {
-                            break;
-                        } else if (totalOrderPredicate(n.leader, n.zxid)) {
-                            proposedLeader = n.leader;
-                            proposedZxid = n.zxid;
-    
-                            sendNotifications();
-                        }
-    
-                        recvset.put(n.addr, new Vote(n.leader, n.zxid));
-    
-                        // If have received from all nodes, then terminate
-                        if (self.getVotingView().size() == recvset.size()) {
-                            self.setPeerState((proposedLeader == self.getId()) ? 
-                                    ServerState.LEADING: ServerState.FOLLOWING);
-                            // if (self.state == ServerState.FOLLOWING) {
-                            // Thread.sleep(100);
-                            // }
-                            leaveInstance();
-                            return new Vote(proposedLeader, proposedZxid);
-    
-                        } else if (termPredicate(recvset, proposedLeader,
-                                proposedZxid)) {
-                            // Otherwise, wait for a fixed amount of time
-                            LOG.info("Passed predicate");
-                            Thread.sleep(finalizeWait);
-    
-                            // Notification probe = recvqueue.peek();
-    
-                            // Verify if there is any change in the proposed leader
-                            while ((!recvqueue.isEmpty())
-                                    && !totalOrderPredicate(
-                                            recvqueue.peek().leader, recvqueue
-                                                    .peek().zxid)) {
-                                recvqueue.poll();
-                            }
-                            if (recvqueue.isEmpty()) {
-                                // LOG.warn("Proposed leader: " +
-                                // proposedLeader);
-                                self.setPeerState(
-                                        (proposedLeader == self.getId()) ? 
-                                         ServerState.LEADING :
-                                         ServerState.FOLLOWING);
-    
-                                leaveInstance();
-                                return new Vote(proposedLeader, proposedZxid);
-                            }
-                        }
-                        break;
-                    case LEADING:
-                        outofelection.put(n.addr, new Vote(n.leader, n.zxid));
-    
-                        if (termPredicate(outofelection, n.leader, n.zxid)) {
-    
-                            self.setPeerState((n.leader == self.getId()) ? 
-                                    ServerState.LEADING: ServerState.FOLLOWING);
-    
-                            leaveInstance();
-                            return new Vote(n.leader, n.zxid);
-                        }
-                        break;
-                    case FOLLOWING:
-                        outofelection.put(n.addr, new Vote(n.leader, n.zxid));
-    
-                        if (termPredicate(outofelection, n.leader, n.zxid)) {
-    
-                            self.setPeerState((n.leader == self.getId()) ? 
-                                    ServerState.LEADING: ServerState.FOLLOWING);
-    
-                            leaveInstance();
-                            return new Vote(n.leader, n.zxid);
-                        }
-                        break;
-                    default:
-                        break;
-                    }
-            }
-    
-            return null;
-        } finally {
-            try {
-                if(self.jmxLeaderElectionBean != null){
-                    MBeanRegistry.getInstance().unregister(
-                            self.jmxLeaderElectionBean);
-                }
-            } catch (Exception e) {
-                LOG.warn("Failed to unregister with JMX", e);
-            }
-            self.jmxLeaderElectionBean = null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3ce1b18b/src/java/main/org/apache/zookeeper/server/quorum/BufferStats.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/BufferStats.java b/src/java/main/org/apache/zookeeper/server/quorum/BufferStats.java
deleted file mode 100644
index a76d80f..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/BufferStats.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-/**
- * Provides live statistics about Jute buffer usage in term of proposal and client request size.
- */
-public class BufferStats {
-    public static final int INIT_VALUE = -1;
-
-    /**
-     * Size of the last buffer usage.
-     */
-    private int lastBufferSize = INIT_VALUE;
-
-    /**
-     * Size of the smallest buffer usage.
-     */
-    private int minBufferSize = INIT_VALUE;
-
-    /**
-     * Size of the largest buffer usage.
-     */
-    private int maxBufferSize = INIT_VALUE;
-
-    /**
-     * Size of the last buffer usage.
-     */
-    public synchronized int getLastBufferSize() {
-        return lastBufferSize;
-    }
-
-    /**
-     * Updates statistics by setting the last buffer usage size.
-     */
-    public synchronized void setLastBufferSize(int value) {
-        lastBufferSize = value;
-        if (minBufferSize == INIT_VALUE || value < minBufferSize) {
-            minBufferSize = value;
-        }
-        if (value > maxBufferSize) {
-            maxBufferSize = value;
-        }
-    }
-
-    /**
-     * Size of the smallest buffer usage.
-     */
-    public synchronized int getMinBufferSize() {
-        return minBufferSize;
-    }
-
-    /**
-     * Size of the largest buffer usage.
-     */
-    public synchronized int getMaxBufferSize() {
-        return maxBufferSize;
-    }
-
-    /**
-     * Reset statistics.
-     */
-    public synchronized void reset() {
-        lastBufferSize = INIT_VALUE;
-        minBufferSize = INIT_VALUE;
-        maxBufferSize = INIT_VALUE;
-    }
-
-    @Override
-    public synchronized String toString() {
-        return String.format("%d/%d/%d", lastBufferSize, minBufferSize, maxBufferSize);
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3ce1b18b/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java b/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
deleted file mode 100644
index e87f359..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.WorkerService;
-import org.apache.zookeeper.server.ZooKeeperCriticalThread;
-import org.apache.zookeeper.server.ZooKeeperServerListener;
-
-/**
- * This RequestProcessor matches the incoming committed requests with the
- * locally submitted requests. The trick is that locally submitted requests that
- * change the state of the system will come back as incoming committed requests,
- * so we need to match them up.
- *
- * The CommitProcessor is multi-threaded. Communication between threads is
- * handled via queues, atomics, and wait/notifyAll synchronized on the
- * processor. The CommitProcessor acts as a gateway for allowing requests to
- * continue with the remainder of the processing pipeline. It will allow many
- * read requests but only a single write request to be in flight simultaneously,
- * thus ensuring that write requests are processed in transaction id order.
- *
- *   - 1   commit processor main thread, which watches the request queues and
- *         assigns requests to worker threads based on their sessionId so that
- *         read and write requests for a particular session are always assigned
- *         to the same thread (and hence are guaranteed to run in order).
- *   - 0-N worker threads, which run the rest of the request processor pipeline
- *         on the requests. If configured with 0 worker threads, the primary
- *         commit processor thread runs the pipeline directly.
- *
- * Typical (default) thread counts are: on a 32 core machine, 1 commit
- * processor thread and 32 worker threads.
- *
- * Multi-threading constraints:
- *   - Each session's requests must be processed in order.
- *   - Write requests must be processed in zxid order
- *   - Must ensure no race condition between writes in one session that would
- *     trigger a watch being set by a read request in another session
- *
- * The current implementation solves the third constraint by simply allowing no
- * read requests to be processed in parallel with write requests.
- */
-public class CommitProcessor extends ZooKeeperCriticalThread implements
-        RequestProcessor {
-    private static final Logger LOG = LoggerFactory.getLogger(CommitProcessor.class);
-
-    /** Default: numCores */
-    public static final String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS =
-        "zookeeper.commitProcessor.numWorkerThreads";
-    /** Default worker pool shutdown timeout in ms: 5000 (5s) */
-    public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT =
-        "zookeeper.commitProcessor.shutdownTimeout";
-
-    /**
-     * Requests that we are holding until the commit comes in.
-     */
-    protected final LinkedBlockingQueue<Request> queuedRequests =
-        new LinkedBlockingQueue<Request>();
-
-    /**
-     * Requests that have been committed.
-     */
-    protected final LinkedBlockingQueue<Request> committedRequests =
-        new LinkedBlockingQueue<Request>();
-
-    /** Request for which we are currently awaiting a commit */
-    protected final AtomicReference<Request> nextPending =
-        new AtomicReference<Request>();
-    /** Request currently being committed (ie, sent off to next processor) */
-    private final AtomicReference<Request> currentlyCommitting =
-        new AtomicReference<Request>();
-
-    /** The number of requests currently being processed */
-    protected AtomicInteger numRequestsProcessing = new AtomicInteger(0);
-
-    RequestProcessor nextProcessor;
-
-    protected volatile boolean stopped = true;
-    private long workerShutdownTimeoutMS;
-    protected WorkerService workerPool;
-
-    /**
-     * This flag indicates whether we need to wait for a response to come back from the
-     * leader or we just let the sync operation flow through like a read. The flag will
-     * be false if the CommitProcessor is in a Leader pipeline.
-     */
-    boolean matchSyncs;
-
-    public CommitProcessor(RequestProcessor nextProcessor, String id,
-                           boolean matchSyncs, ZooKeeperServerListener listener) {
-        super("CommitProcessor:" + id, listener);
-        this.nextProcessor = nextProcessor;
-        this.matchSyncs = matchSyncs;
-    }
-
-    private boolean isProcessingRequest() {
-        return numRequestsProcessing.get() != 0;
-    }
-
-    private boolean isWaitingForCommit() {
-        return nextPending.get() != null;
-    }
-
-    private boolean isProcessingCommit() {
-        return currentlyCommitting.get() != null;
-    }
-
-    protected boolean needCommit(Request request) {
-        switch (request.type) {
-            case OpCode.create:
-            case OpCode.create2:
-            case OpCode.createTTL:
-            case OpCode.createContainer:
-            case OpCode.delete:
-            case OpCode.deleteContainer:
-            case OpCode.setData:
-            case OpCode.reconfig:
-            case OpCode.multi:
-            case OpCode.setACL:
-                return true;
-            case OpCode.sync:
-                return matchSyncs;    
-            case OpCode.createSession:
-            case OpCode.closeSession:
-                return !request.isLocalSession();
-            default:
-                return false;
-        }
-    }
-
-    @Override
-    public void run() {
-        Request request;
-        try {
-            while (!stopped) {
-                synchronized(this) {
-                    while (
-                        !stopped &&
-                        ((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) &&
-                         (committedRequests.isEmpty() || isProcessingRequest()))) {
-                        wait();
-                    }
-                }
-
-                /*
-                 * Processing queuedRequests: Process the next requests until we
-                 * find one for which we need to wait for a commit. We cannot
-                 * process a read request while we are processing write request.
-                 */
-                while (!stopped && !isWaitingForCommit() &&
-                       !isProcessingCommit() &&
-                       (request = queuedRequests.poll()) != null) {
-                    if (needCommit(request)) {
-                        nextPending.set(request);
-                    } else {
-                        sendToNextProcessor(request);
-                    }
-                }
-
-                /*
-                 * Processing committedRequests: check and see if the commit
-                 * came in for the pending request. We can only commit a
-                 * request when there is no other request being processed.
-                 */
-                processCommitted();
-            }
-        } catch (Throwable e) {
-            handleException(this.getName(), e);
-        }
-        LOG.info("CommitProcessor exited loop!");
-    }
-
-    /*
-     * Separated this method from the main run loop
-     * for test purposes (ZOOKEEPER-1863)
-     */
-    protected void processCommitted() {
-        Request request;
-
-        if (!stopped && !isProcessingRequest() &&
-                (committedRequests.peek() != null)) {
-
-            /*
-             * ZOOKEEPER-1863: continue only if there is no new request
-             * waiting in queuedRequests or it is waiting for a
-             * commit. 
-             */
-            if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {
-                return;
-            }
-            request = committedRequests.poll();
-
-            /*
-             * We match with nextPending so that we can move to the
-             * next request when it is committed. We also want to
-             * use nextPending because it has the cnxn member set
-             * properly.
-             */
-            Request pending = nextPending.get();
-            if (pending != null &&
-                pending.sessionId == request.sessionId &&
-                pending.cxid == request.cxid) {
-                // we want to send our version of the request.
-                // the pointer to the connection in the request
-                pending.setHdr(request.getHdr());
-                pending.setTxn(request.getTxn());
-                pending.zxid = request.zxid;
-                // Set currentlyCommitting so we will block until this
-                // completes. Cleared by CommitWorkRequest after
-                // nextProcessor returns.
-                currentlyCommitting.set(pending);
-                nextPending.set(null);
-                sendToNextProcessor(pending);
-            } else {
-                // this request came from someone else so just
-                // send the commit packet
-                currentlyCommitting.set(request);
-                sendToNextProcessor(request);
-            }
-        }      
-    }
-
-    @Override
-    public void start() {
-        int numCores = Runtime.getRuntime().availableProcessors();
-        int numWorkerThreads = Integer.getInteger(
-            ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, numCores);
-        workerShutdownTimeoutMS = Long.getLong(
-            ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000);
-
-        LOG.info("Configuring CommitProcessor with "
-                 + (numWorkerThreads > 0 ? numWorkerThreads : "no")
-                 + " worker threads.");
-        if (workerPool == null) {
-            workerPool = new WorkerService(
-                "CommitProcWork", numWorkerThreads, true);
-        }
-        stopped = false;
-        super.start();
-    }
-
-    /**
-     * Schedule final request processing; if a worker thread pool is not being
-     * used, processing is done directly by this thread.
-     */
-    private void sendToNextProcessor(Request request) {
-        numRequestsProcessing.incrementAndGet();
-        workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
-    }
-
-    /**
-     * CommitWorkRequest is a small wrapper class to allow
-     * downstream processing to be run using the WorkerService
-     */
-    private class CommitWorkRequest extends WorkerService.WorkRequest {
-        private final Request request;
-
-        CommitWorkRequest(Request request) {
-            this.request = request;
-        }
-
-        @Override
-        public void cleanup() {
-            if (!stopped) {
-                LOG.error("Exception thrown by downstream processor,"
-                          + " unable to continue.");
-                CommitProcessor.this.halt();
-            }
-        }
-
-        public void doWork() throws RequestProcessorException {
-            try {
-                nextProcessor.processRequest(request);
-            } finally {
-                // If this request is the commit request that was blocking
-                // the processor, clear.
-                currentlyCommitting.compareAndSet(request, null);
-
-                /*
-                 * Decrement outstanding request count. The processor may be
-                 * blocked at the moment because it is waiting for the pipeline
-                 * to drain. In that case, wake it up if there are pending
-                 * requests.
-                 */
-                if (numRequestsProcessing.decrementAndGet() == 0) {
-                    if (!queuedRequests.isEmpty() ||
-                        !committedRequests.isEmpty()) {
-                        wakeup();
-                    }
-                }
-            }
-        }
-    }
-
-    synchronized private void wakeup() {
-        notifyAll();
-    }
-
-    public void commit(Request request) {
-        if (stopped || request == null) {
-            return;
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Committing request:: " + request);
-        }
-        committedRequests.add(request);
-        if (!isProcessingCommit()) {
-            wakeup();
-        }
-    }
-
-    public void processRequest(Request request) {
-        if (stopped) {
-            return;
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Processing request:: " + request);
-        }
-        queuedRequests.add(request);
-        if (!isWaitingForCommit()) {
-            wakeup();
-        }
-    }
-
-    private void halt() {
-        stopped = true;
-        wakeup();
-        queuedRequests.clear();
-        if (workerPool != null) {
-            workerPool.stop();
-        }
-    }
-
-    public void shutdown() {
-        LOG.info("Shutting down");
-
-        halt();
-
-        if (workerPool != null) {
-            workerPool.join(workerShutdownTimeoutMS);
-        }
-
-        if (nextProcessor != null) {
-            nextProcessor.shutdown();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3ce1b18b/src/java/main/org/apache/zookeeper/server/quorum/Election.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Election.java b/src/java/main/org/apache/zookeeper/server/quorum/Election.java
deleted file mode 100644
index 8990638..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/Election.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-
-import org.apache.zookeeper.server.quorum.Vote;
-
-public interface Election {
-    public Vote lookForLeader() throws InterruptedException;
-    public void shutdown();
-}


Mime
View raw message