asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prest...@apache.org
Subject [2/8] incubator-asterixdb git commit: Adding introspection for getting record details.
Date Fri, 10 Jul 2015 03:15:30 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c66d23a5/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByIndexDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByIndexDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByIndexDescriptor.java
new file mode 100644
index 0000000..072edac
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByIndexDescriptor.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.evaluators.functions.records;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+public class FieldAccessByIndexDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new FieldAccessByIndexDescriptor();
+        }
+    };
+
+    private ARecordType recType;
+
+    public void reset(ARecordType recType) {
+        this.recType = recType;
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX;
+    }
+
+    @Override
+    public ICopyEvaluatorFactory createEvaluatorFactory(ICopyEvaluatorFactory[] args) {
+        return new FieldAccessByIndexEvalFactory(args[0], args[1], recType);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c66d23a5/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java
new file mode 100644
index 0000000..2b9f666
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.evaluators.functions.records;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class FieldAccessByIndexEvalFactory implements ICopyEvaluatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private ICopyEvaluatorFactory recordEvalFactory;
+    private ICopyEvaluatorFactory fieldIndexEvalFactory;
+    private int nullBitmapSize;
+    private ARecordType recordType;
+    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
+    private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
+
+    public FieldAccessByIndexEvalFactory(ICopyEvaluatorFactory recordEvalFactory,
+            ICopyEvaluatorFactory fieldIndexEvalFactory, ARecordType recordType) {
+        this.recordEvalFactory = recordEvalFactory;
+        this.fieldIndexEvalFactory = fieldIndexEvalFactory;
+        this.recordType = recordType;
+        this.nullBitmapSize = ARecordType.computeNullBitmapSize(recordType);
+
+    }
+
+    @Override
+    public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+        return new ICopyEvaluator() {
+
+            private DataOutput out = output.getDataOutput();
+
+            private ArrayBackedValueStorage outInput0 = new ArrayBackedValueStorage();
+            private ArrayBackedValueStorage outInput1 = new ArrayBackedValueStorage();
+            private ICopyEvaluator eval0 = recordEvalFactory.createEvaluator(outInput0);
+            private ICopyEvaluator eval1 = fieldIndexEvalFactory.createEvaluator(outInput1);
+            @SuppressWarnings("unchecked")
+            private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(BuiltinType.ANULL);
+            private int fieldIndex;
+            private int fieldValueOffset;
+            private int fieldValueLength;
+            private IAType fieldValueType;
+            private ATypeTag fieldValueTypeTag = ATypeTag.NULL;
+
+            @Override
+            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                try {
+                    outInput0.reset();
+                    eval0.evaluate(tuple);
+                    outInput1.reset();
+                    eval1.evaluate(tuple);
+                    byte[] serRecord = outInput0.getByteArray();
+
+                    if (serRecord[0] == SER_NULL_TYPE_TAG) {
+                        nullSerde.serialize(ANull.NULL, out);
+                        return;
+                    }
+
+                    if (serRecord[0] != SER_RECORD_TYPE_TAG) {
+                        throw new AlgebricksException("Field accessor is not defined for values of type "
+                                + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[0]));
+                    }
+
+                    fieldIndex = IntegerPointable.getInteger(outInput1.getByteArray(), 1);
+                    fieldValueOffset = ARecordSerializerDeserializer.getFieldOffsetById(serRecord, fieldIndex,
+                            nullBitmapSize, recordType.isOpen());
+
+                    if (fieldValueOffset == 0) {
+                        // the field is null, we checked the null bit map
+                        out.writeByte(SER_NULL_TYPE_TAG);
+                        return;
+                    }
+
+                    fieldValueType = recordType.getFieldTypes()[fieldIndex];
+                    if (fieldValueType.getTypeTag().equals(ATypeTag.UNION)) {
+                        if (NonTaggedFormatUtil.isOptionalField((AUnionType) fieldValueType)) {
+                            fieldValueTypeTag = ((AUnionType) fieldValueType).getUnionList()
+                                    .get(AUnionType.OPTIONAL_TYPE_INDEX_IN_UNION_LIST).getTypeTag();
+                            fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, fieldValueOffset,
+                                    fieldValueTypeTag, false);
+                            out.writeByte(fieldValueTypeTag.serialize());
+                        } else {
+                            // union .. the general case
+                            throw new NotImplementedException();
+                        }
+                    } else {
+                        fieldValueTypeTag = fieldValueType.getTypeTag();
+                        fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, fieldValueOffset,
+                                fieldValueTypeTag, false);
+                        out.writeByte(fieldValueTypeTag.serialize());
+                    }
+                    out.write(serRecord, fieldValueOffset, fieldValueLength);
+
+                } catch (IOException e) {
+                    throw new AlgebricksException(e);
+                } catch (AsterixException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c66d23a5/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByNameDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByNameDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByNameDescriptor.java
new file mode 100644
index 0000000..316eb9a
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByNameDescriptor.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.evaluators.functions.records;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+public class FieldAccessByNameDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new FieldAccessByNameDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME;
+    }
+
+    @Override
+    public ICopyEvaluatorFactory createEvaluatorFactory(ICopyEvaluatorFactory[] args) {
+        return new FieldAccessByNameEvalFactory(args[0], args[1]);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c66d23a5/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java
new file mode 100644
index 0000000..d9a10de
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.evaluators.functions.records;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class FieldAccessByNameEvalFactory implements ICopyEvaluatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private ICopyEvaluatorFactory recordEvalFactory;
+    private ICopyEvaluatorFactory fldNameEvalFactory;
+
+    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
+    private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
+
+    public FieldAccessByNameEvalFactory(ICopyEvaluatorFactory recordEvalFactory,
+            ICopyEvaluatorFactory fldNameEvalFactory) {
+        this.recordEvalFactory = recordEvalFactory;
+        this.fldNameEvalFactory = fldNameEvalFactory;
+    }
+
+    @Override
+    public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+        return new ICopyEvaluator() {
+
+            private DataOutput out = output.getDataOutput();
+
+            private ArrayBackedValueStorage outInput0 = new ArrayBackedValueStorage();
+            private ArrayBackedValueStorage outInput1 = new ArrayBackedValueStorage();
+            private ICopyEvaluator eval0 = recordEvalFactory.createEvaluator(outInput0);
+            private ICopyEvaluator eval1 = fldNameEvalFactory.createEvaluator(outInput1);
+            @SuppressWarnings("unchecked")
+            private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(BuiltinType.ANULL);
+            private int fieldValueOffset;
+            private int fieldValueLength;
+            private ATypeTag fieldValueTypeTag = ATypeTag.NULL;
+
+            @Override
+            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+
+                try {
+                    outInput0.reset();
+                    eval0.evaluate(tuple);
+                    outInput1.reset();
+                    eval1.evaluate(tuple);
+                    byte[] serRecord = outInput0.getByteArray();
+
+                    if (serRecord[0] == SER_NULL_TYPE_TAG) {
+                        nullSerde.serialize(ANull.NULL, out);
+                        return;
+                    }
+
+                    if (serRecord[0] != SER_RECORD_TYPE_TAG) {
+                        throw new AlgebricksException(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME.getName()
+                                + ": expects input type NULL or RECORD, but got "
+                                + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[0]));
+                    }
+
+                    byte[] serFldName = outInput1.getByteArray();
+                    fieldValueOffset = ARecordSerializerDeserializer.getFieldOffsetByName(serRecord, serFldName);
+                    if (fieldValueOffset < 0) {
+                        out.writeByte(ATypeTag.NULL.serialize());
+                        return;
+                    }
+
+                    fieldValueTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[fieldValueOffset]);
+                    fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, fieldValueOffset,
+                            fieldValueTypeTag, true) + 1;
+                    out.write(serRecord, fieldValueOffset, fieldValueLength);
+
+                } catch (IOException e) {
+                    throw new AlgebricksException(e);
+                } catch (AsterixException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c66d23a5/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessNestedDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessNestedDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessNestedDescriptor.java
new file mode 100644
index 0000000..4a79d93
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessNestedDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.evaluators.functions.records;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+public class FieldAccessNestedDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new FieldAccessNestedDescriptor();
+        }
+    };
+
+    private ARecordType recType;
+    private List<String> fldName;
+
+    public void reset(ARecordType recType, List<String> fldName) {
+        this.recType = recType;
+        this.fldName = fldName;
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.FIELD_ACCESS_NESTED;
+    }
+
+    @Override
+    public ICopyEvaluatorFactory createEvaluatorFactory(ICopyEvaluatorFactory[] args) {
+        return new FieldAccessNestedEvalFactory(args[0], recType, fldName);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c66d23a5/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java
new file mode 100644
index 0000000..06c2109
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.evaluators.functions.records;
+
+import java.io.DataOutput;
+import java.util.List;
+
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class FieldAccessNestedEvalFactory implements ICopyEvaluatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private ICopyEvaluatorFactory recordEvalFactory;
+    private ARecordType recordType;
+    private List<String> fieldPath;
+
+    public FieldAccessNestedEvalFactory(ICopyEvaluatorFactory recordEvalFactory, ARecordType recordType,
+            List<String> fldName) {
+        this.recordEvalFactory = recordEvalFactory;
+        this.recordType = recordType;
+        this.fieldPath = fldName;
+
+    }
+
+    @Override
+    public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+        return new ICopyEvaluator() {
+
+            private DataOutput out = output.getDataOutput();
+            private ByteArrayAccessibleOutputStream subRecordTmpStream = new ByteArrayAccessibleOutputStream();
+
+            private ArrayBackedValueStorage outInput0 = new ArrayBackedValueStorage();
+            private ICopyEvaluator eval0 = recordEvalFactory.createEvaluator(outInput0);
+            private ArrayBackedValueStorage[] abvsFields = new ArrayBackedValueStorage[fieldPath.size()];
+            private DataOutput[] doFields = new DataOutput[fieldPath.size()];
+
+            {
+                FieldAccessUtil.getFieldsAbvs(abvsFields, doFields, fieldPath);
+                recordType = recordType.deepCopy(recordType);
+            }
+
+            @Override
+            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                FieldAccessUtil.evaluate(tuple, out, eval0, abvsFields, outInput0, subRecordTmpStream, recordType);
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c66d23a5/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessUtil.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessUtil.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessUtil.java
new file mode 100644
index 0000000..ef790fc
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/FieldAccessUtil.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.evaluators.functions.records;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class FieldAccessUtil {
+
+    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
+    private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
+
+    @SuppressWarnings("unchecked")
+    private static ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ANULL);
+
+    @SuppressWarnings("unchecked")
+    public static void getFieldsAbvs(ArrayBackedValueStorage[] abvsFields, DataOutput[] doFields,
+            List<String> fieldPaths) throws AlgebricksException {
+        AString as;
+        for (int i = 0; i < fieldPaths.size(); i++) {
+            abvsFields[i] = new ArrayBackedValueStorage();
+            doFields[i] = abvsFields[i].getDataOutput();
+            as = new AString(fieldPaths.get(i));
+            try {
+                AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(as.getType()).serialize(as,
+                        doFields[i]);
+            } catch (HyracksDataException e) {
+                throw new AlgebricksException(e);
+            }
+        }
+    }
+
+    public static boolean checkType(byte tagId, DataOutput out) throws AlgebricksException {
+        if (tagId == SER_NULL_TYPE_TAG) {
+            try {
+                nullSerde.serialize(ANull.NULL, out);
+            } catch (HyracksDataException e) {
+                throw new AlgebricksException(e);
+            }
+            return true;
+        }
+
+        if (tagId != SER_RECORD_TYPE_TAG) {
+            throw new AlgebricksException("Field accessor is not defined for values of type "
+                    + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(tagId));
+        }
+        return false;
+    }
+
+    public static void evaluate(IFrameTupleReference tuple, DataOutput out, ICopyEvaluator eval0,
+            ArrayBackedValueStorage[] abvsFields, ArrayBackedValueStorage abvsRecord,
+            ByteArrayAccessibleOutputStream subRecordTmpStream, ARecordType recordType) throws AlgebricksException {
+
+        try {
+            abvsRecord.reset();
+            eval0.evaluate(tuple);
+
+            int subFieldIndex = -1;
+            int subFieldOffset = -1;
+            int subFieldLength = -1;
+            int nullBitmapSize = -1;
+            IAType subType = recordType;
+            ATypeTag subTypeTag = ATypeTag.NULL;
+            byte[] subRecord = abvsRecord.getByteArray();
+            boolean openField = false;
+            int i = 0;
+
+            if (checkType(subRecord[0], out)) {
+                return;
+            }
+
+            //Moving through closed fields
+            for (; i < abvsFields.length; i++) {
+                if (subType.getTypeTag().equals(ATypeTag.UNION)) {
+                    //enforced SubType
+                    subType = ((AUnionType) subType).getUnionList().get(AUnionType.OPTIONAL_TYPE_INDEX_IN_UNION_LIST);
+                    if (subType.getTypeTag().serialize() != SER_RECORD_TYPE_TAG) {
+                        throw new AlgebricksException("Field accessor is not defined for values of type " + subTypeTag);
+                    }
+
+                }
+                subFieldIndex = ((ARecordType) subType).findFieldPosition(abvsFields[i].getByteArray(),
+                        abvsFields[i].getStartOffset() + 1, abvsFields[i].getLength());
+                if (subFieldIndex == -1) {
+                    break;
+                }
+                nullBitmapSize = ARecordType.computeNullBitmapSize((ARecordType) subType);
+                subFieldOffset = ARecordSerializerDeserializer.getFieldOffsetById(subRecord, subFieldIndex,
+                        nullBitmapSize, ((ARecordType) subType).isOpen());
+                if (subFieldOffset == 0) {
+                    // the field is null, we checked the null bit map
+                    out.writeByte(SER_NULL_TYPE_TAG);
+                    return;
+                }
+                subType = ((ARecordType) subType).getFieldTypes()[subFieldIndex];
+                if (subType.getTypeTag().equals(ATypeTag.UNION)) {
+                    if (NonTaggedFormatUtil.isOptionalField((AUnionType) subType)) {
+                        subTypeTag = ((AUnionType) subType).getUnionList()
+                                .get(AUnionType.OPTIONAL_TYPE_INDEX_IN_UNION_LIST).getTypeTag();
+                        subFieldLength = NonTaggedFormatUtil.getFieldValueLength(subRecord, subFieldOffset, subTypeTag,
+                                false);
+                    } else {
+                        // union .. the general case
+                        throw new NotImplementedException();
+                    }
+                } else {
+                    subTypeTag = subType.getTypeTag();
+                    subFieldLength = NonTaggedFormatUtil.getFieldValueLength(subRecord, subFieldOffset, subTypeTag,
+                            false);
+                }
+
+                if (i < abvsFields.length - 1) {
+                    //setup next iteration
+                    subRecordTmpStream.reset();
+                    subRecordTmpStream.write(subTypeTag.serialize());
+                    subRecordTmpStream.write(subRecord, subFieldOffset, subFieldLength);
+                    subRecord = subRecordTmpStream.getByteArray();
+
+                    if (checkType(subRecord[0], out)) {
+                        return;
+                    }
+                }
+            }
+
+            //Moving through open fields
+            for (; i < abvsFields.length; i++) {
+                openField = true;
+                subFieldOffset = ARecordSerializerDeserializer.getFieldOffsetByName(subRecord,
+                        abvsFields[i].getByteArray());
+                if (subFieldOffset < 0) {
+                    out.writeByte(SER_NULL_TYPE_TAG);
+                    return;
+                }
+
+                subTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(subRecord[subFieldOffset]);
+                subFieldLength = NonTaggedFormatUtil.getFieldValueLength(subRecord, subFieldOffset, subTypeTag, true) + 1;
+
+                if (i < abvsFields.length - 1) {
+                    //setup next iteration
+                    subRecord = Arrays.copyOfRange(subRecord, subFieldOffset, subFieldOffset + subFieldLength);
+
+                    if (checkType(subRecord[0], out)) {
+                        return;
+                    }
+                }
+            }
+            if (!openField) {
+                out.writeByte(subTypeTag.serialize());
+            }
+            out.write(subRecord, subFieldOffset, subFieldLength);
+
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        } catch (AsterixException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c66d23a5/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldValueDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldValueDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldValueDescriptor.java
new file mode 100644
index 0000000..dfd8db0
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldValueDescriptor.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.evaluators.functions.records;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+public class GetRecordFieldValueDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new GetRecordFieldValueDescriptor();
+        }
+    };
+
+    private ARecordType recType;
+
+    public void reset(ARecordType recType) {
+        this.recType = recType;
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.GET_RECORD_FIELD_VALUE;
+    }
+
+    @Override
+    public ICopyEvaluatorFactory createEvaluatorFactory(ICopyEvaluatorFactory[] args) {
+        return new GetRecordFieldValueEvalFactory(args[0], args[1], recType);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c66d23a5/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java
new file mode 100644
index 0000000..b07049a
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.evaluators.functions.records;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class GetRecordFieldValueEvalFactory implements ICopyEvaluatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private ICopyEvaluatorFactory recordEvalFactory;
+    private ICopyEvaluatorFactory fldNameEvalFactory;
+    private ARecordType recordType;
+
+    private final static byte SER_STRING_TYPE_TAG = ATypeTag.STRING.serialize();
+
+    public GetRecordFieldValueEvalFactory(ICopyEvaluatorFactory recordEvalFactory,
+            ICopyEvaluatorFactory fldNameEvalFactory, ARecordType recordType) {
+        this.recordEvalFactory = recordEvalFactory;
+        this.fldNameEvalFactory = fldNameEvalFactory;
+        this.recordType = recordType;
+    }
+
+    @Override
+    public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+        return new ICopyEvaluator() {
+
+            private DataOutput out = output.getDataOutput();
+            private ByteArrayAccessibleOutputStream subRecordTmpStream = new ByteArrayAccessibleOutputStream();
+
+            private ArrayBackedValueStorage outInput0 = new ArrayBackedValueStorage();
+            private ArrayBackedValueStorage outInput1 = new ArrayBackedValueStorage();
+            private ICopyEvaluator eval0 = recordEvalFactory.createEvaluator(outInput0);
+            private ICopyEvaluator eval1 = fldNameEvalFactory.createEvaluator(outInput1);
+
+            int size = 1;
+            private ArrayBackedValueStorage abvsFields[] = new ArrayBackedValueStorage[size];
+            private DataOutput[] doFields = new DataOutput[size];
+
+            @SuppressWarnings("unchecked")
+            private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(BuiltinType.ANULL);
+
+            {
+                abvsFields[0] = new ArrayBackedValueStorage();
+                doFields[0] = abvsFields[0].getDataOutput();
+                recordType = recordType.deepCopy(recordType);
+            }
+
+            @Override
+            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                try {
+                    outInput1.reset();
+                    eval1.evaluate(tuple);
+
+                    byte[] serFldName = outInput1.getByteArray();
+                    if (serFldName[0] != SER_STRING_TYPE_TAG) {
+                        nullSerde.serialize(ANull.NULL, out);
+                        return;
+                    }
+                    abvsFields[0].reset();
+                    doFields[0].write(serFldName);
+
+                    FieldAccessUtil.evaluate(tuple, out, eval0, abvsFields, outInput0, subRecordTmpStream, recordType);
+                } catch (IOException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c66d23a5/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldsDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldsDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldsDescriptor.java
new file mode 100644
index 0000000..98abc61
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldsDescriptor.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.evaluators.functions.records;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+public class GetRecordFieldsDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new GetRecordFieldsDescriptor();
+        }
+    };
+
+    private ARecordType recType;
+
+    public void reset(ARecordType recType) {
+        this.recType = recType;
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.GET_RECORD_FIELDS;
+    }
+
+    @Override
+    public ICopyEvaluatorFactory createEvaluatorFactory(ICopyEvaluatorFactory[] args) {
+        return new GetRecordFieldsEvalFactory(args[0], recType);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c66d23a5/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldsEvalFactory.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldsEvalFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldsEvalFactory.java
new file mode 100644
index 0000000..c0c77a3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/GetRecordFieldsEvalFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.evaluators.functions.records;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.pointables.nonvisitor.ARecordPointable;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class GetRecordFieldsEvalFactory implements ICopyEvaluatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private ICopyEvaluatorFactory recordEvalFactory;
+    private ARecordType recordType;
+
+    private final byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
+    private final byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
+
+    public GetRecordFieldsEvalFactory(ICopyEvaluatorFactory recordEvalFactory, ARecordType recordType) {
+        this.recordEvalFactory = recordEvalFactory;
+        this.recordType = recordType;
+    }
+
+    @Override
+    public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+        return new ICopyEvaluator() {
+
+            @SuppressWarnings("unchecked")
+            private final ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(BuiltinType.ANULL);
+
+            private final ARecordPointable recordPointable = (ARecordPointable) ARecordPointable.FACTORY
+                    .createPointable();
+
+            private ArrayBackedValueStorage outInput0 = new ArrayBackedValueStorage();
+            private ICopyEvaluator eval0 = recordEvalFactory.createEvaluator(outInput0);
+            private DataOutput out = output.getDataOutput();
+            private RecordFieldsUtil rfu = new RecordFieldsUtil();
+
+            {
+                recordType = recordType.deepCopy(recordType);
+            }
+
+            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                outInput0.reset();
+                eval0.evaluate(tuple);
+
+                if (outInput0.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+                    try {
+                        nullSerde.serialize(ANull.NULL, out);
+                    } catch (HyracksDataException e) {
+                        throw new AlgebricksException(e);
+                    }
+                }
+
+                if (outInput0.getByteArray()[0] != SER_RECORD_TYPE_TAG) {
+                    throw new AlgebricksException("Field accessor is not defined for values of type "
+                            + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(outInput0.getByteArray()[0]));
+                }
+
+                recordPointable.set(outInput0.getByteArray(), outInput0.getStartOffset(), outInput0.getLength());
+
+                try {
+                    rfu.processRecord(recordPointable, recordType, out, 0);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                } catch (AsterixException e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c66d23a5/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/RecordFieldsUtil.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/RecordFieldsUtil.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/RecordFieldsUtil.java
new file mode 100644
index 0000000..fbb9a1b
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/RecordFieldsUtil.java
@@ -0,0 +1,303 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.evaluators.functions.records;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.builders.AbvsBuilderFactory;
+import edu.uci.ics.asterix.builders.IARecordBuilder;
+import edu.uci.ics.asterix.builders.IAsterixListBuilder;
+import edu.uci.ics.asterix.builders.ListBuilderFactory;
+import edu.uci.ics.asterix.builders.OrderedListBuilder;
+import edu.uci.ics.asterix.builders.RecordBuilder;
+import edu.uci.ics.asterix.builders.RecordBuilderFactory;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ABoolean;
+import edu.uci.ics.asterix.om.base.AMutableString;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.pointables.base.DefaultOpenFieldType;
+import edu.uci.ics.asterix.om.pointables.nonvisitor.AListPointable;
+import edu.uci.ics.asterix.om.pointables.nonvisitor.ARecordPointable;
+import edu.uci.ics.asterix.om.types.AOrderedListType;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AbstractCollectionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.container.IObjectPool;
+import edu.uci.ics.asterix.om.util.container.ListObjectPool;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IMutableValueStorage;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class RecordFieldsUtil {
+
+    private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
+    private final static byte SER_ORDERED_LIST_TYPE_TAG = ATypeTag.ORDEREDLIST.serialize();
+    private final static byte SER_UNORDERED_LIST_TYPE_TAG = ATypeTag.UNORDEREDLIST.serialize();
+
+    private final static AString fieldName = new AString("field-name");
+    private final static AString typeName = new AString("field-type");
+    private final static AString isOpenName = new AString("is-open");
+    private final static AString nestedName = new AString("nested");
+    private final static AString listName = new AString("list");
+
+    private IObjectPool<IARecordBuilder, String> recordBuilderPool = new ListObjectPool<IARecordBuilder, String>(
+            new RecordBuilderFactory());
+    private IObjectPool<IAsterixListBuilder, String> listBuilderPool = new ListObjectPool<IAsterixListBuilder, String>(
+            new ListBuilderFactory());
+    private IObjectPool<IMutableValueStorage, String> abvsBuilderPool = new ListObjectPool<IMutableValueStorage, String>(
+            new AbvsBuilderFactory());
+    private IObjectPool<IPointable, String> recordPointablePool = new ListObjectPool<IPointable, String>(
+            ARecordPointable.ALLOCATOR);
+    private IObjectPool<IPointable, String> listPointablePool = new ListObjectPool<IPointable, String>(
+            AListPointable.ALLOCATOR);
+
+    private final static AOrderedListType listType = new AOrderedListType(BuiltinType.ANY, "fields");
+    @SuppressWarnings("unchecked")
+    protected final static ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ASTRING);
+    @SuppressWarnings("unchecked")
+    protected final static ISerializerDeserializer<ABoolean> booleanSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+
+    private final static ARecordType openType = DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE;
+
+    public  void processRecord(ARecordPointable recordAccessor, ARecordType recType, DataOutput out, int level)
+            throws IOException, AsterixException, AlgebricksException {
+        ArrayBackedValueStorage itemValue = getTempBuffer();
+        ArrayBackedValueStorage fieldName = getTempBuffer();
+
+        OrderedListBuilder orderedListBuilder = getOrderedListBuilder();
+        orderedListBuilder.reset(listType);
+        IARecordBuilder fieldRecordBuilder = getRecordBuilder();
+        fieldRecordBuilder.reset(null);
+
+        int schemeFieldCount = recordAccessor.getSchemeFieldCount(recType);
+        for (int i = 0; i < schemeFieldCount; ++i) {
+            itemValue.reset();
+            fieldRecordBuilder.init();
+
+            // write name
+            fieldName.reset();
+            recordAccessor.getClosedFieldName(recType, i, fieldName.getDataOutput());
+            addNameField(fieldName, fieldRecordBuilder);
+
+            // write type
+            byte tag = recordAccessor.getClosedFieldTag(recType, i);
+            addFieldType(tag, fieldRecordBuilder);
+
+            // write open
+            addIsOpenField(false, fieldRecordBuilder);
+
+            // write nested or list types
+            if (tag == SER_RECORD_TYPE_TAG || tag == SER_ORDERED_LIST_TYPE_TAG || tag == SER_UNORDERED_LIST_TYPE_TAG) {
+                if (!recordAccessor.isClosedFieldNull(recType, i)) {
+                    IAType fieldType = recordAccessor.getClosedFieldType(recType, i);
+                    ArrayBackedValueStorage tmpValue = getTempBuffer();
+                    tmpValue.reset();
+                    recordAccessor.getClosedFieldValue(recType, i, tmpValue.getDataOutput());
+                    if (tag == SER_RECORD_TYPE_TAG) {
+                        addNestedField(tmpValue, fieldType, fieldRecordBuilder, level + 1);
+                    } else if (tag == SER_ORDERED_LIST_TYPE_TAG || tag == SER_UNORDERED_LIST_TYPE_TAG) {
+                        addListField(tmpValue, fieldType, fieldRecordBuilder, level + 1);
+                    }
+                }
+            }
+
+            // write record
+            fieldRecordBuilder.write(itemValue.getDataOutput(), true);
+
+            // add item to the list of fields
+            orderedListBuilder.addItem(itemValue);
+        }
+        for (int i = recordAccessor.getOpenFieldCount(recType) - 1; i >= 0; --i) {
+            itemValue.reset();
+            fieldRecordBuilder.init();
+
+            // write name
+            fieldName.reset();
+            recordAccessor.getOpenFieldName(recType, i, fieldName.getDataOutput());
+            addNameField(fieldName, fieldRecordBuilder);
+
+            // write type
+            byte tag = recordAccessor.getOpenFieldTag(recType, i);
+            addFieldType(tag, fieldRecordBuilder);
+
+            // write open
+            addIsOpenField(true, fieldRecordBuilder);
+
+            // write nested or list types
+            if (tag == SER_RECORD_TYPE_TAG || tag == SER_ORDERED_LIST_TYPE_TAG || tag == SER_UNORDERED_LIST_TYPE_TAG) {
+                IAType fieldType = null;
+                ArrayBackedValueStorage tmpValue = getTempBuffer();
+                tmpValue.reset();
+                recordAccessor.getOpenFieldValue(recType, i, tmpValue.getDataOutput());
+                if (tag == SER_RECORD_TYPE_TAG) {
+                    addNestedField(tmpValue, fieldType, fieldRecordBuilder, level + 1);
+                } else if (tag == SER_ORDERED_LIST_TYPE_TAG || tag == SER_UNORDERED_LIST_TYPE_TAG) {
+                    addListField(tmpValue, fieldType, fieldRecordBuilder, level + 1);
+                }
+            }
+
+            // write record
+            fieldRecordBuilder.write(itemValue.getDataOutput(), true);
+
+            // add item to the list of fields
+            orderedListBuilder.addItem(itemValue);
+        }
+        orderedListBuilder.write(out, true);
+    }
+
+    public  void addNameField(IValueReference nameArg, IARecordBuilder fieldRecordBuilder)
+            throws HyracksDataException, AsterixException {
+        ArrayBackedValueStorage fieldAbvs = getTempBuffer();
+
+        fieldAbvs.reset();
+        stringSerde.serialize(fieldName, fieldAbvs.getDataOutput());
+        fieldRecordBuilder.addField(fieldAbvs, nameArg);
+    }
+
+    public  void addFieldType(byte tagId, IARecordBuilder fieldRecordBuilder) throws HyracksDataException,
+            AsterixException {
+        ArrayBackedValueStorage fieldAbvs = getTempBuffer();
+        ArrayBackedValueStorage valueAbvs = getTempBuffer();
+
+        // Name
+        fieldAbvs.reset();
+        stringSerde.serialize(typeName, fieldAbvs.getDataOutput());
+        // Value
+        valueAbvs.reset();
+        ATypeTag tag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(tagId);
+        AMutableString aString = new AMutableString("");
+        aString.setValue(tag.toString());
+        stringSerde.serialize(aString, valueAbvs.getDataOutput());
+        fieldRecordBuilder.addField(fieldAbvs, valueAbvs);
+    }
+
+    public  void addIsOpenField(boolean isOpen, IARecordBuilder fieldRecordBuilder) throws HyracksDataException,
+            AsterixException {
+        ArrayBackedValueStorage fieldAbvs = getTempBuffer();
+        ArrayBackedValueStorage valueAbvs = getTempBuffer();
+
+        // Name
+        fieldAbvs.reset();
+        stringSerde.serialize(isOpenName, fieldAbvs.getDataOutput());
+        // Value
+        valueAbvs.reset();
+        if (isOpen) {
+            booleanSerde.serialize(ABoolean.TRUE, valueAbvs.getDataOutput());
+        } else {
+            booleanSerde.serialize(ABoolean.FALSE, valueAbvs.getDataOutput());
+        }
+        fieldRecordBuilder.addField(fieldAbvs, valueAbvs);
+    }
+
+    public  void addListField(IValueReference listArg, IAType fieldType, IARecordBuilder fieldRecordBuilder,
+            int level) throws AsterixException, IOException, AlgebricksException {
+        ArrayBackedValueStorage fieldAbvs = getTempBuffer();
+        ArrayBackedValueStorage valueAbvs = getTempBuffer();
+
+        // Name
+        fieldAbvs.reset();
+        stringSerde.serialize(listName, fieldAbvs.getDataOutput());
+        // Value
+        valueAbvs.reset();
+        processListValue(listArg, fieldType, valueAbvs.getDataOutput(), level);
+        fieldRecordBuilder.addField(fieldAbvs, valueAbvs);
+    }
+
+    public  void addNestedField(IValueReference recordArg, IAType fieldType, IARecordBuilder fieldRecordBuilder,
+            int level) throws HyracksDataException, AlgebricksException, IOException, AsterixException {
+        ArrayBackedValueStorage fieldAbvs = getTempBuffer();
+        ArrayBackedValueStorage valueAbvs = getTempBuffer();
+
+        // Name
+        fieldAbvs.reset();
+        stringSerde.serialize(nestedName, fieldAbvs.getDataOutput());
+        // Value
+        valueAbvs.reset();
+        ARecordType newType;
+        if (fieldType == null) {
+            newType = openType.deepCopy(openType);
+        } else {
+            newType = ((ARecordType) fieldType).deepCopy((ARecordType) fieldType);
+        }
+        ARecordPointable recordP = getRecordPointable();
+        recordP.set(recordArg);
+        processRecord(recordP, (ARecordType) newType, valueAbvs.getDataOutput(), level);
+        fieldRecordBuilder.addField(fieldAbvs, valueAbvs);
+    }
+
+    public  void processListValue(IValueReference listArg, IAType fieldType, DataOutput out, int level)
+            throws AsterixException, IOException, AlgebricksException {
+        ArrayBackedValueStorage itemValue = getTempBuffer();
+        IARecordBuilder listRecordBuilder = getRecordBuilder();
+
+        AListPointable list = getListPointable();
+        list.set(listArg);
+
+        OrderedListBuilder innerListBuilder = getOrderedListBuilder();
+        innerListBuilder.reset(listType);
+
+        listRecordBuilder.reset(null);
+        AbstractCollectionType act = (AbstractCollectionType) fieldType;
+        int itemCount = list.getItemCount();
+        for (int l = 0; l < itemCount; l++) {
+            itemValue.reset();
+            listRecordBuilder.init();
+
+            byte tagId = list.getItemTag(act, l);
+            addFieldType(tagId, listRecordBuilder);
+
+            if (tagId == SER_RECORD_TYPE_TAG) {
+                ArrayBackedValueStorage tmpAbvs = getTempBuffer();
+                list.getItemValue(act, l, tmpAbvs.getDataOutput());
+                addNestedField(tmpAbvs, act.getItemType(), listRecordBuilder, level + 1);
+            }
+
+            listRecordBuilder.write(itemValue.getDataOutput(), true);
+            innerListBuilder.addItem(itemValue);
+        }
+        innerListBuilder.write(out, true);
+    }
+
+    private  ARecordPointable getRecordPointable() {
+        return (ARecordPointable) recordPointablePool.allocate("record");
+    }
+
+    private  AListPointable getListPointable() {
+        return (AListPointable) listPointablePool.allocate("list");
+    }
+
+    private  IARecordBuilder getRecordBuilder() {
+        return (RecordBuilder) recordBuilderPool.allocate("record");
+    }
+
+    private  OrderedListBuilder getOrderedListBuilder() {
+        return (OrderedListBuilder) listBuilderPool.allocate("ordered");
+    }
+
+    private  ArrayBackedValueStorage getTempBuffer() {
+        return (ArrayBackedValueStorage) abvsBuilderPool.allocate("buffer");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c66d23a5/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
new file mode 100644
index 0000000..dfe9200
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.evaluators.functions.records;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Stack;
+
+import edu.uci.ics.asterix.builders.RecordBuilder;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.om.pointables.ARecordVisitablePointable;
+import edu.uci.ics.asterix.om.pointables.PointableAllocator;
+import edu.uci.ics.asterix.om.pointables.base.IVisitablePointable;
+import edu.uci.ics.asterix.om.typecomputer.impl.RecordMergeTypeComputer;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+//The record merge evaluator is used to combine two records with no matching fieldnames
+//If both records have the same fieldname for a non-record field anywhere in the schema, the merge will fail
+//This function is performed on a recursive level, meaning that nested records can be combined
+//for instance if both records have a nested field called "metadata"
+//where metadata from A is {"comments":"this rocks"}
+//and metadata from B is {"index":7, "priority":5}
+//Records A and B can be combined yielding a nested record called "metadata"
+//That will have all three fields
+public class RecordMergeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
+
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new RecordMergeDescriptor();
+        }
+    };
+
+    private ARecordType outRecType;
+    private ARecordType inRecType0;
+    private ARecordType inRecType1;
+
+    public void reset(IAType outType, IAType inType0, IAType inType1) {
+        outRecType = RecordMergeTypeComputer.extractRecordType(outType);
+        inRecType0 = RecordMergeTypeComputer.extractRecordType(inType0);
+        inRecType1 = RecordMergeTypeComputer.extractRecordType(inType1);
+    }
+
+    @Override
+    public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopyEvaluatorFactory() {
+
+            private static final long serialVersionUID = 1L;
+
+            @SuppressWarnings("unchecked")
+            private final ISerializerDeserializer<ANull> nullSerDe = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(BuiltinType.ANULL);
+
+            @Override
+            public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+                final ARecordType recType;
+                try {
+                    recType = new ARecordType(outRecType.getTypeName(), outRecType.getFieldNames(),
+                            outRecType.getFieldTypes(), outRecType.isOpen());
+                } catch (AsterixException | HyracksDataException e) {
+                    throw new IllegalStateException();
+                }
+
+                final PointableAllocator pa = new PointableAllocator();
+                final IVisitablePointable vp0 = pa.allocateRecordValue(inRecType0);
+                final IVisitablePointable vp1 = pa.allocateRecordValue(inRecType1);
+
+                final ArrayBackedValueStorage abvs0 = new ArrayBackedValueStorage();
+                final ArrayBackedValueStorage abvs1 = new ArrayBackedValueStorage();
+
+                final ICopyEvaluator eval0 = args[0].createEvaluator(abvs0);
+                final ICopyEvaluator eval1 = args[1].createEvaluator(abvs1);
+
+                final Stack<RecordBuilder> rbStack = new Stack<RecordBuilder>();
+
+                final ArrayBackedValueStorage tabvs = new ArrayBackedValueStorage();
+
+                final ByteArrayAccessibleOutputStream nameOutputStream = new ByteArrayAccessibleOutputStream();
+                final ByteArrayInputStream namebais = new ByteArrayInputStream(nameOutputStream.getByteArray());
+                final DataInputStream namedis = new DataInputStream(namebais);
+
+                return new ICopyEvaluator() {
+
+                    @Override
+                    public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                        abvs0.reset();
+                        abvs1.reset();
+
+                        eval0.evaluate(tuple);
+                        eval1.evaluate(tuple);
+
+                        if (abvs0.getByteArray()[0] == SER_NULL_TYPE_TAG
+                                || abvs1.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+                            try {
+                                nullSerDe.serialize(ANull.NULL, output.getDataOutput());
+                            } catch (HyracksDataException e) {
+                                throw new AlgebricksException(e);
+                            }
+                            return;
+                        }
+
+                        vp0.set(abvs0);
+                        vp1.set(abvs1);
+
+                        ARecordVisitablePointable rp0 = (ARecordVisitablePointable) vp0;
+                        ARecordVisitablePointable rp1 = (ARecordVisitablePointable) vp1;
+
+                        try {
+                            mergeFields(recType, rp0, rp1, true, 0);
+
+                            rbStack.get(0).write(output.getDataOutput(), true);
+                        } catch (IOException | AsterixException e) {
+                            throw new AlgebricksException(e);
+                        }
+                    }
+
+                    private void mergeFields(ARecordType combinedType, ARecordVisitablePointable leftRecord,
+                            ARecordVisitablePointable rightRecord, boolean openFromParent, int nestedLevel) throws IOException,
+                            AsterixException, AlgebricksException {
+                        if (rbStack.size() < (nestedLevel + 1)) {
+                            rbStack.push(new RecordBuilder());
+                        }
+
+                        rbStack.get(nestedLevel).reset(combinedType);
+                        rbStack.get(nestedLevel).init();
+                        //Add all fields from left record
+                        for (int i = 0; i < leftRecord.getFieldNames().size(); i++) {
+                            IVisitablePointable leftName = leftRecord.getFieldNames().get(i);
+                            IVisitablePointable leftValue = leftRecord.getFieldValues().get(i);
+                            boolean foundMatch = false;
+                            for (int j = 0; j < rightRecord.getFieldNames().size(); j++) {
+                                IVisitablePointable rightName = rightRecord.getFieldNames().get(j);
+                                IVisitablePointable rightValue = rightRecord.getFieldValues().get(j);
+                                if (rightName.equals(leftName)) {
+                                    //Field was found on the right. Merge Sub Records
+                                    if (rightValue.getByteArray()[0] != ATypeTag.RECORD.serialize()
+                                            || leftValue.getByteArray()[0] != ATypeTag.RECORD.serialize()) {
+                                        //The fields need to be records in order to merge
+                                        throw new AlgebricksException("Duplicate field found");
+                                    } else {
+                                        //We are merging two sub records
+                                        addFieldToSubRecord(combinedType, leftName, leftValue, rightValue,
+                                                openFromParent, nestedLevel);
+                                    }
+                                    foundMatch = true;
+                                }
+                            }
+                            if (!foundMatch) {
+                                addFieldToSubRecord(combinedType, leftName, leftValue, null, openFromParent,
+                                        nestedLevel);
+                            }
+                        }
+                        //Repeat for right side (ignoring duplicates this time)
+                        for (int j = 0; j < rightRecord.getFieldNames().size(); j++) {
+                            IVisitablePointable rightName = rightRecord.getFieldNames().get(j);
+                            IVisitablePointable rightValue = rightRecord.getFieldValues().get(j);
+                            boolean foundMatch = false;
+                            for (int i = 0; i < leftRecord.getFieldNames().size(); i++) {
+                                IVisitablePointable leftName = leftRecord.getFieldNames().get(i);
+                                if (rightName.equals(leftName)) {
+                                    foundMatch = true;
+                                }
+                            }
+                            if (!foundMatch) {
+                                addFieldToSubRecord(combinedType, rightName, rightValue, null, openFromParent,
+                                        nestedLevel);
+                            }
+                        }
+                    }
+
+                    //Takes in a record type, field name, and the field values (which are record) from two records
+                    //Merges them into one record of combinedType
+                    //And adds that record as a field to the Record in subrb
+                    //the second value can be null, indicated that you just add the value of left as a field to subrb
+                    private void addFieldToSubRecord(ARecordType combinedType, IVisitablePointable fieldNamePointable,
+                            IVisitablePointable leftValue, IVisitablePointable rightValue, boolean openFromParent,
+                            int nestedLevel) throws IOException, AsterixException, AlgebricksException {
+
+                        nameOutputStream.reset();
+                        nameOutputStream.write(fieldNamePointable.getByteArray(),
+                                fieldNamePointable.getStartOffset() + 1, fieldNamePointable.getLength());
+                        namedis.reset();
+                        String fieldName = AStringSerializerDeserializer.INSTANCE.deserialize(namedis).getStringValue();
+
+                        //Add the merged field
+                        if (combinedType.isClosedField(fieldName)) {
+                            int pos = combinedType.findFieldPosition(fieldName);
+                            if (rightValue == null) {
+                                rbStack.get(nestedLevel).addField(pos, leftValue);
+                            } else {
+                                mergeFields((ARecordType) combinedType.getFieldType(fieldName),
+                                        (ARecordVisitablePointable) leftValue, (ARecordVisitablePointable) rightValue, false,
+                                        nestedLevel + 1);
+                                tabvs.reset();
+                                rbStack.get(nestedLevel + 1).write(tabvs.getDataOutput(), true);
+                                rbStack.get(nestedLevel).addField(pos, tabvs);
+                            }
+                        } else {
+                            if (rightValue == null) {
+                                rbStack.get(nestedLevel).addField(fieldNamePointable, leftValue);
+                            } else {
+                                mergeFields((ARecordType) combinedType.getFieldType(fieldName),
+                                        (ARecordVisitablePointable) leftValue, (ARecordVisitablePointable) rightValue, false,
+                                        nestedLevel + 1);
+                                tabvs.reset();
+                                rbStack.get(nestedLevel + 1).write(tabvs.getDataOutput(), true);
+                                rbStack.get(nestedLevel).addField(fieldNamePointable, tabvs);
+                            }
+                        }
+                    }
+
+                };
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.RECORD_MERGE;
+    }
+}


Mime
View raw message