Return-Path: Delivered-To: apmail-lucene-nutch-commits-archive@www.apache.org Received: (qmail 68081 invoked from network); 28 Apr 2006 19:57:11 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 28 Apr 2006 19:57:11 -0000 Received: (qmail 39234 invoked by uid 500); 28 Apr 2006 19:57:10 -0000 Delivered-To: apmail-lucene-nutch-commits-archive@lucene.apache.org Received: (qmail 39208 invoked by uid 500); 28 Apr 2006 19:57:10 -0000 Mailing-List: contact nutch-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: nutch-dev@lucene.apache.org Delivered-To: mailing list nutch-commits@lucene.apache.org Received: (qmail 39195 invoked by uid 99); 28 Apr 2006 19:57:09 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Apr 2006 12:57:09 -0700 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 28 Apr 2006 12:57:09 -0700 Received: (qmail 68000 invoked by uid 65534); 28 Apr 2006 19:56:48 -0000 Message-ID: <20060428195648.67999.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r397995 - /lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java Date: Fri, 28 Apr 2006 19:56:48 -0000 To: nutch-commits@lucene.apache.org From: ab@apache.org X-Mailer: svnmailer-1.0.8 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: ab Date: Fri Apr 28 12:56:47 2006 New Revision: 397995 URL: http://svn.apache.org/viewcvs?rev=397995&view=rev Log: Implement fully incremental LinkDb updates. Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java?rev=397995&r1=397994&r2=397995&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java Fri Apr 28 12:56:47 2006 @@ -23,6 +23,7 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.*; import org.apache.hadoop.util.LogFormatter; import org.apache.hadoop.mapred.*; @@ -42,6 +43,34 @@ private int maxAnchorLength; private int maxInlinks; private boolean ignoreInternalLinks; + + public static class LinkDbMerger extends MapReduceBase implements Reducer { + private int _maxInlinks; + + public void configure(JobConf job) { + super.configure(job); + _maxInlinks = job.getInt("db.max.inlinks", 10000); + } + + public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { + Inlinks inlinks = null; + while (values.hasNext()) { + if (inlinks == null) { + inlinks = (Inlinks)values.next(); + continue; + } + Inlinks val = (Inlinks)values.next(); + for (Iterator it = val.iterator(); it.hasNext(); ) { + if (inlinks.size() >= _maxInlinks) { + output.collect(key, inlinks); + return; + } + inlinks.add((Inlink)it.next()); + } + } + output.collect(key, inlinks); + } + } public LinkDb() { super(null); @@ -126,18 +155,17 @@ output.collect(key, result); } - - public void invert(File linkDb, File segmentsDir) throws IOException { - LOG.info("LinkDb: starting"); - LOG.info("LinkDb: linkdb: " + linkDb); - LOG.info("LinkDb: segments: " + segmentsDir); - - JobConf job = LinkDb.createJob(getConf(), linkDb); - job.setInputDir(segmentsDir); - job.set("mapred.input.subdir", ParseData.DIR_NAME); - JobClient.runJob(job); - LinkDb.install(job, linkDb); - LOG.info("LinkDb: done"); + public void invert(File linkDb, final File segmentsDir) throws IOException { + final FileSystem fs = FileSystem.get(getConf()); + File[] files = fs.listFiles(segmentsDir, new FileFilter() { + public boolean accept(File f) { + try { + if (fs.isDirectory(f)) return true; + } catch (IOException ioe) {}; + return false; + } + }); + invert(linkDb, files); } public void invert(File linkDb, File[] segments) throws IOException { @@ -149,13 +177,24 @@ job.addInputDir(new File(segments[i], ParseData.DIR_NAME)); } JobClient.runJob(job); + FileSystem fs = FileSystem.get(getConf()); + if (fs.exists(linkDb)) { + LOG.info("LinkDb: merging with existing linkdb: " + linkDb); + // try to merge + File newLinkDb = job.getOutputDir(); + job = LinkDb.createMergeJob(getConf(), linkDb); + job.addInputDir(new File(linkDb, CURRENT_NAME)); + job.addInputDir(newLinkDb); + JobClient.runJob(job); + fs.delete(newLinkDb); + } LinkDb.install(job, linkDb); LOG.info("LinkDb: done"); } private static JobConf createJob(Configuration config, File linkDb) { File newLinkDb = - new File(linkDb, + new File("linkdb-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); JobConf job = new NutchJob(config); @@ -178,6 +217,29 @@ return job; } + private static JobConf createMergeJob(Configuration config, File linkDb) { + File newLinkDb = + new File("linkdb-merge-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + JobConf job = new NutchJob(config); + job.setJobName("linkdb merge " + linkDb); + + job.setInputFormat(SequenceFileInputFormat.class); + job.setInputKeyClass(UTF8.class); + job.setInputValueClass(Inlinks.class); + + job.setReducerClass(LinkDbMerger.class); + + job.setOutputDir(newLinkDb); + job.setOutputFormat(MapFileOutputFormat.class); + job.setBoolean("mapred.output.compress", true); + job.setOutputKeyClass(UTF8.class); + job.setOutputValueClass(Inlinks.class); + + return job; + } + public static void install(JobConf job, File linkDb) throws IOException { File newLinkDb = job.getOutputDir(); FileSystem fs = new JobClient(job).getFs(); @@ -190,25 +252,33 @@ } public static void main(String[] args) throws Exception { - LinkDb linkDb = new LinkDb(NutchConfiguration.create()); + Configuration conf = NutchConfiguration.create(); + LinkDb linkDb = new LinkDb(conf); if (args.length < 2) { System.err.println("Usage: (-dir segmentsDir | segment1 segment2 ...)"); return; } - boolean dir = false; File segDir = null; + final FileSystem fs = FileSystem.get(conf); File db = new File(args[0]); ArrayList segs = new ArrayList(); for (int i = 1; i < args.length; i++) { if (args[i].equals("-dir")) { - dir = true; segDir = new File(args[++i]); + File[] files = fs.listFiles(segDir, new FileFilter() { + public boolean accept(File f) { + try { + if (fs.isDirectory(f)) return true; + } catch (IOException ioe) {}; + return false; + } + }); + if (files != null) segs.addAll(Arrays.asList(files)); break; } else segs.add(new File(args[i])); } - if (dir) linkDb.invert(db, segDir); - else linkDb.invert(db, (File[])segs.toArray(new File[segs.size()])); + linkDb.invert(db, (File[])segs.toArray(new File[segs.size()])); }