hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From f..@apache.org
Subject svn commit: r684030 - in /hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper: ServerAdminClient.java server/ZooKeeperServer.java server/quorum/FollowerHandler.java server/quorum/QuorumPeer.java
Date Fri, 08 Aug 2008 17:57:58 GMT
Author: fpj
Date: Fri Aug  8 10:57:58 2008
New Revision: 684030

URL: http://svn.apache.org/viewvc?rev=684030&view=rev
Log:
JIRA 109.

Modified:
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ServerAdminClient.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ServerAdminClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ServerAdminClient.java?rev=684030&r1=684029&r2=684030&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ServerAdminClient.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ServerAdminClient.java Fri Aug
 8 10:57:58 2008
@@ -66,11 +66,11 @@
     }
 
     public static void ruok(String host, int port) {
+        Socket s = null;
         try {
             byte[] reqBytes = new byte[4];
             ByteBuffer req = ByteBuffer.wrap(reqBytes);
             req.putInt(ByteBuffer.wrap("ruok".getBytes()).getInt());
-            Socket s = null;
             s = new Socket();
             s.setSoLinger(false, 10);
             s.setSoTimeout(20000);
@@ -86,17 +86,25 @@
             int rc = is.read(resBytes);
             String retv = new String(resBytes);
             System.out.println("rc=" + rc + " retv=" + retv);
-        } catch (IOException ioe) {
-            LOG.warn("Unexpected exception", ioe);
+        } catch (IOException e) {
+            LOG.warn("Unexpected exception", e);
+        } finally {
+            if (s != null) {
+                try {
+                    s.close();
+                } catch (IOException e) {
+                    LOG.warn("Unexpected exception", e);
+                }
+            }
         }
     }
 
     public static void dump(String host, int port) {
+        Socket s = null;
         try {
             byte[] reqBytes = new byte[4];
             ByteBuffer req = ByteBuffer.wrap(reqBytes);
             req.putInt(ByteBuffer.wrap("dump".getBytes()).getInt());
-            Socket s = null;
             s = new Socket();
             s.setSoLinger(false, 10);
             s.setSoTimeout(20000);
@@ -112,17 +120,25 @@
             int rc = is.read(resBytes);
             String retv = new String(resBytes);
             System.out.println("rc=" + rc + " retv=" + retv);
-        } catch (IOException ioe) {
-            LOG.warn("Unexpected exception", ioe);
+        } catch (IOException e) {
+            LOG.warn("Unexpected exception", e);
+        } finally {
+            if (s != null) {
+                try {
+                    s.close();
+                } catch (IOException e) {
+                    LOG.warn("Unexpected exception", e);
+                }
+            }
         }
     }
 
     public static void stat(String host, int port) {
+        Socket s = null;
         try {
             byte[] reqBytes = new byte[4];
             ByteBuffer req = ByteBuffer.wrap(reqBytes);
             req.putInt(ByteBuffer.wrap("stat".getBytes()).getInt());
-            Socket s = null;
             s = new Socket();
             s.setSoLinger(false, 10);
             s.setSoTimeout(20000);
@@ -138,17 +154,25 @@
             int rc = is.read(resBytes);
             String retv = new String(resBytes);
             System.out.println("rc=" + rc + " retv=" + retv);
-        } catch (IOException ioe) {
-            LOG.warn("Unexpected exception", ioe);
+        } catch (IOException e) {
+            LOG.warn("Unexpected exception", e);
+        } finally {
+            if (s != null) {
+                try {
+                    s.close();
+                } catch (IOException e) {
+                    LOG.warn("Unexpected exception", e);
+                }
+            }
         }
     }
 
     public static void kill(String host, int port) {
+        Socket s = null;
         try {
             byte[] reqBytes = new byte[4];
             ByteBuffer req = ByteBuffer.wrap(reqBytes);
             req.putInt(ByteBuffer.wrap("kill".getBytes()).getInt());
-            Socket s = null;
             s = new Socket();
             s.setSoLinger(false, 10);
             s.setSoTimeout(20000);
@@ -163,12 +187,21 @@
             int rc = is.read(resBytes);
             String retv = new String(resBytes);
             System.out.println("rc=" + rc + " retv=" + retv);
-        } catch (IOException ioe) {
-            LOG.warn("Unexpected exception", ioe);
+        } catch (IOException e) {
+            LOG.warn("Unexpected exception", e);
+        } finally {
+            if (s != null) {
+                try {
+                    s.close();
+                } catch (IOException e) {
+                    LOG.warn("Unexpected exception", e);
+                }
+            }
         }
     }
 
     public static void setTraceMask(String host, int port, String traceMaskStr) {
+        Socket s = null;
         try {
             byte[] reqBytes = new byte[12];
             ByteBuffer req = ByteBuffer.wrap(reqBytes);
@@ -176,7 +209,6 @@
             req.putInt(ByteBuffer.wrap("stmk".getBytes()).getInt());
             req.putLong(traceMask);
 
-            Socket s = null;
             s = new Socket();
             s.setSoLinger(false, 10);
             s.setSoTimeout(20000);
@@ -196,18 +228,26 @@
                     + Long.toOctalString(retv) + " masks=0"
                     + Long.toOctalString(traceMask));
             assert (retv == traceMask);
-        } catch (IOException ioe) {
-            LOG.warn("Unexpected exception", ioe);
+        } catch (IOException e) {
+            LOG.warn("Unexpected exception", e);
+        } finally {
+            if (s != null) {
+                try {
+                    s.close();
+                } catch (IOException e) {
+                    LOG.warn("Unexpected exception", e);
+                }
+            }
         }
     }
 
     public static void getTraceMask(String host, int port) {
+        Socket s = null;
         try {
             byte[] reqBytes = new byte[12];
             ByteBuffer req = ByteBuffer.wrap(reqBytes);
             req.putInt(ByteBuffer.wrap("gtmk".getBytes()).getInt());
 
-            Socket s = null;
             s = new Socket();
             s.setSoLinger(false, 10);
             s.setSoTimeout(20000);
@@ -225,8 +265,16 @@
             long retv = res.getLong();
             System.out.println("rc=" + rc + " retv=0"
                     + Long.toOctalString(retv));
-        } catch (IOException ioe) {
-            LOG.warn("Unexpected exception", ioe);
+        } catch (IOException e) {
+            LOG.warn("Unexpected exception", e);
+        } finally {
+            if (s != null) {
+                try {
+                    s.close();
+                } catch (IOException e) {
+                    LOG.warn("Unexpected exception", e);
+                }
+            }
         }
     }
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=684030&r1=684029&r2=684030&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
Fri Aug  8 10:57:58 2008
@@ -426,7 +426,7 @@
                 TxnHeader hdr = new TxnHeader();
                 Record txn = deserializeTxn(ia, hdr);
                 if (logStream.readByte("EOR") != 'B') {
-                    LOG.error("Last transaction was partial.");
+                    LOG.warn("Last transaction was partial.");
                     throw new EOFException();
                 }
                 if (hdr.getZxid() <= highestZxid && highestZxid != 0) {
@@ -468,6 +468,7 @@
                 addCommittedProposal(r);
             }
         } catch (EOFException e) {
+            // expected in some cases - see comments in try block
         }
         return highestZxid;
     }
@@ -569,6 +570,8 @@
                 while (true) {
                     byte[] bytes = ia.readBuffer("txtEntry");
                     if (bytes.length == 0) {
+                        // Since we preallocate, we define EOF to be an
+                        // empty transaction
                         throw new EOFException();
                     }
                     InputArchive iab = BinaryInputArchive
@@ -576,6 +579,7 @@
                     TxnHeader hdr = new TxnHeader();
                     deserializeTxn(iab, hdr);
                     if (ia.readByte("EOF") != 'B') {
+                        LOG.warn("Last transaction was partial.");
                         throw new EOFException();
                     }
                     if (hdr.getZxid() == finalZxid) {
@@ -586,11 +590,17 @@
                         FileOutputStream fout = new FileOutputStream(f);
                         FileChannel fchanOut = fout.getChannel();
                         fchanOut.truncate(pos);
+                        fchanOut.close();
+                        fout.close();
                         truncated = true;
                         break;
                     }
                 }
             } catch (EOFException eof) {
+                // expected in some cases - see comments in try block
+            } finally {
+                fchan.close();
+                fin.close();
             }
             if (truncated == true) {
                 break;

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java?rev=684030&r1=684029&r2=684030&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
Fri Aug  8 10:57:58 2008
@@ -48,7 +48,7 @@
 public class FollowerHandler extends Thread {
     private static final Logger LOG = Logger.getLogger(FollowerHandler.class);
 
-    public Socket s;
+    public Socket sock;
 
     Leader leader;
 
@@ -65,9 +65,9 @@
 
     private BufferedOutputStream bufferedOutput;
 
-    FollowerHandler(Socket s, Leader leader) throws IOException {
-        super("FollowerHandler-" + s.getRemoteSocketAddress());
-        this.s = s;
+    FollowerHandler(Socket sock, Leader leader) throws IOException {
+        super("FollowerHandler-" + sock.getRemoteSocketAddress());
+        this.sock = sock;
         this.leader = leader;
         leader.addFollowerHandler(this);
         start();
@@ -102,7 +102,7 @@
                 oa.writeRecord(p, "packet");
                 bufferedOutput.flush();
             } catch (IOException e) {
-                if (!s.isClosed()) {
+                if (!sock.isClosed()) {
                     LOG.warn("Unexpected exception",e);
                 }
                 break;
@@ -179,9 +179,9 @@
     public void run() {
         try {
 
-            ia = BinaryInputArchive.getArchive(new BufferedInputStream(s
+            ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
                     .getInputStream()));
-            bufferedOutput = new BufferedOutputStream(s.getOutputStream());
+            bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
             oa = BinaryOutputArchive.getArchive(bufferedOutput);
 
             QuorumPacket qp = new QuorumPacket();
@@ -264,7 +264,7 @@
             new Thread() {
                 public void run() {
                     Thread.currentThread().setName(
-                            "Sender-" + s.getRemoteSocketAddress());
+                            "Sender-" + sock.getRemoteSocketAddress());
                     try {
                         sendPackets();
                     } catch (InterruptedException e) {
@@ -292,7 +292,7 @@
 
                 switch (qp.getType()) {
                 case Leader.ACK:
-                    leader.processAck(qp.getZxid(), s.getLocalSocketAddress());
+                    leader.processAck(qp.getZxid(), sock.getLocalSocketAddress());
                     break;
                 case Leader.PING:
                     // Process the touches
@@ -338,13 +338,14 @@
                 }
             }
         } catch (IOException e) {
-            if (s != null && !s.isClosed()) {
+            if (sock != null && !sock.isClosed()) {
                 LOG.error("FIXMSG",e);
             }
         } catch (InterruptedException e) {
             LOG.error("FIXMSG",e);
         } finally {
-            LOG.warn("******* GOODBYE " + s.getRemoteSocketAddress()
+            LOG.warn("******* GOODBYE " 
+                    + (sock != null ? sock.getRemoteSocketAddress() : "<null>")
                     + " ********");
             // Send the packet of death
             try {
@@ -358,8 +359,8 @@
 
     public void shutdown() {
         try {
-            if (s != null && !s.isClosed()) {
-                s.close();
+            if (sock != null && !sock.isClosed()) {
+                sock.close();
             }
         } catch (IOException e) {
             LOG.error("FIXMSG",e);

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=684030&r1=684029&r2=684030&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
Fri Aug  8 10:57:58 2008
@@ -520,9 +520,9 @@
             if (leader != null) {
                 synchronized (leader.followers) {
                     for (FollowerHandler fh : leader.followers) {
-                        if (fh.s == null)
+                        if (fh.sock == null)
                             continue;
-                        String s = fh.s.getRemoteSocketAddress().toString();
+                        String s = fh.sock.getRemoteSocketAddress().toString();
                         if (leader.isFollowerSynced(fh))
                             s += "*";
                         l.add(s);



Mime
View raw message