river-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From peter_jo...@apache.org
Subject svn commit: r651151 - /incubator/river/jtsk/trunk/src/net/jini/jeri/connection/ConnectionManager.java
Date Thu, 24 Apr 2008 04:39:39 GMT
Author: peter_jones
Date: Wed Apr 23 21:39:38 2008
New Revision: 651151

URL: http://svn.apache.org/viewvc?rev=651151&view=rev
Log:
fix RIVER-254

Modified:
    incubator/river/jtsk/trunk/src/net/jini/jeri/connection/ConnectionManager.java

Modified: incubator/river/jtsk/trunk/src/net/jini/jeri/connection/ConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/trunk/src/net/jini/jeri/connection/ConnectionManager.java?rev=651151&r1=651150&r2=651151&view=diff
==============================================================================
--- incubator/river/jtsk/trunk/src/net/jini/jeri/connection/ConnectionManager.java (original)
+++ incubator/river/jtsk/trunk/src/net/jini/jeri/connection/ConnectionManager.java Wed Apr
23 21:39:38 2008
@@ -214,7 +214,7 @@
 			    return mux;
 			}
 		    }
-		    OutboundMux mux = OutboundMux.create(c);
+		    OutboundMux mux = newOutboundMux(c);
 		    mux.newRequestPending();
 		    if (reaper == null) {
 			reaper = new Reaper();
@@ -226,17 +226,17 @@
 		}
 	    }
 	    Connection c = ep.connect(handle);
-	    OutboundMux mux = OutboundMux.create(c);
-	    mux.newRequestPending();
 	    synchronized (this) {
+		OutboundMux mux = newOutboundMux(c);
+		mux.newRequestPending();
 		if (reaper == null) {
 		    reaper = new Reaper();
 		    systemThreadPool.execute(
 			reaper, "ConnectionManager[" + ep + "].Reaper");
 		}
 		muxes.add(mux);
+		return mux;
 	    }
-	    return mux;
 	} finally {
 	    synchronized (this) {
 		assert pendingConnects > 0;
@@ -250,7 +250,8 @@
      * true, removes the mux and adds it to the idle list. Returns true
      * if no connects are pending and no muxes remain.
      */
-    synchronized boolean checkIdle(long now, List idle) {
+    boolean checkIdle(long now, List idle) {
+	assert Thread.holdsLock(this);
 	for (int i = muxes.size(); --i >= 0; ) {
 	    OutboundMux mux = (OutboundMux) muxes.get(i);
 	    if (mux.checkIdle(now)) {
@@ -272,31 +273,57 @@
     }
 
     /**
+     * Constructs an OutboundMux instance from the connection.
+     */
+    private OutboundMux newOutboundMux(Connection c) throws IOException {
+	logger.log(Level.FINEST, "opened {0}", c);
+	OutboundMux mux = null;
+	try {
+	    mux = (c.getChannel() == null) ?
+		new OutboundMux(c) : new OutboundMux(c, true);
+	} finally {
+	    if (mux == null) {
+		try {
+		    c.close();
+		} catch (IOException e) {
+		}
+	    }
+	}
+	return mux;
+    }
+
+    /**
      * Subclass wrapper around MuxClient for outbound connections.
      */
-    private static final class OutboundMux extends MuxClient {
+    private final class OutboundMux extends MuxClient {
 	/**
 	 * The outbound connection.
 	 */
 	private final Connection c;
 	/**
+	 * Lock to enforce single start of mux.
+	 */
+	private final Object startLock = new Object();
+	 /**
 	 * True if the mux needs to be started.
 	 */
 	private boolean pendingStart = true;
 	/**
-	 * Number of pending newRequest calls.
+	 * Number of pending newRequest calls.  Guarded by enclosing
+	 * ConnectionManager's lock.
 	 */
 	private int pendingNewRequests = 0;
 	/**
-	 * The time this mux was found to be idle by the Reaper thread. Set
-	 * to zero each time a request is initiated.
+	 * The time this mux was found to be idle by the Reaper
+	 * thread.  Set to zero each time a request is initiated.
+	 * Guarded by enclosing ConnectionManager's lock.
 	 */
 	private long idleTime = 0;
 
 	/**
 	 * Constructs an instance from the connection's streams.
 	 */
-	private OutboundMux(Connection c) throws IOException {
+	OutboundMux(Connection c) throws IOException {
 	    super(c.getOutputStream(), c.getInputStream());
 	    this.c = c;
 	}
@@ -304,32 +331,12 @@
 	/**
 	 * Constructs an instance from the connection's channel.
 	 */
-	private OutboundMux(Connection c, boolean ignore) throws IOException {
+	OutboundMux(Connection c, boolean ignore) throws IOException {
 	    super(c.getChannel());
 	    this.c = c;
 	}
 
 	/**
-	 * Constructs an instance from the connection.
-	 */
-	static OutboundMux create(Connection c) throws IOException {
-	    logger.log(Level.FINEST, "opened {0}", c);
-	    OutboundMux mux = null;
-	    try {
-		mux = (c.getChannel() == null) ?
-		    new OutboundMux(c) : new OutboundMux(c, true);
-	    } finally {
-		if (mux == null) {
-		    try {
-			c.close();
-		    } catch (IOException e) {
-		    }
-		}
-	    }
-	    return mux;
-	}
-
-	/**
 	 * Returns the outbound connection.
 	 */
 	Connection getConnection() {
@@ -339,7 +346,8 @@
 	/**
 	 * Registers a pending newRequest call.
 	 */
-	synchronized void newRequestPending() {
+	void newRequestPending() {
+	    assert Thread.holdsLock(ConnectionManager.this);
 	    pendingNewRequests++;
 	}
 
@@ -348,21 +356,38 @@
 	 * idle time to zero. Starts the mux if necessary, and decrements
 	 * the pending newRequest count.
 	 */
-	public synchronized OutboundRequest newRequest() throws IOException {
-	    assert pendingNewRequests > 0;
-	    pendingNewRequests--;
-	    if (pendingStart) {
-		pendingStart = false;
-		start();
+	public OutboundRequest newRequest() throws IOException {
+	    assert !Thread.holdsLock(ConnectionManager.this);
+	    boolean ok = false;
+	    try {
+		synchronized (startLock) {
+		    if (pendingStart) {
+			pendingStart = false;
+			start();
+		    }
+		}
+		ok = true;
+	    } finally {
+		if (!ok) {
+		    synchronized (ConnectionManager.this) {
+			assert pendingNewRequests > 0;
+			pendingNewRequests--;
+		    }
+		}
+	    }
+	    synchronized (ConnectionManager.this) {
+		assert pendingNewRequests > 0;
+		pendingNewRequests--;
+		idleTime = 0;
+		return super.newRequest();
 	    }
-	    idleTime = 0;
-	    return super.newRequest();
 	}
 
 	/**
 	 * Returns the number of active and pending requests.
 	 */
-	public synchronized int requestsInProgress() throws IOException {
+	public int requestsInProgress() throws IOException {
+	    assert Thread.holdsLock(ConnectionManager.this);
 	    return super.requestsInProgress() + pendingNewRequests;
 	}
 
@@ -372,7 +397,8 @@
 	 * and returns false otherwise. If the mux is idle and the recorded
 	 * idle time is zero, sets the recorded idle time to now.
 	 */
-	synchronized boolean checkIdle(long now) {
+	boolean checkIdle(long now) {
+	    assert Thread.holdsLock(ConnectionManager.this);
 	    try {
 		if (requestsInProgress() == 0) {
 		    if (idleTime == 0) {
@@ -495,7 +521,7 @@
 	     * Calls readResponseData on the connection, exactly once.
 	     * Sets the handle to null to indicate that it has been called.
 	     */
-	    synchronized private void readFirst() throws IOException {
+	    private synchronized void readFirst() throws IOException {
 		if (handle != null) {
 		    try {
 			IOException e = c.readResponseData(handle, in);



Mime
View raw message