Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 8216 invoked from network); 6 Jun 2008 05:39:30 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 6 Jun 2008 05:39:30 -0000 Received: (qmail 79622 invoked by uid 500); 6 Jun 2008 05:39:33 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 79587 invoked by uid 500); 6 Jun 2008 05:39:33 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 79575 invoked by uid 99); 6 Jun 2008 05:39:33 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Jun 2008 22:39:33 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Jun 2008 05:38:33 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1F3DD2388A16; Thu, 5 Jun 2008 22:38:55 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r663828 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/ipc/Client.java src/test/org/apache/hadoop/ipc/TestIPC.java Date: Fri, 06 Jun 2008 05:38:54 -0000 To: core-commits@hadoop.apache.org From: hairong@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080606053855.1F3DD2388A16@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: hairong Date: Thu Jun 5 22:38:54 2008 New Revision: 663828 URL: http://svn.apache.org/viewvc?rev=663828&view=rev Log: HADOOP-3455. Fix NPE in ipc.Client in case of connection failure and improve its synchronization. Contributed by Steve Loughran and Hairong Kuang. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663828&r1=663827&r2=663828&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Thu Jun 5 22:38:54 2008 @@ -473,6 +473,9 @@ HADOOP-3493. Fix TestStreamingFailure to use FileUtil.fullyDelete to ensure correct cleanup. (Lohit Vijayarenu via acmurthy) + HADOOP-3455. Fix NPE in ipc.Client in case of connection failure and + improve its synchronization. (hairong) + Release 0.17.0 - 2008-05-18 INCOMPATIBLE CHANGES Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=663828&r1=663827&r2=663828&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java Thu Jun 5 22:38:54 2008 @@ -34,6 +34,9 @@ import java.util.Hashtable; import java.util.Iterator; import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import javax.net.SocketFactory; @@ -48,7 +51,6 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; /** A client for an IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on @@ -65,7 +67,7 @@ private Class valueClass; // class of call values private int counter; // counter for call ids - private boolean running = true; // true while client runs + private AtomicBoolean running = new AtomicBoolean(true); // if client runs final private Configuration conf; final private int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecs @@ -125,7 +127,7 @@ synchronized boolean isZeroReference() { return refCount==0; } - + /** A call waiting for a value. */ private class Call { int id; // call id @@ -175,12 +177,13 @@ private class Connection extends Thread { private ConnectionId remoteId; private Socket socket = null; // connected socket - private DataInputStream in; - private DataOutputStream out; + private DataInputStream in; + private AtomicReference out = + new AtomicReference(); // currently active calls private Hashtable calls = new Hashtable(); - private long lastActivity = 0; // last I/O activity time - private boolean shouldCloseConnection = false; // indicate if the connection is closed + private AtomicLong lastActivity = new AtomicLong();// last I/O activity time + private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed private IOException closeException; // close reason public Connection(InetSocketAddress address) throws IOException { @@ -201,23 +204,25 @@ } /** Update lastActivity with the current time. */ - private synchronized void touch() { - touch(System.currentTimeMillis()); - } - - private synchronized void touch(long curTime) { - lastActivity = curTime; + private void touch() { + lastActivity.set(System.currentTimeMillis()); } - /** Add a call to this connection's call queue */ + /** + * Add a call to this connection's call queue and notify + * a listener; synchronized. + * Returns false if called during shutdown. + * @param call to add + * @return true if the call was added. + */ private synchronized boolean addCall(Call call) { - if (shouldCloseConnection) + if (shouldCloseConnection.get()) return false; calls.put(call.id, call); notify(); return true; } - + /** This class sends a ping to the remote side when timeout on * reading. If no failure is detected, it retries until at least * a byte is read. @@ -233,7 +238,7 @@ * otherwise, throw the timeout exception. */ private void handleTimeout(SocketTimeoutException e) throws IOException { - if (shouldCloseConnection || !running) { + if (shouldCloseConnection.get() || !running.get()) { throw e; } else { sendPing(); @@ -243,6 +248,7 @@ /** Read a byte from the stream. * Send a ping if timeout on read. Retries if no failure is detected * until a byte is read. + * @throws IOException for any IO problem other than socket timeout */ public int read() throws IOException { do { @@ -258,7 +264,7 @@ * Send a ping if timeout on read. Retries if no failure is detected * until a byte is read. * - * @Return the total number of bytes read; -1 if the connection is closed. + * @return the total number of bytes read; -1 if the connection is closed. */ public int read(byte[] buf, int off, int len) throws IOException { do { @@ -276,7 +282,7 @@ * the connection thread that waits for responses. */ private synchronized void setupIOstreams() { - if (socket != null || shouldCloseConnection) { + if (socket != null || shouldCloseConnection.get()) { return; } @@ -305,19 +311,19 @@ } this.in = new DataInputStream(new BufferedInputStream (new PingInputStream(NetUtils.getInputStream(socket)))); - this.out = new DataOutputStream - (new BufferedOutputStream(NetUtils.getOutputStream(socket))); + this.out.set(new DataOutputStream + (new BufferedOutputStream(NetUtils.getOutputStream(socket)))); writeHeader(); // update last activity time touch(); + // start the receiver thread after the socket connection has been set up + start(); } catch (IOException e) { markClosed(e); close(); } - // start the receiver thread after the socket connection has been set up - start(); } /* Handle connection failures @@ -325,7 +331,10 @@ * If the current number of retries is equal to the max number of retries, * stop retrying and throw the exception; Otherwise backoff 1 second and * try connecting again. - * + * + * This Method is only called from inside setupIOstreams(), which is + * synchronized. Hence the sleep is synchronized; the locks will be retained. + * * @param curRetries current number of retries * @param maxRetries max number of retries allowed * @param ioe failure reason @@ -361,6 +370,7 @@ * Out is not synchronized because only the first thread does this. */ private void writeHeader() throws IOException { + DataOutputStream out = this.out.get(); out.write(Server.HEADER.array()); out.write(Server.CURRENT_VERSION); //When there are more fields we can have ConnectionHeader Writable. @@ -379,8 +389,9 @@ * Return true if it is time to read a response; false otherwise. */ private synchronized boolean waitForWork() { - if (calls.isEmpty() && !shouldCloseConnection && running) { - long timeout = maxIdleTime-(System.currentTimeMillis()-lastActivity); + if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { + long timeout = maxIdleTime- + (System.currentTimeMillis()-lastActivity.get()); if (timeout>0) { try { wait(timeout); @@ -388,11 +399,11 @@ } } - if (!calls.isEmpty() && !shouldCloseConnection && running) { + if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { return true; - } else if (shouldCloseConnection) { + } else if (shouldCloseConnection.get()) { return false; - } else if (!running) { //get stopped + } else if (!running.get()) { //get stopped markClosed((IOException)new IOException().initCause( new InterruptedException())); return false; @@ -411,9 +422,10 @@ */ private synchronized void sendPing() throws IOException { long curTime = System.currentTimeMillis(); - if ( curTime - lastActivity >= pingInterval) { - touch(curTime); + if ( curTime - lastActivity.get() >= pingInterval) { + lastActivity.set(curTime); synchronized (out) { + DataOutputStream out = this.out.get(); out.writeInt(PING_CALL_ID); out.flush(); } @@ -441,30 +453,36 @@ * threads. */ public void sendParam(Call call) { - synchronized (this) { - if (shouldCloseConnection) { - return; - } + if (shouldCloseConnection.get()) { + return; } + DataOutputBuffer d=null; try { - synchronized (out) { + synchronized (this.out) { + DataOutputStream out = this.out.get(); + if (out==null) return; // socket has closed + if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); - - DataOutputBuffer d = new DataOutputBuffer(); //for serializing the + + //for serializing the //data to be written + d = new DataOutputBuffer(); d.writeInt(call.id); call.param.write(d); byte[] data = d.getData(); int dataLength = d.getLength(); - out.writeInt(dataLength); //first put the data length out.write(data, 0, dataLength);//write the data out.flush(); } } catch(IOException e) { markClosed(e); + } finally { + //the buffer is just an in-memory buffer, but it is still polite to + // close early + IOUtils.closeStream(d); } } @@ -472,10 +490,8 @@ * Because only one receiver, so no synchronization on in. */ private void receiveResponse() { - synchronized (this) { - if (shouldCloseConnection) { - return; - } + if (shouldCloseConnection.get()) { + return; } touch(); @@ -502,8 +518,7 @@ } private synchronized void markClosed(IOException e) { - if (!shouldCloseConnection) { - shouldCloseConnection = true; + if (shouldCloseConnection.compareAndSet(false, true)) { closeException = e; notifyAll(); } @@ -511,7 +526,7 @@ /** Close the connection. */ private synchronized void close() { - if (!shouldCloseConnection) { + if (!shouldCloseConnection.get()) { LOG.error("The connection is not in the closed state"); return; } @@ -527,15 +542,17 @@ // close the socket and streams IOUtils.closeStream(in); - IOUtils.closeStream(out); + in = null; + IOUtils.closeStream(out.getAndSet(null)); IOUtils.closeSocket(socket); + socket = null; // clean up all calls if (closeException == null) { if (!calls.isEmpty()) { LOG.warn( "A connection is closed for no cause and calls are not empty"); - + // clean up calls anyway closeException = new IOException("Unexpected closed connection"); cleanupCalls(); @@ -544,7 +561,7 @@ // log the info if (LOG.isDebugEnabled()) { LOG.debug("closing ipc connection to " + remoteId.address + ": " + - StringUtils.stringifyException(closeException)); + closeException.getMessage(),closeException); } // cleanup calls @@ -643,11 +660,10 @@ if (LOG.isDebugEnabled()) { LOG.debug("Stopping client"); } - - if (running == false) { + + if (!running.compareAndSet(true, false)) { return; } - running = false; // wake up all connections synchronized (connections) { @@ -716,8 +732,9 @@ Connection connection = getConnection(addresses[i], null, call); connection.sendParam(call); // send each parameter } catch (IOException e) { + // log errors LOG.info("Calling "+addresses[i]+" caught: " + - StringUtils.stringifyException(e)); // log errors + e.getMessage(),e); results.size--; // wait for one fewer result } } @@ -737,11 +754,9 @@ UserGroupInformation ticket, Call call) throws IOException { - synchronized (this) { - if (!running) { - // the client is stopped - throw new IOException("The client is stopped"); - } + if (!running.get()) { + // the client is stopped + throw new IOException("The client is stopped"); } Connection connection; /* we could avoid this allocation for each RPC by having a Modified: hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java?rev=663828&r1=663827&r2=663828&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java Thu Jun 5 22:38:54 2008 @@ -208,6 +208,19 @@ } } + public void testStandAloneClient() throws Exception { + testParallel(10, false, 2, 4, 2, 4, 100); + Client client = new Client(LongWritable.class, conf); + boolean hasException = false; + try { + client.call(new LongWritable(RANDOM.nextLong()), + new InetSocketAddress("127.0.0.1", 10)); + } catch (IOException e) { + hasException = true; + } + assertTrue (hasException); + } + public static void main(String[] args) throws Exception { //new TestIPC("test").testSerial(5, false, 2, 10, 1000);