jena-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From casta...@apache.org
Subject svn commit: r1298851 - /incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
Date Fri, 09 Mar 2012 14:38:54 GMT
Author: castagna
Date: Fri Mar  9 14:38:53 2012
New Revision: 1298851

URL: http://svn.apache.org/viewvc?rev=1298851&view=rev
Log:
JENA-157 - This adds a pre-merge phase to SortedDataBag to merge files 100 at the time max.
If more than 100 files need to be merged, it is done in multiple rounds.

Modified:
    incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java

Modified: incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java?rev=1298851&r1=1298850&r2=1298851&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
(original)
+++ incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
Fri Mar  9 14:38:53 2012
@@ -62,6 +62,8 @@ import org.openjena.atlas.lib.Sink ;
  */
 public class SortedDataBag<E> extends AbstractDataBag<E>
 {
+    private static final int MAX_SPILL_FILES = 100 ; // this is the maximum number of files
to merge at the same time
+
     protected final ThresholdPolicy<E> policy;
     protected final SerializationFactory<E> serializationFactory;
     protected final Comparator<? super E> comparator;
@@ -177,8 +179,15 @@ public class SortedDataBag<E> extends Ab
      * @return an Iterator
      */
     @Override
+	public Iterator<E> iterator()
+    {
+        preMerge();
+
+        return iterator(getSpillFiles().size());
+    }
+
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    public Iterator<E> iterator()
+    private Iterator<E> iterator(int size)
     {
         checkClosed();
         
@@ -197,15 +206,16 @@ public class SortedDataBag<E> extends Ab
         
         if (spilled)
         {
-            List<Iterator<E>> inputs = new ArrayList<Iterator<E>>();
+            List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(size
+ (memSize > 0 ? 1 : 0));
                         
             if (memSize > 0)
             {
                 inputs.add(memory.iterator());
             }
             
-            for (File spillFile : getSpillFiles())
+            for ( int i = 0; i < size; i++ )
             {
+                File spillFile = getSpillFiles().get(i);
                 try
                 {
                     Iterator<E> irc = getInputIterator(spillFile);
@@ -241,6 +251,38 @@ public class SortedDataBag<E> extends Ab
         }
     }
     
+    private void preMerge() {
+        if (getSpillFiles() == null || getSpillFiles().size() <= MAX_SPILL_FILES) { return;
}
+
+        try {
+            while ( getSpillFiles().size() > MAX_SPILL_FILES ) {
+                Sink<E> sink = serializationFactory.createSerializer(getSpillStream())
;
+                Iterator<E> ssi = iterator(MAX_SPILL_FILES) ;
+                try {
+                    while ( ssi.hasNext() ) {
+                        sink.send( ssi.next() );
+                    }
+                } finally {
+                    Iter.close(ssi) ;
+                    sink.close() ;
+                }
+                
+                List<File> toRemove = new ArrayList<File>(MAX_SPILL_FILES) ;
+                for ( int i = 0; i < MAX_SPILL_FILES; i++ ) {
+                    File file = getSpillFiles().get(i) ;
+                    file.delete() ;
+                    toRemove.add(file) ;
+                }
+
+                getSpillFiles().removeAll(toRemove) ;
+
+                memory = new ArrayList<E>() ;
+            }            
+        } catch (IOException e) {
+            throw new AtlasException(e) ;
+        }
+    }
+
     @Override
     public void close()
     {



Mime
View raw message