manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1551439 - in /manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss: RSSConnector.java Robots.java ThrottledFetcher.java
Date Tue, 17 Dec 2013 01:45:10 GMT
Author: kwright
Date: Tue Dec 17 01:45:10 2013
New Revision: 1551439

URL: http://svn.apache.org/r1551439
Log:
Hook up RSS connector to use new module - except for polling

Modified:
    manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
    manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
    manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java

Modified: manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java?rev=1551439&r1=1551438&r2=1551439&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
(original)
+++ manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
Tue Dec 17 01:45:10 2013
@@ -52,7 +52,7 @@ public class RSSConnector extends org.ap
 {
   public static final String _rcsid = "@(#)$Id: RSSConnector.java 994959 2010-09-08 10:04:42Z
kwright $";
 
-
+  protected final static String rssThrottleGroupType = "_RSS_";
 
   // Usage flag values
   protected static final int ROBOTS_NONE = 0;
@@ -105,7 +105,7 @@ public class RSSConnector extends org.ap
   protected Robots robots = null;
 
   /** Storage for fetcher objects */
-  protected static Map fetcherMap = new HashMap();
+  protected static Map<String,ThrottledFetcher> fetcherMap = new HashMap<String,ThrottledFetcher>();
   /** Storage for robots objects */
   protected static Map robotsMap = new HashMap();
 
@@ -231,10 +231,16 @@ public class RSSConnector extends org.ap
 
       }
 
+      IThrottleGroups tg = ThrottleGroupsFactory.make(currentContext);
+      // Create the throttle group
+      tg.createOrUpdateThrottleGroup(rssThrottleGroupType, throttleGroupName, new ThrottleSpec(maxOpenConnectionsPerServer,
+        minimumMillisecondsPerFetchPerServer, minimumMillisecondsPerBytePerServer));
+      
       isInitialized = true;
     }
   }
 
+  
   /** Return the list of activities that this connector supports (i.e. writes into the log).
   *@return the list.
   */
@@ -936,11 +942,9 @@ public class RSSConnector extends org.ap
             String pathPart = url.getFile();
 
             // Check with robots to see if it's allowed
-            if (robotsUsage >= ROBOTS_DATA && !robots.isFetchAllowed(protocol,port,hostName,url.getPath(),
+            if (robotsUsage >= ROBOTS_DATA && !robots.isFetchAllowed(currentContext,throttleGroupName,
+              protocol,port,hostName,url.getPath(),
               userAgent,from,
-              minimumMillisecondsPerBytePerServer,
-              maxOpenConnectionsPerServer,
-              minimumMillisecondsPerFetchPerServer,
               proxyHost, proxyPort, proxyAuthDomain, proxyAuthUsername, proxyAuthPassword,
               activities, connectionLimit))
             {
@@ -955,10 +959,9 @@ public class RSSConnector extends org.ap
             {
 
               // Now, use the fetcher, and get the file.
-              IThrottledConnection connection = fetcher.createConnection(hostName,
-                minimumMillisecondsPerBytePerServer,
-                maxOpenConnectionsPerServer,
-                minimumMillisecondsPerFetchPerServer,
+              IThrottledConnection connection = fetcher.createConnection(currentContext,
+                throttleGroupName,
+                hostName,
                 connectionLimit,
                 feedTimeout,
                 proxyHost,
@@ -5404,7 +5407,7 @@ public class RSSConnector extends org.ap
   {
     synchronized (fetcherMap)
     {
-      ThrottledFetcher tf = (ThrottledFetcher)fetcherMap.get(throttleGroupName);
+      ThrottledFetcher tf = fetcherMap.get(throttleGroupName);
       if (tf == null)
       {
         tf = new ThrottledFetcher();
@@ -5497,6 +5500,47 @@ public class RSSConnector extends org.ap
 
   // Protected classes
 
+  /** The throttle specification class.  Each server name is a different bin in this model.
+  */
+  protected static class ThrottleSpec implements IThrottleSpec
+  {
+    protected final int maxOpenConnectionsPerServer;
+    protected final long minimumMillisecondsPerFetchPerServer;
+    protected final double minimumMillisecondsPerBytePerServer;
+    
+    public ThrottleSpec(int maxOpenConnectionsPerServer, long minimumMillisecondsPerFetchPerServer,
+      double minimumMillisecondsPerBytePerServer)
+    {
+      this.maxOpenConnectionsPerServer = maxOpenConnectionsPerServer;
+      this.minimumMillisecondsPerFetchPerServer = minimumMillisecondsPerFetchPerServer;
+      this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
+    }
+    
+    /** Given a bin name, find the max open connections to use for that bin.
+    *@return Integer.MAX_VALUE if no limit found.
+    */
+    public int getMaxOpenConnections(String binName)
+    {
+      return maxOpenConnectionsPerServer;
+    }
+
+    /** Look up minimum milliseconds per byte for a bin.
+    *@return 0.0 if no limit found.
+    */
+    public double getMinimumMillisecondsPerByte(String binName)
+    {
+      return minimumMillisecondsPerBytePerServer;
+    }
+
+    /** Look up minimum milliseconds for a fetch for a bin.
+    *@return 0 if no limit found.
+    */
+    public long getMinimumMillisecondsPerFetch(String binName)
+    {
+      return minimumMillisecondsPerFetchPerServer;
+    }
+  }
+
   /** Name/value class */
   protected static class NameValue
   {

Modified: manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java?rev=1551439&r1=1551438&r2=1551439&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
(original)
+++ manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
Tue Dec 17 01:45:10 2013
@@ -109,10 +109,9 @@ public class Robots
   *@param pathString is the path (non-query) part of the URL
   *@return true if fetch is allowed, false otherwise.
   */
-  public boolean isFetchAllowed(String protocol, int port, String hostName, String pathString,
+  public boolean isFetchAllowed(IThreadContext threadContext, String throttleGroupName,
+    String protocol, int port, String hostName, String pathString,
     String userAgent, String from,
-    double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
-    long minimumMillisecondsPerFetchPerServer,
     String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String
proxyAuthPassword,
     IVersionActivity activities, int connectionLimit)
     throws ManifoldCFException, ServiceInterruption
@@ -134,9 +133,9 @@ public class Robots
       }
     }
 
-    return host.isFetchAllowed(System.currentTimeMillis(),pathString,
+    return host.isFetchAllowed(threadContext,throttleGroupName,
+      System.currentTimeMillis(),pathString,
       userAgent,from,
-      minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,
       proxyHost, proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword,activities,connectionLimit);
   }
 
@@ -257,10 +256,9 @@ public class Robots
     *@param pathString is the path string to check.
     *@return true if crawling is allowed, false otherwise.
     */
-    public boolean isFetchAllowed(long currentTime, String pathString,
+    public boolean isFetchAllowed(IThreadContext threadContext, String throttleGroupName,
+      long currentTime, String pathString,
       String userAgent, String from,
-      double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
-      long minimumMillisecondsPerFetchPerServer,
       String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername,
String proxyAuthPassword,
       IVersionActivity activities, int connectionLimit)
       throws ServiceInterruption, ManifoldCFException
@@ -323,9 +321,7 @@ public class Robots
 
         if (readingRobots)
           // This doesn't need to be synchronized because readingRobots blocks all other
threads from getting at this object
-          makeValid(currentTime,userAgent,from,
-          minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,
-          minimumMillisecondsPerFetchPerServer,
+          makeValid(threadContext,throttleGroupName,currentTime,userAgent,from,
           proxyHost, proxyPort, proxyAuthDomain, proxyAuthUsername, proxyAuthPassword,
           hostName, activities, connectionLimit);
 
@@ -435,9 +431,8 @@ public class Robots
     /** Initialize the record.  This method reads the robots file on the specified protocol/host/port,
     * and parses it according to the rules.
     */
-    protected void makeValid(long currentTime, String userAgent, String from,
-      double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
-      long minimumMillisecondsPerFetchPerServer,
+    protected void makeValid(IThreadContext threadContext, String throttleGroupName,
+      long currentTime, String userAgent, String from,
       String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername,
String proxyAuthPassword,
       String hostName, IVersionActivity activities, int connectionLimit)
       throws ServiceInterruption, ManifoldCFException
@@ -445,8 +440,8 @@ public class Robots
       invalidTime = currentTime + 24L * 60L * 60L * 1000L;
 
       // Do the fetch
-      IThrottledConnection connection = fetcher.createConnection(hostName,minimumMillisecondsPerBytePerServer,
-        maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,connectionLimit,ROBOT_TIMEOUT_MILLISECONDS,
+      IThrottledConnection connection = fetcher.createConnection(threadContext,throttleGroupName,
+        hostName,connectionLimit,ROBOT_TIMEOUT_MILLISECONDS,
         proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
       try
       {

Modified: manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java?rev=1551439&r1=1551438&r2=1551439&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
(original)
+++ manifoldcf/branches/CONNECTORS-829/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
Tue Dec 17 01:45:10 2013
@@ -88,9 +88,9 @@ public class ThrottledFetcher
   /** This is the lock object for that global handle counter */
   protected static Integer globalHandleCounterLock = new Integer(0);
 
-  /** This hash maps the server string (without port) to a server object, where
+  /** This hash maps the server string (without port) to a pool throttling object, where
   * we can track the statistics and make sure we throttle appropriately */
-  protected Map serverMap = new HashMap();
+  protected final Map<String,IConnectionThrottler> serverMap = new HashMap<String,IConnectionThrottler>();
 
   /** Reference count for how many connections to this pool there are */
   protected int refCount = 0;
@@ -151,35 +151,25 @@ public class ThrottledFetcher
 
   /** Establish a connection to a specified URL.
   * @param serverName is the FQDN of the server, e.g. foo.metacarta.com
-  * @param minimumMillisecondsPerBytePerServer is the average number of milliseconds to wait
-  *       between bytes, on
-  *       average, over all streams reading from this server.  That means that the
-  *       stream will block on fetch until the number of bytes being fetched, done
-  *       in the average time interval required for that fetch, would not exceed
-  *       the desired bandwidth.
-  * @param minimumMillisecondsPerFetchPerServer is the number of milliseconds
-  *        between fetches, as a minimum, on a per-server basis.  Set
-  *        to zero for no limit.
-  * @param maxOpenConnectionsPerServer is the maximum number of open connections to allow
for a single server.
-  *        If more than this number of connections would need to be open, then this connection
request will block
-  *        until this number will no longer be exceeded.
   * @param connectionLimit is the maximum desired outstanding connections at any one time.
   * @param connectionTimeoutMilliseconds is the number of milliseconds to wait for the connection
before timing out.
   */
-  public synchronized IThrottledConnection createConnection(String serverName, double minimumMillisecondsPerBytePerServer,
-    int maxOpenConnectionsPerServer, long minimumMillisecondsPerFetchPerServer, int connectionLimit,
int connectionTimeoutMilliseconds,
+  public synchronized IThrottledConnection createConnection(IThreadContext threadContext,
String throttleGroupName,
+    String serverName, int connectionLimit, int connectionTimeoutMilliseconds,
     String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String
proxyAuthPassword)
     throws ManifoldCFException, ServiceInterruption
   {
-    Server server;
-    server = (Server)serverMap.get(serverName);
+    IConnectionThrottler server;
+    server = serverMap.get(serverName);
     if (server == null)
     {
-      server = new Server(serverName);
+      // Create a connection throttler for this server
+      IThrottleGroups tg = ThrottleGroupsFactory.make(threadContext);
+      server = tg.obtainConnectionThrottler(RSSConnector.rssThrottleGroupType, throttleGroupName,
new String[]{serverName});
       serverMap.put(serverName,server);
     }
 
-    return new ThrottledConnection(server,minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,
+    return new ThrottledConnection(serverName, server,
       connectionTimeoutMilliseconds,connectionLimit,
       proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
   }
@@ -206,14 +196,8 @@ public class ThrottledFetcher
     refCount--;
     if (refCount == 0)
     {
-      // Close all the servers one by one
-      Iterator iter = serverMap.keySet().iterator();
-      while (iter.hasNext())
-      {
-        String serverName = (String)iter.next();
-        Server server = (Server)serverMap.get(serverName);
-        server.discard();
-      }
+      // Since we don't have any actual pools here, this can be a no-op for now
+      // MHL
       serverMap.clear();
     }
   }
@@ -222,14 +206,12 @@ public class ThrottledFetcher
   */
   protected static class ThrottledConnection implements IThrottledConnection
   {
-    /** The connection bandwidth we want */
-    protected final double minimumMillisecondsPerBytePerServer;
-    /** The maximum open connections per server */
-    protected final int maxOpenConnectionsPerServer;
-    /** The minimum time between fetches */
-    protected final long minimumMillisecondsPerFetchPerServer;
-    /** The server object we use to track connections and fetches. */
-    protected final Server server;
+    /** The server fqdn */
+    protected final String serverName;
+    /** The throttling object we use to track connections */
+    protected final IConnectionThrottler connectionThrottler;
+    /** The throttling object we use to track fetches */
+    protected final IFetchThrottler fetchThrottler;
     /** Connection timeout in milliseconds */
     protected final int connectionTimeoutMilliseconds;
     /** The client connection manager */
@@ -237,6 +219,8 @@ public class ThrottledFetcher
     /** The httpclient */
     protected final HttpClient httpClient;
 
+    /** The stream throttler */
+    protected IStreamThrottler streamThrottler = null;
     /** The method object */
     protected HttpRequestBase executeMethod = null;
     /** The start-fetch time */
@@ -259,15 +243,14 @@ public class ThrottledFetcher
     
     /** Constructor.
     */
-    public ThrottledConnection(Server server, double minimumMillisecondsPerBytePerServer,
int maxOpenConnectionsPerServer,
-      long minimumMillisecondsPerFetchPerServer, int connectionTimeoutMilliseconds, int connectionLimit,
+    public ThrottledConnection(String serverName,
+      IConnectionThrottler connectionThrottler,
+      int connectionTimeoutMilliseconds, int connectionLimit,
       String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername,
String proxyAuthPassword)
       throws ManifoldCFException
     {
-      this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
-      this.maxOpenConnectionsPerServer = maxOpenConnectionsPerServer;
-      this.minimumMillisecondsPerFetchPerServer = minimumMillisecondsPerFetchPerServer;
-      this.server = server;
+      this.serverName = serverName;
+      this.connectionThrottler = connectionThrottler;
       this.connectionTimeoutMilliseconds = connectionTimeoutMilliseconds;
 
       // Create the https scheme for this connection
@@ -330,7 +313,17 @@ public class ThrottledFetcher
       httpClient = localHttpClient;
 
       registerGlobalHandle(connectionLimit);
-      server.registerConnection(maxOpenConnectionsPerServer);
+      try
+      {
+        int result = connectionThrottler.waitConnectionAvailable();
+        if (result != IConnectionThrottler.CONNECTION_FROM_CREATION)
+          throw new IllegalStateException("Got back unexpected value from waitForAConnection()
of "+result);
+        fetchThrottler = connectionThrottler.getNewConnectionFetchThrottler();
+      }
+      catch (InterruptedException e)
+      {
+        throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
+      }
     }
 
     /** Begin the fetch process.
@@ -344,7 +337,7 @@ public class ThrottledFetcher
       fetchCounter = 0L;
       try
       {
-        server.beginFetch(minimumMillisecondsPerFetchPerServer);
+        streamThrottler = fetchThrottler.obtainFetchDocumentPermission();
       }
       catch (InterruptedException e)
       {
@@ -386,7 +379,7 @@ public class ThrottledFetcher
     {
 
       StringBuilder sb = new StringBuilder(protocol);
-      sb.append("://").append(server.getServerName());
+      sb.append("://").append(serverName);
       if (port != -1)
         sb.append(":").append(Integer.toString(port));
       sb.append(urlPath);
@@ -407,8 +400,8 @@ public class ThrottledFetcher
       if (lastModified != null)
         executeMethod.setHeader(new BasicHeader("Last-Modified",lastModified));
       // Create the execution thread.
-      methodThread = new ExecuteMethodThread(this, server,
-        minimumMillisecondsPerBytePerServer, httpClient, executeMethod);
+      methodThread = new ExecuteMethodThread(this, streamThrottler,
+        httpClient, executeMethod);
       // Start the method thread, which will start the transaction
       try
       {
@@ -702,7 +695,6 @@ public class ThrottledFetcher
         if (methodThread != null && threadStarted)
           methodThread.abort();
         long endTime = System.currentTimeMillis();
-        server.endFetch();
 
         activities.recordActivity(new Long(startFetchTime),RSSConnector.ACTIVITY_FETCH,
           new Long(fetchCounter),myUrl,Integer.toString(statusCode),(throwable==null)?null:throwable.getMessage(),null);
@@ -739,6 +731,7 @@ public class ThrottledFetcher
         myUrl = null;
         statusCode = -1;
         fetchType = null;
+        streamThrottler = null;
       }
     }
 
@@ -749,7 +742,7 @@ public class ThrottledFetcher
     {
       // Clean up the connection pool.  This should do the necessary bookkeeping to release
the one connection that's sitting there.
       connectionManager.shutdown();
-      server.releaseConnection();
+      connectionThrottler.noteConnectionDestroyed();
       releaseGlobalHandle();
     }
 
@@ -760,23 +753,20 @@ public class ThrottledFetcher
   */
   protected static class ThrottledInputstream extends InputStream
   {
-    /** Stream throttling parameters */
-    protected double minimumMillisecondsPerBytePerServer;
-    /** The throttled connection we belong to */
-    protected ThrottledConnection throttledConnection;
-    /** The server object we use to track throttling */
-    protected Server server;
+    /** Throttled connection */
+    protected final ThrottledConnection throttledConnection;
+    /** Stream throttler */
+    protected final IStreamThrottler streamThrottler;
     /** The stream we are wrapping. */
-    protected InputStream inputStream;
+    protected final InputStream inputStream;
 
     /** Constructor.
     */
-    public ThrottledInputstream(ThrottledConnection connection, Server server, InputStream
is, double minimumMillisecondsPerBytePerServer)
+    public ThrottledInputstream(ThrottledConnection throttledConnection, IStreamThrottler
streamThrottler, InputStream is)
     {
-      this.throttledConnection = connection;
-      this.server = server;
+      this.throttledConnection = throttledConnection;
+      this.streamThrottler = streamThrottler;
       this.inputStream = is;
-      this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
     }
 
     /** Read a byte.
@@ -839,7 +829,8 @@ public class ThrottledFetcher
     {
       try
       {
-        server.beginRead(len,minimumMillisecondsPerBytePerServer);
+        if (streamThrottler.obtainReadPermission(len) == false)
+          throw new IllegalStateException("Throttler shut down while still active");
         int amt = 0;
         try
         {
@@ -849,10 +840,10 @@ public class ThrottledFetcher
         finally
         {
           if (amt == -1)
-            server.endRead(len,0);
+            streamThrottler.releaseReadPermission(len,0);
           else
           {
-            server.endRead(len,amt);
+            streamThrottler.releaseReadPermission(len,amt);
             throttledConnection.logFetchCount(amt);
           }
         }
@@ -909,308 +900,18 @@ public class ThrottledFetcher
     public void close()
       throws IOException
     {
-      inputStream.close();
-    }
-
-  }
-
-  /** This class represents the throttling stuff kept around for a single server.
-  *
-  * In order to calculate
-  * the effective "burst" fetches per second and bytes per second, we need to have some idea
what the window is.
-  * For example, a long hiatus from fetching could cause overuse of the server when fetching
resumes, if the
-  * window length is too long.
-  *
-  * One solution to this problem would be to keep a list of the individual fetches as records.
 Then, we could
-  * "expire" a fetch by discarding the old record.  However, this is quite memory consumptive
for all but the
-  * smallest intervals.
-  *
-  * Another, better, solution is to hook into the start and end of individual fetches.  These
will, presumably, occur
-  * at the fastest possible rate without long pauses spent doing something else.  The only
complication is that
-  * fetches may well overlap, so we need to "reference count" the fetches to know when to
reset the counters.
-  * For "fetches per second", we can simply make sure we "schedule" the next fetch at an
appropriate time, rather
-  * than keep records around.  The overall rate may therefore be somewhat less than the specified
rate, but that's perfectly
-  * acceptable.
-  *
-  * For the "maximum open connections" limit, the best thing would be to establish a separate
MultiThreadedConnectionPool
-  * for each Server.  Then, the limit would be automatic.
-  *
-  * Some notes on the algorithms used to limit server bandwidth impact
-  * ==================================================================
-  *
-  * In a single connection case, the algorithm we'd want to use works like this.  On the
first chunk of a series,
-  * the total length of time and the number of bytes are recorded.  Then, prior to each subsequent
chunk, a calculation
-  * is done which attempts to hit the bandwidth target by the end of the chunk read, using
the rate of the first chunk
-  * access as a way of estimating how long it will take to fetch those next n bytes.
-  *
-  * For a multi-connection case, which this is, it's harder to either come up with a good
maximum bandwidth estimate,
-  * and harder still to "hit the target", because simultaneous fetches will intrude.  The
strategy is therefore:
-  *
-  * 1) The first chunk of any series should proceed without interference from other connections
to the same server.
-  *    The goal here is to get a decent quality estimate without any possibility of overwhelming
the server.
-  *
-  * 2) The bandwidth of the first chunk is treated as the "maximum bandwidth per connection".
 That is, if other
-  *    connections are going on, we can presume that each connection will use at most the
bandwidth that the first fetch
-  *    took.  Thus, by generating end-time estimates based on this number, we are actually
being conservative and
-  *    using less server bandwidth.
-  *
-  * 3) For chunks that have started but not finished, we keep track of their size and estimated
elapsed time in order to schedule when
-  *    new chunks from other connections can start.
-  *
-  */
-  protected class Server
-  {
-    /** The fqdn of the server */
-    protected final String serverName;
-    /** This is the time of the next allowed fetch (in ms since epoch) */
-    protected long nextFetchTime = 0L;
-
-    // Bandwidth throttling variables
-    /** Reference count for bandwidth variables */
-    protected volatile int refCount = 0;
-    /** The inverse rate estimate of the first fetch, in ms/byte */
-    protected double rateEstimate = 0.0;
-    /** Flag indicating whether a rate estimate is needed */
-    protected volatile boolean estimateValid = false;
-    /** Flag indicating whether rate estimation is in progress yet */
-    protected volatile boolean estimateInProgress = false;
-    /** The start time of this series */
-    protected long seriesStartTime = -1L;
-    /** Total actual bytes read in this series; this includes fetches in progress */
-    protected long totalBytesRead = -1L;
-
-    /** Outstanding connection counter */
-    protected volatile int outstandingConnections = 0;
-
-    /** Constructor */
-    public Server(String serverName)
-    {
-      this.serverName = serverName;
-    }
-
-    /** Get the fqdn of the server */
-    public String getServerName()
-    {
-      return serverName;
-    }
-
-    /** Register an outstanding connection (and wait until it can be obtained before proceeding)
*/
-    public synchronized void registerConnection(int maxOutstandingConnections)
-      throws ManifoldCFException
-    {
+      IOException rval = null;
       try
       {
-        while (outstandingConnections >= maxOutstandingConnections)
-        {
-          wait();
-        }
-        outstandingConnections++;
-      }
-      catch (InterruptedException e)
-      {
-        throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
-      }
-    }
-
-    /** Release an outstanding connection back into the pool */
-    public synchronized void releaseConnection()
-    {
-      outstandingConnections--;
-      notifyAll();
-    }
-
-    /** Note the start of a fetch operation.  Call this method just before the actual stream
access begins.
-    * May wait until schedule allows.
-    */
-    public void beginFetch(long minimumMillisecondsPerFetchPerServer)
-      throws InterruptedException
-    {
-      //if (Logging.connectors.isTraceEnabled())
-      //      Logging.connectors.trace("RSS: Note begin fetch for '"+serverName+"'");
-      // First, do any waiting, and reschedule as needed
-      long waitAmount = 0L;
-      long currentTime = System.currentTimeMillis();
-
-      // System.out.println("Begin fetch for server "+this.toString()+" with minimum milliseconds
per fetch of "+new Long(minimumMillisecondsPerFetchPerServer).toString()+
-      //      " Current time: "+new Long(currentTime).toString()+ " Next fetch time: "+new
Long(nextFetchTime).toString());
-
-      synchronized (this)
-      {
-        if (currentTime < nextFetchTime)
-        {
-          waitAmount = nextFetchTime-currentTime;
-          nextFetchTime = nextFetchTime + minimumMillisecondsPerFetchPerServer;
-        }
-        else
-          nextFetchTime = currentTime + minimumMillisecondsPerFetchPerServer;
-      }
-      if (waitAmount > 0L)
-      {
-        if (Logging.connectors.isDebugEnabled())
-          Logging.connectors.debug("RSS: Performing a fetch wait for server '"+serverName+"'
for "+
-          new Long(waitAmount).toString()+" ms.");
-        ManifoldCF.sleep(waitAmount);
-      }
-
-      // System.out.println("For server "+this.toString()+", at "+new Long(System.currentTimeMillis()).toString()+",
the next fetch time is now "+new Long(nextFetchTime).toString());
-
-      synchronized (this)
-      {
-        if (refCount == 0)
-        {
-          // Now, reset bandwidth throttling counters
-          estimateValid = false;
-          rateEstimate = 0.0;
-          totalBytesRead = 0L;
-          estimateInProgress = false;
-          seriesStartTime = -1L;
-        }
-        refCount++;
+        inputStream.close();
       }
-      //if (Logging.connectors.isTraceEnabled())
-      //      Logging.connectors.trace("RSS: Begin fetch noted for '"+serverName+"'");
-
-    }
-
-    /** Note the end of a fetch operation.  Call this method just after the fetch completes.
-    */
-    public void endFetch()
-    {
-      //if (Logging.connectors.isTraceEnabled())
-      //      Logging.connectors.trace("RSS: Note end fetch for '"+serverName+"'");
-
-      synchronized (this)
-      {
-        refCount--;
-      }
-
-      //if (Logging.connectors.isTraceEnabled())
-      //      Logging.connectors.trace("RSS: End fetch noted for '"+serverName+"'");
-
-    }
-
-    /** Note the start of an individual byte read of a specified size.  Call this method
just before the
-    * read request takes place.  Performs the necessary delay prior to reading specified
number of bytes from the server.
-    */
-    public void beginRead(int byteCount, double minimumMillisecondsPerBytePerServer)
-      throws InterruptedException
-    {
-      //if (Logging.connectors.isTraceEnabled())
-      //      Logging.connectors.trace("RSS: Note begin read for '"+serverName+"'");
-
-      long currentTime = System.currentTimeMillis();
-
-      synchronized (this)
-      {
-        while (estimateInProgress)
-          wait();
-        if (estimateValid == false)
-        {
-          seriesStartTime = currentTime;
-          estimateInProgress = true;
-          // Add these bytes to the estimated total
-          totalBytesRead += (long)byteCount;
-          // Exit early; this thread isn't going to do any waiting
-          //if (Logging.connectors.isTraceEnabled())
-          //      Logging.connectors.trace("RSS: Read begin noted; gathering stats for '"+serverName+"'");
-
-          return;
-        }
-      }
-
-      // It is possible for the following code to get interrupted.  If that happens,
-      // we have to unstick the threads that are waiting on the estimate!
-      boolean finished = false;
-      try
-      {
-        long waitTime = 0L;
-        synchronized (this)
-        {
-          // Add these bytes to the estimated total
-          totalBytesRead += (long)byteCount;
-
-          // Estimate the time this read will take, and wait accordingly
-          long estimatedTime = (long)(rateEstimate * (double)byteCount);
-
-          // Figure out how long the total byte count should take, to meet the constraint
-          long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerBytePerServer);
-
-          // The wait time is the different between our desired end time, minus the estimated
time to read the data, and the
-          // current time.  But it can't be negative.
-          waitTime = (desiredEndTime - estimatedTime) - currentTime;
-        }
-
-        if (waitTime > 0L)
-        {
-          if (Logging.connectors.isDebugEnabled())
-            Logging.connectors.debug("RSS: Performing a read wait on server '"+serverName+"'
of "+
-            new Long(waitTime).toString()+" ms.");
-          ManifoldCF.sleep(waitTime);
-        }
-
-        //if (Logging.connectors.isTraceEnabled())
-        //      Logging.connectors.trace("RSS: Begin read noted for '"+serverName+"'");
-        finished = true;
-      }
-      finally
-      {
-        if (!finished)
-        {
-          abortRead();
-        }
-      }
-    }
-
-    /** Abort a read in progress.
-    */
-    public void abortRead()
-    {
-      synchronized (this)
-      {
-        if (estimateInProgress)
-        {
-          estimateInProgress = false;
-          notifyAll();
-        }
-      }
-    }
-
-    /** Note the end of an individual read from the server.  Call this just after an individual
read completes.
-    * Pass the actual number of bytes read to the method.
-    */
-    public void endRead(int originalCount, int actualCount)
-    {
-      //if (Logging.connectors.isTraceEnabled())
-      //      Logging.connectors.trace("RSS: Note end read for '"+serverName+"'");
-
-      long currentTime = System.currentTimeMillis();
-
-      synchronized (this)
+      catch (IOException e)
       {
-        totalBytesRead = totalBytesRead + (long)actualCount - (long)originalCount;
-        if (estimateInProgress)
-        {
-          if (actualCount == 0)
-            // Didn't actually get any bytes, so use 0.0
-            rateEstimate = 0.0;
-          else
-            rateEstimate = ((double)(currentTime - seriesStartTime))/(double)actualCount;
-          estimateValid = true;
-          estimateInProgress = false;
-          notifyAll();
-        }
+        rval = e;
       }
-
-      //if (Logging.connectors.isTraceEnabled())
-      //      Logging.connectors.trace("RSS: End read noted for '"+serverName+"'");
-
-    }
-
-    /** Discard this server.
-    */
-    public void discard()
-    {
-      // Nothing needed anymore
+      streamThrottler.closeStream();
+      if (rval != null)
+        throw rval;
     }
 
   }
@@ -1235,10 +936,8 @@ public class ThrottledFetcher
   {
     /** The connection */
     protected final ThrottledConnection theConnection;
-    /** The connection bandwidth we want */
-    protected final double minimumMillisecondsPerBytePerServer;
-    /** The server object we use to track connections and fetches. */
-    protected final Server server;
+    /** The stream throttler */
+    protected final IStreamThrottler streamThrottler;
     /** Client and method, all preconfigured */
     protected final HttpClient httpClient;
     protected final HttpRequestBase executeMethod;
@@ -1256,15 +955,13 @@ public class ThrottledFetcher
 
     protected Throwable generalException = null;
     
-    public ExecuteMethodThread(ThrottledConnection theConnection, Server server,
-      double minimumMillisecondsPerBytePerServer,
+    public ExecuteMethodThread(ThrottledConnection theConnection, IStreamThrottler streamThrottler,
       HttpClient httpClient, HttpRequestBase executeMethod)
     {
       super();
       setDaemon(true);
       this.theConnection = theConnection;
-      this.server = server;
-      this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
+      this.streamThrottler = streamThrottler;
       this.httpClient = httpClient;
       this.executeMethod = executeMethod;
     }
@@ -1316,7 +1013,7 @@ public class ThrottledFetcher
                   bodyStream = response.getEntity().getContent();
                   if (bodyStream != null)
                   {
-                    bodyStream = new ThrottledInputstream(theConnection,server,bodyStream,minimumMillisecondsPerBytePerServer);
+                    bodyStream = new ThrottledInputstream(theConnection,streamThrottler,bodyStream);
                     threadStream = new XThreadInputStream(bodyStream);
                   }
                   streamCreated = true;



Mime
View raw message