asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [1/2] incubator-asterixdb-hyracks git commit: Move to non-copy-based evaluator interfaces for scalar functions, aggregate functions, running aggregate functions and unnest functions.
Date Sat, 13 Feb 2016 02:14:34 GMT
Repository: incubator-asterixdb-hyracks
Updated Branches:
  refs/heads/master 6800b9e16 -> 7a27293c3


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7a27293c/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
index b7f11d8..2d5c929 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
@@ -24,8 +24,8 @@ import java.nio.ByteBuffer;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -35,7 +35,8 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -48,12 +49,13 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
     private static final long serialVersionUID = 1L;
     public static int NO_DEFAULT_BRANCH = -1;
 
-    private final ICopyEvaluatorFactory[] evalFactories;
+    private final IScalarEvaluatorFactory[] evalFactories;
     private final IBinaryBooleanInspector boolInspector;
     private final int defaultBranchIndex;
 
-    public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec, ICopyEvaluatorFactory[]
evalFactories,
-            IBinaryBooleanInspector boolInspector, int defaultBranchIndex, RecordDescriptor
rDesc) {
+    public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec,
+            IScalarEvaluatorFactory[] evalFactories, IBinaryBooleanInspector boolInspector,
int defaultBranchIndex,
+            RecordDescriptor rDesc) {
         super(spec, 1, (defaultBranchIndex == evalFactories.length) ? evalFactories.length
+ 1 : evalFactories.length);
         for (int i = 0; i < evalFactories.length; i++) {
             recordDescriptors[i] = rDesc;
@@ -71,8 +73,8 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
             private final IFrameWriter[] writers = new IFrameWriter[outputArity];
             private final boolean[] isOpen = new boolean[outputArity];
             private final IFrame[] writeBuffers = new IFrame[outputArity];
-            private final ICopyEvaluator[] evals = new ICopyEvaluator[outputArity];
-            private final ArrayBackedValueStorage evalBuf = new ArrayBackedValueStorage();
+            private final IScalarEvaluator[] evals = new IScalarEvaluator[outputArity];
+            private final IPointable evalPointable = new VoidPointable();
             private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(),
                     0);
             private final FrameTupleAccessor accessor = new FrameTupleAccessor(inOutRecDesc);
@@ -149,12 +151,12 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
                     boolean found = false;
                     for (int j = 0; j < evals.length; j++) {
                         try {
-                            evalBuf.reset();
-                            evals[j].evaluate(frameTuple);
+                            evals[j].evaluate(frameTuple, evalPointable);
                         } catch (AlgebricksException e) {
                             throw new HyracksDataException(e);
                         }
-                        found = boolInspector.getBooleanValue(evalBuf.getByteArray(), 0,
1);
+                        found = boolInspector.getBooleanValue(evalPointable.getByteArray(),
+                                evalPointable.getStartOffset(), evalPointable.getLength());
                         if (found) {
                             copyAndAppendTuple(j);
                             break;
@@ -199,7 +201,7 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
                 // Create evaluators for partitioning.
                 try {
                     for (int i = 0; i < evalFactories.length; i++) {
-                        evals[i] = evalFactories[i].createEvaluator(evalBuf);
+                        evals[i] = evalFactories[i].createScalarEvaluator(ctx);
                     }
                 } catch (AlgebricksException e) {
                     throw new HyracksDataException(e);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7a27293c/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 5a26f36..bb6cc73 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -100,7 +100,7 @@ public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRun
                     int n = runningAggregates.length;
                     for (int i = 0; i < n; i++) {
                         try {
-                            raggs[i] = runningAggregates[i].createRunningAggregateEvaluator();
+                            raggs[i] = runningAggregates[i].createRunningAggregateEvaluator(ctx);
                         } catch (AlgebricksException ae) {
                             throw new HyracksDataException(ae);
                         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7a27293c/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 2c04003..e9d4ec3 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -27,7 +27,7 @@ import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingPositionWriter;
-import org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvalFactory;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -73,7 +73,7 @@ public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
         this.positionWriter = positionWriter;
         this.posOffsetEvalFactory = posOffsetEvalFactory;
         if (this.posOffsetEvalFactory == null) {
-            this.posOffsetEvalFactory = new ConstantEvaluatorFactory(new byte[5]);
+            this.posOffsetEvalFactory = new ConstantEvalFactory(new byte[5]);
         }
     }
 
@@ -88,7 +88,7 @@ public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
 
         return new AbstractOneInputOneOutputOneFramePushRuntime() {
             private IPointable p = VoidPointable.FACTORY.createPointable();
-            private IUnnestingEvaluator agg;
+            private IUnnestingEvaluator unnest;
             private ArrayTupleBuilder tupleBuilder;
 
             private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
@@ -98,7 +98,7 @@ public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
                 writer.open();
                 initAccessAppendRef(ctx);
                 try {
-                    agg = unnestingFactory.createUnnestingEvaluator(ctx);
+                    unnest = unnestingFactory.createUnnestingEvaluator(ctx);
                 } catch (AlgebricksException ae) {
                     throw new HyracksDataException(ae);
                 }
@@ -118,7 +118,7 @@ public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
                     }
                     int offset = IntegerPointable.getInteger(p.getByteArray(), p.getStartOffset());
                     try {
-                        agg.init(tRef);
+                        unnest.init(tRef);
                         // assume that when unnesting the tuple, each step() call for each
element
                         // in the tuple will increase the positionIndex, and the positionIndex
will
                         // be reset when a new tuple is to be processed.
@@ -126,7 +126,7 @@ public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
                         boolean goon = true;
                         do {
                             tupleBuilder.reset();
-                            if (!agg.step(p)) {
+                            if (!unnest.step(p)) {
                                 goon = false;
                             } else {
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7a27293c/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
index fd979fc..f7a97f4 100644
--- a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
+++ b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
@@ -29,7 +29,6 @@ import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class IntegerAddEvalFactory implements IScalarEvaluatorFactory {
 
@@ -52,7 +51,6 @@ public class IntegerAddEvalFactory implements IScalarEvaluatorFactory {
             private IScalarEvaluator evalLeft = evalLeftFactory.createScalarEvaluator(ctx);
             private IScalarEvaluator evalRight = evalRightFactory.createScalarEvaluator(ctx);
 
-            @SuppressWarnings("static-access")
             @Override
             public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException
{
                 evalLeft.evaluate(tuple, p);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7a27293c/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
index ea415c8..66d3848 100644
--- a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
+++ b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
@@ -27,7 +27,6 @@ import org.apache.hyracks.data.std.primitive.BooleanPointable;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class IntegerEqualsEvalFactory implements IScalarEvaluatorFactory {
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7a27293c/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
index aebc406..46a7ab6 100644
--- a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
+++ b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
@@ -27,7 +27,6 @@ import org.apache.hyracks.data.std.primitive.BooleanPointable;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class IntegerGreaterThanEvalFactory implements IScalarEvaluatorFactory {
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7a27293c/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
b/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
index e311fa6..7e834db 100644
--- a/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
+++ b/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
@@ -94,7 +94,8 @@ public final class UTF8StringPointable extends AbstractPointable implements
IHas
      * Returns the character at the given byte offset. The caller is responsible for making
sure that
      * the provided offset is within bounds and points to the beginning of a valid UTF8 character.
      *
-     * @param offset - Byte offset
+     * @param offset
+     *            - Byte offset
      * @return Character at the given offset.
      */
     public char charAt(int offset) {
@@ -157,6 +158,7 @@ public final class UTF8StringPointable extends AbstractPointable implements
IHas
         UTF8StringUtil.toString(buffer, bytes, start);
     }
 
+    @Override
     public String toString() {
         return new String(this.bytes, this.getCharStartOffset(), this.getUTF8Length(), Charset.forName("UTF-8"));
     }
@@ -166,8 +168,8 @@ public final class UTF8StringPointable extends AbstractPointable implements
IHas
      */
 
     public int ignoreCaseCompareTo(UTF8StringPointable other) {
-        return UTF8StringUtil.lowerCaseCompareTo(this.getByteArray(), this.getStartOffset(),
-                other.getByteArray(), other.getStartOffset());
+        return UTF8StringUtil.lowerCaseCompareTo(this.getByteArray(), this.getStartOffset(),
other.getByteArray(),
+                other.getStartOffset());
     }
 
     public int find(UTF8StringPointable pattern, boolean ignoreCase) {
@@ -228,8 +230,9 @@ public final class UTF8StringPointable extends AbstractPointable implements
IHas
     public static boolean startsWith(UTF8StringPointable src, UTF8StringPointable pattern,
boolean ignoreCase) {
         int utflen1 = src.getUTF8Length();
         int utflen2 = pattern.getUTF8Length();
-        if (utflen2 > utflen1)
+        if (utflen2 > utflen1) {
             return false;
+        }
 
         int s1Start = src.getMetaDataLength();
         int s2Start = pattern.getMetaDataLength();
@@ -257,8 +260,9 @@ public final class UTF8StringPointable extends AbstractPointable implements
IHas
     public static boolean endsWith(UTF8StringPointable src, UTF8StringPointable pattern,
boolean ignoreCase) {
         int len1 = src.getUTF8Length();
         int len2 = pattern.getUTF8Length();
-        if (len2 > len1)
+        if (len2 > len1) {
             return false;
+        }
 
         int s1Start = src.getMetaDataLength();
         int s2Start = pattern.getMetaDataLength();
@@ -351,10 +355,7 @@ public final class UTF8StringPointable extends AbstractPointable implements
IHas
      * @param out
      * @throws IOException
      */
-    public static void substrBefore(
-            UTF8StringPointable src,
-            UTF8StringPointable match,
-            UTF8StringBuilder builder,
+    public static void substrBefore(UTF8StringPointable src, UTF8StringPointable match, UTF8StringBuilder
builder,
             GrowableArray out) throws IOException {
 
         int byteOffset = find(src, match, false);
@@ -367,7 +368,7 @@ public final class UTF8StringPointable extends AbstractPointable implements
IHas
         final int srcMetaLen = src.getMetaDataLength();
 
         builder.reset(out, byteOffset);
-        for (int idx = 0; idx < byteOffset; ) {
+        for (int idx = 0; idx < byteOffset;) {
             builder.appendChar(src.charAt(srcMetaLen + idx));
             idx += src.charSize(srcMetaLen + idx);
         }
@@ -387,10 +388,7 @@ public final class UTF8StringPointable extends AbstractPointable implements
IHas
      * @param builder
      * @param out
      */
-    public static void substrAfter(
-            UTF8StringPointable src,
-            UTF8StringPointable match,
-            UTF8StringBuilder builder,
+    public static void substrAfter(UTF8StringPointable src, UTF8StringPointable match, UTF8StringBuilder
builder,
             GrowableArray out) throws IOException {
 
         int byteOffset = find(src, match, false);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/7a27293c/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
b/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
index 5a716b4..0e70273 100644
--- a/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
+++ b/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
@@ -24,17 +24,14 @@ import java.io.IOException;
 
 /**
  * Encodes positive integers in a variable-bytes format.
- *
  * Each byte stores seven bits of the number. The first bit of each byte notifies if it is
the last byte.
  * Specifically, if the first bit is set, then we need to shift the current value by seven
and
  * continue to read the next byte util we meet a byte whose first byte is unset.
- *
  * e.g. if the number is < 128, it will be stored using one byte and the byte value keeps
as original.
  * To store the number 255 (0xff) , it will be encoded as [0x81,0x7f]. To decode that value,
it reads the 0x81
  * to know that the current value is (0x81 & 0x7f)= 0x01, and the first bit tells that
there are more bytes to
  * be read. When it meets 0x7f, whose first flag is unset, it knows that it is the final
byte to decode.
  * Finally it will return ( 0x01 << 7) + 0x7f === 255.
- *
  */
 public class VarLenIntEncoderDecoder {
     // sometimes the dec number is easier to get the sense of how big it is.
@@ -75,11 +72,16 @@ public class VarLenIntEncoderDecoder {
 
     public static int decode(byte[] srcBytes, int startPos) {
         int sum = 0;
-        while ((srcBytes[startPos] & CONTINUE_CHUNK) == CONTINUE_CHUNK) {
+        while (startPos < srcBytes.length && (srcBytes[startPos] & CONTINUE_CHUNK)
== CONTINUE_CHUNK) {
             sum = (sum + (srcBytes[startPos] & DECODE_MASK)) << 7;
             startPos++;
         }
-        sum += srcBytes[startPos++];
+        if (startPos < srcBytes.length) {
+            sum += srcBytes[startPos];
+        } else {
+            throw new IllegalStateException("Corrupted string bytes: trying to access entry
" + startPos
+                    + " in a byte array of length " + srcBytes.length);
+        }
         return sum;
     }
 


Mime
View raw message