Return-Path: Delivered-To: apmail-lucene-solr-commits-archive@minotaur.apache.org Received: (qmail 18985 invoked from network); 9 Oct 2009 18:36:54 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 9 Oct 2009 18:36:54 -0000 Received: (qmail 61242 invoked by uid 500); 9 Oct 2009 18:36:53 -0000 Delivered-To: apmail-lucene-solr-commits-archive@lucene.apache.org Received: (qmail 61177 invoked by uid 500); 9 Oct 2009 18:36:53 -0000 Mailing-List: contact solr-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: solr-dev@lucene.apache.org Delivered-To: mailing list solr-commits@lucene.apache.org Received: (qmail 61168 invoked by uid 99); 9 Oct 2009 18:36:53 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Oct 2009 18:36:53 +0000 X-ASF-Spam-Status: No, hits=-1998.8 required=10.0 tests=ALL_TRUSTED,FS_REPLICA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Oct 2009 18:36:50 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1A2052388896; Fri, 9 Oct 2009 18:36:29 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r823654 - in /lucene/solr/trunk/src/java/org/apache/solr: core/IndexDeletionPolicyWrapper.java handler/ReplicationHandler.java Date: Fri, 09 Oct 2009 18:36:29 -0000 To: solr-commits@lucene.apache.org From: yonik@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091009183629.1A2052388896@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: yonik Date: Fri Oct 9 18:36:28 2009 New Revision: 823654 URL: http://svn.apache.org/viewvc?rev=823654&view=rev Log: SOLR-1458: use saved commit points when replciating optimized indexes Modified: lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java Modified: lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java URL: http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java?rev=823654&r1=823653&r2=823654&view=diff ============================================================================== --- lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java (original) +++ lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java Fri Oct 9 18:36:28 2009 @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; /** * A wrapper for an IndexDeletionPolicy instance. @@ -39,6 +40,7 @@ private volatile Map solrVersionVsCommits = new ConcurrentHashMap(); private final Map reserves = new ConcurrentHashMap(); private volatile IndexCommit latestCommit; + private final ConcurrentHashMap savedCommits = new ConcurrentHashMap(); public IndexDeletionPolicyWrapper(IndexDeletionPolicy deletionPolicy) { this.deletionPolicy = deletionPolicy; @@ -98,6 +100,25 @@ return result; } + /** Permanently prevent this commit point from being deleted. + * A counter is used to allow a commit point to be correctly saved and released + * multiple times. */ + public synchronized void saveCommitPoint(Long indexCommitVersion) { + AtomicInteger reserveCount = savedCommits.get(indexCommitVersion); + if (reserveCount == null) reserveCount = new AtomicInteger(); + reserveCount.incrementAndGet(); + } + + /** Release a previously saved commit point */ + public synchronized void releaseCommmitPoint(Long indexCommitVersion) { + AtomicInteger reserveCount = savedCommits.get(indexCommitVersion); + if (reserveCount == null) return;// this should not happen + if (reserveCount.decrementAndGet() <= 0) { + savedCommits.remove(indexCommitVersion); + } + } + + /** * Internal use for Lucene... do not explicitly call. */ @@ -121,55 +142,74 @@ private class IndexCommitWrapper extends IndexCommit { IndexCommit delegate; + IndexCommitWrapper(IndexCommit delegate) { this.delegate = delegate; } + @Override public String getSegmentsFileName() { return delegate.getSegmentsFileName(); } + @Override public Collection getFileNames() throws IOException { return delegate.getFileNames(); } + @Override public Directory getDirectory() { return delegate.getDirectory(); } + @Override public void delete() { - Long reserve = reserves.get(delegate.getVersion()); + Long version = delegate.getVersion(); + Long reserve = reserves.get(version); if (reserve != null && System.currentTimeMillis() < reserve) return; + if(savedCommits.contains(version)) return; delegate.delete(); } + @Override public boolean isOptimized() { return delegate.isOptimized(); } + @Override public boolean equals(Object o) { return delegate.equals(o); } + @Override public int hashCode() { return delegate.hashCode(); } + @Override public long getVersion() { return delegate.getVersion(); } + @Override public long getGeneration() { return delegate.getGeneration(); } + @Override public boolean isDeleted() { return delegate.isDeleted(); } + @Override public long getTimestamp() throws IOException { return delegate.getTimestamp(); } + + @Override + public Map getUserData() throws IOException { + return delegate.getUserData(); + } } /** Modified: lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java URL: http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=823654&r1=823653&r2=823654&view=diff ============================================================================== --- lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java (original) +++ lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java Fri Oct 9 18:36:28 2009 @@ -18,6 +18,7 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexDeletionPolicy; +import org.apache.lucene.index.IndexReader; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; @@ -815,15 +816,32 @@ replicateOnStart = true; RefCounted s = core.getNewestSearcher(false); try { + IndexReader reader = s==null ? null : s.get().getReader(); + if (reader!=null && reader.getIndexCommit() != null && reader.getIndexCommit().getGeneration() != 1L) { + try { + if(!replicateOnCommit && replicateOnOptimize){ + Collection commits = IndexReader.listCommits(reader.directory()); + for (IndexCommit ic : commits) { + if(ic.isOptimized()){ + if(indexCommitPoint == null || indexCommitPoint.getVersion() < ic.getVersion()) indexCommitPoint = ic; + } + } + } else{ + indexCommitPoint = reader.getIndexCommit(); + } + } finally { + if(indexCommitPoint != null){ + core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getVersion()); + } + } + } if (core.getUpdateHandler() instanceof DirectUpdateHandler2) { ((DirectUpdateHandler2) core.getUpdateHandler()).forceOpenWriter(); } else { LOG.warn("The update handler being used is not an instance or sub-class of DirectUpdateHandler2. " + "Replicate on Startup cannot work."); - } - if (s.get().getReader().getIndexCommit() != null) - if (s.get().getReader().getIndexCommit().getGeneration() != 1L) - indexCommitPoint = s.get().getReader().getIndexCommit(); + } + } catch (IOException e) { LOG.warn("Unable to get IndexCommit on startup", e); } finally { @@ -892,13 +910,18 @@ * This refreshes the latest replicateable index commit and optionally can create Snapshots as well */ public void postCommit() { - if (getCommit || snapshoot) { + if (getCommit) { + IndexCommit oldCommitPoint = indexCommitPoint; indexCommitPoint = core.getDeletionPolicy().getLatestCommit(); + core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getVersion()); + if(oldCommitPoint != null){ + core.getDeletionPolicy().releaseCommmitPoint(oldCommitPoint.getVersion()); + } } if (snapshoot) { try { SnapShooter snapShooter = new SnapShooter(core, null); - snapShooter.createSnapAsync(indexCommitPoint.getFileNames(), ReplicationHandler.this); + snapShooter.createSnapAsync(core.getDeletionPolicy().getLatestCommit().getFileNames(), ReplicationHandler.this); } catch (Exception e) { LOG.error("Exception while snapshooting", e); }