asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [01/29] incubator-asterixdb git commit: Supports Left Outer Join and Left Outer Unnest in SQL++.
Date Sat, 04 Jun 2016 02:44:05 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master dde37e358 -> b0fe0ac0c


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/b0fe0ac0/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index e9d4ec3..3afa808 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -18,22 +18,23 @@
  */
 package org.apache.hyracks.algebricks.runtime.operators.std;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingPositionWriter;
-import org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvalFactory;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriter;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
@@ -43,23 +44,19 @@ public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
 
     private final int outCol;
     private final IUnnestingEvaluatorFactory unnestingFactory;
-    private int outColPos;
-    private final boolean outColIsProjected;
-
+    private final boolean unnestColIsProjected;
     private final IUnnestingPositionWriter positionWriter;
-    private IScalarEvaluatorFactory posOffsetEvalFactory;
-
-    // Each time step() is called on the aggregate, a new value is written in
-    // its output. One byte is written before that value and is neglected.
-    // By convention, if the aggregate function writes nothing, it means it
-    // produced the last value.
+    private final boolean leftOuter;
+    private final IMissingWriterFactory missingWriterFactory;
+    private int outColPos;
 
-    public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory,
int[] projectionList) {
-        this(outCol, unnestingFactory, projectionList, null, null);
+    public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory,
int[] projectionList,
+            boolean leftOuter, IMissingWriterFactory missingWriterFactory) {
+        this(outCol, unnestingFactory, projectionList, null, leftOuter, missingWriterFactory);
     }
 
     public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory,
int[] projectionList,
-            IUnnestingPositionWriter positionWriter, IScalarEvaluatorFactory posOffsetEvalFactory)
{
+            IUnnestingPositionWriter positionWriter, boolean leftOuter, IMissingWriterFactory
missingWriterFactory) {
         super(projectionList);
         this.outCol = outCol;
         this.unnestingFactory = unnestingFactory;
@@ -69,12 +66,10 @@ public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
                 outColPos = f;
             }
         }
-        outColIsProjected = outColPos >= 0;
+        unnestColIsProjected = outColPos >= 0;
         this.positionWriter = positionWriter;
-        this.posOffsetEvalFactory = posOffsetEvalFactory;
-        if (this.posOffsetEvalFactory == null) {
-            this.posOffsetEvalFactory = new ConstantEvalFactory(new byte[5]);
-        }
+        this.leftOuter = leftOuter;
+        this.missingWriterFactory = missingWriterFactory;
     }
 
     @Override
@@ -85,23 +80,27 @@ public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
     @Override
     public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final
IHyracksTaskContext ctx)
             throws AlgebricksException {
-
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutput output = new DataOutputStream(bos);
+        if (missingWriterFactory != null) {
+            IMissingWriter missingWriter = missingWriterFactory.createMissingWriter();
+            try {
+                missingWriter.writeMissing(output);
+            } catch (HyracksDataException e) {
+                throw new AlgebricksException(e);
+            }
+        }
+        byte[] missingBytes = bos.toByteArray();
+        int missingBytesLen = bos.size();
         return new AbstractOneInputOneOutputOneFramePushRuntime() {
             private IPointable p = VoidPointable.FACTORY.createPointable();
-            private IUnnestingEvaluator unnest;
             private ArrayTupleBuilder tupleBuilder;
-
-            private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
+            private IUnnestingEvaluator unnest = unnestingFactory.createUnnestingEvaluator(ctx);
 
             @Override
             public void open() throws HyracksDataException {
                 writer.open();
                 initAccessAppendRef(ctx);
-                try {
-                    unnest = unnestingFactory.createUnnestingEvaluator(ctx);
-                } catch (AlgebricksException ae) {
-                    throw new HyracksDataException(ae);
-                }
                 tupleBuilder = new ArrayTupleBuilder(projectionList.length);
             }
 
@@ -112,59 +111,71 @@ public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
                 for (int t = 0; t < nTuple; t++) {
                     tRef.reset(tAccess, t);
                     try {
-                        offsetEval.evaluate(tRef, p);
-                    } catch (AlgebricksException e) {
-                        throw new HyracksDataException(e);
-                    }
-                    int offset = IntegerPointable.getInteger(p.getByteArray(), p.getStartOffset());
-                    try {
                         unnest.init(tRef);
-                        // assume that when unnesting the tuple, each step() call for each
element
-                        // in the tuple will increase the positionIndex, and the positionIndex
will
-                        // be reset when a new tuple is to be processed.
-                        int positionIndex = 1;
-                        boolean goon = true;
-                        do {
-                            tupleBuilder.reset();
-                            if (!unnest.step(p)) {
-                                goon = false;
-                            } else {
-
-                                if (!outColIsProjected && positionWriter == null)
{
-                                    appendProjectionToFrame(t, projectionList);
-                                } else {
-                                    for (int f = 0; f < outColPos; f++) {
-                                        tupleBuilder.addField(tAccess, t, f);
-                                    }
-                                    if (outColIsProjected) {
-                                        tupleBuilder.addField(p.getByteArray(), p.getStartOffset(),
p.getLength());
-                                    } else {
-                                        tupleBuilder.addField(tAccess, t, outColPos);
-                                    }
-                                    for (int f = outColPos + 1; f < (positionWriter !=
null ? projectionList.length - 1
-                                            : projectionList.length); f++) {
-                                        tupleBuilder.addField(tAccess, t, f);
-                                    }
-                                }
-                                if (positionWriter != null) {
-                                    // Write the positional variable
-                                    positionWriter.write(tupleBuilder.getDataOutput(), offset
+ positionIndex++);
-                                    tupleBuilder.addFieldEndOffset();
-                                }
-                                appendToFrameFromTupleBuilder(tupleBuilder);
-                            }
-                        } while (goon);
+                        unnesting(t);
                     } catch (AlgebricksException | IOException ae) {
                         throw new HyracksDataException(ae);
                     }
                 }
             }
 
+            private void unnesting(int t) throws AlgebricksException, IOException {
+                // Assumes that when unnesting the tuple, each step() call for each element
+                // in the tuple will increase the positionIndex, and the positionIndex will
+                // be reset when a new tuple is to be processed.
+                int positionIndex = 1;
+                boolean emitted = false;
+                do {
+                    if (!unnest.step(p)) {
+                        break;
+                    }
+                    writeOutput(t, positionIndex++, false);
+                    emitted = true;
+                } while (true);
+                if (leftOuter && !emitted) {
+                    writeOutput(t, -1, true);
+                }
+            }
+
+            private void writeOutput(int t, int positionIndex, boolean missing)
+                    throws HyracksDataException, IOException {
+                if (!unnestColIsProjected && positionWriter == null) {
+                    appendProjectionToFrame(t, projectionList);
+                    appendToFrameFromTupleBuilder(tupleBuilder);
+                    return;
+                }
+
+                tupleBuilder.reset();
+                for (int f = 0; f < outColPos; f++) {
+                    tupleBuilder.addField(tAccess, t, f);
+                }
+                if (unnestColIsProjected) {
+                    if (missing) {
+                        tupleBuilder.addField(missingBytes, 0, missingBytesLen);
+                    } else {
+                        tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+                    }
+                }
+                for (int f = unnestColIsProjected ? outColPos + 1 : outColPos; f < (positionWriter
!= null
+                        ? projectionList.length - 1 : projectionList.length); f++) {
+                    tupleBuilder.addField(tAccess, t, f);
+                }
+                if (positionWriter != null) {
+                    // Write the positional variable
+                    if (missing) {
+                        tupleBuilder.addField(missingBytes, 0, missingBytesLen);
+                    } else {
+                        positionWriter.write(tupleBuilder.getDataOutput(), positionIndex);
+                        tupleBuilder.addFieldEndOffset();
+                    }
+                }
+                appendToFrameFromTupleBuilder(tupleBuilder);
+            }
+
             @Override
             public void flush() throws HyracksDataException {
                 appender.flush(writer);
             }
         };
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/b0fe0ac0/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 4dafd0e..020cffe 100644
--- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -23,12 +23,6 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 
-import junit.framework.Assert;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
 import org.apache.hyracks.algebricks.data.impl.BinaryBooleanInspectorImpl;
 import org.apache.hyracks.algebricks.data.impl.BinaryIntegerInspectorImpl;
@@ -91,6 +85,11 @@ import org.apache.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import junit.framework.Assert;
 
 public class PushRuntimeTest {
 
@@ -127,13 +126,13 @@ public class PushRuntimeTest {
 
         EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
         RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
-        AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 }, new IScalarEvaluatorFactory[]
{
-                const1, const2 }, new int[] { 0, 1 });
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 },
+                new IScalarEvaluatorFactory[] { const1, const2 }, new int[] { 0, 1 });
         RecordDescriptor assignDesc = new RecordDescriptor(new ISerializerDeserializer[]
{
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE
});
 
-        PrinterRuntimeFactory printer = new PrinterRuntimeFactory(new int[] { 0, 1 }, new
IPrinterFactory[] {
-                IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE }, assignDesc);
+        PrinterRuntimeFactory printer = new PrinterRuntimeFactory(new int[] { 0, 1 },
+                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE
}, assignDesc);
 
         AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec,
0, 0,
                 new IPushRuntimeFactory[] { ets, assign, printer },
@@ -151,19 +150,20 @@ public class PushRuntimeTest {
 
         EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
         RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
-        AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 }, new IScalarEvaluatorFactory[]
{
-                const1, const2 }, new int[] { 0, 1 });
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 },
+                new IScalarEvaluatorFactory[] { const1, const2 }, new int[] { 0, 1 });
         RecordDescriptor assignDesc = new RecordDescriptor(new ISerializerDeserializer[]
{
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE
});
 
         String filePath = PATH_ACTUAL + SEPARATOR + "etsAssignWrite.out";
         File outFile = new File(filePath);
-        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0, 1 },
new IPrinterFactory[] {
-                IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE }, outFile,
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0, 1 },
+                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE
}, outFile,
                 PrinterBasedWriterFactory.INSTANCE, assignDesc);
 
         AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec,
0, 0,
-                new IPushRuntimeFactory[] { ets, assign, writer }, new RecordDescriptor[]
{ etsDesc, assignDesc, null });
+                new IPushRuntimeFactory[] { ets, assign, writer },
+                new RecordDescriptor[] { etsDesc, assignDesc, null });
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp, DEFAULT_NODES);
         spec.addRoot(algebricksOp);
         AlgebricksHyracksIntegrationUtil.runJob(spec);
@@ -180,8 +180,8 @@ public class PushRuntimeTest {
 
         // the scanner
         FileSplit[] intFileSplits = new FileSplit[1];
-        intFileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new
File(
-                "data/simple/int-part1.tbl")));
+        intFileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID,
+                new FileReference(new File("data/simple/int-part1.tbl")));
         IFileSplitProvider intSplitProvider = new ConstantFileSplitProvider(intFileSplits);
         RecordDescriptor intScannerDesc = new RecordDescriptor(
                 new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
@@ -228,8 +228,8 @@ public class PushRuntimeTest {
 
         EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
         RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
-        AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 }, new IScalarEvaluatorFactory[]
{
-                const1, const2 }, new int[] { 0, 1 });
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 },
+                new IScalarEvaluatorFactory[] { const1, const2 }, new int[] { 0, 1 });
         RecordDescriptor assignDesc = new RecordDescriptor(new ISerializerDeserializer[]
{
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE
});
         StreamProjectRuntimeFactory project = new StreamProjectRuntimeFactory(new int[] {
1 });
@@ -243,8 +243,8 @@ public class PushRuntimeTest {
                 projectDesc);
 
         AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec,
0, 0,
-                new IPushRuntimeFactory[] { ets, assign, project, writer }, new RecordDescriptor[]
{ etsDesc,
-                        assignDesc, projectDesc, null });
+                new IPushRuntimeFactory[] { ets, assign, project, writer },
+                new RecordDescriptor[] { etsDesc, assignDesc, projectDesc, null });
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp, DEFAULT_NODES);
 
@@ -263,8 +263,8 @@ public class PushRuntimeTest {
 
         // the scanner
         FileSplit[] fileSplits = new FileSplit[1];
-        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new
File(
-                "data/tpch0.001/customer.tbl")));
+        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID,
+                new FileReference(new File("data/tpch0.001/customer.tbl")));
         IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
 
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[]
{
@@ -315,7 +315,7 @@ public class PushRuntimeTest {
         EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
         RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
         IUnnestingEvaluatorFactory aggregFactory = new IntArrayUnnester(new int[] { 100,
200, 300 });
-        UnnestRuntimeFactory unnest = new UnnestRuntimeFactory(0, aggregFactory, new int[]
{ 0 });
+        UnnestRuntimeFactory unnest = new UnnestRuntimeFactory(0, aggregFactory, new int[]
{ 0 }, false, null);
         RecordDescriptor unnestDesc = new RecordDescriptor(
                 new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
 
@@ -326,7 +326,8 @@ public class PushRuntimeTest {
                 unnestDesc);
 
         AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec,
0, 0,
-                new IPushRuntimeFactory[] { ets, unnest, writer }, new RecordDescriptor[]
{ etsDesc, unnestDesc, null });
+                new IPushRuntimeFactory[] { ets, unnest, writer },
+                new RecordDescriptor[] { etsDesc, unnestDesc, null });
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
         spec.addRoot(algebricksOp);
@@ -344,8 +345,8 @@ public class PushRuntimeTest {
 
         // the scanner
         FileSplit[] fileSplits = new FileSplit[1];
-        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new
File(
-                "data/tpch0.001/customer-part1.tbl")));
+        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID,
+                new FileReference(new File("data/tpch0.001/customer-part1.tbl")));
         IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[]
{
                 IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer(),
@@ -396,8 +397,8 @@ public class PushRuntimeTest {
 
         // the scanner
         FileSplit[] fileSplits = new FileSplit[1];
-        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new
File(
-                "data/tpch0.001/customer.tbl")));
+        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID,
+                new FileReference(new File("data/tpch0.001/customer.tbl")));
         IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[]
{
                 IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer(),
@@ -437,8 +438,8 @@ public class PushRuntimeTest {
         RecordDescriptor gbyDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE
});
         PreclusteredGroupOperatorDescriptor gby = new PreclusteredGroupOperatorDescriptor(spec,
new int[] { 3 },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY)
},
-                npaaf, gbyDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY)
}, npaaf,
+                gbyDesc);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, gby,
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
@@ -482,13 +483,13 @@ public class PushRuntimeTest {
         EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
         RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
         IUnnestingEvaluatorFactory aggregFactory = new IntArrayUnnester(new int[] { 100,
200, 300 });
-        UnnestRuntimeFactory unnest = new UnnestRuntimeFactory(0, aggregFactory, new int[]
{ 0 });
+        UnnestRuntimeFactory unnest = new UnnestRuntimeFactory(0, aggregFactory, new int[]
{ 0 }, false, null);
         RecordDescriptor unnestDesc = new RecordDescriptor(
                 new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
 
         RunningAggregateRuntimeFactory ragg = new RunningAggregateRuntimeFactory(new int[]
{ 1 },
-                new IRunningAggregateEvaluatorFactory[] { new TupleCountRunningAggregateFunctionFactory()
}, new int[] {
-                        0, 1 });
+                new IRunningAggregateEvaluatorFactory[] { new TupleCountRunningAggregateFunctionFactory()
},
+                new int[] { 0, 1 });
         RecordDescriptor raggDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE
});
 
@@ -499,8 +500,8 @@ public class PushRuntimeTest {
                 raggDesc);
 
         AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec,
0, 0,
-                new IPushRuntimeFactory[] { ets, unnest, ragg, writer }, new RecordDescriptor[]
{ etsDesc, unnestDesc,
-                        raggDesc, null });
+                new IPushRuntimeFactory[] { ets, unnest, ragg, writer },
+                new RecordDescriptor[] { etsDesc, unnestDesc, raggDesc, null });
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
@@ -522,8 +523,8 @@ public class PushRuntimeTest {
 
         EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
         RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
-        AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 }, new IScalarEvaluatorFactory[]
{
-                const1, const2 }, new int[] { 0, 1 });
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 },
+                new IScalarEvaluatorFactory[] { const1, const2 }, new int[] { 0, 1 });
         RecordDescriptor assignDesc = new RecordDescriptor(new ISerializerDeserializer[]
{
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE
});
 
@@ -540,21 +541,21 @@ public class PushRuntimeTest {
             return;
         }
 
-        StringStreamingRuntimeFactory script = new StringStreamingRuntimeFactory(command,
new IPrinterFactory[] {
-                IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE }, ' ',
+        StringStreamingRuntimeFactory script = new StringStreamingRuntimeFactory(command,
+                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE
}, ' ',
                 new DelimitedDataTupleParserFactory(valueParsers, ' '));
         RecordDescriptor scriptDesc = new RecordDescriptor(new ISerializerDeserializer[]
{
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE
});
 
         String filePath = PATH_ACTUAL + SEPARATOR + "etsAssignScriptWrite.out";
         File outFile = new File(filePath);
-        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0, 1 },
new IPrinterFactory[] {
-                IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE }, outFile,
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0, 1 },
+                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE
}, outFile,
                 PrinterBasedWriterFactory.INSTANCE, scriptDesc);
 
         AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec,
0, 0,
-                new IPushRuntimeFactory[] { ets, assign, script, writer }, new RecordDescriptor[]
{ etsDesc,
-                        assignDesc, scriptDesc, null });
+                new IPushRuntimeFactory[] { ets, assign, script, writer },
+                new RecordDescriptor[] { etsDesc, assignDesc, scriptDesc, null });
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
@@ -581,16 +582,16 @@ public class PushRuntimeTest {
             outputFile[i] = File.createTempFile("splitop", null);
         }
 
-        FileSplit[] inputSplits = new FileSplit[] { new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID,
-                new FileReference(inputFile)) };
+        FileSplit[] inputSplits = new FileSplit[] {
+                new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(inputFile))
};
 
         DelimitedDataTupleParserFactory stringParser = new DelimitedDataTupleParserFactory(
                 new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, '\u0000');
         RecordDescriptor stringRec = new RecordDescriptor(
                 new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), });
 
-        FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec, new ConstantFileSplitProvider(
-                inputSplits), stringParser, stringRec);
+        FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec,
+                new ConstantFileSplitProvider(inputSplits), stringParser, stringRec);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp,
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
@@ -602,8 +603,8 @@ public class PushRuntimeTest {
 
         IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
         for (int i = 0; i < outputArity; i++) {
-            outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] { new
FileSplit(
-                    AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(outputFile[i]))
});
+            outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] {
+                    new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(outputFile[i]))
});
             PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i],
                     new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
         }
@@ -629,8 +630,8 @@ public class PushRuntimeTest {
 
         // the scanner
         FileSplit[] fileSplits = new FileSplit[1];
-        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new
File(
-                "data/tpch0.001/nation.tbl")));
+        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID,
+                new FileReference(new File("data/tpch0.001/nation.tbl")));
         IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[]
{
                 IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer(),
@@ -653,9 +654,10 @@ public class PushRuntimeTest {
         String filePath = PATH_ACTUAL + SEPARATOR + fileName;
         String resultFilePath = PATH_EXPECTED + SEPARATOR + fileName;
         File outFile = new File(filePath);
-        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0, 1,
2, 3 }, new IPrinterFactory[] {
-                IntegerPrinterFactory.INSTANCE, UTF8StringPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE,
-                UTF8StringPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
sortDesc);
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0, 1,
2, 3 },
+                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE, UTF8StringPrinterFactory.INSTANCE,
+                        IntegerPrinterFactory.INSTANCE, UTF8StringPrinterFactory.INSTANCE
},
+                outFile, PrinterBasedWriterFactory.INSTANCE, sortDesc);
 
         AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec,
1, 0,
                 new IPushRuntimeFactory[] { sort, writer }, new RecordDescriptor[] { sortDesc,
null });
@@ -717,8 +719,8 @@ public class PushRuntimeTest {
                 project2Desc);
 
         AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec,
0, 0,
-                new IPushRuntimeFactory[] { ets, assign1, subplan, project2, writer }, new
RecordDescriptor[] {
-                        etsDesc, assign1Desc, subplanDesc, project2Desc, null });
+                new IPushRuntimeFactory[] { ets, assign1, subplan, project2, writer },
+                new RecordDescriptor[] { etsDesc, assign1Desc, subplanDesc, project2Desc,
null });
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp, DEFAULT_NODES);
 
@@ -737,8 +739,8 @@ public class PushRuntimeTest {
 
         // the scanner
         FileSplit[] fileSplits = new FileSplit[1];
-        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new
File(
-                "data/tpch0.001/customer.tbl")));
+        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID,
+                new FileReference(new File("data/tpch0.001/customer.tbl")));
         IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[]
{
                 IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer(),
@@ -774,8 +776,8 @@ public class PushRuntimeTest {
         RecordDescriptor gbyDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE
});
         MicroPreClusteredGroupRuntimeFactory gby = new MicroPreClusteredGroupRuntimeFactory(new
int[] { 3 },
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY)
},
-                npaaf, sortDesc, gbyDesc, null);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY)
}, npaaf,
+                sortDesc, gbyDesc, null);
 
         // the algebricks op.
         IScalarEvaluatorFactory cond = new IntegerEqualsEvalFactory(new IntegerConstantEvalFactory(3),
@@ -792,8 +794,8 @@ public class PushRuntimeTest {
                 selectDesc);
 
         AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec,
1, 0,
-                new IPushRuntimeFactory[] { sort, gby, select, writer }, new RecordDescriptor[]
{ sortDesc, gbyDesc,
-                        selectDesc, null });
+                new IPushRuntimeFactory[] { sort, gby, select, writer },
+                new RecordDescriptor[] { sortDesc, gbyDesc, selectDesc, null });
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });


Mime
View raw message