incubator-connectors-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r911418 [6/8] - in /incubator/lcf/trunk/modules: connectors/webcrawler/connector/org/apache/lcf/crawler/connectors/webcrawler/ framework/agents/org/apache/lcf/agents/agentmanager/ framework/agents/org/apache/lcf/agents/incrementalingest/ fr...
Date Thu, 18 Feb 2010 14:31:31 GMT
Modified: incubator/lcf/trunk/modules/framework/pull-agent/org/apache/lcf/crawler/jobs/HopCount.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/org/apache/lcf/crawler/jobs/HopCount.java?rev=911418&r1=911417&r2=911418&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/org/apache/lcf/crawler/jobs/HopCount.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/org/apache/lcf/crawler/jobs/HopCount.java Thu Feb 18 14:31:31 2010
@@ -30,3325 +30,3303 @@
 */
 public class HopCount extends org.apache.lcf.core.database.BaseTable
 {
-	public static final String _rcsid = "@(#)$Id$";
+        public static final String _rcsid = "@(#)$Id$";
 
-	// Answer constants
-	public static final int ANSWER_UNKNOWN = -1;
-	public static final int ANSWER_INFINITY = -2;
-
-	// Notes on the schema
-	// ===================
-	//
-	// This schema consists of three interrelated tables.  The table controlled directly by this class
-	// is the table where cached distance results are kept.  It has a child table, which keeps track
-	// of certain dependencies, so that we have a way of figuring out relatively accurately which cached links
-	// need to be re-evaluated when there is a change.  Finally, there is a related table where intrinsic
-	// (i.e. direct) link information is kept.
-	//
-	// When links are recorded, a source document refers to target documents.  The convention here is
-	// that the source document is called the "child", and the target document is called the "parent".
-	// Also by convention, a child value of null means "the root".  Since all cached distances are to
-	// the root, we only store the "parent" in the hopcount table.
-	//
-	// Each row in the main hopcount table is linked with the child tables by means of an id field.
-	//
-	// Database table management for hopcount determination
-	// ====================================================
-	//
-	// The critical operation we want to be able to do is to propagate the effects of a change throughout
-	// the cached data.  I originally assumed that that meant "blowing the cache" - deleting all minimum
-	// hop counts stored in the database which corresponded to the link we have added or deleted.
-	// However, after the naive algorithm ran for a while, it became clear that it was not going to perform
-	// well, because the sheer quantity of dependency information made management of dependencies far
-	// exceed reason.  Caching of hopcount, however, still was clearly essential, because when I removed
-	// the caching completely, things just plain wedged.
-	//
-	// Then I realized that by far the most common activity involves adding links to the graph, and therefore
-	// if I could optimize that activity without storing huge quantities of dependency information, the 
-	// performance goals would be met.  So, this is how the thinking went:
-	//
-	// - We always start with a graph where the cached hopcount values only exist IF the hopcount values
-	//   that were needed to come up with that value also exist.  Any changes to the graph MUST preserve this
-	//   situation.
-	// - Under these conditions, adding a link between a source and target could encounter either of two conditions:
-	//   (a) the target has no cached hopcount, or
-	//   (b) the target DOES have a cached hopcount.
-	//   In case (a), we must treat the existing non-record as meaning "infinite distance", which is clearly wrong.
-	//   We therefore must create a record for that location, which has a value of infinity.  After that, treat this
-	//   the exact same way as for (b).
-	//   In the case of (b), we need to re-evaluate the hopcount with the new link in place,
-	//   and compare it against the existing hopcount.  The new value cannot be larger (unless the table was somehow corrupted),
-	//   because adding a link can NEVER increase a hopcount.  If the new hopcount is less than the old, then
-	//   we change the value in the table, and examine all the target nodes in the same way.  Most likely, the
-	//   propagation will stop quickly, because there are lots of ways of getting to a node and this is just one
-	//   of them.
-	// - When a link is deleted, we run the risk of leaving around disconnected loops that evaluate forever, if
-	//   we use the same propagation algorithm.  So instead, we want to keep track of what nodes will need reevaluation
-	//   when a link is destroyed.  This list is relatively small, since only the shortest possible path to a node
-	//   is represented in this dependency information.
-	//   So, when a link is deleted, the following steps take place.  All the dependent hopcount nodes are queued, but
-	//   in such a way as to be reset to having an "infinite" distance.  Then, re-evaluation occurs in the same manner as for
-	//   the add case above.
-	// - In order to determine the hopcount value of a node at any given time, all you need to do is to look for a cached
-	//   hopcount value.  If you find it, that's the right number.  If you don't, you can presume the value is infinity.
-	//
-	//
-	// Activities that should occur when a hopcount changes
-	// ====================================================
-	//
-	// Documents in the job queue may be excluded from consideration based on hopcount.  If the hopcount for a document changes
-	// (decreases), this assessment could well change.  Therefore, this hopcount module MUST cause documents to be switched
-	// to a "pending" state whenever a hopcount change occurs that makes the document pass its hopcount filtering criteria.
-	//
-	//
-
-	// Field names
-	public static final String idField = "id";
-	public static final String jobIDField = "jobid";
-	public static final String linkTypeField = "linktype";
-	public static final String parentIDHashField = "parentidhash";
-	public static final String distanceField = "distance";
-	public static final String markForDeathField = "deathmark";
-
-	// Mark for death status
-	public static final int MARK_NORMAL = 0;
-	public static final int MARK_QUEUED = 1;
-	public static final int MARK_DELETING = 2;
-
-	protected static Map markMap;
-
-	static
-	{
-		markMap = new HashMap();
-		markMap.put("N",new Integer(MARK_NORMAL));
-		markMap.put("Q",new Integer(MARK_QUEUED));
-		markMap.put("D",new Integer(MARK_DELETING));
-	}
-
-	/** Counter for kicking off analyze */
-	protected static AnalyzeTracker tracker = new AnalyzeTracker();
-	/** Counter for kicking off reindex */
-	protected static AnalyzeTracker reindexTracker = new AnalyzeTracker();
-	
-	// The number of updates before doing a reindex
-	protected static long REINDEX_COUNT = 250000L;
-	
-	/** Intrinsic link table manager. */
-	protected IntrinsicLink intrinsicLinkManager;
-	/** Hop "delete" dependencies manager */
-	protected HopDeleteDeps deleteDepsManager;
-
-	/** Constructor.
-	*@param database is the database handle.
-	*/
-	public HopCount(IDBInterface database)
-		throws LCFException
-	{
-		super(database,"hopcount");
-		intrinsicLinkManager = new IntrinsicLink(database);
-		deleteDepsManager = new HopDeleteDeps(database);
-	}
-
-	/** Install or upgrade.
-	*/
-	public void install(String jobsTable, String jobsColumn)
-		throws LCFException
-	{
-                // Do schema first
-		beginTransaction();
-		try
-		{
-			Map existing = getTableSchema(null,null);
-			if (existing == null)
-			{
-				HashMap map = new HashMap();
-				map.put(idField,new ColumnDescription("BIGINT",true,false,null,null,false));
-				map.put(jobIDField,new ColumnDescription("BIGINT",false,false,jobsTable,jobsColumn,false));
-				map.put(linkTypeField,new ColumnDescription("VARCHAR(255)",false,true,null,null,false));
-				map.put(parentIDHashField,new ColumnDescription("VARCHAR(40)",false,false,null,null,false));
-				map.put(distanceField,new ColumnDescription("BIGINT",false,true,null,null,false));
-				map.put(markForDeathField,new ColumnDescription("CHAR(1)",false,false,null,null,false));
-				performCreate(map,null);
-
-			}
-			else
-			{
-				ColumnDescription cd;
-				
-				// Get rid of parentID field, if found
-				cd = (ColumnDescription)existing.get("parentid");
-				if (cd != null)
-				{
-					ArrayList list = new ArrayList();
-					list.add("parentid");
-					performAlter(null,null,list,null);
-				}
-			}
-		}
-		catch (LCFException e)
-		{
-			signalRollback();
-			throw e;
-		}
-		catch (Error e)
-		{
-			signalRollback();
-			throw e;
-		}
-		finally
-		{
-			endTransaction();
-		}
-                
-		// Do indexes
-		IndexDescription jobLinktypeParentIndex = new IndexDescription(true,new String[]{jobIDField,linkTypeField,parentIDHashField});
-		IndexDescription jobParentIndex = new IndexDescription(false,new String[]{jobIDField,parentIDHashField});
-		IndexDescription jobDeathIndex = new IndexDescription(false,new String[]{jobIDField,markForDeathField});
+        // Answer constants
+        public static final int ANSWER_UNKNOWN = -1;
+        public static final int ANSWER_INFINITY = -2;
+
+        // Notes on the schema
+        // ===================
+        //
+        // This schema consists of three interrelated tables.  The table controlled directly by this class
+        // is the table where cached distance results are kept.  It has a child table, which keeps track
+        // of certain dependencies, so that we have a way of figuring out relatively accurately which cached links
+        // need to be re-evaluated when there is a change.  Finally, there is a related table where intrinsic
+        // (i.e. direct) link information is kept.
+        //
+        // When links are recorded, a source document refers to target documents.  The convention here is
+        // that the source document is called the "child", and the target document is called the "parent".
+        // Also by convention, a child value of null means "the root".  Since all cached distances are to
+        // the root, we only store the "parent" in the hopcount table.
+        //
+        // Each row in the main hopcount table is linked with the child tables by means of an id field.
+        //
+        // Database table management for hopcount determination
+        // ====================================================
+        //
+        // The critical operation we want to be able to do is to propagate the effects of a change throughout
+        // the cached data.  I originally assumed that that meant "blowing the cache" - deleting all minimum
+        // hop counts stored in the database which corresponded to the link we have added or deleted.
+        // However, after the naive algorithm ran for a while, it became clear that it was not going to perform
+        // well, because the sheer quantity of dependency information made management of dependencies far
+        // exceed reason.  Caching of hopcount, however, still was clearly essential, because when I removed
+        // the caching completely, things just plain wedged.
+        //
+        // Then I realized that by far the most common activity involves adding links to the graph, and therefore
+        // if I could optimize that activity without storing huge quantities of dependency information, the 
+        // performance goals would be met.  So, this is how the thinking went:
+        //
+        // - We always start with a graph where the cached hopcount values only exist IF the hopcount values
+        //   that were needed to come up with that value also exist.  Any changes to the graph MUST preserve this
+        //   situation.
+        // - Under these conditions, adding a link between a source and target could encounter either of two conditions:
+        //   (a) the target has no cached hopcount, or
+        //   (b) the target DOES have a cached hopcount.
+        //   In case (a), we must treat the existing non-record as meaning "infinite distance", which is clearly wrong.
+        //   We therefore must create a record for that location, which has a value of infinity.  After that, treat this
+        //   the exact same way as for (b).
+        //   In the case of (b), we need to re-evaluate the hopcount with the new link in place,
+        //   and compare it against the existing hopcount.  The new value cannot be larger (unless the table was somehow corrupted),
+        //   because adding a link can NEVER increase a hopcount.  If the new hopcount is less than the old, then
+        //   we change the value in the table, and examine all the target nodes in the same way.  Most likely, the
+        //   propagation will stop quickly, because there are lots of ways of getting to a node and this is just one
+        //   of them.
+        // - When a link is deleted, we run the risk of leaving around disconnected loops that evaluate forever, if
+        //   we use the same propagation algorithm.  So instead, we want to keep track of what nodes will need reevaluation
+        //   when a link is destroyed.  This list is relatively small, since only the shortest possible path to a node
+        //   is represented in this dependency information.
+        //   So, when a link is deleted, the following steps take place.  All the dependent hopcount nodes are queued, but
+        //   in such a way as to be reset to having an "infinite" distance.  Then, re-evaluation occurs in the same manner as for
+        //   the add case above.
+        // - In order to determine the hopcount value of a node at any given time, all you need to do is to look for a cached
+        //   hopcount value.  If you find it, that's the right number.  If you don't, you can presume the value is infinity.
+        //
+        //
+        // Activities that should occur when a hopcount changes
+        // ====================================================
+        //
+        // Documents in the job queue may be excluded from consideration based on hopcount.  If the hopcount for a document changes
+        // (decreases), this assessment could well change.  Therefore, this hopcount module MUST cause documents to be switched
+        // to a "pending" state whenever a hopcount change occurs that makes the document pass its hopcount filtering criteria.
+        //
+        //
+
+        // Field names
+        public static final String idField = "id";
+        public static final String jobIDField = "jobid";
+        public static final String linkTypeField = "linktype";
+        public static final String parentIDHashField = "parentidhash";
+        public static final String distanceField = "distance";
+        public static final String markForDeathField = "deathmark";
+
+        // Mark for death status
+        public static final int MARK_NORMAL = 0;
+        public static final int MARK_QUEUED = 1;
+        public static final int MARK_DELETING = 2;
+
+        protected static Map markMap;
+
+        static
+        {
+                markMap = new HashMap();
+                markMap.put("N",new Integer(MARK_NORMAL));
+                markMap.put("Q",new Integer(MARK_QUEUED));
+                markMap.put("D",new Integer(MARK_DELETING));
+        }
+
+        /** Counter for kicking off analyze */
+        protected static AnalyzeTracker tracker = new AnalyzeTracker();
+        /** Counter for kicking off reindex */
+        protected static AnalyzeTracker reindexTracker = new AnalyzeTracker();
+        
+        // The number of updates before doing a reindex
+        protected static long REINDEX_COUNT = 250000L;
+        
+        /** Intrinsic link table manager. */
+        protected IntrinsicLink intrinsicLinkManager;
+        /** Hop "delete" dependencies manager */
+        protected HopDeleteDeps deleteDepsManager;
+
+        /** Constructor.
+        *@param database is the database handle.
+        */
+        public HopCount(IDBInterface database)
+                throws LCFException
+        {
+                super(database,"hopcount");
+                intrinsicLinkManager = new IntrinsicLink(database);
+                deleteDepsManager = new HopDeleteDeps(database);
+        }
+
+        /** Install or upgrade.
+        */
+        public void install(String jobsTable, String jobsColumn)
+                throws LCFException
+        {
+                // Per convention, always have outer loop in install() methods
+                while (true)
+                {
+                        Map existing = getTableSchema(null,null);
+                        if (existing == null)
+                        {
+                                HashMap map = new HashMap();
+                                map.put(idField,new ColumnDescription("BIGINT",true,false,null,null,false));
+                                map.put(jobIDField,new ColumnDescription("BIGINT",false,false,jobsTable,jobsColumn,false));
+                                map.put(linkTypeField,new ColumnDescription("VARCHAR(255)",false,true,null,null,false));
+                                map.put(parentIDHashField,new ColumnDescription("VARCHAR(40)",false,false,null,null,false));
+                                map.put(distanceField,new ColumnDescription("BIGINT",false,true,null,null,false));
+                                map.put(markForDeathField,new ColumnDescription("CHAR(1)",false,false,null,null,false));
+                                performCreate(map,null);
+
+                        }
+                        else
+                        {
+                                // Upgrade goes here, if needed
+                        }
+
+                        // Do child tables.
+                        intrinsicLinkManager.install(jobsTable,jobsColumn);
+                        deleteDepsManager.install(jobsTable,jobsColumn,getTableName(),idField);
+                        
+                        // Do indexes
+                        IndexDescription jobLinktypeParentIndex = new IndexDescription(true,new String[]{jobIDField,linkTypeField,parentIDHashField});
+                        IndexDescription jobParentIndex = new IndexDescription(false,new String[]{jobIDField,parentIDHashField});
+                        IndexDescription jobDeathIndex = new IndexDescription(false,new String[]{jobIDField,markForDeathField});
+                        
+                        Map indexes = getTableIndexes(null,null);
+                        Iterator iter = indexes.keySet().iterator();
+                        while (iter.hasNext())
+                        {
+                                String indexName = (String)iter.next();
+                                IndexDescription id = (IndexDescription)indexes.get(indexName);
+                                    
+                                if (jobLinktypeParentIndex != null && id.equals(jobLinktypeParentIndex))
+                                        jobLinktypeParentIndex = null;
+                                else if (jobParentIndex != null && id.equals(jobParentIndex))
+                                        jobParentIndex = null;
+                                else if (jobDeathIndex != null && id.equals(jobDeathIndex))
+                                        jobDeathIndex = null;
+                                else if (indexName.indexOf("_pkey") == -1)
+                                        // This index shouldn't be here; drop it
+                                        performRemoveIndex(indexName);
+                        }
+
+                        if (jobLinktypeParentIndex != null)
+                                performAddIndex(null,jobLinktypeParentIndex);
+
+                        if (jobParentIndex != null)
+                                performAddIndex(null,jobParentIndex);
+
+                        if (jobDeathIndex != null)
+                                performAddIndex(null,jobDeathIndex);
+                        
+                        break;
+                }
+        }
+
+        /** Uninstall.
+        */
+        public void deinstall()
+                throws LCFException
+        {
+                beginTransaction();
+                try
+                {
+                        deleteDepsManager.deinstall();
+                        intrinsicLinkManager.deinstall();
+                        performDrop(null);
+                }
+                catch (LCFException e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                catch (Error e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                finally
+                {
+                        endTransaction();
+                }
+
+        }
+
+        /** Go from string to mark.
+        *@param value is the string.
+        *@return the status value.
+        */
+        public static int stringToMark(String value)
+                throws LCFException
+        {
+                Integer x = (Integer)markMap.get(value);
+                if (x == null)
+                        throw new LCFException("Bad mark value: '"+value+"'");
+                return x.intValue();
+        }
+
+        /** Go from mark to string.
+        *@param mark is the mark.
+        *@return the string.
+        */
+        public static String markToString(int mark)
+                throws LCFException
+        {
+                switch (mark)
+                {
+                case MARK_NORMAL:
+                        return "N";
+                case MARK_QUEUED:
+                        return "Q";
+                case MARK_DELETING:
+                        return "D";
+                default:
+                        throw new LCFException("Bad mark value");
+                }
+        }
+
+        /** Delete an owner (and clean up the corresponding hopcount rows).
+        */
+        public void deleteOwner(Long jobID)
+                throws LCFException
+        {
+                beginTransaction();
+                try
+                {
+                        // Delete the intrinsic rows belonging to this job.
+                        intrinsicLinkManager.deleteOwner(jobID);
+
+                        // Delete the deletedeps rows
+                        deleteDepsManager.deleteJob(jobID);
+
+                        // Delete our own rows.
+                        ArrayList list = new ArrayList();
+                        list.add(jobID);
+                        performDelete("WHERE "+jobIDField+"=?",list,null);
+                        reindexTracker.noteInsert();
+                }
+                catch (LCFException e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                catch (Error e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                finally
+                {
+                        endTransaction();
+                }
+        }
+
+        /** Reset, at startup time.
+        */
+        public void reset()
+                throws LCFException
+        {
+                beginTransaction();
+                try
+                {
+                        intrinsicLinkManager.reset();
+                }
+                catch (LCFException e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                catch (Error e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                finally
+                {
+                        endTransaction();
+                }
+        }
+
+        /** Record a references from a set of documents to the root.  These will be marked as "new" or "existing", and
+        * will have a null linktype.
+        */
+        public void recordSeedReferences(Long jobID, String[] legalLinkTypes, String[] targetDocumentIDHashes, int hopcountMethod)
+                throws LCFException
+        {
+                doRecord(jobID,legalLinkTypes,"",targetDocumentIDHashes,"",hopcountMethod);
+        }
+
+        /** Finish seed references.  Seed references are special in that the only source is the root.
+        */
+        public void finishSeedReferences(Long jobID, String[] legalLinkTypes, int hopcountMethod)
+                throws LCFException
+        {
+                doFinish(jobID,legalLinkTypes,new String[]{""},hopcountMethod);
+        }
+
+        /** Record a reference from source to target.  This reference will be marked as "new" or "existing".
+        */
+        public void recordReference(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String targetDocumentIDHash, String linkType,
+                int hopcountMethod)
+                throws LCFException
+        {
+                doRecord(jobID,legalLinkTypes,sourceDocumentIDHash,new String[]{targetDocumentIDHash},linkType,hopcountMethod);
+        }
+
+        /** Record a set of references from source to target.  This reference will be marked as "new" or "existing".
+        */
+        public void recordReferences(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String[] targetDocumentIDHashes, String linkType,
+                int hopcountMethod)
+                throws LCFException
+        {
+                doRecord(jobID,legalLinkTypes,sourceDocumentIDHash,targetDocumentIDHashes,linkType,hopcountMethod);
+        }
+
+        /** Complete a recalculation pass for a set of source documents.  All child links that are not marked as "new"
+        * or "existing" will be removed.  At the completion of this pass, the links will have their "new" flag cleared.
+        */
+        public void finishParents(Long jobID, String[] legalLinkTypes, String[] sourceDocumentHashes, int hopcountMethod)
+                throws LCFException
+        {
+                doFinish(jobID,legalLinkTypes,sourceDocumentHashes,hopcountMethod);
+        }
+
+        /** Do the work of recording source-target references. */
+        protected void doRecord(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String[] targetDocumentIDHashes, String linkType,
+                int hopcountMethod)
+                throws LCFException
+        {
+
+                // We have to both add the reference, AND invalidate appropriate cached hopcounts (if it is a NEW
+                // link.)
+                beginTransaction();
+                try
+                {
+                        String[] newReferences = intrinsicLinkManager.recordReferences(jobID,sourceDocumentIDHash,targetDocumentIDHashes,linkType);
+                        if (newReferences.length > 0)
+                        {
+                                // There are added links.
+
+                                // The add causes hopcount records to be queued for processing (and created if they don't exist).
+                                // ALL the hopcount records for the target document ids must be queued, for all the link types
+                                // there are for this job.  Other times, the queuing requirement is less stringent, such as
+                                // when a hopcount for one linktype changes.  In those cases we only want to queue up hopcount
+                                // records corresponding to the changed record.
+
+                                // What we need to do is create a queue which contains only the target hopcount table rows, if they
+                                // exist.  Then we run the update algorithm until the cache is empty.
+
+                                if (Logging.hopcount.isDebugEnabled())
+                                        Logging.hopcount.debug("Queueing "+Integer.toString(targetDocumentIDHashes.length)+" documents");
+
+                                // Since we really want efficiency, we can write the answer in place now, based on the current
+                                // hopcount rows.  This works even if the current row is out of date, because if we change the
+                                // current row's value, the target rows will be requeued at that point.
+
+                                // When we record new links, we must come up with an initial calculation or requeue ALL legal link
+                                // types.  If this isn't done, then we cannot guarantee that the target record will exist - and
+                                // somebody will then interpret the distance as being 'infinity'.
+                                //
+                                // It would be possible to change this but we would then also need to change how a missing record
+                                // would be interpreted.
+
+                                //if (!(linkType == null || linkType.length() == 0))
+                                //	legalLinkTypes = new String[]{linkType};
+
+                                // So, let's load what we have for hopcount and dependencies for sourceDocumentID.
+
+                                Answer[] estimates = new Answer[legalLinkTypes.length];
+
+                                if (sourceDocumentIDHash == null || sourceDocumentIDHash.length() == 0)
+                                {
+                                        int i = 0;
+                                        while (i < estimates.length)
+                                        {
+                                                estimates[i++] = new Answer(0);
+                                        }
+                                }
+                                else
+                                {
+                                        ArrayList list = new ArrayList();
+                                        StringBuffer sb = new StringBuffer("SELECT ");
+                                        sb.append(idField).append(",").append(distanceField).append(",").append(linkTypeField)
+                                                .append(" FROM ").append(getTableName()).append(" WHERE ");
+                                        int i = 0;
+                                        while (i < legalLinkTypes.length)
+                                        {
+                                                if (i > 0)
+                                                        sb.append(" OR ");
+                                                sb.append("(").append(jobIDField).append("=? AND ")
+                                                        .append(linkTypeField).append("=? AND ").append(parentIDHashField).append("=?)");
+                                                list.add(jobID);
+                                                list.add(legalLinkTypes[i++]);
+                                                list.add(sourceDocumentIDHash);
+                                        }
+
+                                        IResultSet set = performQuery(sb.toString(),list,null,null);
+                                        HashMap answerMap = new HashMap();
+                                        i = 0;
+                                        while (i < estimates.length)
+                                        {
+                                                estimates[i] = new Answer(ANSWER_INFINITY);
+                                                answerMap.put(legalLinkTypes[i],estimates[i]);
+                                                i++;
+                                        }
+
+                                        i = 0;
+                                        while (i < set.getRowCount())
+                                        {
+                                                IResultRow row = set.getRow(i++);
+                                                Long id = (Long)row.getValue(idField);
+                                                DeleteDependency[] dds;
+                                                if (hopcountMethod != IJobDescription.HOPCOUNT_NEVERDELETE)
+                                                        dds = deleteDepsManager.getDeleteDependencies(id);
+                                                else
+                                                        dds = new DeleteDependency[0];
+                                                Long distance = (Long)row.getValue(distanceField);
+                                                String recordedLinkType = (String)row.getValue(linkTypeField);
+                                                Answer a = (Answer)answerMap.get(recordedLinkType);
+                                                int recordedDistance = (int)distance.longValue();
+                                                if (recordedDistance != -1)
+                                                {
+                                                        a.setAnswer(recordedDistance,dds);
+                                                }
+                                        }
+                                }
+
+                                // Now add these documents to the processing queue
+                                addToProcessingQueue(jobID,legalLinkTypes,newReferences,estimates,sourceDocumentIDHash,linkType,hopcountMethod);
+
+                                if (Logging.hopcount.isDebugEnabled())
+                                        Logging.hopcount.debug("Done queueing "+Integer.toString(targetDocumentIDHashes.length)+" documents");
+                        }
+                }
+                catch (LCFException e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                catch (Error e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                finally
+                {
+                        endTransaction();
+                }
+        }
+
+        /** Remove a set of document identifiers specified as a criteria.  This will remove hopcount rows and
+        * also intrinsic links that have the specified document identifiers are sources.
+        */
+        public void deleteMatchingDocuments(Long jobID, String[] legalLinkTypes,
+                String sourceTableName,
+                String sourceTableIDColumn, String sourceTableJobColumn, String sourceTableCriteria, int hopcountMethod)
+                throws LCFException
+        {
+                // This should work similarly to deleteDocumentIdentifiers() except that the identifiers
+                // come from a subquery rather than a list.
+                beginTransaction();
+                try
+                {
+                        // This also removes the links themselves...
+                        if (hopcountMethod == IJobDescription.HOPCOUNT_ACCURATE)
+                        {
+                                doDeleteInvalidation(jobID,legalLinkTypes,false,null,sourceTableName,
+                                        sourceTableIDColumn,sourceTableJobColumn,
+                                        sourceTableCriteria);
+                        }
+
+                }
+                catch (LCFException e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                catch (Error e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                finally
+                {
+                        endTransaction();
+                }
+
+        }
+        
+
+        /** Remove a set of document identifier hashes.  This will also remove the intrinsic links that have these document
+        * identifier hashes as sources, as well as invalidating cached hop counts that depend on them.
+        */
+        public void deleteDocumentIdentifiers(Long jobID, String[] legalLinkTypes, String[] sourceDocumentHashes, int hopcountMethod)
+                throws LCFException
+        {
+                beginTransaction();
+                try
+                {
+                        // What I want to do here is to first perform the invalidation of the cached hopcounts.
+                        //
+                        // UPDATE hopcount SET markfordeath='X' WHERE EXISTS(SELECT 'x' FROM hopdeletedeps t0 WHERE t0.ownerid=hopcount.id AND t0.jobid=<jobid>
+                        //	AND EXISTS(SELECT 'x' FROM intrinsiclinks t1 WHERE t1.linktype=t0.linktype AND t1.parentid=t0.parentid
+                        //		AND t1.childid=t0.childid AND t1.jobid=<jobid> AND t1.childid IN(<sourcedocs>)))
+                        //
+                        // ... and then, re-evaluate all hopcount records and their dependencies that are marked for delete.
+                        //
+
+
+                        // This also removes the links themselves...
+                        if (hopcountMethod == IJobDescription.HOPCOUNT_ACCURATE)
+                                doDeleteInvalidation(jobID,legalLinkTypes,false,sourceDocumentHashes,null,null,null,null);
+
+                }
+                catch (LCFException e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                catch (Error e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                finally
+                {
+                        endTransaction();
+                }
+        }
+
+        /** Calculate a bunch of hop-counts.  The values returned are only guaranteed to be an upper bound, unless
+        * the queue has recently been processed (via processQueue below).  -1 will be returned to indicate "infinity".
+        */
+        public int[] findHopCounts(Long jobID, String[] parentIdentifierHashes, String linkType)
+                throws LCFException
+        {
+                // No transaction, since we can happily interpret whatever comes back.
+                StringBuffer sb = new StringBuffer();
+                ArrayList list = new ArrayList();
+
+                int[] rval = new int[parentIdentifierHashes.length];
+                HashMap rvalMap = new HashMap();
+                int i = 0;
+                while (i < rval.length)
+                {
+                        rval[i] = -1;
+                        rvalMap.put(parentIdentifierHashes[i],new Integer(i));
+                        i++;
+                }
+
+                int maxClause = 25;
+                i = 0;
+                int k = 0;
+                while (i < parentIdentifierHashes.length)
+                {
+                        if (k == maxClause)
+                        {
+                                processFind(rval,rvalMap,sb.toString(),list);
+                                k = 0;
+                                sb.setLength(0);
+                                list.clear();
+                        }
+                        if (k > 0)
+                                sb.append(" OR");
+                        sb.append("(").append(jobIDField).append("=? AND ").append(linkTypeField)
+                                .append("=? AND ").append(parentIDHashField).append("=?)");
+                        list.add(jobID);
+                        list.add(linkType);
+                        list.add(parentIdentifierHashes[i]);
+                        k++;
+                        i++;
+                }
+                if (k > 0)
+                        processFind(rval,rvalMap,sb.toString(),list);
+
+                return rval;
+        }
+
+        /** Process a portion of a find request for hopcount information.
+        */
+        protected void processFind(int[] rval, Map rvalMap, String query, ArrayList list)
+                throws LCFException
+        {
+                IResultSet set = performQuery("SELECT "+distanceField+","+parentIDHashField+" FROM "+getTableName()+" WHERE "+query,list,null,null);
+                int i = 0;
+                while (i < set.getRowCount())
+                {
+                        IResultRow row = set.getRow(i++);
+                        String parentIDHash = (String)row.getValue(parentIDHashField);
+                        Long distance = (Long)row.getValue(distanceField);
+                        rval[((Integer)rvalMap.get(parentIDHash)).intValue()] = (int)distance.longValue();
+                }
+        }
+
+        /** Process a stage of the propagation queue for a job.
+        *@param jobID is the job we need to have the hopcount propagated for.
+        *@return true if the queue is empty.
+        */
+        public boolean processQueue(Long jobID, String[] legalLinkTypes, int hopcountMethod)
+                throws LCFException
+        {
+                // We can't instantiate the DocumentHash object here, because it will wind up having
+                // cached in it the answers from the previous round of calculation.  That round had
+                // a different set of marked nodes than the current round.
+
+                ArrayList list = new ArrayList();
+
+                // Pick off up to n queue items at a time.  We don't want to pick off too many (because
+                // then we wind up delaying other threads too much), nor do we want to do one at a time
+                // (because that is inefficient against the database), so I picked 200 as being 200+x faster
+                // than 1...
+                list.clear();
+                list.add(jobID);
+                list.add(markToString(MARK_QUEUED));
+                IResultSet set = performQuery("SELECT "+linkTypeField+","+parentIDHashField+" FROM "+
+                        getTableName()+" WHERE "+jobIDField+"=? AND "+markForDeathField+"=?"+" LIMIT 200 FOR UPDATE",list,null,null);
+
+                // No more entries == we are done
+                if (set.getRowCount() == 0)
+                        return true;
+
+                DocumentHash dh = new DocumentHash(jobID,legalLinkTypes,hopcountMethod);
+
+                Question[] questions = new Question[set.getRowCount()];
+
+                int i = 0;
+                while (i < set.getRowCount())
+                {
+                        IResultRow row = set.getRow(i);
+                        String parentIdentifierHash = (String)row.getValue(parentIDHashField);
+                        String linkType = (String)row.getValue(linkTypeField);
+
+                        // All documents in the set have the same basic assumptions; another set may be queued
+                        // as a side effect of some of these getting resolved, but treating them in chunks
+                        // seems like it should not cause problems (because the same underlying assumptions
+                        // underlie the whole chunk).  The side effects *may* cause other documents that are
+                        // still in the queue to be evaluated as well, in which case they will disappear from 
+                        // the queue and not be processed further.
+
+                        // Create a document hash object.
+                        questions[i] = new Question(parentIdentifierHash,linkType);
+                        i++;
+                }
+
+                // We don't care what the response is; we just want the documents to leave the queue.
+                dh.askQuestions(questions);
+                return false;
+        }
+
+
+        /** Limited find for missing records.
+        */
+        protected void performFindMissingRecords(String query, ArrayList list, Map matchMap)
+                throws LCFException
+        {
+                // The naive query is this - but postgres does not find the index this way:
+                //IResultSet set = performQuery("SELECT "+parentIDField+","+linkTypeField+" FROM "+getTableName()+" WHERE "+
+                //	parentIDField+" IN("+query+") AND "+jobIDField+"=?",list,null,null);
+                IResultSet set = performQuery("SELECT "+parentIDHashField+","+linkTypeField+","+distanceField+" FROM "+getTableName()+" WHERE "+query,list,null,null);
+                int i = 0;
+                while (i < set.getRowCount())
+                {
+                        IResultRow row = set.getRow(i++);
+                        String docIDHash = (String)row.getValue(parentIDHashField);
+                        String linkType = (String)row.getValue(linkTypeField);
+                        Long distance = (Long)row.getValue(distanceField);
+                        Question q = new Question(docIDHash,linkType);
+                        matchMap.put(q,distance);
+                }
+        }
+
+
+        /** Add documents to the processing queue.  For the supplied bunch of link types and document ids,
+        * the corresponding hopcount records will be marked as being queued.  If, for example, the affected link types
+        * are 'link' and 'redirect', and the specified document id's are 'A' and 'B' and 'C', then six hopcount
+        * rows will be created and/or queued.
+        * The values that this code uses for initial distance or delete dependencies for each of the hopcount
+        * rows combinatorially described above are calculated by this method by starting with the passed-in hopcount values
+        * and dependencies for each of the affectedLinkTypes for the specified "source" document.  The result estimates are then
+        * generated by passing these values and dependencies over the links to the target document identifiers, presuming that
+        * the link is of the supplied link type.
+        * 
+        *@param jobID is the job the documents belong to.
+        *@param affectedLinkTypes are the set of affected link types.
+        *@param documentIDHashes are the documents to add.
+        *@param sourceDocumentIDHash is the source document identifier for the links from source to target documents.
+        *@param estimates are the current estimated answers and delete dependencies at the sourceDocumentID for
+        * each of the affectedLinkType values.  These answers, if not null, will be used to calculate 
+        *@param linkEstimates, if not null, provide estimated hopcount distances for each affected link type,
+        * for all the documentIDs.
+        */
+        protected void addToProcessingQueue(Long jobID, String[] affectedLinkTypes, String[] documentIDHashes,
+                Answer[] startingAnswers, String sourceDocumentIDHash, String linkType, int hopcountMethod)
+                throws LCFException
+        {
+                // If we're given the source hopcount distances, we should write the derived target values into the NEW
+                // hopcount records we create, because it will save much database access in the long run, and handles the
+                // typical case in an inexpensive way.  These records do not even need to be queued - since we are creating
+                // them, we know there are no other paths to them yet (or paths that depend upon them).  So we can write in
+                // 'final' values, which will need to be updated only if the source hopcount row's distance is lowered (and
+                // then, the targets will all be requeued anyhow).
+                //
+                // For EXISTING hopcount rows, I've opted to not consider the passed-in distance estimates.  Even if I should
+                // detect that the hopcount has improved, there would still be the requirement of requeuing all the target's
+                // targets.  This kind of propagation is probably best handled by the normal queue processing code, which does
+                // as much in bulk as is possible.  So, for existing target hopcount rows, they simply get queued.
+
+                if (Logging.hopcount.isDebugEnabled())
+                {
+                        Logging.hopcount.debug("Adding "+Integer.toString(documentIDHashes.length)+" documents to processing queue");
+                        int z = 0;
+                        while (z < documentIDHashes.length)
+                        {
+                                Logging.hopcount.debug("  Adding '"+documentIDHashes[z++]+"' to processing queue");
+                        }
+                        Logging.hopcount.debug("The source id is '"+sourceDocumentIDHash+"' and linktype is '"+linkType+"', and there are "+
+                                Integer.toString(affectedLinkTypes.length)+" affected link types, as below:");
+                        z = 0;
+                        while (z < affectedLinkTypes.length)
+                        {
+                                Logging.hopcount.debug("  Linktype '"+affectedLinkTypes[z]+"', current distance "+Integer.toString(startingAnswers[z].getAnswer())+" with "+
+                                        Integer.toString(startingAnswers[z].countDeleteDependencies())+" delete dependencies.");
+                                z++;
+                        }
+                }
+
+
+                // If hopcount records for the targets for the links don't yet exist, we had better create them,
+                // so we can make sure they are added to the queue properly.
+
+                // Make a map of the combinations of link type and document id we want to have present
+                HashMap matchMap = new HashMap();
+
+                // Make a map from the link type to the corresponding Answer object
+                HashMap answerMap = new HashMap();
+                int u = 0;
+                while (u < affectedLinkTypes.length)
+                {
+                        answerMap.put(affectedLinkTypes[u],startingAnswers[u]);
+                        u++;
+                }
+
+                // Do this in a transaction
+                beginTransaction();
+                try
+                {
+                        // I don't think we have to throw a table lock here, because even though we base decisions for insertion on the lack of existence
+                        // of a record, there can be only one thread in here at a time.
+                        
+                        // Sigh.  Postgresql is not smart enough to recognize that it can use an index when there's
+                        // an IN clause for one index field, and = clauses for the others.  So I have to generate this
+                        // as ORed together match tuples.  I do 25 at a pop, which is arbitrary.
+
+                        int maxClause = 25;
+                        StringBuffer sb = new StringBuffer();
+                        ArrayList list = new ArrayList();
+                        int i = 0;
+                        int k = 0;
+                        while (i < documentIDHashes.length)
+                        {
+                            String documentIDHash = documentIDHashes[i];
+                            int j = 0;
+                            while (j < affectedLinkTypes.length)
+                            {
+                                String affectedLinkType = affectedLinkTypes[j];
+                                if (k == maxClause)
+                                {
+                                        performFindMissingRecords(sb.toString(),list,matchMap);
+                                        k = 0;
+                                        list.clear();
+                                        sb.setLength(0);
+                                }
+                                if (k > 0)
+                                        sb.append(" OR");
+                                sb.append(" (").append(jobIDField).append("=? AND ")
+                                        .append(linkTypeField).append("=? AND ").append(parentIDHashField).append("=?)");
+                                list.add(jobID);
+                                list.add(affectedLinkType);
+                                list.add(documentIDHash);
+                                k++;
+                                j++;
+                            }
+                            i++;
+                        }
+                        if (k > 0)
+                                performFindMissingRecords(sb.toString(),list,matchMap);
+
+                        // Repeat our pass through the documents and legal link types.  For each document/legal link type,
+                        // see if there was an existing row.  If not, we create a row.  If so, we compare the recorded
+                        // distance against the distance estimate we would have given it.  If the new distance is LOWER, it gets left around
+                        // for queuing.
+
+                        HashMap map = new HashMap();
+                        i = 0;
+                        while (i < documentIDHashes.length)
+                        {
+                            String documentIDHash = documentIDHashes[i];
+                            int j = 0;
+                            while (j < affectedLinkTypes.length)
+                            {
+                                String affectedLinkType = affectedLinkTypes[j];
+                                Question q = new Question(documentIDHash,affectedLinkType);
+
+                                // Calculate what our new answer would be.
+                                Answer startingAnswer = (Answer)answerMap.get(affectedLinkType);
+                                int newAnswerValue = startingAnswer.getAnswer();
+                                if (newAnswerValue >= 0 && affectedLinkType.equals(linkType))
+                                        newAnswerValue++;
+
+                                // Now, see if there's a distance already present.
+                                Long currentDistance = (Long)matchMap.get(q);
+                                if (currentDistance == null)
+                                {
+                                        // Prepare to do an insert.
+                                        // The dependencies are the old dependencies, plus the one we are about to add.
+                                        DeleteDependency dd = new DeleteDependency(linkType,documentIDHash,sourceDocumentIDHash);
+                                        // Build a new answer, based on the starting answer and the kind of link this is.
+                                        map.clear();
+                                        Long hopCountID = new Long(IDFactory.make());
+                                        map.put(idField,hopCountID);
+                                        map.put(parentIDHashField,q.getDocumentIdentifierHash());
+                                        map.put(linkTypeField,q.getLinkType());
+                                        if (newAnswerValue == ANSWER_INFINITY)
+                                                map.put(distanceField,new Long(-1L));
+                                        else
+                                                map.put(distanceField,new Long((long)newAnswerValue));
+                                        map.put(jobIDField,jobID);
+                                        map.put(markForDeathField,markToString(MARK_NORMAL));
+                                        if (Logging.hopcount.isDebugEnabled())
+                                                Logging.hopcount.debug("Inserting new record for '"+documentIDHash+"' linktype '"+affectedLinkType+"' distance "+Integer.toString(newAnswerValue)+" for job "+jobID);
+                                        performInsert(map,null);
+                                        tracker.noteInsert();
+                                        if (hopcountMethod != IJobDescription.HOPCOUNT_NEVERDELETE)
+                                        {
+                                                deleteDepsManager.writeDependency(hopCountID,jobID,dd);
+                                                Iterator iter2 = startingAnswer.getDeleteDependencies();
+                                                while (iter2.hasNext())
+                                                {
+                                                        dd = (DeleteDependency)iter2.next();
+                                                        deleteDepsManager.writeDependency(hopCountID,jobID,dd);
+                                                }
+                                        }
+                                }
+                                else
+                                {
+                                        // If the new distance >= saved distance, don't queue anything.  That means, remove it from the hash.
+                                        int oldAnswerValue = (int)currentDistance.longValue();
+                                        if (!(newAnswerValue >= 0 && (oldAnswerValue < 0 || newAnswerValue < oldAnswerValue)))
+                                        {
+                                                // New answer is no better than the old answer, so don't queue
+                                                if (Logging.hopcount.isDebugEnabled())
+                                                        Logging.hopcount.debug("Existing record for '"+documentIDHash+"' linktype '"+affectedLinkType+"' has better distance "+Integer.toString(oldAnswerValue)+
+                                                                " than new distance "+Integer.toString(newAnswerValue)+", so not queuing for job "+jobID);
+                                                matchMap.remove(q);
+                                        }
+                                }
+                                j++;
+                            }
+                            i++;
+                        }
+
+                        // For all the records still in the matchmap, queue them.
+
+                        // The query I want to run is:
+                        // UPDATE hopcount SET markfordeath='Q' WHERE jobID=? AND parentid IN (...)
+                        // but postgresql is stupid and won't use the index that way.  So do this instead:
+                        // UPDATE hopcount SET markfordeath='Q' WHERE (jobID=? AND parentid=?) OR (jobid=? AND parentid=?)...
+
+                        sb = new StringBuffer();
+                        list = new ArrayList();
+                        k = 0;
+                        i = 0;
+                        while (k < documentIDHashes.length)
+                        {
+                                String documentIDHash = documentIDHashes[k];
+                                int j = 0;
+                                while (j < affectedLinkTypes.length)
+                                {
+                                        String affectedLinkType = affectedLinkTypes[j];
+
+                                        Question q = new Question(documentIDHash,affectedLinkType);
+                                        if (matchMap.get(q) != null)
+                                        {
+                                                if (i == maxClause)
+                                                {
+                                                        performMarkAddDeps(sb.toString(),list);
+                                                        i = 0;
+                                                        sb.setLength(0);
+                                                        list.clear();
+                                                }
+                                                if (i > 0)
+                                                        sb.append(" OR");
+
+                                                // We only want to queue up hopcount records that correspond to the affected link types.
+                                                //
+                                                // Also, to reduce deadlock, do not update any records that are already marked as queued.  These would be infrequent,
+                                                // but they nevertheless seem to cause deadlock very easily.
+                                                //
+                                                if (Logging.hopcount.isDebugEnabled())
+                                                        Logging.hopcount.debug("Queuing '"+documentIDHash+"' linktype '"+affectedLinkType+"' for job "+jobID);
+                                                sb.append(" (").append(jobIDField).append("=? AND ")
+                                                        .append(linkTypeField).append("=? AND ").append(parentIDHashField).append("=? AND ").append(markForDeathField)
+                                                        .append("!=?)");
+                                                list.add(jobID);
+                                                list.add(affectedLinkType);
+                                                list.add(documentIDHash);
+                                                list.add(markToString(MARK_QUEUED));
+                                                i++;
+                                        }
+                                        j++;
+                                }
+                                k++;
+                        }
+                        if (i > 0)
+                                performMarkAddDeps(sb.toString(),list);
+
+                        // Leave the dependency records for the queued rows.  This will save lots of work if we decide not to
+                        // update the distance.  It's safe to leave the old dep records, because they must only record links that furnish
+                        // A minimal path, not THE minimal path.
+
+                }
+                catch (LCFException e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                catch (Error e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                finally
+                {
+                        endTransaction();
+                }
+                reindexTracker.noteInsert(documentIDHashes.length);
+        }
+
+        /** Do the work of marking add-dep-dependent links in the hopcount table. */
+        protected void performMarkAddDeps(String query, ArrayList list)
+                throws LCFException
+        {
+                HashMap map = new HashMap();
+                map.put(markForDeathField,markToString(MARK_QUEUED));
+                performUpdate(map,"WHERE "+query,list,null);
+        }
+
+
+        /** Method that does the work of "finishing" a set of child references. */
+        protected void doFinish(Long jobID, String[] legalLinkTypes, String[] sourceDocumentHashes, int hopcountMethod)
+                throws LCFException
+        {
+                // Go into a transaction!
+                beginTransaction();
+                try
+                {
+                        if (hopcountMethod == IJobDescription.HOPCOUNT_ACCURATE)
+                        {
+                                // First, blow the cache.
+                                //
+                                // To do this, I'd the following queries to occur:
+                                //
+                                // UPDATE hopcount SET markfordeath='Q' WHERE EXISTS(SELECT 'x' FROM hopdeletedeps t0 WHERE t0.ownerid=hopcount.id AND t0.jobid=<jobid>
+                                //	AND EXISTS(SELECT 'x' FROM intrinsiclinks t1 WHERE t1.linktype=t0.linktype AND t1.parentid=t0.parentid
+                                //		AND t1.childid=t0.childid AND t1.jobid=<jobid> AND t1.isnew=<base> AND t1.childid IN(<sourcedocs>)))
+                                //
+                                // ... and then, get rid of all hopcount records and their dependencies that are marked for delete.
+
+
+                                // Invalidate all links with the given source documents that match the common expression
+                                doDeleteInvalidation(jobID,legalLinkTypes,true,sourceDocumentHashes,null,null,null,null);
+                        }
+                        // Make all new and existing links become just "base" again.
+                        intrinsicLinkManager.restoreLinks(jobID,sourceDocumentHashes);
+                }
+                catch (LCFException e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                catch (Error e)
+                {
+                        signalRollback();
+                        throw e;
+                }
+                finally
+                {
+                        endTransaction();
+                }
+
+        }
+
+        /** Invalidate links meeting a simple criteria which have a given set of source documents.  This also runs a queue
+        * which is initialized with all the documents that have sources that exist in the hopcount table.  The purpose
+        * of that queue is to re-establish non-infinite values for all nodes that are described in IntrinsicLinks, that are
+        * still connected to the root. */
+        protected void doDeleteInvalidation(Long jobID, String[] legalLinkTypes, boolean existingOnly,
+                String[] sourceDocumentHashes, String sourceTableName,
+                String sourceTableIDColumn, String sourceTableJobColumn, String sourceTableCriteria)
+                throws LCFException
+        {
                 
-		Map indexes = getTableIndexes(null,null);
-		Iterator iter = indexes.keySet().iterator();
-		while (iter.hasNext())
-                {
-			String indexName = (String)iter.next();
-			IndexDescription id = (IndexDescription)indexes.get(indexName);
-			    
-			if (jobLinktypeParentIndex != null && id.equals(jobLinktypeParentIndex))
-				jobLinktypeParentIndex = null;
-			else if (jobParentIndex != null && id.equals(jobParentIndex))
-				jobParentIndex = null;
-			else if (jobDeathIndex != null && id.equals(jobDeathIndex))
-				jobDeathIndex = null;
-			else if (indexName.indexOf("_pkey") == -1)
-				// This index shouldn't be here; drop it
-				performRemoveIndex(indexName);
-		}
+                String commonNewExpression = null;
+                if (existingOnly)
+                        commonNewExpression = intrinsicLinkManager.newField+"="+quoteSQLString(intrinsicLinkManager.statusToString(intrinsicLinkManager.LINKSTATUS_BASE));
 
-		if (jobLinktypeParentIndex != null)
-			performAddIndex(null,jobLinktypeParentIndex);
+                // Clear up hopcount table
+                if (sourceDocumentHashes == null || sourceDocumentHashes.length > 0)
+                {
 
-		if (jobParentIndex != null)
-			performAddIndex(null,jobParentIndex);
+                        if (Logging.hopcount.isDebugEnabled())
+                        {
+                                Logging.hopcount.debug("Marking for delete for job "+jobID+" all target document references"+((commonNewExpression==null)?"":" matching '"+commonNewExpression+"'")+
+                                        " from:");
+                                if (sourceDocumentHashes != null)
+                                {
+                                        int k = 0;
+                                        while (k < sourceDocumentHashes.length)
+                                        {
+                                                Logging.hopcount.debug("  "+sourceDocumentHashes[k++]);
+                                        }
+                                }
+                                else
+                                        Logging.hopcount.debug(" table "+sourceTableName+" matching "+sourceTableCriteria);
+                        }
+
+                        
+                        if (sourceDocumentHashes != null)
+                        {
+                                // The query form I found that seems to work ok with postgresql looks like this:
+                                //
+                                // UPDATE hopcount SET x=y WHERE id IN (SELECT ownerid FROM hopdeletedeps t0
+                                //   WHERE ((t0.jobid=? AND t0.childid=?)
+                                //	 OR (t0.jobid=? AND t0.childid=?)
+                                //	 ...
+                                //	 OR (t0.jobid=? AND t0.childid=?))
+                                //	 AND EXISTS(SELECT 'x' FROM intrinsiclink t1 WHERE t1.linktype=t0.linktype
+                                //		AND t1.parentid=t0.parentid AND t1.childid=t0.childid AND t1.jobid=t0.jobid AND t1.isnew='B'))
+                                //
+                                // Here's a revised form that would take advantage of postgres's better ability to work with joins, if this should
+                                // turn out to be necessary:
+                                //
+                                // UPDATE hopcount SET x=y WHERE id IN (SELECT t0.ownerid FROM hopdeletedeps t0, intrinsiclink t1
+                                //	WHERE t1.childidhash=t0.childidhash AND t1.jobid=? AND t1.linktype=t0.linktype AND t1.parentid=t0.parentid AND t1.childid=t0.childid AND t1.isnew='B'
+                                //	AND ((t0.jobid=? AND t0.childidhash=? AND t0.childid=?)
+                                //	 OR (t0.jobid=? AND t0.childidhash=? AND t0.childid=?)
+                                //	 ...
+                                //	 OR (t0.jobid=? AND t0.childidhash=? AND t0.childid=?))
+
+                                int maxClause = 25;
+                                ArrayList list = new ArrayList();
+                                StringBuffer sb = new StringBuffer();
+                                int i = 0;
+                                int k = 0;
+                                while (i < sourceDocumentHashes.length)
+                                {
+                                        if (k == maxClause)
+                                        {
+                                                markForDelete(sb.toString(),list,commonNewExpression);
+                                                sb.setLength(0);
+                                                list.clear();
+                                                k = 0;
+                                        }
+                                        if (k > 0)
+                                                sb.append(" OR");
+                                        sb.append("(t0.").append(deleteDepsManager.jobIDField).append("=? AND t0.")
+                                                .append(deleteDepsManager.childIDHashField).append("=?)");
+                                        String sourceDocumentHash = sourceDocumentHashes[i];
+                                        list.add(jobID);
+                                        list.add(sourceDocumentHash);
+                                        i++;
+                                        k++;
+                                }
+                                if (k > 0)
+                                        markForDelete(sb.toString(),list,commonNewExpression);
+                                reindexTracker.noteInsert(sourceDocumentHashes.length);
+                        }
+                        else
+                        {
+                                // For this query, postgresql seems to not do the right thing unless the subclause is a three-way join:
+                                //
+                                // UPDATE hopcount SET x=y WHERE id IN(SELECT t0.ownerid FROM hopdeletedeps t0,jobqueue t99,intrinsiclink t1 WHERE
+                                // 	t0.jobid=? and t99.jobid=? and t1.jobid=? and
+                                // 	t0.childidhash=t99.dochash and t0.childid=t99.docid and t99.status='P' and
+                                //	t0.parentidhash=t1.parentidhash and t0.childidhash=t1.childidhash and t0.linktype=t1.linktype and 
+                                //	t0.parentid=t1.parentid and t0.childid=t1.childid)
+
+                                StringBuffer sb = new StringBuffer("WHERE ");
+                                ArrayList list = new ArrayList();
+                                list.add(jobID);
+                                list.add(jobID);
+                                list.add(jobID);
+                                sb.append(idField).append(" IN(SELECT t0.").append(deleteDepsManager.ownerIDField).append(" FROM ")
+                                        .append(deleteDepsManager.getTableName()).append(" t0,").append(sourceTableName).append(",")
+                                        .append(intrinsicLinkManager.getTableName()).append(" t1 WHERE ")
 
-		if (jobDeathIndex != null)
-			performAddIndex(null,jobDeathIndex);
+                                        .append("t0.").append(deleteDepsManager.jobIDField).append("=? AND ").append(sourceTableJobColumn)
+                                        .append("=? AND t1.").append(intrinsicLinkManager.jobIDField).append("=? AND ")
                                 
-                // Finally, child tables.
-                intrinsicLinkManager.install(jobsTable,jobsColumn);
-                deleteDepsManager.install(jobsTable,jobsColumn,getTableName(),idField);
-	}
-
-	/** Uninstall.
-	*/
-	public void deinstall()
-		throws LCFException
-	{
-		beginTransaction();
-		try
-		{
-			deleteDepsManager.deinstall();
-			intrinsicLinkManager.deinstall();
-			performDrop(null);
-		}
-		catch (LCFException e)
-		{
-			signalRollback();
-			throw e;
-		}
-		catch (Error e)
-		{
-			signalRollback();
-			throw e;
-		}
-		finally
-		{
-			endTransaction();
-		}
-
-	}
-
-	/** Go from string to mark.
-	*@param value is the string.
-	*@return the status value.
-	*/
-	public static int stringToMark(String value)
-		throws LCFException
-	{
-		Integer x = (Integer)markMap.get(value);
-		if (x == null)
-			throw new LCFException("Bad mark value: '"+value+"'");
-		return x.intValue();
-	}
-
-	/** Go from mark to string.
-	*@param mark is the mark.
-	*@return the string.
-	*/
-	public static String markToString(int mark)
-		throws LCFException
-	{
-		switch (mark)
-		{
-		case MARK_NORMAL:
-			return "N";
-		case MARK_QUEUED:
-			return "Q";
-		case MARK_DELETING:
-			return "D";
-		default:
-			throw new LCFException("Bad mark value");
-		}
-	}
-
-	/** Delete an owner (and clean up the corresponding hopcount rows).
-	*/
-	public void deleteOwner(Long jobID)
-		throws LCFException
-	{
-		beginTransaction();
-		try
-		{
-			// Delete the intrinsic rows belonging to this job.
-			intrinsicLinkManager.deleteOwner(jobID);
-
-			// Delete the deletedeps rows
-			deleteDepsManager.deleteJob(jobID);
-
-			// Delete our own rows.
-			ArrayList list = new ArrayList();
-			list.add(jobID);
-			performDelete("WHERE "+jobIDField+"=?",list,null);
-			reindexTracker.noteInsert();
-		}
-		catch (LCFException e)
-		{
-			signalRollback();
-			throw e;
-		}
-		catch (Error e)
-		{
-			signalRollback();
-			throw e;
-		}
-		finally
-		{
-			endTransaction();
-		}
-	}
-
-	/** Reset, at startup time.
-	*/
-	public void reset()
-		throws LCFException
-	{
-		beginTransaction();
-		try
-		{
-			intrinsicLinkManager.reset();
-		}
-		catch (LCFException e)
-		{
-			signalRollback();
-			throw e;
-		}
-		catch (Error e)
-		{
-			signalRollback();
-			throw e;
-		}
-		finally
-		{
-			endTransaction();
-		}
-	}
-
-	/** Record a references from a set of documents to the root.  These will be marked as "new" or "existing", and
-	* will have a null linktype.
-	*/
-	public void recordSeedReferences(Long jobID, String[] legalLinkTypes, String[] targetDocumentIDHashes, int hopcountMethod)
-		throws LCFException
-	{
-		doRecord(jobID,legalLinkTypes,"",targetDocumentIDHashes,"",hopcountMethod);
-	}
-
-	/** Finish seed references.  Seed references are special in that the only source is the root.
-	*/
-	public void finishSeedReferences(Long jobID, String[] legalLinkTypes, int hopcountMethod)
-		throws LCFException
-	{
-		doFinish(jobID,legalLinkTypes,new String[]{""},hopcountMethod);
-	}
-
-	/** Record a reference from source to target.  This reference will be marked as "new" or "existing".
-	*/
-	public void recordReference(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String targetDocumentIDHash, String linkType,
-		int hopcountMethod)
-		throws LCFException
-	{
-		doRecord(jobID,legalLinkTypes,sourceDocumentIDHash,new String[]{targetDocumentIDHash},linkType,hopcountMethod);
-	}
-
-	/** Record a set of references from source to target.  This reference will be marked as "new" or "existing".
-	*/
-	public void recordReferences(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String[] targetDocumentIDHashes, String linkType,
-		int hopcountMethod)
-		throws LCFException
-	{
-		doRecord(jobID,legalLinkTypes,sourceDocumentIDHash,targetDocumentIDHashes,linkType,hopcountMethod);
-	}
-
-	/** Complete a recalculation pass for a set of source documents.  All child links that are not marked as "new"
-	* or "existing" will be removed.  At the completion of this pass, the links will have their "new" flag cleared.
-	*/
-	public void finishParents(Long jobID, String[] legalLinkTypes, String[] sourceDocumentHashes, int hopcountMethod)
-		throws LCFException
-	{
-		doFinish(jobID,legalLinkTypes,sourceDocumentHashes,hopcountMethod);
-	}
-
-	/** Do the work of recording source-target references. */
-	protected void doRecord(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String[] targetDocumentIDHashes, String linkType,
-		int hopcountMethod)
-		throws LCFException
-	{
-
-		// We have to both add the reference, AND invalidate appropriate cached hopcounts (if it is a NEW
-		// link.)
-		beginTransaction();
-		try
-		{
-			String[] newReferences = intrinsicLinkManager.recordReferences(jobID,sourceDocumentIDHash,targetDocumentIDHashes,linkType);
-			if (newReferences.length > 0)
-			{
-				// There are added links.
-
-				// The add causes hopcount records to be queued for processing (and created if they don't exist).
-				// ALL the hopcount records for the target document ids must be queued, for all the link types
-				// there are for this job.  Other times, the queuing requirement is less stringent, such as
-				// when a hopcount for one linktype changes.  In those cases we only want to queue up hopcount
-				// records corresponding to the changed record.
-
-				// What we need to do is create a queue which contains only the target hopcount table rows, if they
-				// exist.  Then we run the update algorithm until the cache is empty.
-
-				if (Logging.hopcount.isDebugEnabled())
-					Logging.hopcount.debug("Queueing "+Integer.toString(targetDocumentIDHashes.length)+" documents");
-
-				// Since we really want efficiency, we can write the answer in place now, based on the current
-				// hopcount rows.  This works even if the current row is out of date, because if we change the
-				// current row's value, the target rows will be requeued at that point.
-
-				// When we record new links, we must come up with an initial calculation or requeue ALL legal link
-				// types.  If this isn't done, then we cannot guarantee that the target record will exist - and
-				// somebody will then interpret the distance as being 'infinity'.
-				//
-				// It would be possible to change this but we would then also need to change how a missing record
-				// would be interpreted.
-
-				//if (!(linkType == null || linkType.length() == 0))
-				//	legalLinkTypes = new String[]{linkType};
-
-				// So, let's load what we have for hopcount and dependencies for sourceDocumentID.
-
-				Answer[] estimates = new Answer[legalLinkTypes.length];
-
-				if (sourceDocumentIDHash == null || sourceDocumentIDHash.length() == 0)
-				{
-					int i = 0;
-					while (i < estimates.length)
-					{
-						estimates[i++] = new Answer(0);
-					}
-				}
-				else
-				{
-					ArrayList list = new ArrayList();
-					StringBuffer sb = new StringBuffer("SELECT ");
-					sb.append(idField).append(",").append(distanceField).append(",").append(linkTypeField)
-						.append(" FROM ").append(getTableName()).append(" WHERE ");
-					int i = 0;
-					while (i < legalLinkTypes.length)
-					{
-						if (i > 0)
-							sb.append(" OR ");
-						sb.append("(").append(jobIDField).append("=? AND ")
-							.append(linkTypeField).append("=? AND ").append(parentIDHashField).append("=?)");
-						list.add(jobID);
-						list.add(legalLinkTypes[i++]);
-						list.add(sourceDocumentIDHash);
-					}
-
-					IResultSet set = performQuery(sb.toString(),list,null,null);
-					HashMap answerMap = new HashMap();
-					i = 0;
-					while (i < estimates.length)
-					{
-						estimates[i] = new Answer(ANSWER_INFINITY);
-						answerMap.put(legalLinkTypes[i],estimates[i]);
-						i++;
-					}
-
-					i = 0;
-					while (i < set.getRowCount())
-					{
-						IResultRow row = set.getRow(i++);
-						Long id = (Long)row.getValue(idField);
-						DeleteDependency[] dds;
-						if (hopcountMethod != IJobDescription.HOPCOUNT_NEVERDELETE)
-							dds = deleteDepsManager.getDeleteDependencies(id);
-						else
-							dds = new DeleteDependency[0];
-						Long distance = (Long)row.getValue(distanceField);
-						String recordedLinkType = (String)row.getValue(linkTypeField);
-						Answer a = (Answer)answerMap.get(recordedLinkType);
-						int recordedDistance = (int)distance.longValue();
-						if (recordedDistance != -1)
-						{
-							a.setAnswer(recordedDistance,dds);
-						}
-					}
-				}
-
-				// Now add these documents to the processing queue
-				addToProcessingQueue(jobID,legalLinkTypes,newReferences,estimates,sourceDocumentIDHash,linkType,hopcountMethod);
-
-				if (Logging.hopcount.isDebugEnabled())
-					Logging.hopcount.debug("Done queueing "+Integer.toString(targetDocumentIDHashes.length)+" documents");
-			}
-		}
-		catch (LCFException e)
-		{
-			signalRollback();
-			throw e;
-		}
-		catch (Error e)
-		{
-			signalRollback();
-			throw e;
-		}
-		finally
-		{
-			endTransaction();
-		}
-	}
-
-	/** Remove a set of document identifiers specified as a criteria.  This will remove hopcount rows and
-	* also intrinsic links that have the specified document identifiers are sources.
-	*/
-	public void deleteMatchingDocuments(Long jobID, String[] legalLinkTypes,
-		String sourceTableName,
-		String sourceTableIDColumn, String sourceTableJobColumn, String sourceTableCriteria, int hopcountMethod)
-		throws LCFException
-	{
-		// This should work similarly to deleteDocumentIdentifiers() except that the identifiers
-		// come from a subquery rather than a list.
-		beginTransaction();
-		try
-		{
-			// This also removes the links themselves...
-			if (hopcountMethod == IJobDescription.HOPCOUNT_ACCURATE)
-			{
-				doDeleteInvalidation(jobID,legalLinkTypes,false,null,sourceTableName,
-					sourceTableIDColumn,sourceTableJobColumn,
-					sourceTableCriteria);
-			}
-
-		}
-		catch (LCFException e)
-		{
-			signalRollback();
-			throw e;
-		}
-		catch (Error e)
-		{
-			signalRollback();
-			throw e;
-		}
-		finally
-		{
-			endTransaction();
-		}
-
-	}
-	
-
-	/** Remove a set of document identifier hashes.  This will also remove the intrinsic links that have these document
-	* identifier hashes as sources, as well as invalidating cached hop counts that depend on them.
-	*/
-	public void deleteDocumentIdentifiers(Long jobID, String[] legalLinkTypes, String[] sourceDocumentHashes, int hopcountMethod)
-		throws LCFException
-	{
-		beginTransaction();
-		try
-		{
-			// What I want to do here is to first perform the invalidation of the cached hopcounts.
-			//
-			// UPDATE hopcount SET markfordeath='X' WHERE EXISTS(SELECT 'x' FROM hopdeletedeps t0 WHERE t0.ownerid=hopcount.id AND t0.jobid=<jobid>
-			//	AND EXISTS(SELECT 'x' FROM intrinsiclinks t1 WHERE t1.linktype=t0.linktype AND t1.parentid=t0.parentid
-			//		AND t1.childid=t0.childid AND t1.jobid=<jobid> AND t1.childid IN(<sourcedocs>)))
-			//
-			// ... and then, re-evaluate all hopcount records and their dependencies that are marked for delete.
-			//
-
-
-			// This also removes the links themselves...
-			if (hopcountMethod == IJobDescription.HOPCOUNT_ACCURATE)
-				doDeleteInvalidation(jobID,legalLinkTypes,false,sourceDocumentHashes,null,null,null,null);
-
-		}
-		catch (LCFException e)
-		{
-			signalRollback();
-			throw e;
-		}
-		catch (Error e)
-		{
-			signalRollback();
-			throw e;
-		}
-		finally
-		{
-			endTransaction();
-		}
-	}
-
-	/** Calculate a bunch of hop-counts.  The values returned are only guaranteed to be an upper bound, unless
-	* the queue has recently been processed (via processQueue below).  -1 will be returned to indicate "infinity".
-	*/
-	public int[] findHopCounts(Long jobID, String[] parentIdentifierHashes, String linkType)
-		throws LCFException
-	{
-		// No transaction, since we can happily interpret whatever comes back.
-		StringBuffer sb = new StringBuffer();
-		ArrayList list = new ArrayList();
-
-		int[] rval = new int[parentIdentifierHashes.length];
-		HashMap rvalMap = new HashMap();
-		int i = 0;
-		while (i < rval.length)
-		{
-			rval[i] = -1;
-			rvalMap.put(parentIdentifierHashes[i],new Integer(i));
-			i++;
-		}
-
-		int maxClause = 25;
-		i = 0;
-		int k = 0;
-		while (i < parentIdentifierHashes.length)
-		{
-			if (k == maxClause)
-			{
-				processFind(rval,rvalMap,sb.toString(),list);
-				k = 0;
-				sb.setLength(0);
-				list.clear();
-			}
-			if (k > 0)
-				sb.append(" OR");
-			sb.append("(").append(jobIDField).append("=? AND ").append(linkTypeField)
-				.append("=? AND ").append(parentIDHashField).append("=?)");
-			list.add(jobID);
-			list.add(linkType);
-			list.add(parentIdentifierHashes[i]);
-			k++;
-			i++;
-		}
-		if (k > 0)
-			processFind(rval,rvalMap,sb.toString(),list);
-
-		return rval;
-	}
-
-	/** Process a portion of a find request for hopcount information.
-	*/
-	protected void processFind(int[] rval, Map rvalMap, String query, ArrayList list)
-		throws LCFException
-	{
-		IResultSet set = performQuery("SELECT "+distanceField+","+parentIDHashField+" FROM "+getTableName()+" WHERE "+query,list,null,null);
-		int i = 0;
-		while (i < set.getRowCount())
-		{
-			IResultRow row = set.getRow(i++);
-			String parentIDHash = (String)row.getValue(parentIDHashField);
-			Long distance = (Long)row.getValue(distanceField);
-			rval[((Integer)rvalMap.get(parentIDHash)).intValue()] = (int)distance.longValue();
-		}
-	}
-
-	/** Process a stage of the propagation queue for a job.
-	*@param jobID is the job we need to have the hopcount propagated for.
-	*@return true if the queue is empty.
-	*/
-	public boolean processQueue(Long jobID, String[] legalLinkTypes, int hopcountMethod)
-		throws LCFException
-	{
-		// We can't instantiate the DocumentHash object here, because it will wind up having
-		// cached in it the answers from the previous round of calculation.  That round had
-		// a different set of marked nodes than the current round.
-
-		ArrayList list = new ArrayList();
-
-		// Pick off up to n queue items at a time.  We don't want to pick off too many (because
-		// then we wind up delaying other threads too much), nor do we want to do one at a time
-		// (because that is inefficient against the database), so I picked 200 as being 200+x faster
-		// than 1...
-		list.clear();
-		list.add(jobID);
-		list.add(markToString(MARK_QUEUED));
-		IResultSet set = performQuery("SELECT "+linkTypeField+","+parentIDHashField+" FROM "+
-			getTableName()+" WHERE "+jobIDField+"=? AND "+markForDeathField+"=?"+" LIMIT 200 FOR UPDATE",list,null,null);
-
-		// No more entries == we are done
-		if (set.getRowCount() == 0)
-			return true;
-
-		DocumentHash dh = new DocumentHash(jobID,legalLinkTypes,hopcountMethod);
-
-		Question[] questions = new Question[set.getRowCount()];
-
-		int i = 0;
-		while (i < set.getRowCount())
-		{
-			IResultRow row = set.getRow(i);
-			String parentIdentifierHash = (String)row.getValue(parentIDHashField);
-			String linkType = (String)row.getValue(linkTypeField);
-
-			// All documents in the set have the same basic assumptions; another set may be queued
-			// as a side effect of some of these getting resolved, but treating them in chunks
-			// seems like it should not cause problems (because the same underlying assumptions
-			// underlie the whole chunk).  The side effects *may* cause other documents that are
-			// still in the queue to be evaluated as well, in which case they will disappear from 
-			// the queue and not be processed further.
-
-			// Create a document hash object.
-			questions[i] = new Question(parentIdentifierHash,linkType);
-			i++;
-		}
-
-		// We don't care what the response is; we just want the documents to leave the queue.
-		dh.askQuestions(questions);
-		return false;
-	}
-
-
-	/** Limited find for missing records.
-	*/
-	protected void performFindMissingRecords(String query, ArrayList list, Map matchMap)
-		throws LCFException
-	{
-		// The naive query is this - but postgres does not find the index this way:
-		//IResultSet set = performQuery("SELECT "+parentIDField+","+linkTypeField+" FROM "+getTableName()+" WHERE "+
-		//	parentIDField+" IN("+query+") AND "+jobIDField+"=?",list,null,null);
-		IResultSet set = performQuery("SELECT "+parentIDHashField+","+linkTypeField+","+distanceField+" FROM "+getTableName()+" WHERE "+query,list,null,null);
-		int i = 0;
-		while (i < set.getRowCount())
-		{

[... 4779 lines stripped ...]


Mime
View raw message