drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [13/16] incubator-drill git commit: - Update Parquet writer to always make binary copy of data for statistics holding purposes. (Fixes JVM crash in certain cases.) - Update WriterRecordBatch to stop doing premature cleanup. In the case that a downstrea
Date Sat, 08 Nov 2014 00:03:14 GMT
- Update Parquet writer to always make binary copy of data for statistics holding purposes.
 (Fixes JVM crash in certain cases.)
- Update WriterRecordBatch to stop doing premature cleanup.  In the case that a downstream
operator is still holding memory allocated in the writer record batch, it was possible that
the operator would try to close the allocator before that memory had been released.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/06f0e178
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/06f0e178
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/06f0e178

Branch: refs/heads/master
Commit: 06f0e178e7738dc7ecd81faad8d1f73003c84fb8
Parents: 66d5be4
Author: Jacques Nadeau <jacques@apache.org>
Authored: Wed Nov 5 18:59:40 2014 -0800
Committer: Jinfeng Ni <jni@maprtech.com>
Committed: Fri Nov 7 10:50:57 2014 -0800

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          |  2 +-
 .../exec/physical/impl/WriterRecordBatch.java   | 82 ++++++++++----------
 2 files changed, 42 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/06f0e178/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 55721d1..0dea38a 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -125,7 +125,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-column</artifactId>
-      <version>1.5.1-drill-r4</version>
+      <version>1.5.1-drill-r5</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/06f0e178/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index acbb815..07302d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -88,7 +88,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer>
{
   @Override
   public IterOutcome innerNext() {
     if(processed) {
-      cleanup();
+//      cleanup();
       // if the upstream record batch is already processed and next() is called by
       // downstream then return NONE to indicate completion
       return IterOutcome.NONE;
@@ -96,48 +96,52 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer>
{
 
     // process the complete upstream in one next() call
     IterOutcome upstream;
-    do {
-      upstream = next(incoming);
-
-      switch(upstream) {
-        case NOT_YET:
-        case NONE:
-        case STOP:
-          if (upstream == IterOutcome.STOP) {
-            return upstream;
-          }
-          break;
-
-        case OK_NEW_SCHEMA:
-          try{
-            setupNewSchema();
-          } catch(Exception ex) {
-            kill(false);
-            logger.error("Failure during query", ex);
-            context.fail(ex);
+    try{
+      do {
+        upstream = next(incoming);
+
+        switch(upstream) {
+          case STOP:
             return IterOutcome.STOP;
-          }
-          // fall through.
-        case OK:
-          try {
+
+          case NOT_YET:
+          case NONE:
+            break;
+
+          case OK_NEW_SCHEMA:
+            setupNewSchema();
+            // fall through.
+          case OK:
             counter += eventBasedRecordWriter.write(incoming.getRecordCount());
             logger.debug("Total records written so far: {}", counter);
-          } catch(IOException ex) {
-            throw new RuntimeException(ex);
-          }
 
-          for(VectorWrapper v : incoming) {
-            v.getValueVector().clear();
-          }
-          break;
+            for(VectorWrapper<?> v : incoming) {
+              v.getValueVector().clear();
+            }
+            break;
+
+          default:
+            throw new UnsupportedOperationException();
+        }
+      } while(upstream != IterOutcome.NONE);
+    }catch(Exception ex){
+      kill(false);
+      logger.error("Failure during query", ex);
+      context.fail(ex);
+      return IterOutcome.STOP;
+    }
 
-        default:
-          throw new UnsupportedOperationException();
-      }
-    } while(upstream != IterOutcome.NONE);
+    addOutputContainerData();
+    processed = true;
 
+    return IterOutcome.OK_NEW_SCHEMA;
+  }
 
-    VarCharVector fragmentIdVector = (VarCharVector) container.getValueAccessorById(VarCharVector.class,
container.getValueVectorId(SchemaPath.getSimplePath("Fragment")).getFieldIds()).getValueVector();
+  private void addOutputContainerData(){
+    VarCharVector fragmentIdVector = (VarCharVector) container.getValueAccessorById( //
+        VarCharVector.class, //
+        container.getValueVectorId(SchemaPath.getSimplePath("Fragment")).getFieldIds() //
+        ).getValueVector();
     AllocationHelper.allocate(fragmentIdVector, 1, 50);
     BigIntVector summaryVector = (BigIntVector) container.getValueAccessorById(BigIntVector.class,
             container.getValueVectorId(SchemaPath.getSimplePath("Number of records written")).getFieldIds()).getValueVector();
@@ -148,9 +152,6 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer>
{
     summaryVector.getMutator().setValueCount(1);
 
     container.setRecordCount(1);
-    processed = true;
-
-    return IterOutcome.OK_NEW_SCHEMA;
   }
 
   protected void setupNewSchema() throws Exception {
@@ -167,8 +168,6 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer>
{
       container.addOrGet(fragmentIdField);
       container.addOrGet(summaryField);
       container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-    } catch(IOException ex) {
-      throw new RuntimeException("Failed to update schema in RecordWriter", ex);
     } finally{
       stats.stopSetup();
     }
@@ -186,6 +185,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer>
{
         recordWriter.cleanup();
       }
     } catch(IOException ex) {
+      logger.error("Failure while closing record writer", ex);
       throw new RuntimeException("Failed to close RecordWriter", ex);
     }
   }


Mime
View raw message