hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r881882 [2/2] - in /hadoop/zookeeper/trunk: ./ docs/ src/docs/src/documentation/content/xdocs/ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/ sr...
Date Wed, 18 Nov 2009 19:06:41 GMT
Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java?rev=881882&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java Wed Nov 18 19:06:39 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.FinalRequestProcessor;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+/**
+ * A ZooKeeperServer for the Observer node type. Not much is different, but
+ * we anticipate specializing the request processors in the future. 
+ *
+ */
+public class ObserverZooKeeperServer extends LearnerZooKeeperServer {
+    private static final Logger LOG = Logger.getLogger(ObserverZooKeeperServer.class);        
+    
+    /*
+     * Request processors
+     */
+    private CommitProcessor commitProcessor;
+    private SyncRequestProcessor syncProcessor;
+    
+    /*
+     * Pending sync requests
+     */
+    ConcurrentLinkedQueue<Request> pendingSyncs = 
+        new ConcurrentLinkedQueue<Request>();
+        
+    ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self,
+            DataTreeBuilder treeBuilder) throws IOException {
+        super(logFactory, self.tickTime, treeBuilder);
+        this.self = self;        
+    }
+    
+    public Observer getObserver() {
+        return self.observer;
+    }
+    
+    @Override
+    public Learner getLearner() {
+        return self.observer;
+    }       
+    
+    /**
+     * Unlike a Follower, which sees a full request only during the PROPOSAL
+     * phase, Observers get all the data required with the INFORM packet. 
+     * This method commits a request that has been unpacked by from an INFORM
+     * received from the Leader. 
+     *      
+     * @param request
+     */
+    public void commitRequest(Request request) {     
+        commitProcessor.commit(request);        
+    }
+    
+    /**
+     * Set up the request processors for an Observer:
+     * firstProcesor->commitProcessor->finalProcessor
+     */
+    @Override
+    protected void setupRequestProcessors() {      
+        // We might consider changing the processor behaviour of 
+        // Observers to, for example, remove the disk sync requirements.
+        // Currently, they behave almost exactly the same as followers.
+        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
+        commitProcessor = new CommitProcessor(finalProcessor,
+                Long.toString(getServerId()), true);
+        commitProcessor.start();
+        firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
+        ((ObserverRequestProcessor) firstProcessor).start();
+        syncProcessor = new SyncRequestProcessor(this,
+                new SendAckRequestProcessor(getObserver()));
+        syncProcessor.start();
+    }
+    
+    /*
+     * Process a sync request
+     */
+    synchronized public void sync(){
+        if(pendingSyncs.size() ==0){
+            LOG.warn("Not expecting a sync.");
+            return;
+        }
+                
+        Request r = pendingSyncs.remove();
+        commitProcessor.commit(r);
+    }
+    
+    @Override
+    public String getState() {
+        return "observer";
+    };    
+}

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=881882&r1=881881&r2=881882&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Wed Nov 18 19:06:39 2009
@@ -126,7 +126,7 @@
         SocketChannel channel;
         LOG.debug("Opening channel to server "  + sid);
         channel = SocketChannel
-                .open(self.quorumPeers.get(sid).electionAddr);
+                .open(self.getVotingView().get(sid).electionAddr);
         channel.socket().setTcpNoDelay(true);
         initiateConnection(channel, sid);
     }
@@ -327,7 +327,8 @@
             try {
                 SocketChannel channel;
                 LOG.debug("Opening channel to server "  + sid);
-                channel = SocketChannel.open(electionAddr);
+                channel = SocketChannel
+                        .open(self.getView().get(sid).electionAddr);
                 channel.socket().setTcpNoDelay(true);
                 initiateConnection(channel, sid);
             } catch (UnresolvedAddressException e) {
@@ -510,7 +511,7 @@
                 LOG.warn("Exception while closing socket");
             }
             //channel = null;
-            
+
             this.interrupt();
             if (recvWorker != null)
                 recvWorker.finish();

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=881882&r1=881881&r2=881882&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Wed Nov 18 19:06:39 2009
@@ -25,6 +25,8 @@
 import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -93,22 +95,54 @@
             this.electionAddr = null;
         }
         
+        public QuorumServer(long id, InetSocketAddress addr,
+                    InetSocketAddress electionAddr, LearnerType type) {
+            this.id = id;
+            this.addr = addr;
+            this.electionAddr = electionAddr;
+            this.type = type;
+        }
+        
         public InetSocketAddress addr;
 
         public InetSocketAddress electionAddr;
         
         public long id;
+        
+        public LearnerType type = LearnerType.PARTICIPANT;
     }
 
     public enum ServerState {
-        LOOKING, FOLLOWING, LEADING;
+        LOOKING, FOLLOWING, LEADING, OBSERVING;
+    }
+    
+    /**
+     * A peer can either be participating, which implies that it is willing to
+     * both vote in instances of consensus and to elect or become a Leader, or
+     * it may be observing in which case it isn't.
+     * 
+     * We need this distinction to decide which ServerState to move to when 
+     * conditions change (e.g. which state to become after LOOKING). 
+     */
+    public enum LearnerType {
+        PARTICIPANT, OBSERVER;
+    }
+    
+    private LearnerType peerType = LearnerType.PARTICIPANT;
+    
+    public LearnerType getPeerType() {
+        return peerType;
+    }
+    
+    public void setPeerType(LearnerType p) {
+        peerType = p;
     }
     /**
      * The servers that make up the cluster
      */
-    Map<Long, QuorumServer> quorumPeers;
+    protected Map<Long, QuorumServer> quorumPeers;
     public int getQuorumSize(){
-        return quorumPeers.size();
+        return getVotingView().size();
     }
     
     /**
@@ -226,6 +260,11 @@
                                 // This can happen in state transitions,
                                 // just ignore the request
                             }
+                            break;
+                        case OBSERVING:
+                            // Do nothing, Observers keep themselves to
+                            // themselves. 
+                            break;
                         }
                         packet.setData(b);
                         udpSocket.send(packet);
@@ -233,7 +272,7 @@
                     packet.setLength(b.length);
                 }
             } catch (Exception e) {
-                LOG.warn("Unexpected exception",e);
+                LOG.warn("Unexpected exception in ResponderThread",e);
             } finally {
                 LOG.warn("QuorumPeer responder thread exited");
             }
@@ -282,7 +321,8 @@
             long myid, int tickTime, int initLimit, int syncLimit,
             NIOServerCnxn.Factory cnxnFactory) throws IOException {
         this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime, 
-        		initLimit, syncLimit, cnxnFactory, new QuorumMaj(quorumPeers.size()));
+        		initLimit, syncLimit, cnxnFactory, 
+        		new QuorumMaj(countParticipants(quorumPeers)));
     }
     
     public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
@@ -300,7 +340,7 @@
         this.syncLimit = syncLimit;        
         this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
         if(quorumConfig == null)
-            this.quorumConfig = new QuorumMaj(quorumPeers.size());
+            this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers));
         else this.quorumConfig = quorumConfig;
     }
     
@@ -310,8 +350,10 @@
     
     @Override
     public synchronized void start() {
-        cnxnFactory.start();
-        startLeaderElection();
+        cnxnFactory.start();        
+        if (getPeerType() == LearnerType.PARTICIPANT) {
+            startLeaderElection();
+        }
         super.start();
     }
 
@@ -323,7 +365,7 @@
     }
     synchronized public void startLeaderElection() {
         currentVote = new Vote(myid, getLastLoggedZxid());
-        for (QuorumServer p : quorumPeers.values()) {
+        for (QuorumServer p : getView().values()) {
             if (p.id == myid) {
                 myQuorumAddr = p.addr;
                 break;
@@ -344,6 +386,20 @@
         this.electionAlg = createElectionAlgorithm(electionType);
     }
     
+    /**
+     * Count the number of nodes in the map that could be followers.
+     * @param peers
+     * @return The number of followers in the map
+     */
+    protected static int countParticipants(Map<Long,QuorumServer> peers) {
+      int count = 0;
+      for (QuorumServer q : peers.values()) {
+          if (q.type == LearnerType.PARTICIPANT) {
+              count++;
+          }
+      }
+      return count;
+    }
     
     /**
      * This constructor is only used by the existing unit test code.
@@ -357,7 +413,7 @@
         this(quorumPeers, snapDir, logDir, electionAlg,
                 myid,tickTime, initLimit,syncLimit,
                 new NIOServerCnxn.Factory(clientPort),
-                new QuorumMaj(quorumPeers.size()));
+                new QuorumMaj(countParticipants(quorumPeers)));
     }
     
     /**
@@ -380,6 +436,7 @@
     }
     public Follower follower;
     public Leader leader;
+    public Observer observer;
 
     protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
         return new Follower(this, new FollowerZooKeeperServer(logFactory, 
@@ -390,9 +447,15 @@
         return new Leader(this, new LeaderZooKeeperServer(logFactory,
                 this,new ZooKeeperServer.BasicDataTreeBuilder()));
     }
+    
+    protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
+        return new Observer(this, new ObserverZooKeeperServer(logFactory,
+                this, new ZooKeeperServer.BasicDataTreeBuilder()));
+    }
 
     private Election createElectionAlgorithm(int electionAlgorithm){
         Election le=null;
+                
         //TODO: use a factory rather than a switch
         switch (electionAlgorithm) {
         case 0:
@@ -423,6 +486,8 @@
     protected Election makeLEStrategy(){
         LOG.debug("Initializing leader election protocol...");
 
+        // LeaderElection is the only implementation that correctly
+        // transitions between LOOKING and OBSERVER
         if(electionAlg==null)
             return new LeaderElection(this);
         return electionAlg;
@@ -435,12 +500,18 @@
     synchronized protected void setFollower(Follower newFollower){
         follower=newFollower;
     }
+    
+    synchronized protected void setObserver(Observer newObserver){
+        observer=newObserver;
+    }
 
     synchronized public ZooKeeperServer getActiveServer(){
         if(leader!=null)
             return leader.zk;
         else if(follower!=null)
             return follower.zk;
+        else if (observer != null)
+            return observer.zk;
         return null;
     }
 
@@ -491,6 +562,19 @@
                         setPeerState(ServerState.LOOKING);
                     }
                     break;
+                case OBSERVING:
+                    try {
+                        LOG.info("OBSERVING");
+                        setObserver(makeObserver(logFactory));
+                        observer.observeLeader();
+                    } catch (Exception e) {
+                        LOG.warn("Unexpected exception",e );                        
+                    } finally {
+                        observer.shutdown();
+                        setObserver(null);
+                        setPeerState(ServerState.LOOKING);
+                    }
+                    break;
                 case FOLLOWING:
                     try {
                         LOG.info("FOLLOWING");
@@ -549,11 +633,42 @@
     }
 
     /**
-     * A 'view' is a node's current opinion of the membership of the
-     * ensemble. 
+     * A 'view' is a node's current opinion of the membership of the entire
+     * ensemble.    
      */
     public Map<Long,QuorumPeer.QuorumServer> getView() {
-        return this.quorumPeers;
+        return Collections.unmodifiableMap(this.quorumPeers);
+    }
+    
+    /**
+     * Observers are not contained in this view, only nodes with 
+     * PeerType=PARTICIPANT.     
+     */
+    public Map<Long,QuorumPeer.QuorumServer> getVotingView() {
+        Map<Long,QuorumPeer.QuorumServer> ret = 
+            new HashMap<Long, QuorumPeer.QuorumServer>();
+        Map<Long,QuorumPeer.QuorumServer> view = getView();
+        for (QuorumServer server : view.values()) {            
+            if (server.type == LearnerType.PARTICIPANT) {
+                ret.put(server.id, server);
+            }
+        }        
+        return ret;
+    }
+    
+    /**
+     * Returns only observers, no followers.
+     */
+    public Map<Long,QuorumPeer.QuorumServer> getObservingView() {
+        Map<Long,QuorumPeer.QuorumServer> ret = 
+            new HashMap<Long, QuorumPeer.QuorumServer>();
+        Map<Long,QuorumPeer.QuorumServer> view = getView();
+        for (QuorumServer server : view.values()) {            
+            if (server.type == LearnerType.OBSERVER) {
+                ret.put(server.id, server);
+            }
+        }        
+        return ret;
     }
     
     /**
@@ -565,6 +680,9 @@
         return this.quorumPeers.containsKey(sid);
     }
     
+    /**
+     * Only used by QuorumStats at the moment
+     */
     public String[] getQuorumPeers() {
         List<String> l = new ArrayList<String>();
         synchronized (this) {
@@ -594,6 +712,8 @@
             return QuorumStats.Provider.LEADING_STATE;
         case FOLLOWING:
             return QuorumStats.Provider.FOLLOWING_STATE;
+        case OBSERVING:
+            return QuorumStats.Provider.OBSERVING_STATE;
         }
         return QuorumStats.Provider.UNKNOWN_STATE;
     }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=881882&r1=881881&r2=881882&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java Wed Nov 18 19:06:39 2009
@@ -33,6 +33,7 @@
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
@@ -52,12 +53,16 @@
     protected int maxClientCnxns = 10;
     protected final HashMap<Long,QuorumServer> servers =
         new HashMap<Long, QuorumServer>();
+    protected final HashMap<Long,QuorumServer> observers =
+        new HashMap<Long, QuorumServer>();
 
     protected long serverId;
     protected HashMap<Long, Long> serverWeight = new HashMap<Long, Long>();
     protected HashMap<Long, Long> serverGroup = new HashMap<Long, Long>();
     protected int numGroups = 0;
     protected QuorumVerifier quorumVerifier;
+    
+    protected LearnerType peerType = LearnerType.PARTICIPANT;
 
     @SuppressWarnings("serial")
     public static class ConfigException extends Exception {
@@ -128,13 +133,23 @@
                 electionAlg = Integer.parseInt(value);
             } else if (key.equals("maxClientCnxns")) {
                 maxClientCnxns = Integer.parseInt(value);
+            } else if (key.equals("peerType")) {
+                if (value.toLowerCase().equals("observer")) {
+                    peerType = LearnerType.OBSERVER;
+                } else if (value.toLowerCase().equals("participant")) {
+                    peerType = LearnerType.PARTICIPANT;
+                } else
+                {
+                    throw new ConfigException("Unrecognised peertype: " + value);                      
+                }
             } else if (key.startsWith("server.")) {
                 int dot = key.indexOf('.');
                 long sid = Long.parseLong(key.substring(dot + 1));
                 String parts[] = value.split(":");
-                if ((parts.length != 2) && (parts.length != 3)) {
+                if ((parts.length != 2) && (parts.length != 3) && (parts.length !=4)) {
                     LOG.error(value
-                       + " does not have the form host:port or host:port:port");
+                       + " does not have the form host:port or host:port:port " +
+                       " or host:port:port:type");
                 }
                 InetSocketAddress addr = new InetSocketAddress(parts[0],
                         Integer.parseInt(parts[1]));
@@ -145,6 +160,21 @@
                             parts[0], Integer.parseInt(parts[2]));
                     servers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
                             electionAddr));
+                } else if (parts.length == 4) {
+                    InetSocketAddress electionAddr = new InetSocketAddress(
+                            parts[0], Integer.parseInt(parts[2]));
+                    LearnerType type = LearnerType.PARTICIPANT;
+                    if (parts[3].toLowerCase().equals("observer")) {
+                        type = LearnerType.OBSERVER;
+                        observers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
+                                electionAddr,type));
+                    } else if (parts[3].toLowerCase().equals("participant")) {
+                        type = LearnerType.PARTICIPANT;
+                        servers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
+                                electionAddr,type));
+                    } else {
+                        throw new ConfigException("Unrecognised peertype: " + value);
+                    }                    
                 }
             } else if (key.startsWith("group")) {
                 int dot = key.indexOf('.');
@@ -169,6 +199,10 @@
                 System.setProperty("zookeeper." + key, value);
             }
         }
+        if (observers.size() > 0 && electionAlg != 0) {
+            throw new IllegalArgumentException("Observers must currently be used with simple leader election" +
+            		" (set electionAlg=0)");
+        }
         if (dataDir == null) {
             throw new IllegalArgumentException("dataDir is not set");
         }
@@ -233,6 +267,10 @@
                 quorumVerifier = new QuorumMaj(servers.size());
             }
             
+            // Now add observers to servers, once the quorums have been 
+            // figured out
+            servers.putAll(observers);
+            
             File myIdFile = new File(dataDir, "myid");
             if (!myIdFile.exists()) {
                 throw new IllegalArgumentException(myIdFile.toString()
@@ -276,4 +314,8 @@
     public long getServerId() { return serverId; }
 
     public boolean isDistributed() { return servers.size() > 1; }
+
+    public LearnerType getPeerType() {
+        return peerType;
+    }
 }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=881882&r1=881881&r2=881882&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Wed Nov 18 19:06:39 2009
@@ -133,6 +133,7 @@
           quorumPeer.setSyncLimit(config.getSyncLimit());
           quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
           quorumPeer.setCnxnFactory(cnxnFactory);
+          quorumPeer.setPeerType(config.getPeerType());
   
           quorumPeer.start();
           quorumPeer.join();

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumStats.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumStats.java?rev=881882&r1=881881&r2=881882&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumStats.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumStats.java Wed Nov 18 19:06:39 2009
@@ -26,7 +26,7 @@
         static public final String LOOKING_STATE = "leaderelection";
         static public final String LEADING_STATE = "leading";
         static public final String FOLLOWING_STATE = "following";
-        
+        static public final String OBSERVING_STATE = "observing";
         public String[] getQuorumPeers();
         public String getServerState();
     }
@@ -53,7 +53,8 @@
                 sb.append(" ").append(f);
             }
             sb.append("\n");            
-        }else if(state.equals(Provider.FOLLOWING_STATE)){
+        }else if(state.equals(Provider.FOLLOWING_STATE) 
+                || state.equals(Provider.OBSERVING_STATE)){
             sb.append("Leader: ");
             String[] ldr=getQuorumPeers();
             if(ldr.length>0)

Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ObserverTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ObserverTest.java?rev=881882&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ObserverTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ObserverTest.java Wed Nov 18 19:06:39 2009
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper.States;
+
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.Test;
+
+/**
+ * Test Observer behaviour and specific code paths.
+ *
+ */
+public class ObserverTest extends QuorumPeerTestBase implements Watcher{
+    protected static final Logger LOG =
+        Logger.getLogger(ObserverTest.class);    
+      
+    // We expect two notifications before we want to continue
+    CountDownLatch latch = new CountDownLatch(2);
+    ZooKeeper zk;
+    WatchedEvent lastEvent = null;
+          
+    /**
+     * This test ensures two things:
+     * 1. That Observers can successfully proxy requests to the ensemble.
+     * 2. That Observers don't participate in leader elections.
+     * The second is tested by constructing an ensemble where a leader would
+     * be elected if and only if an Observer voted. 
+     * @throws Exception
+     */
+    @Test
+    public void testObserver() throws Exception {
+        ClientBase.setupTestEnv();
+        final int CLIENT_PORT_QP1 = 3181;
+        final int CLIENT_PORT_QP2 = CLIENT_PORT_QP1 + 3;
+        final int CLIENT_PORT_OBS = CLIENT_PORT_QP2 + 3;
+
+        String quorumCfgSection =
+            "electionAlg=0\n" + 
+            "server.1=localhost:" + (CLIENT_PORT_QP1 + 1)
+            + ":" + (CLIENT_PORT_QP1 + 2)
+            + "\nserver.2=localhost:" + (CLIENT_PORT_QP2 + 1)
+            + ":" + (CLIENT_PORT_QP2 + 2)
+            + "\nserver.3=localhost:" 
+            + (CLIENT_PORT_OBS+1)+ ":" + (CLIENT_PORT_OBS + 2) + ":observer";
+        String obsCfgSection =  quorumCfgSection + "\npeerType=observer";
+        MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
+        MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
+        MainThread q3 = new MainThread(3, CLIENT_PORT_OBS, obsCfgSection);
+        q1.start();
+        q2.start();
+        q3.start();
+        assertTrue("waiting for server 1 being up",
+                ClientBase.waitForServerUp("localhost:" + CLIENT_PORT_QP1,
+                        CONNECTION_TIMEOUT));
+        assertTrue("waiting for server 2 being up",
+                ClientBase.waitForServerUp("localhost:" + CLIENT_PORT_QP2,
+                        CONNECTION_TIMEOUT));
+        assertTrue("waiting for server 3 being up",
+                ClientBase.waitForServerUp("localhost:" + CLIENT_PORT_OBS,
+                        CONNECTION_TIMEOUT));        
+        
+        zk = new ZooKeeper("localhost:" + CLIENT_PORT_OBS,
+                ClientBase.CONNECTION_TIMEOUT, this);
+        zk.create("/obstest", "test".getBytes(),Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        
+        // Assert that commands are getting forwarded correctly
+        assertEquals(new String(zk.getData("/obstest", null, null)), "test");
+        
+        // Now check that other commands don't blow everything up
+        zk.sync("/", null, null);
+        zk.setData("/obstest", "test2".getBytes(), -1);
+        zk.getChildren("/", false);
+        
+        assertEquals(zk.getState(), States.CONNECTED);
+        
+        // Now kill one of the other real servers        
+        q2.shutdown();
+                
+        assertTrue("Waiting for server 2 to shut down",
+                    ClientBase.waitForServerDown("localhost:"+CLIENT_PORT_QP2, 
+                                    ClientBase.CONNECTION_TIMEOUT));
+        
+        // Now the resulting ensemble shouldn't be quorate         
+        latch.await();        
+        assertNotSame("zk should not be connected", KeeperState.SyncConnected,lastEvent.getState());
+
+        try {
+            assertFalse("Shouldn't get a response when cluster not quorate!",
+                    new String(zk.getData("/obstest", null, null)).equals("test"));
+        }
+        catch (ConnectionLossException c) {
+            LOG.info("Connection loss exception caught - ensemble not quorate (this is expected)");
+        }
+        
+        latch = new CountDownLatch(1);
+        
+        // Bring it back
+        q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
+        q2.start();
+        assertTrue("waiting for server 2 being up",
+                ClientBase.waitForServerUp("localhost:" + CLIENT_PORT_QP2,
+                        CONNECTION_TIMEOUT));
+        
+        latch.await();
+        // It's possible our session expired - but this is ok, shows we 
+        // were able to talk to the ensemble
+        assertTrue("Didn't reconnect", 
+                (KeeperState.SyncConnected==lastEvent.getState() ||
+                KeeperState.Expired==lastEvent.getState())); 
+                       
+        q1.shutdown();
+        q2.shutdown();
+        q3.shutdown();
+        
+        zk.close();        
+        assertTrue("Waiting for server 1 to shut down",
+                ClientBase.waitForServerDown("localhost:"+CLIENT_PORT_QP1, 
+                                ClientBase.CONNECTION_TIMEOUT));
+        assertTrue("Waiting for server 2 to shut down",
+                ClientBase.waitForServerDown("localhost:"+CLIENT_PORT_QP2, 
+                                ClientBase.CONNECTION_TIMEOUT));
+        assertTrue("Waiting for server 3 to shut down",
+                ClientBase.waitForServerDown("localhost:"+CLIENT_PORT_OBS, 
+                                ClientBase.CONNECTION_TIMEOUT));
+    
+    }
+    
+    public void process(WatchedEvent event) {
+        latch.countDown();
+        lastEvent = event;
+    }    
+    
+    /**
+     * This test ensures that an Observer does not elect itself as a leader, or
+     * indeed come up properly, if it is the lone member of an ensemble.
+     * @throws IOException
+     */
+    @Test
+    public void testSingleObserver() throws IOException{
+        ClientBase.setupTestEnv();
+        final int CLIENT_PORT_QP1 = 3181;        
+
+        String quorumCfgSection =
+            "server.1=localhost:" + (CLIENT_PORT_QP1 + 1)
+            + ":" + (CLIENT_PORT_QP1 + 2) + "\npeerType=observer";
+                    
+        MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
+        q1.start();
+        assertFalse("Observer shouldn't come up",
+                ClientBase.waitForServerUp("localhost:" + CLIENT_PORT_QP1,
+                                            CONNECTION_TIMEOUT));
+        
+        q1.shutdown();
+    }    
+    
+    @Test
+    public void testLeaderElectionFail() throws Exception {        
+        ClientBase.setupTestEnv();
+        final int CLIENT_PORT_QP1 = 3181;
+        final int CLIENT_PORT_QP2 = CLIENT_PORT_QP1 + 3;
+        final int CLIENT_PORT_OBS = CLIENT_PORT_QP2 + 3;
+
+        String quorumCfgSection =
+            "electionAlg=1\n" + 
+            "server.1=localhost:" + (CLIENT_PORT_QP1 + 1)
+            + ":" + (CLIENT_PORT_QP1 + 2)
+            + "\nserver.2=localhost:" + (CLIENT_PORT_QP2 + 1)
+            + ":" + (CLIENT_PORT_QP2 + 2)
+            + "\nserver.3=localhost:" 
+            + (CLIENT_PORT_OBS+1)+ ":" + (CLIENT_PORT_OBS + 2) + ":observer";
+        QuorumPeerConfig qpc = new QuorumPeerConfig();
+        
+        File tmpDir = ClientBase.createTmpDir();
+        File confFile = new File(tmpDir, "zoo.cfg");
+
+        FileWriter fwriter = new FileWriter(confFile);
+        fwriter.write("tickTime=2000\n");
+        fwriter.write("initLimit=10\n");
+        fwriter.write("syncLimit=5\n");
+
+        File dataDir = new File(tmpDir, "data");
+        if (!dataDir.mkdir()) {
+            throw new IOException("Unable to mkdir " + dataDir);
+        }
+        fwriter.write("dataDir=" + dataDir.toString() + "\n");
+
+        fwriter.write("clientPort=" + CLIENT_PORT_QP1 + "\n");
+        fwriter.write(quorumCfgSection + "\n");
+        fwriter.flush();
+        fwriter.close();
+        try {
+            qpc.parse(confFile.toString());
+        } catch (ConfigException e) {
+            LOG.info("Config exception caught as expected: " + e.getCause());
+            return;
+        }
+        
+        assertTrue("Didn't get the expected config exception", false);        
+    } 
+}

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java?rev=881882&r1=881881&r2=881882&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java Wed Nov 18 19:06:39 2009
@@ -55,63 +55,8 @@
  * Test stand-alone server.
  *
  */
-public class QuorumPeerMainTest extends TestCase implements Watcher {
-    protected static final Logger LOG =
-        Logger.getLogger(QuorumPeerMainTest.class);
-
-    public static class MainThread extends Thread {
-        final File confFile;
-        final TestQPMain main;
-
-        public MainThread(int myid, int clientPort, String quorumCfgSection)
-            throws IOException
-        {
-            super("QuorumPeer with myid:" + myid
-                    + " and clientPort:" + clientPort);
-            File tmpDir = ClientBase.createTmpDir();
-            confFile = new File(tmpDir, "zoo.cfg");
-
-            FileWriter fwriter = new FileWriter(confFile);
-            fwriter.write("tickTime=2000\n");
-            fwriter.write("initLimit=10\n");
-            fwriter.write("syncLimit=5\n");
-
-            File dataDir = new File(tmpDir, "data");
-            if (!dataDir.mkdir()) {
-                throw new IOException("Unable to mkdir " + dataDir);
-            }
-            fwriter.write("dataDir=" + dataDir.toString() + "\n");
-
-            fwriter.write("clientPort=" + clientPort + "\n");
-            fwriter.write(quorumCfgSection + "\n");
-            fwriter.flush();
-            fwriter.close();
-
-            File myidFile = new File(dataDir, "myid");
-            fwriter = new FileWriter(myidFile);
-            fwriter.write(Integer.toString(myid));
-            fwriter.flush();
-            fwriter.close();
-
-            main = new TestQPMain();
-        }
-
-        public void run() {
-            String args[] = new String[1];
-            args[0] = confFile.toString();
-            try {
-                main.initializeAndRun(args);
-            } catch (Exception e) {
-                // test will still fail even though we just log/ignore
-                LOG.error("unexpected exception in run", e);
-            }
-        }
-
-        public void shutdown() {
-            main.shutdown();
-        }
-    }
-
+public class QuorumPeerMainTest extends QuorumPeerTestBase {
+   
     public static  class TestQPMain extends QuorumPeerMain {
         public void shutdown() {
             super.shutdown();
@@ -358,8 +303,5 @@
         }
         assertTrue("fastleaderelection used", found);
     }
-
-    public void process(WatchedEvent event) {
-        // ignore for this test
-    }
+   
 }

Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=881882&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Wed Nov 18 19:06:39 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.zookeeper.server.quorum;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.quorum.QuorumPeerMainTest.TestQPMain;
+import org.apache.zookeeper.test.ClientBase;
+
+import junit.framework.TestCase;
+
+/**
+ * Has some common functionality for tests that work with QuorumPeers.
+ * Override process(WatchedEvent) to implement the Watcher interface
+ */
+public class QuorumPeerTestBase extends TestCase implements Watcher {
+    protected static final Logger LOG =
+        Logger.getLogger(QuorumPeerTestBase.class);
+    
+    public void process(WatchedEvent event) {
+        // ignore for this test
+    }
+    
+    public static class MainThread extends Thread {
+        final File confFile;
+        final TestQPMain main;
+
+        public MainThread(int myid, int clientPort, String quorumCfgSection)
+            throws IOException
+        {
+            super("QuorumPeer with myid:" + myid
+                    + " and clientPort:" + clientPort);
+            File tmpDir = ClientBase.createTmpDir();
+            confFile = new File(tmpDir, "zoo.cfg");
+
+            FileWriter fwriter = new FileWriter(confFile);
+            fwriter.write("tickTime=2000\n");
+            fwriter.write("initLimit=10\n");
+            fwriter.write("syncLimit=5\n");
+
+            File dataDir = new File(tmpDir, "data");
+            if (!dataDir.mkdir()) {
+                throw new IOException("Unable to mkdir " + dataDir);
+            }
+            fwriter.write("dataDir=" + dataDir.toString() + "\n");
+
+            fwriter.write("clientPort=" + clientPort + "\n");
+            fwriter.write(quorumCfgSection + "\n");
+            fwriter.flush();
+            fwriter.close();
+
+            File myidFile = new File(dataDir, "myid");
+            fwriter = new FileWriter(myidFile);
+            fwriter.write(Integer.toString(myid));
+            fwriter.flush();
+            fwriter.close();
+
+            main = new TestQPMain();
+        }
+
+        public void run() {
+            String args[] = new String[1];
+            args[0] = confFile.toString();
+            try {
+                main.initializeAndRun(args);
+            } catch (Exception e) {
+                // test will still fail even though we just log/ignore
+                LOG.error("unexpected exception in run", e);
+            }
+        }
+
+        public void shutdown() {
+            main.shutdown();
+        }
+    }
+}

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java?rev=881882&r1=881881&r2=881882&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java Wed Nov 18 19:06:39 2009
@@ -202,6 +202,26 @@
         LOG.info("Verifying hammers 2");
         qb.verifyRootOfAllServersMatch(qb.hostPort);
     }
+    
+    @Test
+    public void testObserversHammer() throws Exception {
+        qb.tearDown();
+        qb.setUp(true);
+        bang = true;
+        Thread[] hammers = new Thread[100];
+        for (int i = 0; i < hammers.length; i++) {
+            hammers[i] = new HammerThread("HammerThread-" + i);
+            hammers[i].start();
+        }
+        Thread.sleep(5000); // allow the clients to run for max 5sec
+        bang = false;
+        for (int i = 0; i < hammers.length; i++) {
+            hammers[i].interrupt();
+            verifyThreadTerminated(hammers[i], 60000);
+        }
+        // before restart
+        qb.verifyRootOfAllServersMatch(qb.hostPort);          
+    }
 
     @SuppressWarnings("unchecked")
     public void processResult(int rc, String path, Object ctx, String name) {

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/HierarchicalQuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/HierarchicalQuorumTest.java?rev=881882&r1=881881&r2=881882&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/HierarchicalQuorumTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/HierarchicalQuorumTest.java Wed Nov 18 19:06:39 2009
@@ -50,20 +50,20 @@
 
     File s1dir, s2dir, s3dir, s4dir, s5dir;
     QuorumPeer s1, s2, s3, s4, s5;
-    private int port1;
-    private int port2;
-    private int port3;
-    private int port4;
-    private int port5;
-
-    private int leport1;
-    private int leport2;
-    private int leport3;
-    private int leport4;
-    private int leport5;
+    protected int port1;
+    protected int port2;
+    protected int port3;
+    protected int port4;
+    protected int port5;
+
+    protected int leport1;
+    protected int leport2;
+    protected int leport3;
+    protected int leport4;
+    protected int leport5;
 
     Properties qp;
-    private final ClientHammerTest cht = new ClientHammerTest();
+    protected final ClientHammerTest cht = new ClientHammerTest();
     
     @Override
     protected void setUp() throws Exception {
@@ -118,15 +118,29 @@
         LOG.info("Setup finished");
     }
     
-    
+    /**
+     * This method is here to keep backwards compatibility with the test code 
+     * written before observers. 
+     * @throws Exception
+     */
     void startServers() throws Exception {
+        startServers(false);
+    }
+    
+    /**
+     * Starts 5 Learners. When withObservers == false, all 5 are Followers.
+     * When withObservers == true, 3 are Followers and 2 Observers.
+     * @param withObservers
+     * @throws Exception
+     */
+    void startServers(boolean withObservers) throws Exception {
         int tickTime = 2000;
         int initLimit = 3;
         int syncLimit = 3;
         HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
         peers.put(Long.valueOf(1), new QuorumServer(1, 
                 new InetSocketAddress("127.0.0.1", port1 + 1000),
-                new InetSocketAddress("127.0.0.1", leport1 + 1000)));
+                new InetSocketAddress("127.0.0.1", leport1 + 1000)));        
         peers.put(Long.valueOf(2), new QuorumServer(2, 
                 new InetSocketAddress("127.0.0.1", port2 + 1000),
                 new InetSocketAddress("127.0.0.1", leport2 + 1000)));
@@ -135,10 +149,14 @@
                 new InetSocketAddress("127.0.0.1", leport3 + 1000)));
         peers.put(Long.valueOf(4), new QuorumServer(4,
                 new InetSocketAddress("127.0.0.1", port4 + 1000),
-                new InetSocketAddress("127.0.0.1", leport4 + 1000)));
+                new InetSocketAddress("127.0.0.1", leport4 + 1000),
+                withObservers ? QuorumPeer.LearnerType.OBSERVER
+                        : QuorumPeer.LearnerType.PARTICIPANT));
         peers.put(Long.valueOf(5), new QuorumServer(5,
                 new InetSocketAddress("127.0.0.1", port5 + 1000),
-                new InetSocketAddress("127.0.0.1", leport5 + 1000)));
+                new InetSocketAddress("127.0.0.1", leport5 + 1000),
+                withObservers ? QuorumPeer.LearnerType.OBSERVER
+                        : QuorumPeer.LearnerType.PARTICIPANT));
 
         LOG.info("creating QuorumPeer 1 port " + port1);
         QuorumHierarchical hq1 = new QuorumHierarchical(qp); 
@@ -158,21 +176,37 @@
         LOG.info("creating QuorumPeer 4 port " + port4);
         QuorumHierarchical hq4 = new QuorumHierarchical(qp); 
         s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 3, 4, tickTime, initLimit, syncLimit, hq4);
+        if (withObservers) {
+            s4.setPeerType(QuorumPeer.LearnerType.OBSERVER);
+        }
         assertEquals(port4, s4.getClientPort());
-        
+                       
         LOG.info("creating QuorumPeer 5 port " + port5);
         QuorumHierarchical hq5 = new QuorumHierarchical(qp); 
         s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 3, 5, tickTime, initLimit, syncLimit, hq5);
+        if (withObservers) {
+            s5.setPeerType(QuorumPeer.LearnerType.OBSERVER);
+        }
         assertEquals(port5, s5.getClientPort());
+        
+        // Observers are currently only compatible with LeaderElection
+        if (withObservers) {
+            s1.setElectionType(0);
+            s2.setElectionType(0);
+            s3.setElectionType(0);
+            s4.setElectionType(0);
+            s5.setElectionType(0);
+        }
+        
         LOG.info("start QuorumPeer 1");
         s1.start();
         LOG.info("start QuorumPeer 2");
         s2.start();
         LOG.info("start QuorumPeer 3");
         s3.start();
-        LOG.info("start QuorumPeer 4");
+        LOG.info("start QuorumPeer 4" + (withObservers ? "(observer)" : ""));
         s4.start();
-        LOG.info("start QuorumPeer 5");
+        LOG.info("start QuorumPeer 5" + (withObservers ? "(observer)" : ""));
         s5.start();
         LOG.info("started QuorumPeer 5");
 

Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverHierarchicalQuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverHierarchicalQuorumTest.java?rev=881882&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverHierarchicalQuorumTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverHierarchicalQuorumTest.java Wed Nov 18 19:06:39 2009
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.junit.Test;
+
+/**
+ * Mimics QuorumHierarchical test, but on an ensemble that includes 2 
+ * observers.
+ */
+
+public class ObserverHierarchicalQuorumTest extends HierarchicalQuorumTest {
+    private static final Logger LOG = Logger.getLogger(QuorumBase.class);
+       
+    /**
+     * startServers(true) puts two observers into a 5 peer ensemble
+     */
+    void startServers() throws Exception {
+        startServers(true);
+    }
+           
+    protected void shutdown(QuorumPeer qp) {
+        try {
+            /* TODO: when Observers are compatible with fle, shutdown
+             * the leader election */            
+            LOG.info("Done with leader election");
+            qp.shutdown();
+            LOG.info("Done with quorum peer");
+            qp.join(30000);
+            if (qp.isAlive()) {
+                fail("QP failed to shutdown in 30 seconds");
+            }
+        } catch (InterruptedException e) {
+            LOG.debug("QP interrupted", e);
+        }
+    }
+
+    @Test
+    public void testHierarchicalQuorum() throws Throwable {
+        cht.runHammer(5, 10);
+    }
+}
\ No newline at end of file

Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverQuorumHammerTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverQuorumHammerTest.java?rev=881882&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverQuorumHammerTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverQuorumHammerTest.java Wed Nov 18 19:06:39 2009
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+import org.apache.log4j.Logger;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Mimics QuorumHammerTest, but with 2 observers in the 5 Learner ensemble.
+ */
+public class ObserverQuorumHammerTest extends QuorumHammerTest {
+    public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
+
+    
+    @Before
+    @Override
+    protected void setUp() throws Exception {
+        qb.setUp(true);
+        cht.hostPort = qb.hostPort;
+        cht.setUpAll();
+    }
+   
+    @Test
+    public void testHammerBasic() throws Throwable {
+        cht.testHammerBasic();
+    }
+}
\ No newline at end of file

Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java?rev=881882&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java Wed Nov 18 19:06:39 2009
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.junit.Test;
+
+/**
+ * Test Observer behaviour and specific code paths.
+ *
+ */
+public class ObserverTest extends QuorumPeerTestBase implements Watcher{
+    protected static final Logger LOG =
+        Logger.getLogger(ObserverTest.class);    
+      
+    // We expect two notifications before we want to continue
+    CountDownLatch latch = new CountDownLatch(2);
+    ZooKeeper zk;
+    WatchedEvent lastEvent = null;
+          
+    /**
+     * This test ensures two things:
+     * 1. That Observers can successfully proxy requests to the ensemble.
+     * 2. That Observers don't participate in leader elections.
+     * The second is tested by constructing an ensemble where a leader would
+     * be elected if and only if an Observer voted. 
+     * @throws Exception
+     */
+    @Test
+    public void testObserver() throws Exception {
+        ClientBase.setupTestEnv();
+        final int CLIENT_PORT_QP1 = 3181;
+        final int CLIENT_PORT_QP2 = CLIENT_PORT_QP1 + 3;
+        final int CLIENT_PORT_OBS = CLIENT_PORT_QP2 + 3;
+
+        String quorumCfgSection =
+            "electionAlg=0\n" + 
+            "server.1=localhost:" + (CLIENT_PORT_QP1 + 1)
+            + ":" + (CLIENT_PORT_QP1 + 2)
+            + "\nserver.2=localhost:" + (CLIENT_PORT_QP2 + 1)
+            + ":" + (CLIENT_PORT_QP2 + 2)
+            + "\nserver.3=localhost:" 
+            + (CLIENT_PORT_OBS+1)+ ":" + (CLIENT_PORT_OBS + 2) + ":observer";
+        String obsCfgSection =  quorumCfgSection + "\npeerType=observer";
+        MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
+        MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
+        MainThread q3 = new MainThread(3, CLIENT_PORT_OBS, obsCfgSection);
+        q1.start();
+        q2.start();
+        q3.start();
+        assertTrue("waiting for server 1 being up",
+                ClientBase.waitForServerUp("localhost:" + CLIENT_PORT_QP1,
+                        CONNECTION_TIMEOUT));
+        assertTrue("waiting for server 2 being up",
+                ClientBase.waitForServerUp("localhost:" + CLIENT_PORT_QP2,
+                        CONNECTION_TIMEOUT));
+        assertTrue("waiting for server 3 being up",
+                ClientBase.waitForServerUp("localhost:" + CLIENT_PORT_OBS,
+                        CONNECTION_TIMEOUT));        
+        
+        zk = new ZooKeeper("localhost:" + CLIENT_PORT_OBS,
+                ClientBase.CONNECTION_TIMEOUT, this);
+        zk.create("/obstest", "test".getBytes(),Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        
+        // Assert that commands are getting forwarded correctly
+        assertEquals(new String(zk.getData("/obstest", null, null)), "test");
+        
+        // Now check that other commands don't blow everything up
+        zk.sync("/", null, null);
+        zk.setData("/obstest", "test2".getBytes(), -1);
+        zk.getChildren("/", false);
+        
+        assertEquals(zk.getState(), States.CONNECTED);
+        
+        // Now kill one of the other real servers        
+        q2.shutdown();
+                
+        assertTrue("Waiting for server 2 to shut down",
+                    ClientBase.waitForServerDown("localhost:"+CLIENT_PORT_QP2, 
+                                    ClientBase.CONNECTION_TIMEOUT));
+        
+        // Now the resulting ensemble shouldn't be quorate         
+        latch.await();        
+        assertNotSame("zk should not be connected", KeeperState.SyncConnected,lastEvent.getState());
+
+        try {
+            assertFalse("Shouldn't get a response when cluster not quorate!",
+                    new String(zk.getData("/obstest", null, null)).equals("test"));
+        }
+        catch (ConnectionLossException c) {
+            LOG.info("Connection loss exception caught - ensemble not quorate (this is expected)");
+        }
+        
+        latch = new CountDownLatch(1);
+        
+        // Bring it back
+        q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
+        q2.start();
+        assertTrue("waiting for server 2 being up",
+                ClientBase.waitForServerUp("localhost:" + CLIENT_PORT_QP2,
+                        CONNECTION_TIMEOUT));
+        
+        latch.await();
+        // It's possible our session expired - but this is ok, shows we 
+        // were able to talk to the ensemble
+        assertTrue("Didn't reconnect", 
+                (KeeperState.SyncConnected==lastEvent.getState() ||
+                KeeperState.Expired==lastEvent.getState())); 
+                       
+        q1.shutdown();
+        q2.shutdown();
+        q3.shutdown();
+        
+        zk.close();        
+        assertTrue("Waiting for server 1 to shut down",
+                ClientBase.waitForServerDown("localhost:"+CLIENT_PORT_QP1, 
+                                ClientBase.CONNECTION_TIMEOUT));
+        assertTrue("Waiting for server 2 to shut down",
+                ClientBase.waitForServerDown("localhost:"+CLIENT_PORT_QP2, 
+                                ClientBase.CONNECTION_TIMEOUT));
+        assertTrue("Waiting for server 3 to shut down",
+                ClientBase.waitForServerDown("localhost:"+CLIENT_PORT_OBS, 
+                                ClientBase.CONNECTION_TIMEOUT));
+    
+    }
+    
+    /**
+     * Implementation of watcher interface.
+     */
+    public void process(WatchedEvent event) {
+        latch.countDown();
+        lastEvent = event;
+    }    
+    
+    /**
+     * This test ensures that an Observer does not elect itself as a leader, or
+     * indeed come up properly, if it is the lone member of an ensemble.
+     * @throws IOException
+     */
+    @Test
+    public void testSingleObserver() throws IOException{
+        ClientBase.setupTestEnv();
+        final int CLIENT_PORT_QP1 = 3181;        
+
+        String quorumCfgSection =
+            "server.1=localhost:" + (CLIENT_PORT_QP1 + 1)
+            + ":" + (CLIENT_PORT_QP1 + 2) + "\npeerType=observer";
+                    
+        MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
+        q1.start();
+        assertFalse("Observer shouldn't come up",
+                ClientBase.waitForServerUp("localhost:" + CLIENT_PORT_QP1,
+                                            CONNECTION_TIMEOUT));
+        
+        q1.shutdown();
+    }    
+    
+    /**
+     * Check that an attempt to instantiate an ensemble with observers and
+     * electionAlg != 0 fails (this will be removed when the restriction is). 
+     * @throws Exception
+     */
+    @Test
+    public void testLeaderElectionFail() throws Exception {        
+        ClientBase.setupTestEnv();
+        final int CLIENT_PORT_QP1 = 3181;
+        final int CLIENT_PORT_QP2 = CLIENT_PORT_QP1 + 3;
+        final int CLIENT_PORT_OBS = CLIENT_PORT_QP2 + 3;
+
+        String quorumCfgSection =
+            "electionAlg=1\n" + 
+            "server.1=localhost:" + (CLIENT_PORT_QP1 + 1)
+            + ":" + (CLIENT_PORT_QP1 + 2)
+            + "\nserver.2=localhost:" + (CLIENT_PORT_QP2 + 1)
+            + ":" + (CLIENT_PORT_QP2 + 2)
+            + "\nserver.3=localhost:" 
+            + (CLIENT_PORT_OBS+1)+ ":" + (CLIENT_PORT_OBS + 2) + ":observer";
+        QuorumPeerConfig qpc = new QuorumPeerConfig();
+        
+        File tmpDir = ClientBase.createTmpDir();
+        File confFile = new File(tmpDir, "zoo.cfg");
+
+        FileWriter fwriter = new FileWriter(confFile);
+        fwriter.write("tickTime=2000\n");
+        fwriter.write("initLimit=10\n");
+        fwriter.write("syncLimit=5\n");
+
+        File dataDir = new File(tmpDir, "data");
+        if (!dataDir.mkdir()) {
+            throw new IOException("Unable to mkdir " + dataDir);
+        }
+        fwriter.write("dataDir=" + dataDir.toString() + "\n");
+
+        fwriter.write("clientPort=" + CLIENT_PORT_QP1 + "\n");
+        fwriter.write(quorumCfgSection + "\n");
+        fwriter.flush();
+        fwriter.close();
+        try {
+            qpc.parse(confFile.toString());
+        } catch (ConfigException e) {
+            LOG.info("Config exception caught as expected: " + e.getCause());
+            return;
+        }
+        
+        assertTrue("Didn't get the expected config exception", false);        
+    } 
+}

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=881882&r1=881881&r2=881882&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java Wed Nov 18 19:06:39 2009
@@ -31,6 +31,7 @@
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.TestableZooKeeper;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.junit.After;
 
@@ -49,6 +50,10 @@
 
     @Override
     protected void setUp() throws Exception {
+        setUp(false);
+    }
+        
+    protected void setUp(boolean withObservers) throws Exception {
         LOG.info("STARTING " + getName());
         setupTestEnv();
 
@@ -74,7 +79,7 @@
         s4dir = ClientBase.createTmpDir();
         s5dir = ClientBase.createTmpDir();
 
-        startServers();
+        startServers(withObservers);
 
         OperatingSystemMXBean osMbean =
             ManagementFactory.getOperatingSystemMXBean();
@@ -87,7 +92,12 @@
 
         LOG.info("Setup finished");
     }
+    
     void startServers() throws Exception {
+        startServers(false);        
+    }
+    
+    void startServers(boolean withObservers) throws Exception {
         int tickTime = 2000;
         int initLimit = 3;
         int syncLimit = 3;
@@ -97,6 +107,11 @@
         peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1", port3 + 1000)));
         peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1", port4 + 1000)));
         peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1", port5 + 1000)));
+        
+        if (withObservers) {
+            peers.get(Long.valueOf(4)).type = LearnerType.OBSERVER;        
+            peers.get(Long.valueOf(5)).type = LearnerType.OBSERVER;
+        }
 
         LOG.info("creating QuorumPeer 1 port " + port1);
         s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 0, 1, tickTime, initLimit, syncLimit);
@@ -113,6 +128,12 @@
         LOG.info("creating QuorumPeer 5 port " + port5);
         s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 0, 5, tickTime, initLimit, syncLimit);
         assertEquals(port5, s5.getClientPort());
+        
+        if (withObservers) {
+            s4.setPeerType(LearnerType.OBSERVER);
+            s5.setPeerType(LearnerType.OBSERVER);
+        }
+        
         LOG.info("start QuorumPeer 1");
         s1.start();
         LOG.info("start QuorumPeer 2");

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumHammerTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumHammerTest.java?rev=881882&r1=881881&r2=881882&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumHammerTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumHammerTest.java Wed Nov 18 19:06:39 2009
@@ -22,11 +22,11 @@
 import org.junit.Test;
 
 public class QuorumHammerTest extends QuorumBase {
-    private static final Logger LOG = Logger.getLogger(QuorumHammerTest.class);
+    protected static final Logger LOG = Logger.getLogger(QuorumHammerTest.class);
     public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
 
-    private final QuorumBase qb = new QuorumBase();
-    private final ClientHammerTest cht = new ClientHammerTest();
+    protected final QuorumBase qb = new QuorumBase();
+    protected final ClientHammerTest cht = new ClientHammerTest();
 
     @Before
     @Override



Mime
View raw message