Return-Path: Delivered-To: apmail-hadoop-zookeeper-commits-archive@locus.apache.org Received: (qmail 34638 invoked from network); 24 Jun 2008 19:06:14 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 24 Jun 2008 19:06:14 -0000 Received: (qmail 3107 invoked by uid 500); 24 Jun 2008 19:06:15 -0000 Delivered-To: apmail-hadoop-zookeeper-commits-archive@hadoop.apache.org Received: (qmail 3089 invoked by uid 500); 24 Jun 2008 19:06:15 -0000 Mailing-List: contact zookeeper-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: zookeeper-dev@ Delivered-To: mailing list zookeeper-commits@hadoop.apache.org Received: (qmail 3078 invoked by uid 99); 24 Jun 2008 19:06:15 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jun 2008 12:06:15 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jun 2008 19:05:33 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D70F72388AB3; Tue, 24 Jun 2008 12:05:00 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r671303 [6/6] - in /hadoop/zookeeper/trunk/src/java: lib/cobertura/lib/ lib/svnant/ main/org/apache/zookeeper/ main/org/apache/zookeeper/server/ main/org/apache/zookeeper/server/auth/ main/org/apache/zookeeper/server/quorum/ main/org/apache... Date: Tue, 24 Jun 2008 19:04:59 -0000 To: zookeeper-commits@hadoop.apache.org From: phunt@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080624190500.D70F72388AB3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=671303&r1=671302&r2=671303&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Tue Jun 24 12:04:58 2008 @@ -1,573 +1,573 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server.quorum; - - -import static org.apache.zookeeper.server.ServerConfig.getClientPort; -import static org.apache.zookeeper.server.ServerConfig.getDataDir; -import static org.apache.zookeeper.server.ServerConfig.getDataLogDir; -import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionAlg; -import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionPort; -import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getInitLimit; -import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServerId; -import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServers; -import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getSyncLimit; -import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getTickTime; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetSocketAddress; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import org.apache.log4j.Logger; - -import org.apache.jute.BinaryInputArchive; -import org.apache.jute.InputArchive; -import org.apache.zookeeper.server.NIOServerCnxn; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.txn.TxnHeader; - -/** - * This class manages the quorum protocol. There are three states this server - * can be in: - *
    - *
  1. Leader election - each server will elect a leader (proposing itself as a - * leader initially).
  2. - *
  3. Follower - the server will synchronize with the leader and replicate any - * transactions.
  4. - *
  5. Leader - the server will process requests and forward them to followers. - * A majority of followers must log the request before it can be accepted. - *
- * - * This class will setup a datagram socket that will always respond with its - * view of the current leader. The response will take the form of: - * - *
- * int xid;
- *
- * long myid;
- *
- * long leader_id;
- *
- * long leader_zxid;
- * 
- * - * The request for the current leader will consist solely of an xid: int xid; - * - *

Configuration file

- * - * When the main() method of this class is used to start the program, the file - * "zoo.cfg" in the current directory will be used to obtain configuration - * information. zoo.cfg is a Properties file, so keys and values are separated - * by equals (=) and the key/value pairs are separated by new lines. The - * following keys are used in the configuration file: - *
    - *
  1. dataDir - The directory where the zookeeper data is stored.
  2. - *
  3. clientPort - The port used to communicate with clients.
  4. - *
  5. tickTime - The duration of a tick in milliseconds. This is the basic - * unit of time in zookeeper.
  6. - *
  7. initLimit - The maximum number of ticks that a follower will wait to - * initially synchronize with a leader.
  8. - *
  9. syncLimit - The maximum number of ticks that a follower will wait for a - * message (including heartbeats) from the leader.
  10. - *
  11. server.id - This is the host:port that the server with the - * given id will use for the quorum protocol.
  12. - *
- * In addition to the zoo.cfg file. There is a file in the data directory called - * "myid" that contains the server id as an ASCII decimal value. - */ -public class QuorumPeer extends Thread implements QuorumStats.Provider { - private static final Logger LOG = Logger.getLogger(QuorumPeer.class); - - /** - * Create an instance of a quorum peer - */ - public interface Factory{ - public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory) throws IOException; - public NIOServerCnxn.Factory createConnectionFactory() throws IOException; - } - - public static class QuorumServer { - public QuorumServer(long id, InetSocketAddress addr) { - this.id = id; - this.addr = addr; - } - - public InetSocketAddress addr; - - public long id; - } - - public enum ServerState { - LOOKING, FOLLOWING, LEADING; - } - /** - * The servers that make up the cluster - */ - ArrayList quorumPeers; - public int getQuorumSize(){ - return quorumPeers.size(); - } - /** - * My id - */ - private long myid; - - - /** - * get the id of this quorum peer. - */ - public long getId() { - return myid; - } - - /** - * This is who I think the leader currently is. - */ - volatile Vote currentVote; - - volatile boolean running = true; - - /** - * The number of milliseconds of each tick - */ - int tickTime; - - /** - * The number of ticks that the initial synchronization phase can take - */ - int initLimit; - - /** - * The number of ticks that can pass between sending a request and getting - * an acknowledgement - */ - int syncLimit; - - /** - * The current tick - */ - int tick; - - /** - * This class simply responds to requests for the current leader of this - * node. - *

- * The request contains just an xid generated by the requestor. - *

- * The response has the xid, the id of this server, the id of the leader, - * and the zxid of the leader. - * - * @author breed - * - */ - class ResponderThread extends Thread { - ResponderThread() { - super("ResponderThread"); - } - - public void run() { - try { - byte b[] = new byte[36]; - ByteBuffer responseBuffer = ByteBuffer.wrap(b); - DatagramPacket packet = new DatagramPacket(b, b.length); - while (true) { - udpSocket.receive(packet); - if (packet.getLength() != 4) { - LOG.warn("Got more than just an xid! Len = " - + packet.getLength()); - } else { - responseBuffer.clear(); - responseBuffer.getInt(); // Skip the xid - responseBuffer.putLong(myid); - switch (state) { - case LOOKING: - responseBuffer.putLong(currentVote.id); - responseBuffer.putLong(currentVote.zxid); - break; - case LEADING: - responseBuffer.putLong(myid); - try { - responseBuffer.putLong(leader.lastProposed); - } catch (NullPointerException npe) { - // This can happen in state transitions, - // just ignore the request - } - break; - case FOLLOWING: - responseBuffer.putLong(currentVote.id); - try { - responseBuffer.putLong(follower.getZxid()); - } catch (NullPointerException npe) { - // This can happen in state transitions, - // just ignore the request - } - } - packet.setData(b); - udpSocket.send(packet); - } - packet.setLength(b.length); - } - } catch (Exception e) { - LOG.warn("Unexpected exception",e); - } finally { - LOG.warn("QuorumPeer responder thread exited"); - } - } - } - - private ServerState state = ServerState.LOOKING; - - public void setPeerState(ServerState newState){ - state=newState; - } - - public ServerState getPeerState(){ - return state; - } - - DatagramSocket udpSocket; - - private InetSocketAddress myQuorumAddr; - - public InetSocketAddress getQuorumAddress(){ - return myQuorumAddr; - } - - /** - * the directory where the snapshot is stored. - */ - private File dataDir; - - /** - * the directory where the logs are stored. - */ - private File dataLogDir; - - Election electionAlg; - - int electionPort; - - NIOServerCnxn.Factory cnxnFactory; - - public QuorumPeer(ArrayList quorumPeers, File dataDir, - File dataLogDir, int electionAlg, int electionPort,long myid, int tickTime, - int initLimit, int syncLimit,NIOServerCnxn.Factory cnxnFactory) throws IOException { - super("QuorumPeer"); - this.cnxnFactory = cnxnFactory; - this.quorumPeers = quorumPeers; - this.dataDir = dataDir; - this.electionPort = electionPort; - this.dataLogDir = dataLogDir; - this.myid = myid; - this.tickTime = tickTime; - this.initLimit = initLimit; - this.syncLimit = syncLimit; - currentVote = new Vote(myid, getLastLoggedZxid()); - for (QuorumServer p : quorumPeers) { - if (p.id == myid) { - myQuorumAddr = p.addr; - break; - } - } - if (myQuorumAddr == null) { - throw new SocketException("My id " + myid + " not in the peer list"); - } - if (electionAlg == 0) { - udpSocket = new DatagramSocket(myQuorumAddr.getPort()); - new ResponderThread().start(); - } - this.electionAlg = createElectionAlgorithm(electionAlg); - QuorumStats.getInstance().setStatsProvider(this); - } - - /** - * This constructor is only used by the existing unit test code. - */ - public QuorumPeer(ArrayList quorumPeers, File dataDir, - File dataLogDir, int clientPort, int electionAlg, int electionPort, - long myid, int tickTime, int initLimit, int syncLimit) throws IOException { - this(quorumPeers,dataDir,dataLogDir,electionAlg,electionPort,myid,tickTime, - initLimit,syncLimit,new NIOServerCnxn.Factory(clientPort)); - } - /** - * The constructor uses the quorum peer config to instantiate the class - */ - public QuorumPeer(NIOServerCnxn.Factory cnxnFactory) throws IOException { - this(getServers(), new File(getDataDir()), new File(getDataLogDir()), - getElectionAlg(), getElectionPort(),getServerId(),getTickTime(), - getInitLimit(), getSyncLimit(),cnxnFactory); - } - - public Follower follower; - public Leader leader; - - protected Follower makeFollower(File dataDir,File dataLogDir) throws IOException { - return new Follower(this, new FollowerZooKeeperServer(dataDir, - dataLogDir, this,new ZooKeeperServer.BasicDataTreeBuilder())); - } - - protected Leader makeLeader(File dataDir,File dataLogDir) throws IOException { - return new Leader(this, new LeaderZooKeeperServer(dataDir, dataLogDir, - this,new ZooKeeperServer.BasicDataTreeBuilder())); - } - - private Election createElectionAlgorithm(int electionAlgorithm){ - Election le=null; - //TODO: use a factory rather than a switch - switch (electionAlgorithm) { - case 0: - // will create a new instance for each run of the protocol - break; - case 1: - le = new AuthFastLeaderElection(this, this.electionPort); - break; - case 2: - le = new AuthFastLeaderElection(this, this.electionPort, true); - break; - case 3: - le = new FastLeaderElection(this, - new QuorumCnxManager(this.electionPort)); - default: - assert false; - } - return le; - } - - protected Election makeLEStrategy(){ - if(electionAlg==null) - return new LeaderElection(this); - return electionAlg; - } - - synchronized protected void setLeader(Leader newLeader){ - leader=newLeader; - } - - synchronized protected void setFollower(Follower newFollower){ - follower=newFollower; - } - - synchronized public ZooKeeperServer getActiveServer(){ - if(leader!=null) - return leader.zk; - else if(follower!=null) - return follower.zk; - return null; - } - - public void run() { - /* - * Main loop - */ - while (running) { - switch (state) { - case LOOKING: - try { - LOG.info("LOOKING"); - currentVote = makeLEStrategy().lookForLeader(); - } catch (Exception e) { - LOG.warn("Unexpected exception",e); - state = ServerState.LOOKING; - } - break; - case FOLLOWING: - try { - LOG.info("FOLLOWING"); - setFollower(makeFollower(dataDir,dataLogDir)); - follower.followLeader(); - } catch (Exception e) { - LOG.warn("Unexpected exception",e); - } finally { - follower.shutdown(); - setFollower(null); - state = ServerState.LOOKING; - } - break; - case LEADING: - LOG.info("LEADING"); - try { - setLeader(makeLeader(dataDir,dataLogDir)); - leader.lead(); - setLeader(null); - } catch (Exception e) { - LOG.warn("Unexpected exception",e); - } finally { - if (leader != null) { - leader.shutdown("Forcing shutdown"); - setLeader(null); - } - state = ServerState.LOOKING; - } - break; - } - } - LOG.warn("QuorumPeer main thread exited"); - } - - public void shutdown() { - running = false; - if (leader != null) { - leader.shutdown("quorum Peer shutdown"); - } - if (follower != null) { - follower.shutdown(); - } - cnxnFactory.shutdown(); - udpSocket.close(); - } - - long getLastLoggedZxid() { - File[] list = dataLogDir.listFiles(); - if (list == null) { - return 0; - } - long maxLog = -1; - long maxSnapShot = 0; - for (File f : list) { - String name = f.getName(); - if (name.startsWith("log.")) { - long zxid = ZooKeeperServer.getZxidFromName(f.getName(), "log"); - if (zxid > maxLog) { - maxLog = zxid; - } - } else if (name.startsWith("snapshot.")) { - long zxid = ZooKeeperServer.getZxidFromName(f.getName(), - "snapshot"); - if (zxid > maxLog) { - maxSnapShot = zxid; - } - } - } - if (maxSnapShot > maxLog) { - return maxSnapShot; - } - long zxid = maxLog; - FileInputStream logStream = null; - try { - logStream = new FileInputStream(new File(dataLogDir, "log." - + Long.toHexString(maxLog))); - BinaryInputArchive ia = BinaryInputArchive.getArchive(logStream); - while (true) { - byte[] bytes = ia.readBuffer("txnEntry"); - if (bytes.length == 0) { - // Since we preallocate, we define EOF to be an - // empty transaction - break; - } - int B = ia.readByte("EOR"); - if (B != 'B') { - break; - } - InputArchive bia = BinaryInputArchive - .getArchive(new ByteArrayInputStream(bytes)); - TxnHeader hdr = new TxnHeader(); - hdr.deserialize(bia, "hdr"); - zxid = hdr.getZxid(); - } - } catch (IOException e) { - LOG.warn("Unexpected exception", e); - } finally { - try { - if (logStream != null) { - logStream.close(); - } - } catch (IOException e) { - LOG.warn("Unexpected exception",e); - } - } - return zxid; - } - - public static void runPeer(QuorumPeer.Factory qpFactory) { - try { - QuorumStats.registerAsConcrete(); - QuorumPeer self = qpFactory.create(qpFactory.createConnectionFactory()); - self.start(); - self.join(); - } catch (Exception e) { - LOG.fatal("Unexpected exception",e); - } - System.exit(2); - } - - public String[] getQuorumPeers() { - List l = new ArrayList(); - synchronized (this) { - if (leader != null) { - synchronized (leader.followers) { - for (FollowerHandler fh : leader.followers) { - if (fh.s == null) - continue; - String s = fh.s.getRemoteSocketAddress().toString(); - if (leader.isFollowerSynced(fh)) - s += "*"; - l.add(s); - } - } - } else if (follower != null) { - l.add(follower.sock.getRemoteSocketAddress().toString()); - } - } - return l.toArray(new String[0]); - } - - public String getServerState() { - switch (state) { - case LOOKING: - return QuorumStats.Provider.LOOKING_STATE; - case LEADING: - return QuorumStats.Provider.LEADING_STATE; - case FOLLOWING: - return QuorumStats.Provider.FOLLOWING_STATE; - } - return QuorumStats.Provider.UNKNOWN_STATE; - } - - public static void main(String args[]) { - if (args.length == 2) { - ZooKeeperServer.main(args); - return; - } - QuorumPeerConfig.parse(args); - - if (!QuorumPeerConfig.isStandalone()) { - runPeer(new QuorumPeer.Factory() { - public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory) - throws IOException { - return new QuorumPeer(cnxnFactory); - } - public NIOServerCnxn.Factory createConnectionFactory() - throws IOException { - return new NIOServerCnxn.Factory(getClientPort()); - } - }); - }else{ - // there is only server in the quorum -- run as standalone - ZooKeeperServer.main(args); - } - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + + +import static org.apache.zookeeper.server.ServerConfig.getClientPort; +import static org.apache.zookeeper.server.ServerConfig.getDataDir; +import static org.apache.zookeeper.server.ServerConfig.getDataLogDir; +import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionAlg; +import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionPort; +import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getInitLimit; +import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServerId; +import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServers; +import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getSyncLimit; +import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getTickTime; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.log4j.Logger; + +import org.apache.jute.BinaryInputArchive; +import org.apache.jute.InputArchive; +import org.apache.zookeeper.server.NIOServerCnxn; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.txn.TxnHeader; + +/** + * This class manages the quorum protocol. There are three states this server + * can be in: + *

    + *
  1. Leader election - each server will elect a leader (proposing itself as a + * leader initially).
  2. + *
  3. Follower - the server will synchronize with the leader and replicate any + * transactions.
  4. + *
  5. Leader - the server will process requests and forward them to followers. + * A majority of followers must log the request before it can be accepted. + *
+ * + * This class will setup a datagram socket that will always respond with its + * view of the current leader. The response will take the form of: + * + *
+ * int xid;
+ *
+ * long myid;
+ *
+ * long leader_id;
+ *
+ * long leader_zxid;
+ * 
+ * + * The request for the current leader will consist solely of an xid: int xid; + * + *

Configuration file

+ * + * When the main() method of this class is used to start the program, the file + * "zoo.cfg" in the current directory will be used to obtain configuration + * information. zoo.cfg is a Properties file, so keys and values are separated + * by equals (=) and the key/value pairs are separated by new lines. The + * following keys are used in the configuration file: + *
    + *
  1. dataDir - The directory where the zookeeper data is stored.
  2. + *
  3. clientPort - The port used to communicate with clients.
  4. + *
  5. tickTime - The duration of a tick in milliseconds. This is the basic + * unit of time in zookeeper.
  6. + *
  7. initLimit - The maximum number of ticks that a follower will wait to + * initially synchronize with a leader.
  8. + *
  9. syncLimit - The maximum number of ticks that a follower will wait for a + * message (including heartbeats) from the leader.
  10. + *
  11. server.id - This is the host:port that the server with the + * given id will use for the quorum protocol.
  12. + *
+ * In addition to the zoo.cfg file. There is a file in the data directory called + * "myid" that contains the server id as an ASCII decimal value. + */ +public class QuorumPeer extends Thread implements QuorumStats.Provider { + private static final Logger LOG = Logger.getLogger(QuorumPeer.class); + + /** + * Create an instance of a quorum peer + */ + public interface Factory{ + public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory) throws IOException; + public NIOServerCnxn.Factory createConnectionFactory() throws IOException; + } + + public static class QuorumServer { + public QuorumServer(long id, InetSocketAddress addr) { + this.id = id; + this.addr = addr; + } + + public InetSocketAddress addr; + + public long id; + } + + public enum ServerState { + LOOKING, FOLLOWING, LEADING; + } + /** + * The servers that make up the cluster + */ + ArrayList quorumPeers; + public int getQuorumSize(){ + return quorumPeers.size(); + } + /** + * My id + */ + private long myid; + + + /** + * get the id of this quorum peer. + */ + public long getId() { + return myid; + } + + /** + * This is who I think the leader currently is. + */ + volatile Vote currentVote; + + volatile boolean running = true; + + /** + * The number of milliseconds of each tick + */ + int tickTime; + + /** + * The number of ticks that the initial synchronization phase can take + */ + int initLimit; + + /** + * The number of ticks that can pass between sending a request and getting + * an acknowledgement + */ + int syncLimit; + + /** + * The current tick + */ + int tick; + + /** + * This class simply responds to requests for the current leader of this + * node. + *

+ * The request contains just an xid generated by the requestor. + *

+ * The response has the xid, the id of this server, the id of the leader, + * and the zxid of the leader. + * + * @author breed + * + */ + class ResponderThread extends Thread { + ResponderThread() { + super("ResponderThread"); + } + + public void run() { + try { + byte b[] = new byte[36]; + ByteBuffer responseBuffer = ByteBuffer.wrap(b); + DatagramPacket packet = new DatagramPacket(b, b.length); + while (true) { + udpSocket.receive(packet); + if (packet.getLength() != 4) { + LOG.warn("Got more than just an xid! Len = " + + packet.getLength()); + } else { + responseBuffer.clear(); + responseBuffer.getInt(); // Skip the xid + responseBuffer.putLong(myid); + switch (state) { + case LOOKING: + responseBuffer.putLong(currentVote.id); + responseBuffer.putLong(currentVote.zxid); + break; + case LEADING: + responseBuffer.putLong(myid); + try { + responseBuffer.putLong(leader.lastProposed); + } catch (NullPointerException npe) { + // This can happen in state transitions, + // just ignore the request + } + break; + case FOLLOWING: + responseBuffer.putLong(currentVote.id); + try { + responseBuffer.putLong(follower.getZxid()); + } catch (NullPointerException npe) { + // This can happen in state transitions, + // just ignore the request + } + } + packet.setData(b); + udpSocket.send(packet); + } + packet.setLength(b.length); + } + } catch (Exception e) { + LOG.warn("Unexpected exception",e); + } finally { + LOG.warn("QuorumPeer responder thread exited"); + } + } + } + + private ServerState state = ServerState.LOOKING; + + public void setPeerState(ServerState newState){ + state=newState; + } + + public ServerState getPeerState(){ + return state; + } + + DatagramSocket udpSocket; + + private InetSocketAddress myQuorumAddr; + + public InetSocketAddress getQuorumAddress(){ + return myQuorumAddr; + } + + /** + * the directory where the snapshot is stored. + */ + private File dataDir; + + /** + * the directory where the logs are stored. + */ + private File dataLogDir; + + Election electionAlg; + + int electionPort; + + NIOServerCnxn.Factory cnxnFactory; + + public QuorumPeer(ArrayList quorumPeers, File dataDir, + File dataLogDir, int electionAlg, int electionPort,long myid, int tickTime, + int initLimit, int syncLimit,NIOServerCnxn.Factory cnxnFactory) throws IOException { + super("QuorumPeer"); + this.cnxnFactory = cnxnFactory; + this.quorumPeers = quorumPeers; + this.dataDir = dataDir; + this.electionPort = electionPort; + this.dataLogDir = dataLogDir; + this.myid = myid; + this.tickTime = tickTime; + this.initLimit = initLimit; + this.syncLimit = syncLimit; + currentVote = new Vote(myid, getLastLoggedZxid()); + for (QuorumServer p : quorumPeers) { + if (p.id == myid) { + myQuorumAddr = p.addr; + break; + } + } + if (myQuorumAddr == null) { + throw new SocketException("My id " + myid + " not in the peer list"); + } + if (electionAlg == 0) { + udpSocket = new DatagramSocket(myQuorumAddr.getPort()); + new ResponderThread().start(); + } + this.electionAlg = createElectionAlgorithm(electionAlg); + QuorumStats.getInstance().setStatsProvider(this); + } + + /** + * This constructor is only used by the existing unit test code. + */ + public QuorumPeer(ArrayList quorumPeers, File dataDir, + File dataLogDir, int clientPort, int electionAlg, int electionPort, + long myid, int tickTime, int initLimit, int syncLimit) throws IOException { + this(quorumPeers,dataDir,dataLogDir,electionAlg,electionPort,myid,tickTime, + initLimit,syncLimit,new NIOServerCnxn.Factory(clientPort)); + } + /** + * The constructor uses the quorum peer config to instantiate the class + */ + public QuorumPeer(NIOServerCnxn.Factory cnxnFactory) throws IOException { + this(getServers(), new File(getDataDir()), new File(getDataLogDir()), + getElectionAlg(), getElectionPort(),getServerId(),getTickTime(), + getInitLimit(), getSyncLimit(),cnxnFactory); + } + + public Follower follower; + public Leader leader; + + protected Follower makeFollower(File dataDir,File dataLogDir) throws IOException { + return new Follower(this, new FollowerZooKeeperServer(dataDir, + dataLogDir, this,new ZooKeeperServer.BasicDataTreeBuilder())); + } + + protected Leader makeLeader(File dataDir,File dataLogDir) throws IOException { + return new Leader(this, new LeaderZooKeeperServer(dataDir, dataLogDir, + this,new ZooKeeperServer.BasicDataTreeBuilder())); + } + + private Election createElectionAlgorithm(int electionAlgorithm){ + Election le=null; + //TODO: use a factory rather than a switch + switch (electionAlgorithm) { + case 0: + // will create a new instance for each run of the protocol + break; + case 1: + le = new AuthFastLeaderElection(this, this.electionPort); + break; + case 2: + le = new AuthFastLeaderElection(this, this.electionPort, true); + break; + case 3: + le = new FastLeaderElection(this, + new QuorumCnxManager(this.electionPort)); + default: + assert false; + } + return le; + } + + protected Election makeLEStrategy(){ + if(electionAlg==null) + return new LeaderElection(this); + return electionAlg; + } + + synchronized protected void setLeader(Leader newLeader){ + leader=newLeader; + } + + synchronized protected void setFollower(Follower newFollower){ + follower=newFollower; + } + + synchronized public ZooKeeperServer getActiveServer(){ + if(leader!=null) + return leader.zk; + else if(follower!=null) + return follower.zk; + return null; + } + + public void run() { + /* + * Main loop + */ + while (running) { + switch (state) { + case LOOKING: + try { + LOG.info("LOOKING"); + currentVote = makeLEStrategy().lookForLeader(); + } catch (Exception e) { + LOG.warn("Unexpected exception",e); + state = ServerState.LOOKING; + } + break; + case FOLLOWING: + try { + LOG.info("FOLLOWING"); + setFollower(makeFollower(dataDir,dataLogDir)); + follower.followLeader(); + } catch (Exception e) { + LOG.warn("Unexpected exception",e); + } finally { + follower.shutdown(); + setFollower(null); + state = ServerState.LOOKING; + } + break; + case LEADING: + LOG.info("LEADING"); + try { + setLeader(makeLeader(dataDir,dataLogDir)); + leader.lead(); + setLeader(null); + } catch (Exception e) { + LOG.warn("Unexpected exception",e); + } finally { + if (leader != null) { + leader.shutdown("Forcing shutdown"); + setLeader(null); + } + state = ServerState.LOOKING; + } + break; + } + } + LOG.warn("QuorumPeer main thread exited"); + } + + public void shutdown() { + running = false; + if (leader != null) { + leader.shutdown("quorum Peer shutdown"); + } + if (follower != null) { + follower.shutdown(); + } + cnxnFactory.shutdown(); + udpSocket.close(); + } + + long getLastLoggedZxid() { + File[] list = dataLogDir.listFiles(); + if (list == null) { + return 0; + } + long maxLog = -1; + long maxSnapShot = 0; + for (File f : list) { + String name = f.getName(); + if (name.startsWith("log.")) { + long zxid = ZooKeeperServer.getZxidFromName(f.getName(), "log"); + if (zxid > maxLog) { + maxLog = zxid; + } + } else if (name.startsWith("snapshot.")) { + long zxid = ZooKeeperServer.getZxidFromName(f.getName(), + "snapshot"); + if (zxid > maxLog) { + maxSnapShot = zxid; + } + } + } + if (maxSnapShot > maxLog) { + return maxSnapShot; + } + long zxid = maxLog; + FileInputStream logStream = null; + try { + logStream = new FileInputStream(new File(dataLogDir, "log." + + Long.toHexString(maxLog))); + BinaryInputArchive ia = BinaryInputArchive.getArchive(logStream); + while (true) { + byte[] bytes = ia.readBuffer("txnEntry"); + if (bytes.length == 0) { + // Since we preallocate, we define EOF to be an + // empty transaction + break; + } + int B = ia.readByte("EOR"); + if (B != 'B') { + break; + } + InputArchive bia = BinaryInputArchive + .getArchive(new ByteArrayInputStream(bytes)); + TxnHeader hdr = new TxnHeader(); + hdr.deserialize(bia, "hdr"); + zxid = hdr.getZxid(); + } + } catch (IOException e) { + LOG.warn("Unexpected exception", e); + } finally { + try { + if (logStream != null) { + logStream.close(); + } + } catch (IOException e) { + LOG.warn("Unexpected exception",e); + } + } + return zxid; + } + + public static void runPeer(QuorumPeer.Factory qpFactory) { + try { + QuorumStats.registerAsConcrete(); + QuorumPeer self = qpFactory.create(qpFactory.createConnectionFactory()); + self.start(); + self.join(); + } catch (Exception e) { + LOG.fatal("Unexpected exception",e); + } + System.exit(2); + } + + public String[] getQuorumPeers() { + List l = new ArrayList(); + synchronized (this) { + if (leader != null) { + synchronized (leader.followers) { + for (FollowerHandler fh : leader.followers) { + if (fh.s == null) + continue; + String s = fh.s.getRemoteSocketAddress().toString(); + if (leader.isFollowerSynced(fh)) + s += "*"; + l.add(s); + } + } + } else if (follower != null) { + l.add(follower.sock.getRemoteSocketAddress().toString()); + } + } + return l.toArray(new String[0]); + } + + public String getServerState() { + switch (state) { + case LOOKING: + return QuorumStats.Provider.LOOKING_STATE; + case LEADING: + return QuorumStats.Provider.LEADING_STATE; + case FOLLOWING: + return QuorumStats.Provider.FOLLOWING_STATE; + } + return QuorumStats.Provider.UNKNOWN_STATE; + } + + public static void main(String args[]) { + if (args.length == 2) { + ZooKeeperServer.main(args); + return; + } + QuorumPeerConfig.parse(args); + + if (!QuorumPeerConfig.isStandalone()) { + runPeer(new QuorumPeer.Factory() { + public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory) + throws IOException { + return new QuorumPeer(cnxnFactory); + } + public NIOServerCnxn.Factory createConnectionFactory() + throws IOException { + return new NIOServerCnxn.Factory(getClientPort()); + } + }); + }else{ + // there is only server in the quorum -- run as standalone + ZooKeeperServer.main(args); + } + } +} Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=671303&r1=671302&r2=671303&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java Tue Jun 24 12:04:58 2008 @@ -1,210 +1,210 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server.quorum; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileReader; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Properties; -import java.util.Map.Entry; - -import org.apache.log4j.Logger; - -import org.apache.zookeeper.server.ServerConfig; -import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; - -public class QuorumPeerConfig extends ServerConfig { - private static final Logger LOG = Logger.getLogger(QuorumPeerConfig.class); - - private int tickTime; - private int initLimit; - private int syncLimit; - private int electionAlg; - private int electionPort; - private ArrayList servers = null; - private long serverId; - - private QuorumPeerConfig(int port, String dataDir, String dataLogDir) { - super(port, dataDir, dataLogDir); - } - - public static void parse(String[] args) { - if(instance!=null) - return; - - try { - if (args.length != 1) { - System.err.println("USAGE: configFile"); - System.exit(2); - } - File zooCfgFile = new File(args[0]); - if (!zooCfgFile.exists()) { - LOG.error(zooCfgFile.toString() + " file is missing"); - System.exit(2); - } - Properties cfg = new Properties(); - cfg.load(new FileInputStream(zooCfgFile)); - ArrayList servers = new ArrayList(); - String dataDir = null; - String dataLogDir = null; - int clientPort = 0; - int tickTime = 0; - int initLimit = 0; - int syncLimit = 0; - int electionAlg = 0; - int electionPort = 0; - for (Entry entry : cfg.entrySet()) { - String key = entry.getKey().toString(); - String value = entry.getValue().toString(); - if (key.equals("dataDir")) { - dataDir = value; - } else if (key.equals("dataLogDir")) { - dataLogDir = value; - } else if (key.equals("clientPort")) { - clientPort = Integer.parseInt(value); - } else if (key.equals("tickTime")) { - tickTime = Integer.parseInt(value); - } else if (key.equals("initLimit")) { - initLimit = Integer.parseInt(value); - } else if (key.equals("syncLimit")) { - syncLimit = Integer.parseInt(value); - } else if (key.equals("electionAlg")) { - electionAlg = Integer.parseInt(value); - } else if (key.equals("electionPort")) { - electionPort = Integer.parseInt(value); - } else if (key.startsWith("server.")) { - int dot = key.indexOf('.'); - long sid = Long.parseLong(key.substring(dot + 1)); - String parts[] = value.split(":"); - if (parts.length != 2) { - LOG.error(value - + " does not have the form host:port"); - } - InetSocketAddress addr = new InetSocketAddress(parts[0], - Integer.parseInt(parts[1])); - servers.add(new QuorumServer(sid, addr)); - } else { - System.setProperty("zookeeper." + key, value); - } - } - if (dataDir == null) { - LOG.error("dataDir is not set"); - System.exit(2); - } - if (dataLogDir == null) { - dataLogDir = dataDir; - } else { - if (!new File(dataLogDir).isDirectory()) { - LOG.error("dataLogDir " + dataLogDir+ " is missing."); - System.exit(2); - } - } - if (clientPort == 0) { - LOG.error("clientPort is not set"); - System.exit(2); - } - if (tickTime == 0) { - LOG.error("tickTime is not set"); - System.exit(2); - } - if (servers.size() > 1 && initLimit == 0) { - LOG.error("initLimit is not set"); - System.exit(2); - } - if (servers.size() > 1 && syncLimit == 0) { - LOG.error("syncLimit is not set"); - System.exit(2); - } - QuorumPeerConfig conf = new QuorumPeerConfig(clientPort, dataDir, - dataLogDir); - conf.tickTime = tickTime; - conf.initLimit = initLimit; - conf.syncLimit = syncLimit; - conf.electionAlg = electionAlg; - conf.electionPort = electionPort; - conf.servers = servers; - if (servers.size() > 1) { - File myIdFile = new File(dataDir, "myid"); - if (!myIdFile.exists()) { - LOG.error(myIdFile.toString() + " file is missing"); - System.exit(2); - } - BufferedReader br = new BufferedReader(new FileReader(myIdFile)); - String myIdString = br.readLine(); - try { - conf.serverId = Long.parseLong(myIdString); - } catch (NumberFormatException e) { - LOG.error(myIdString + " is not a number"); - System.exit(2); - } - } - instance=conf; - } catch (Exception e) { - LOG.error("FIXMSG",e); - System.exit(2); - } - } - - protected boolean isStandaloneServer(){ - return QuorumPeerConfig.getServers().size() <= 1; - } - - public static int getTickTime() { - assert instance instanceof QuorumPeerConfig; - return ((QuorumPeerConfig)instance).tickTime; - } - - public static int getInitLimit() { - assert instance instanceof QuorumPeerConfig; - return ((QuorumPeerConfig)instance).initLimit; - } - - public static int getSyncLimit() { - assert instance instanceof QuorumPeerConfig; - return ((QuorumPeerConfig)instance).syncLimit; - } - - public static int getElectionAlg() { - assert instance instanceof QuorumPeerConfig; - return ((QuorumPeerConfig)instance).electionAlg; - } - - public static int getElectionPort() { - assert instance instanceof QuorumPeerConfig; - return ((QuorumPeerConfig)instance).electionPort; - } - - public static ArrayList getServers() { - assert instance instanceof QuorumPeerConfig; - return ((QuorumPeerConfig)instance).servers; - } - - public static int getQuorumSize(){ - assert instance instanceof QuorumPeerConfig; - return ((QuorumPeerConfig)instance).servers.size(); - } - - public static long getServerId() { - assert instance instanceof QuorumPeerConfig; - return ((QuorumPeerConfig)instance).serverId; - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Properties; +import java.util.Map.Entry; + +import org.apache.log4j.Logger; + +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; + +public class QuorumPeerConfig extends ServerConfig { + private static final Logger LOG = Logger.getLogger(QuorumPeerConfig.class); + + private int tickTime; + private int initLimit; + private int syncLimit; + private int electionAlg; + private int electionPort; + private ArrayList servers = null; + private long serverId; + + private QuorumPeerConfig(int port, String dataDir, String dataLogDir) { + super(port, dataDir, dataLogDir); + } + + public static void parse(String[] args) { + if(instance!=null) + return; + + try { + if (args.length != 1) { + System.err.println("USAGE: configFile"); + System.exit(2); + } + File zooCfgFile = new File(args[0]); + if (!zooCfgFile.exists()) { + LOG.error(zooCfgFile.toString() + " file is missing"); + System.exit(2); + } + Properties cfg = new Properties(); + cfg.load(new FileInputStream(zooCfgFile)); + ArrayList servers = new ArrayList(); + String dataDir = null; + String dataLogDir = null; + int clientPort = 0; + int tickTime = 0; + int initLimit = 0; + int syncLimit = 0; + int electionAlg = 0; + int electionPort = 0; + for (Entry entry : cfg.entrySet()) { + String key = entry.getKey().toString(); + String value = entry.getValue().toString(); + if (key.equals("dataDir")) { + dataDir = value; + } else if (key.equals("dataLogDir")) { + dataLogDir = value; + } else if (key.equals("clientPort")) { + clientPort = Integer.parseInt(value); + } else if (key.equals("tickTime")) { + tickTime = Integer.parseInt(value); + } else if (key.equals("initLimit")) { + initLimit = Integer.parseInt(value); + } else if (key.equals("syncLimit")) { + syncLimit = Integer.parseInt(value); + } else if (key.equals("electionAlg")) { + electionAlg = Integer.parseInt(value); + } else if (key.equals("electionPort")) { + electionPort = Integer.parseInt(value); + } else if (key.startsWith("server.")) { + int dot = key.indexOf('.'); + long sid = Long.parseLong(key.substring(dot + 1)); + String parts[] = value.split(":"); + if (parts.length != 2) { + LOG.error(value + + " does not have the form host:port"); + } + InetSocketAddress addr = new InetSocketAddress(parts[0], + Integer.parseInt(parts[1])); + servers.add(new QuorumServer(sid, addr)); + } else { + System.setProperty("zookeeper." + key, value); + } + } + if (dataDir == null) { + LOG.error("dataDir is not set"); + System.exit(2); + } + if (dataLogDir == null) { + dataLogDir = dataDir; + } else { + if (!new File(dataLogDir).isDirectory()) { + LOG.error("dataLogDir " + dataLogDir+ " is missing."); + System.exit(2); + } + } + if (clientPort == 0) { + LOG.error("clientPort is not set"); + System.exit(2); + } + if (tickTime == 0) { + LOG.error("tickTime is not set"); + System.exit(2); + } + if (servers.size() > 1 && initLimit == 0) { + LOG.error("initLimit is not set"); + System.exit(2); + } + if (servers.size() > 1 && syncLimit == 0) { + LOG.error("syncLimit is not set"); + System.exit(2); + } + QuorumPeerConfig conf = new QuorumPeerConfig(clientPort, dataDir, + dataLogDir); + conf.tickTime = tickTime; + conf.initLimit = initLimit; + conf.syncLimit = syncLimit; + conf.electionAlg = electionAlg; + conf.electionPort = electionPort; + conf.servers = servers; + if (servers.size() > 1) { + File myIdFile = new File(dataDir, "myid"); + if (!myIdFile.exists()) { + LOG.error(myIdFile.toString() + " file is missing"); + System.exit(2); + } + BufferedReader br = new BufferedReader(new FileReader(myIdFile)); + String myIdString = br.readLine(); + try { + conf.serverId = Long.parseLong(myIdString); + } catch (NumberFormatException e) { + LOG.error(myIdString + " is not a number"); + System.exit(2); + } + } + instance=conf; + } catch (Exception e) { + LOG.error("FIXMSG",e); + System.exit(2); + } + } + + protected boolean isStandaloneServer(){ + return QuorumPeerConfig.getServers().size() <= 1; + } + + public static int getTickTime() { + assert instance instanceof QuorumPeerConfig; + return ((QuorumPeerConfig)instance).tickTime; + } + + public static int getInitLimit() { + assert instance instanceof QuorumPeerConfig; + return ((QuorumPeerConfig)instance).initLimit; + } + + public static int getSyncLimit() { + assert instance instanceof QuorumPeerConfig; + return ((QuorumPeerConfig)instance).syncLimit; + } + + public static int getElectionAlg() { + assert instance instanceof QuorumPeerConfig; + return ((QuorumPeerConfig)instance).electionAlg; + } + + public static int getElectionPort() { + assert instance instanceof QuorumPeerConfig; + return ((QuorumPeerConfig)instance).electionPort; + } + + public static ArrayList getServers() { + assert instance instanceof QuorumPeerConfig; + return ((QuorumPeerConfig)instance).servers; + } + + public static int getQuorumSize(){ + assert instance instanceof QuorumPeerConfig; + return ((QuorumPeerConfig)instance).servers.size(); + } + + public static long getServerId() { + assert instance instanceof QuorumPeerConfig; + return ((QuorumPeerConfig)instance).serverId; + } +} Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumStats.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumStats.java?rev=671303&r1=671302&r2=671303&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumStats.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumStats.java Tue Jun 24 12:04:58 2008 @@ -1,20 +1,20 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zookeeper.server.quorum; Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/Profiler.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/Profiler.java?rev=671303&r1=671302&r2=671303&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/Profiler.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/Profiler.java Tue Jun 24 12:04:58 2008 @@ -1,40 +1,40 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server.util; - -import org.apache.log4j.Logger; - -public class Profiler { - private static final Logger LOG = Logger.getLogger(Profiler.class); - - public interface Operation { - public T execute() throws Exception; - } - - public static T profile(Operation op, long timeout, String message) - throws Exception { - long start = System.currentTimeMillis(); - T res = op.execute(); - long end = System.currentTimeMillis(); - if (end - start > timeout) { - LOG.warn("Elapsed "+(end - start) + " ms: " + message); - } - return res; - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.util; + +import org.apache.log4j.Logger; + +public class Profiler { + private static final Logger LOG = Logger.getLogger(Profiler.class); + + public interface Operation { + public T execute() throws Exception; + } + + public static T profile(Operation op, long timeout, String message) + throws Exception { + long start = System.currentTimeMillis(); + T res = op.execute(); + long end = System.currentTimeMillis(); + if (end - start > timeout) { + LOG.warn("Elapsed "+(end - start) + " ms: " + message); + } + return res; + } +} Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/version/util/VerGen.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/version/util/VerGen.java?rev=671303&r1=671302&r2=671303&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/version/util/VerGen.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/version/util/VerGen.java Tue Jun 24 12:04:58 2008 @@ -1,20 +1,20 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zookeeper.version.util; Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/ZooKeeperServerTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/ZooKeeperServerTest.java?rev=671303&r1=671302&r2=671303&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/ZooKeeperServerTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/ZooKeeperServerTest.java Tue Jun 24 12:04:58 2008 @@ -1,70 +1,70 @@ -package org.apache.zookeeper.server; - -import java.io.File; -import java.util.List; - -import junit.framework.TestCase; - -public class ZooKeeperServerTest extends TestCase { - public void testSortDataDirAscending() { - File[] files = new File[5]; - - files[0] = new File("foo.10027c6de"); - files[1] = new File("foo.10027c6df"); - files[2] = new File("bar.10027c6dd"); - files[3] = new File("foo.10027c6dc"); - files[4] = new File("foo.20027c6dc"); - - File[] orig = files.clone(); - - List filelist = ZooKeeperServer.sortDataDir(files, "foo", true); - - assertEquals(orig[2], filelist.get(0)); - assertEquals(orig[3], filelist.get(1)); - assertEquals(orig[0], filelist.get(2)); - assertEquals(orig[1], filelist.get(3)); - assertEquals(orig[4], filelist.get(4)); - } - - public void testSortDataDirDescending() { - File[] files = new File[5]; - - files[0] = new File("foo.10027c6de"); - files[1] = new File("foo.10027c6df"); - files[2] = new File("bar.10027c6dd"); - files[3] = new File("foo.10027c6dc"); - files[4] = new File("foo.20027c6dc"); - - File[] orig = files.clone(); - - List filelist = ZooKeeperServer.sortDataDir(files, "foo", false); - - assertEquals(orig[4], filelist.get(0)); - assertEquals(orig[1], filelist.get(1)); - assertEquals(orig[0], filelist.get(2)); - assertEquals(orig[3], filelist.get(3)); - assertEquals(orig[2], filelist.get(4)); - } - - public void testGetLogFiles() { - File[] files = new File[5]; - - files[0] = new File("log.10027c6de"); - files[1] = new File("log.10027c6df"); - files[2] = new File("snapshot.10027c6dd"); - files[3] = new File("log.10027c6dc"); - files[4] = new File("log.20027c6dc"); - - File[] orig = files.clone(); - - File[] filelist = - ZooKeeperServer.getLogFiles(files, - Long.parseLong("10027c6de", 16)); - - assertEquals(3, filelist.length); - assertEquals(orig[0], filelist[0]); - assertEquals(orig[1], filelist[1]); - assertEquals(orig[4], filelist[2]); - } - -} +package org.apache.zookeeper.server; + +import java.io.File; +import java.util.List; + +import junit.framework.TestCase; + +public class ZooKeeperServerTest extends TestCase { + public void testSortDataDirAscending() { + File[] files = new File[5]; + + files[0] = new File("foo.10027c6de"); + files[1] = new File("foo.10027c6df"); + files[2] = new File("bar.10027c6dd"); + files[3] = new File("foo.10027c6dc"); + files[4] = new File("foo.20027c6dc"); + + File[] orig = files.clone(); + + List filelist = ZooKeeperServer.sortDataDir(files, "foo", true); + + assertEquals(orig[2], filelist.get(0)); + assertEquals(orig[3], filelist.get(1)); + assertEquals(orig[0], filelist.get(2)); + assertEquals(orig[1], filelist.get(3)); + assertEquals(orig[4], filelist.get(4)); + } + + public void testSortDataDirDescending() { + File[] files = new File[5]; + + files[0] = new File("foo.10027c6de"); + files[1] = new File("foo.10027c6df"); + files[2] = new File("bar.10027c6dd"); + files[3] = new File("foo.10027c6dc"); + files[4] = new File("foo.20027c6dc"); + + File[] orig = files.clone(); + + List filelist = ZooKeeperServer.sortDataDir(files, "foo", false); + + assertEquals(orig[4], filelist.get(0)); + assertEquals(orig[1], filelist.get(1)); + assertEquals(orig[0], filelist.get(2)); + assertEquals(orig[3], filelist.get(3)); + assertEquals(orig[2], filelist.get(4)); + } + + public void testGetLogFiles() { + File[] files = new File[5]; + + files[0] = new File("log.10027c6de"); + files[1] = new File("log.10027c6df"); + files[2] = new File("snapshot.10027c6dd"); + files[3] = new File("log.10027c6dc"); + files[4] = new File("log.20027c6dc"); + + File[] orig = files.clone(); + + File[] filelist = + ZooKeeperServer.getLogFiles(files, + Long.parseLong("10027c6de", 16)); + + assertEquals(3, filelist.length); + assertEquals(orig[0], filelist[0]); + assertEquals(orig[1], filelist[1]); + assertEquals(orig[4], filelist[2]); + } + +}