asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sjaco...@apache.org
Subject [10/24] incubator-asterixdb git commit: Introduces Feeds 2.0
Date Mon, 29 Jun 2015 19:45:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
index c2a8a95..f9793f6 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
@@ -17,9 +17,8 @@ package edu.uci.ics.asterix.external.dataset.adapter;
 import java.io.IOException;
 import java.io.InputStream;
 
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.runtime.operators.file.AbstractTupleParser;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -34,23 +33,22 @@ public abstract class FileSystemBasedAdapter implements IDatasourceAdapter {
 
     public abstract InputStream getInputStream(int partition) throws IOException;
 
-    protected final ITupleParser tupleParser;
+    protected final ITupleParserFactory parserFactory;
+    protected ITupleParser tupleParser;
     protected final IAType sourceDatatype;
     protected IHyracksTaskContext ctx;
 
     public FileSystemBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx)
             throws HyracksDataException {
-        this.tupleParser = parserFactory.createTupleParser(ctx);
+        this.parserFactory = parserFactory;
         this.sourceDatatype = sourceDatatype;
         this.ctx = ctx;
     }
 
     @Override
     public void start(int partition, IFrameWriter writer) throws Exception {
+        tupleParser = parserFactory.createTupleParser(ctx);
         InputStream in = getInputStream(partition);
-        if (tupleParser instanceof AbstractTupleParser) {
-            ((AbstractTupleParser) tupleParser).setFilename(getFilename(partition));
-        }
         tupleParser.parse(in, writer);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 9f4b97c..8036a7b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
+
 import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
 import edu.uci.ics.asterix.external.indexing.input.GenericFileAwareRecordReader;
 import edu.uci.ics.asterix.external.indexing.input.GenericRecordReader;
@@ -30,6 +31,7 @@ import edu.uci.ics.asterix.external.indexing.input.TextualDataReader;
 import edu.uci.ics.asterix.external.indexing.input.TextualFullScanDataReader;
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
@@ -71,9 +73,9 @@ public class HDFSAdapter extends FileSystemBasedAdapter {
     @Override
     public InputStream getInputStream(int partition) throws IOException {
         if ((conf.getInputFormat() instanceof TextInputFormat || conf.getInputFormat() instanceof SequenceFileInputFormat)
-                && (HDFSAdapterFactory.FORMAT_ADM.equalsIgnoreCase((String) configuration
-                        .get(HDFSAdapterFactory.KEY_FORMAT)) || HDFSAdapterFactory.FORMAT_DELIMITED_TEXT
-                        .equalsIgnoreCase((String) configuration.get(HDFSAdapterFactory.KEY_FORMAT)))) {
+                && (AsterixTupleParserFactory.FORMAT_ADM.equalsIgnoreCase((String) configuration
+                        .get(AsterixTupleParserFactory.KEY_FORMAT)) || AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT
+                        .equalsIgnoreCase((String) configuration.get(AsterixTupleParserFactory.KEY_FORMAT)))) {
             if (files != null) {
                 return new TextualDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
index 9af1dd7..f730f28 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
@@ -27,6 +27,7 @@ import edu.uci.ics.asterix.external.indexing.input.RCFileDataReader;
 import edu.uci.ics.asterix.external.indexing.input.TextualDataReader;
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
@@ -64,8 +65,8 @@ public class HDFSIndexingAdapter extends FileSystemBasedAdapter {
     public InputStream getInputStream(int partition) throws IOException {
         if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
             return new RCFileDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
-        } else if (format.equals(HDFSAdapterFactory.FORMAT_ADM)
-                || format.equals(HDFSAdapterFactory.FORMAT_DELIMITED_TEXT)) {
+        } else if (format.equals(AsterixTupleParserFactory.FORMAT_ADM)
+                || format.equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
             return new TextualDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
         } else {
             return new GenericFileAwareRecordReader(inputSplits, readSchedule, nodeName, conf, executed, files);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IFeedClient.java
new file mode 100644
index 0000000..02e60b4
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IFeedClient.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public interface IFeedClient {
+
+    public enum InflowState {
+        NO_MORE_DATA,
+        DATA_AVAILABLE,
+        DATA_NOT_AVAILABLE
+    }
+
+    /**
+     * Writes the next fetched tuple into the provided instance of DatatOutput. Invocation of this method blocks until
+     * a new tuple has been written or the specified time has expired.
+     * 
+     * @param dataOutput
+     *            The receiving channel for the feed client to write ADM records to.
+     * @param timeout
+     *            Threshold time (expressed in seconds) for the next tuple to be obtained from the external source.
+     * @return
+     * @throws AsterixException
+     */
+    public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index 78554f2..b31e824 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.io.InputStream;
 
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.runtime.operators.file.AbstractTupleParser;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -58,15 +57,7 @@ public class NCFileSystemAdapter extends FileSystemBasedAdapter {
         }
     }
 
-    @Override
-    public void start(int partition, IFrameWriter writer) throws Exception {
-        InputStream in = getInputStream(partition);
-        if (tupleParser instanceof AbstractTupleParser) {
-            ((AbstractTupleParser) tupleParser).setFilename(getFilename(partition));
-        }
-        tupleParser.parse(in, writer);
-    }
-
+   
     @Override
     public String getFilename(int partition) {
         final FileSplit fileSplit = fileSplits[partition];

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
index c739ca3..63181dc 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
@@ -9,7 +9,7 @@ import java.util.logging.Logger;
 import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
@@ -77,4 +77,9 @@ public class PullBasedAzureTwitterAdapter extends PullBasedAdapter implements ID
     public DataExchangeMode getDataExchangeMode() {
         return DataExchangeMode.PULL;
     }
+
+    @Override
+    public boolean handleException(Exception e) {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
deleted file mode 100644
index 37d93ad..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.dataset.adapter;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.asterix.builders.IARecordBuilder;
-import edu.uci.ics.asterix.builders.RecordBuilder;
-import edu.uci.ics.asterix.builders.UnorderedListBuilder;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ABoolean;
-import edu.uci.ics.asterix.om.base.AInt32;
-import edu.uci.ics.asterix.om.base.AMutableDateTime;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutablePoint;
-import edu.uci.ics.asterix.om.base.AMutableRecord;
-import edu.uci.ics.asterix.om.base.AMutableString;
-import edu.uci.ics.asterix.om.base.AMutableUnorderedList;
-import edu.uci.ics.asterix.om.base.AString;
-import edu.uci.ics.asterix.om.base.IACursor;
-import edu.uci.ics.asterix.om.base.IAObject;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnorderedListType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-
-public abstract class PullBasedFeedClient implements IPullBasedFeedClient {
-
-    protected static final Logger LOGGER = Logger.getLogger(PullBasedFeedClient.class.getName());
-
-    protected ARecordSerializerDeserializer recordSerDe;
-    protected AMutableRecord mutableRecord;
-    protected boolean messageReceived;
-    protected boolean continueIngestion = true;
-    protected IARecordBuilder recordBuilder = new RecordBuilder();
-
-    protected AMutableString aString = new AMutableString("");
-    protected AMutableInt32 aInt32 = new AMutableInt32(0);
-    protected AMutablePoint aPoint = new AMutablePoint(0, 0);
-    protected AMutableDateTime aDateTime = new AMutableDateTime(0);
-
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ASTRING);
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<ABoolean> booleanSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT32);
-
-    public abstract InflowState setNextRecord() throws Exception;
-
-    @Override
-    public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException {
-        try {
-            InflowState state = null;
-            int waitCount = 0;
-            boolean continueWait = true;
-            while ((state == null || state.equals(InflowState.DATA_NOT_AVAILABLE)) && continueWait) {
-                state = setNextRecord();
-                switch (state) {
-                    case DATA_AVAILABLE:
-                        IAType t = mutableRecord.getType();
-                        ATypeTag tag = t.getTypeTag();
-                        dataOutput.writeByte(tag.serialize());
-                        recordBuilder.reset(mutableRecord.getType());
-                        recordBuilder.init();
-                        writeRecord(mutableRecord, dataOutput, recordBuilder);
-                        break;
-                    case DATA_NOT_AVAILABLE:
-                        if (waitCount > timeout) {
-                            continueWait = false;
-                        } else {
-                            if (LOGGER.isLoggable(Level.WARNING)) {
-                                LOGGER.warning("Waiting to obtaing data from pull based adapter");
-                            }
-                            Thread.sleep(1000);
-                            waitCount++;
-                        }
-                        break;
-                    case NO_MORE_DATA:
-                        break;
-                }
-            }
-            return state;
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
-
-    }
-
-    private void writeRecord(AMutableRecord record, DataOutput dataOutput, IARecordBuilder recordBuilder)
-            throws IOException, AsterixException {
-        ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
-        int numFields = record.getType().getFieldNames().length;
-        for (int pos = 0; pos < numFields; pos++) {
-            fieldValue.reset();
-            IAObject obj = record.getValueByPos(pos);
-            writeObject(obj, fieldValue.getDataOutput());
-            recordBuilder.addField(pos, fieldValue);
-        }
-        recordBuilder.write(dataOutput, false);
-    }
-
-    private void writeObject(IAObject obj, DataOutput dataOutput) throws IOException, AsterixException {
-        switch (obj.getType().getTypeTag()) {
-            case RECORD:
-                ATypeTag tag = obj.getType().getTypeTag();
-                try {
-                    dataOutput.writeByte(tag.serialize());
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-                IARecordBuilder recordBuilder = new RecordBuilder();
-                recordBuilder.reset((ARecordType) obj.getType());
-                recordBuilder.init();
-                writeRecord((AMutableRecord) obj, dataOutput, recordBuilder);
-                break;
-            case UNORDEREDLIST:
-                tag = obj.getType().getTypeTag();
-                try {
-                    dataOutput.writeByte(tag.serialize());
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-                UnorderedListBuilder listBuilder = new UnorderedListBuilder();
-                listBuilder.reset((AUnorderedListType) ((AMutableUnorderedList) obj).getType());
-                IACursor cursor = ((AMutableUnorderedList) obj).getCursor();
-                ArrayBackedValueStorage listItemValue = new ArrayBackedValueStorage();
-                while (cursor.next()) {
-                    listItemValue.reset();
-                    IAObject item = cursor.get();
-                    writeObject(item, listItemValue.getDataOutput());
-                    listBuilder.addItem(listItemValue);
-                }
-                listBuilder.write(dataOutput, false);
-                break;
-            default:
-                AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(obj.getType()).serialize(obj,
-                        dataOutput);
-                break;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index 838cfeb..27d7049 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -17,30 +17,33 @@ package edu.uci.ics.asterix.external.dataset.adapter;
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
 import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.CounterTimerTupleForwardPolicy;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * An adapter that provides the functionality of receiving tweets from the
  * Twitter service in the form of ADM formatted records.
  */
-public class PullBasedTwitterAdapter extends PullBasedAdapter implements IFeedAdapter {
+public class PullBasedTwitterAdapter extends ClientBasedFeedAdapter implements IFeedAdapter {
 
     private static final long serialVersionUID = 1L;
 
-    public static final String QUERY = "query";
-    public static final String INTERVAL = "interval";
+    private static final int DEFAULT_BATCH_SIZE = 5;
 
     private ARecordType recordType;
     private PullBasedTwitterFeedClient tweetClient;
 
     @Override
-    public IPullBasedFeedClient getFeedClient(int partition) {
+    public IFeedClient getFeedClient(int partition) {
         return tweetClient;
     }
 
-    public PullBasedTwitterAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx) throws AsterixException {
+    public PullBasedTwitterAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx)
+            throws AsterixException {
         super(configuration, ctx);
         tweetClient = new PullBasedTwitterFeedClient(ctx, recordType, this);
     }
@@ -54,4 +57,20 @@ public class PullBasedTwitterAdapter extends PullBasedAdapter implements IFeedAd
         return DataExchangeMode.PULL;
     }
 
+    @Override
+    public boolean handleException(Exception e) {
+        return true;
+    }
+
+    @Override
+    public ITupleForwardPolicy getTupleParserPolicy() {
+        configuration.put(ITupleForwardPolicy.PARSER_POLICY,
+                ITupleForwardPolicy.TupleForwardPolicyType.COUNTER_TIMER_EXPIRED.name());
+        String propValue = configuration.get(CounterTimerTupleForwardPolicy.BATCH_SIZE);
+        if (propValue == null) {
+            configuration.put(CounterTimerTupleForwardPolicy.BATCH_SIZE, "" + DEFAULT_BATCH_SIZE);
+        }
+        return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 2c8d659..2b68c88 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -16,93 +16,86 @@ package edu.uci.ics.asterix.external.dataset.adapter;
 
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
 import twitter4j.Query;
 import twitter4j.QueryResult;
-import twitter4j.Tweet;
+import twitter4j.Status;
 import twitter4j.Twitter;
 import twitter4j.TwitterException;
-import twitter4j.TwitterFactory;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.om.base.AMutableRecord;
-import edu.uci.ics.asterix.om.base.AMutableString;
-import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.external.util.TweetProcessor;
+import edu.uci.ics.asterix.external.util.TwitterUtil;
+import edu.uci.ics.asterix.external.util.TwitterUtil.SearchAPIConstants;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
- * An implementation of @see {PullBasedFeedClient} for the Twitter service.
- * The feed client fetches data from Twitter service by sending request at
- * regular (configurable) interval.
+ * An implementation of @see {PullBasedFeedClient} for the Twitter service. The
+ * feed client fetches data from Twitter service by sending request at regular
+ * (configurable) interval.
  */
-public class PullBasedTwitterFeedClient extends PullBasedFeedClient {
+public class PullBasedTwitterFeedClient extends FeedClient {
 
     private String keywords;
     private Query query;
     private Twitter twitter;
-    private int requestInterval = 10; // seconds
+    private int requestInterval = 5; // seconds
     private QueryResult result;
 
-    private IAObject[] mutableFields;
-    private String[] tupleFieldValues;
     private ARecordType recordType;
     private int nextTweetIndex = 0;
+    private long lastTweetIdReceived = 0;
+    private TweetProcessor tweetProcessor;
 
     public PullBasedTwitterFeedClient(IHyracksTaskContext ctx, ARecordType recordType, PullBasedTwitterAdapter adapter) {
-        twitter = new TwitterFactory().getInstance();
-        mutableFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableString(null),
-                new AMutableString(null), new AMutableString(null) };
+        this.twitter = TwitterUtil.getTwitterService(adapter.getConfiguration());
         this.recordType = recordType;
-        recordSerDe = new ARecordSerializerDeserializer(recordType);
-        mutableRecord = new AMutableRecord(recordType, mutableFields);
-        tupleFieldValues = new String[recordType.getFieldNames().length];
-        initialize(adapter.getConfiguration());
+        this.tweetProcessor = new TweetProcessor(recordType);
+        this.recordSerDe = new ARecordSerializerDeserializer(recordType);
+        this.mutableRecord = tweetProcessor.getMutableRecord();
+        this.initialize(adapter.getConfiguration());
     }
 
     public ARecordType getRecordType() {
         return recordType;
     }
 
-    public AMutableRecord getMutableRecord() {
-        return mutableRecord;
-    }
-
     @Override
-    public InflowState setNextRecord() throws Exception {
-        Tweet tweet;
+    public InflowState retrieveNextRecord() throws Exception {
+        Status tweet;
         tweet = getNextTweet();
         if (tweet == null) {
             return InflowState.DATA_NOT_AVAILABLE;
         }
-        int numFields = recordType.getFieldNames().length;
-        tupleFieldValues[0] = UUID.randomUUID().toString();
-        tupleFieldValues[1] = tweet.getFromUser();
-        tupleFieldValues[2] = tweet.getLocation() == null ? "" : tweet.getLocation();
-        tupleFieldValues[3] = tweet.getText();
-        tupleFieldValues[4] = tweet.getCreatedAt().toString();
-        for (int i = 0; i < numFields; i++) {
-            ((AMutableString) mutableFields[i]).setValue(tupleFieldValues[i]);
-            mutableRecord.setValueAtPos(i, mutableFields[i]);
-        }
+
+        tweetProcessor.processNextTweet(tweet);
         return InflowState.DATA_AVAILABLE;
     }
 
     private void initialize(Map<String, String> params) {
-        this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
-        this.requestInterval = Integer.parseInt((String) params.get(PullBasedTwitterAdapter.INTERVAL));
+        this.keywords = (String) params.get(SearchAPIConstants.QUERY);
+        this.requestInterval = Integer.parseInt((String) params.get(SearchAPIConstants.INTERVAL));
         this.query = new Query(keywords);
-        query.setRpp(100);
+        this.query.setCount(100);
     }
 
-    private Tweet getNextTweet() throws TwitterException, InterruptedException {
+    private Status getNextTweet() throws TwitterException, InterruptedException {
         if (result == null || nextTweetIndex >= result.getTweets().size()) {
             Thread.sleep(1000 * requestInterval);
+            query.setSinceId(lastTweetIdReceived);
             result = twitter.search(query);
             nextTweetIndex = 0;
         }
-        List<Tweet> tw = result.getTweets();
-        return tw.get(nextTweetIndex++);
+        if (result != null && !result.getTweets().isEmpty()) {
+            List<Status> tw = result.getTweets();
+            Status tweet = tw.get(nextTweetIndex++);
+            if (lastTweetIdReceived < tweet.getId()) {
+                lastTweetIdReceived = tweet.getId();
+            }
+            return tweet;
+        } else {
+            return null;
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java
new file mode 100644
index 0000000..8bc9a37
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java
@@ -0,0 +1,52 @@
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.CounterTimerTupleForwardPolicy;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class PushBasedTwitterAdapter extends ClientBasedFeedAdapter {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int DEFAULT_BATCH_SIZE = 50;
+
+    private PushBasedTwitterFeedClient tweetClient;
+
+    public PushBasedTwitterAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx) throws AsterixException {
+        super(configuration, ctx);
+        this.configuration = configuration;
+        this.tweetClient = new PushBasedTwitterFeedClient(ctx, recordType, this);
+    }
+
+    @Override
+    public DataExchangeMode getDataExchangeMode() {
+        return DataExchangeMode.PUSH;
+    }
+
+    @Override
+    public boolean handleException(Exception e) {
+        return true;
+    }
+
+    @Override
+    public IFeedClient getFeedClient(int partition) throws Exception {
+        return tweetClient;
+    }
+
+    @Override
+    public ITupleForwardPolicy getTupleParserPolicy() {
+        configuration.put(ITupleForwardPolicy.PARSER_POLICY,
+                ITupleForwardPolicy.TupleForwardPolicyType.COUNTER_TIMER_EXPIRED.name());
+        String propValue = configuration.get(CounterTimerTupleForwardPolicy.BATCH_SIZE);
+        if (propValue == null) {
+            configuration.put(CounterTimerTupleForwardPolicy.BATCH_SIZE, "" + DEFAULT_BATCH_SIZE);
+        }
+        return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
new file mode 100644
index 0000000..908fd34
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import twitter4j.FilterQuery;
+import twitter4j.Query;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterStream;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.external.util.TweetProcessor;
+import edu.uci.ics.asterix.external.util.TwitterUtil;
+import edu.uci.ics.asterix.external.util.TwitterUtil.SearchAPIConstants;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * An implementation of @see {PullBasedFeedClient} for the Twitter service. The
+ * feed client fetches data from Twitter service by sending request at regular
+ * (configurable) interval.
+ */
+public class PushBasedTwitterFeedClient extends FeedClient {
+
+    private String keywords;
+    private Query query;
+
+    private ARecordType recordType;
+    private TweetProcessor tweetProcessor;
+    private LinkedBlockingQueue<Status> inputQ;
+
+    public PushBasedTwitterFeedClient(IHyracksTaskContext ctx, ARecordType recordType, PushBasedTwitterAdapter adapter) throws AsterixException {
+        this.recordType = recordType;
+        this.tweetProcessor = new TweetProcessor(recordType);
+        this.recordSerDe = new ARecordSerializerDeserializer(recordType);
+        this.mutableRecord = tweetProcessor.getMutableRecord();
+        this.initialize(adapter.getConfiguration());
+        this.inputQ = new LinkedBlockingQueue<Status>();
+        TwitterStream twitterStream = TwitterUtil.getTwitterStream(adapter.getConfiguration());
+        twitterStream.addListener(new TweetListener(inputQ));
+        FilterQuery query = TwitterUtil.getFilterQuery(adapter.getConfiguration());
+        if (query != null) {
+            twitterStream.filter(query);
+        } else {
+            twitterStream.sample();
+        }
+    }
+
+    public ARecordType getRecordType() {
+        return recordType;
+    }
+
+    private class TweetListener implements StatusListener {
+
+        private LinkedBlockingQueue<Status> inputQ;
+
+        public TweetListener(LinkedBlockingQueue<Status> inputQ) {
+            this.inputQ = inputQ;
+        }
+
+        @Override
+        public void onStatus(Status tweet) {
+            inputQ.add(tweet);
+        }
+
+        @Override
+        public void onException(Exception arg0) {
+
+        }
+
+        @Override
+        public void onDeletionNotice(StatusDeletionNotice arg0) {
+        }
+
+        @Override
+        public void onScrubGeo(long arg0, long arg1) {
+        }
+
+        @Override
+        public void onStallWarning(StallWarning arg0) {
+        }
+
+        @Override
+        public void onTrackLimitationNotice(int arg0) {
+        }
+    }
+
+    @Override
+    public InflowState retrieveNextRecord() throws Exception {
+        Status tweet = inputQ.take();
+        tweetProcessor.processNextTweet(tweet);
+        return InflowState.DATA_AVAILABLE;
+    }
+
+    private void initialize(Map<String, String> params) {
+        this.keywords = (String) params.get(SearchAPIConstants.QUERY);
+        this.query = new Query(keywords);
+        this.query.setCount(100);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 4eea034..bcf809d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -19,14 +19,16 @@ import java.util.List;
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
 import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * RSSFeedAdapter provides the functionality of fetching an RSS based feed.
  */
-public class RSSFeedAdapter extends PullBasedAdapter implements IFeedAdapter {
+public class RSSFeedAdapter extends ClientBasedFeedAdapter implements IFeedAdapter {
 
     private static final long serialVersionUID = 1L;
 
@@ -35,7 +37,7 @@ public class RSSFeedAdapter extends PullBasedAdapter implements IFeedAdapter {
     private List<String> feedURLs = new ArrayList<String>();
     private String id_prefix = "";
 
-    private IPullBasedFeedClient rssFeedClient;
+    private IFeedClient rssFeedClient;
 
     private ARecordType recordType;
 
@@ -62,7 +64,7 @@ public class RSSFeedAdapter extends PullBasedAdapter implements IFeedAdapter {
     }
 
     @Override
-    public IPullBasedFeedClient getFeedClient(int partition) throws Exception {
+    public IFeedClient getFeedClient(int partition) throws Exception {
         if (rssFeedClient == null) {
             rssFeedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
         }
@@ -78,4 +80,14 @@ public class RSSFeedAdapter extends PullBasedAdapter implements IFeedAdapter {
         return DataExchangeMode.PULL;
     }
 
+    @Override
+    public boolean handleException(Exception e) {
+        return false;
+    }
+
+    @Override
+    public ITupleForwardPolicy getTupleParserPolicy() {
+        return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
index 41ed923..e7cbd16 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
@@ -39,7 +39,7 @@ import edu.uci.ics.asterix.om.types.ARecordType;
  * fetching from an RSS feed source at regular interval.
  */
 @SuppressWarnings("rawtypes")
-public class RSSFeedClient extends PullBasedFeedClient {
+public class RSSFeedClient extends FeedClient {
 
     private long id = 0;
     private String idPrefix;
@@ -79,7 +79,7 @@ public class RSSFeedClient extends PullBasedFeedClient {
     }
 
     @Override
-    public InflowState setNextRecord() throws Exception {
+    public InflowState retrieveNextRecord() throws Exception {
         SyndEntryImpl feedEntry = getNextRSSFeed();
         if (feedEntry == null) {
             return InflowState.DATA_NOT_AVAILABLE;
@@ -133,9 +133,9 @@ public class RSSFeedClient extends PullBasedFeedClient {
 
 class FetcherEventListenerImpl implements FetcherListener {
 
-    private final IPullBasedFeedClient feedClient;
+    private final IFeedClient feedClient;
 
-    public FetcherEventListenerImpl(IPullBasedFeedClient feedClient) {
+    public FetcherEventListenerImpl(IFeedClient feedClient) {
         this.feedClient = feedClient;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
index 80965b0..25a0221 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
@@ -5,7 +5,7 @@ import java.io.InputStream;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -25,7 +25,7 @@ public abstract class StreamBasedAdapter implements IDatasourceAdapter {
 
     protected final IAType sourceDatatype;
 
-    public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx)
+    public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
         this.tupleParser = parserFactory.createTupleParser(ctx);
         this.sourceDatatype = sourceDatatype;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
index e2ef9fd..ca15b35 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
@@ -24,6 +24,7 @@ import edu.uci.ics.asterix.external.adapter.factory.HDFSIndexingAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataParser;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -84,11 +85,11 @@ public class HDFSIndexingParserFactory implements ITupleParserFactory {
          * 2. RC indexing tuple parser
          * 3. textual data tuple parser
          */
-        if (format.equalsIgnoreCase(StreamBasedAdapterFactory.FORMAT_ADM)) {
+        if (format.equalsIgnoreCase(AsterixTupleParserFactory.FORMAT_ADM)) {
             // choice 3 with adm data parser
             ADMDataParser dataParser = new ADMDataParser();
             return new AdmOrDelimitedIndexingTupleParser(ctx, atype, dataParser);
-        } else if (format.equalsIgnoreCase(StreamBasedAdapterFactory.FORMAT_DELIMITED_TEXT)) {
+        } else if (format.equalsIgnoreCase(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
             // choice 3 with delimited data parser
             DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser(atype,
                 delimiter, quote); 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
index c6176fc..7e0d9f9 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
@@ -35,6 +35,7 @@ import edu.uci.ics.asterix.metadata.external.IControlledAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataParser;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -81,7 +82,7 @@ public class HDFSLookupAdapter implements IControlledAdapter, Serializable {
         // Create the lookup reader and the controlled parser
         if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
             configureRCFile(jobConf, iNullWriterFactory);
-        } else if (configuration.get(HDFSAdapterFactory.KEY_FORMAT).equals(HDFSAdapterFactory.FORMAT_ADM)) {
+        } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT).equals(AsterixTupleParserFactory.FORMAT_ADM)) {
             // create an adm parser
             ADMDataParser dataParser = new ADMDataParser();
             if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)) {
@@ -95,10 +96,10 @@ public class HDFSLookupAdapter implements IControlledAdapter, Serializable {
                 parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput,
                         inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory);
             }
-        } else if (configuration.get(HDFSAdapterFactory.KEY_FORMAT).equals(HDFSAdapterFactory.FORMAT_DELIMITED_TEXT)) {
+        } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT).equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
             // create a delimited text parser
-            char delimiter = StreamBasedAdapterFactory.getDelimiter(configuration);
-            char quote = StreamBasedAdapterFactory.getQuote(configuration, delimiter);
+            char delimiter = AsterixTupleParserFactory.getDelimiter(configuration);
+            char quote = AsterixTupleParserFactory.getQuote(configuration, delimiter);
 
             DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser((ARecordType) atype,
                     delimiter, quote);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java
index a148e66..57a8e48 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java
@@ -90,9 +90,9 @@ public abstract class ExternalFunction implements IExternalFunction {
                 castBuffer.reset();
                 ATypeHierarchy.convertNumericTypeByteArray(inputVal.getByteArray(), inputVal.getStartOffset(),
                         inputVal.getLength(), targetTypeTag, castBuffer.getDataOutput());
-                functionHelper.setArgument(i, castBuffer.getByteArray());
+                functionHelper.setArgument(i, castBuffer);
             } else {
-                functionHelper.setArgument(i, inputVal.getByteArray());
+                functionHelper.setArgument(i, inputVal);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
index fc629ea..d989323 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
@@ -14,9 +14,6 @@
  */
 package edu.uci.ics.asterix.external.library;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
@@ -26,19 +23,14 @@ import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class ExternalFunctionProvider {
 
-    private static Map<IExternalFunctionInfo, ExternalScalarFunction> functionRepo = new HashMap<IExternalFunctionInfo, ExternalScalarFunction>();
-
     public static IExternalFunction getExternalFunctionEvaluator(IExternalFunctionInfo finfo,
             ICopyEvaluatorFactory args[], IDataOutputProvider outputProvider) throws AlgebricksException {
         switch (finfo.getKind()) {
             case SCALAR:
-                ExternalScalarFunction function = functionRepo.get(finfo);
-                function = new ExternalScalarFunction(finfo, args, outputProvider);
-                // functionRepo.put(finfo, function);
-                return function;
+                return new ExternalScalarFunction(finfo, args, outputProvider);
             case AGGREGATE:
             case UNNEST:
-                throw new IllegalArgumentException(" not supported function kind" + finfo.getKind());
+                throw new IllegalArgumentException(" UDF of kind" + finfo.getKind() + " not supported.");
             default:
                 throw new IllegalArgumentException(" unknown function kind" + finfo.getKind());
         }
@@ -62,9 +54,10 @@ class ExternalScalarFunction extends ExternalFunction implements IExternalScalar
         try {
             setArguments(tuple);
             evaluate(functionHelper);
+            functionHelper.reset();
         } catch (Exception e) {
             e.printStackTrace();
-            throw new AlgebricksException(e);
+            //throw new AlgebricksException(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
index 43eef52..6109588 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
@@ -29,4 +29,6 @@ public interface IFunctionHelper {
     public void setResult(IJObject result) throws IOException, AsterixException;
 
     public IJObject getObject(JTypeTag jtypeTag);
+
+    public void reset();
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
index 3c5ddfd..4beb259 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.asterix.external.library;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import edu.uci.ics.asterix.external.library.java.IJObject;
@@ -48,6 +47,11 @@ import edu.uci.ics.asterix.om.util.container.IObjectFactory;
 
 public class JTypeObjectFactory implements IObjectFactory<IJObject, IAType> {
 
+    public static final JTypeObjectFactory INSTANCE = new JTypeObjectFactory();
+
+    private JTypeObjectFactory() {
+    }
+
     @Override
     public IJObject create(IAType type) {
         IJObject retValue = null;
@@ -77,7 +81,7 @@ public class JTypeObjectFactory implements IObjectFactory<IJObject, IAType> {
                 retValue = new JPoint3D(0, 0, 0);
                 break;
             case POLYGON:
-                retValue = new JPolygon(new ArrayList<JPoint>());
+                retValue = new JPolygon(new JPoint[] {});
                 break;
             case LINE:
                 retValue = new JLine(new JPoint(0, 0), new JPoint(0, 0));

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
index 192cf3e..7f5b3bc 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
@@ -14,59 +14,55 @@
  */
 package edu.uci.ics.asterix.external.library;
 
-import java.io.DataOutput;
 import java.io.IOException;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 
-import edu.uci.ics.asterix.builders.RecordBuilder;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.library.java.IJObject;
-import edu.uci.ics.asterix.external.library.java.JObjectUtil;
-import edu.uci.ics.asterix.external.library.java.JObjects.ByteArrayAccessibleDataInputStream;
-import edu.uci.ics.asterix.external.library.java.JObjects.ByteArrayAccessibleInputStream;
-import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjectPointableVisitor;
 import edu.uci.ics.asterix.external.library.java.JTypeTag;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ARecord;
-import edu.uci.ics.asterix.om.base.AString;
-import edu.uci.ics.asterix.om.base.IAObject;
 import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.pointables.AFlatValuePointable;
+import edu.uci.ics.asterix.om.pointables.AListPointable;
+import edu.uci.ics.asterix.om.pointables.ARecordPointable;
+import edu.uci.ics.asterix.om.pointables.PointableAllocator;
+import edu.uci.ics.asterix.om.pointables.base.IVisitablePointable;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.container.IObjectPool;
 import edu.uci.ics.asterix.om.util.container.ListObjectPool;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
 
 public class JavaFunctionHelper implements IFunctionHelper {
 
     private final IExternalFunctionInfo finfo;
     private final IDataOutputProvider outputProvider;
-    private IJObject[] arguments;
+    private final IJObject[] arguments;
     private IJObject resultHolder;
-    private ISerializerDeserializer resultSerde;
-    private IObjectPool<IJObject, IAType> objectPool = new ListObjectPool<IJObject, IAType>(new JTypeObjectFactory());
-    byte[] buffer = new byte[32768];
-    ByteArrayAccessibleInputStream bis = new ByteArrayAccessibleInputStream(buffer, 0, buffer.length);
-    ByteArrayAccessibleDataInputStream dis = new ByteArrayAccessibleDataInputStream(bis);
+    private final IObjectPool<IJObject, IAType> objectPool = new ListObjectPool<IJObject, IAType>(
+            JTypeObjectFactory.INSTANCE);
+    private final JObjectPointableVisitor pointableVisitor;
+    private final PointableAllocator pointableAllocator;
+    private final Map<Integer, TypeInfo> poolTypeInfo;
 
     public JavaFunctionHelper(IExternalFunctionInfo finfo, IDataOutputProvider outputProvider)
             throws AlgebricksException {
         this.finfo = finfo;
         this.outputProvider = outputProvider;
-        List<IAType> params = finfo.getParamList();
-        arguments = new IJObject[params.size()];
+        this.pointableVisitor = new JObjectPointableVisitor();
+        this.pointableAllocator = new PointableAllocator();
+        this.arguments = new IJObject[finfo.getParamList().size()];
         int index = 0;
-        for (IAType param : params) {
-            this.arguments[index] = objectPool.allocate(param);
-            index++;
+        for (IAType param : finfo.getParamList()) {
+            this.arguments[index++] = objectPool.allocate(param);
         }
-        resultHolder = objectPool.allocate(finfo.getReturnType());
+        this.resultHolder = objectPool.allocate(finfo.getReturnType());
+        this.poolTypeInfo = new HashMap<Integer, TypeInfo>();
+
     }
 
     @Override
@@ -76,110 +72,55 @@ public class JavaFunctionHelper implements IFunctionHelper {
 
     @Override
     public void setResult(IJObject result) throws IOException, AsterixException {
-        IAObject obj = result.getIAObject();
         try {
-            outputProvider.getDataOutput().writeByte(obj.getType().getTypeTag().serialize());
-        } catch (IOException e) {
+            result.serialize(outputProvider.getDataOutput(), true);
+            result.reset();
+        } catch (IOException  | AlgebricksException e) {
             throw new HyracksDataException(e);
         }
-
-        if (obj.getType().getTypeTag().equals(ATypeTag.RECORD)) {
-            ARecordType recType = (ARecordType) obj.getType();
-            if (recType.isOpen()) {
-                writeOpenRecord((JRecord) result, outputProvider.getDataOutput());
-            } else {
-                resultSerde = AqlSerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(recType);
-                resultSerde.serialize(obj, outputProvider.getDataOutput());
-            }
-        } else {
-            resultSerde = AqlSerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(obj.getType());
-            resultSerde.serialize(obj, outputProvider.getDataOutput());
-        }
-        reset();
-    }
-
-    private void writeOpenRecord(JRecord jRecord, DataOutput dataOutput) throws AsterixException, IOException {
-        ARecord aRecord = (ARecord) jRecord.getIAObject();
-        RecordBuilder recordBuilder = new RecordBuilder();
-        ARecordType recordType = aRecord.getType();
-        recordBuilder.reset(recordType);
-        ArrayBackedValueStorage fieldName = new ArrayBackedValueStorage();
-        ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
-        List<Boolean> openField = jRecord.getOpenField();
-
-        int fieldIndex = 0;
-        int closedFieldId = 0;
-        for (IJObject field : jRecord.getFields()) {
-            fieldValue.reset();
-            switch (field.getTypeTag()) {
-                case RECORD:
-                    ARecordType recType = (ARecordType) field.getIAObject().getType();
-                    if (recType.isOpen()) {
-                        fieldValue.getDataOutput().writeByte(recType.getTypeTag().serialize());
-                        writeOpenRecord((JRecord) field, fieldValue.getDataOutput());
-                    } else {
-                        AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(
-                                field.getIAObject().getType()).serialize(field.getIAObject(),
-                                fieldValue.getDataOutput());
-                    }
-                    break;
-                default:
-                    AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(field.getIAObject().getType())
-                            .serialize(field.getIAObject(), fieldValue.getDataOutput());
-                    break;
-            }
-            if (openField.get(fieldIndex)) {
-                String fName = jRecord.getFieldNames().get(fieldIndex);
-                fieldName.reset();
-                AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING).serialize(
-                        new AString(fName), fieldName.getDataOutput());
-                recordBuilder.addField(fieldName, fieldValue);
-            } else {
-                recordBuilder.addField(closedFieldId, fieldValue);
-                closedFieldId++;
-            }
-            fieldIndex++;
-        }
-
-        recordBuilder.write(dataOutput, false);
-
     }
 
-    private void reset() {
-        for (IJObject jObject : arguments) {
-            switch (jObject.getTypeTag()) {
-                case RECORD:
-                    reset((JRecord) jObject);
-                    break;
-            }
-        }
-        switch (resultHolder.getTypeTag()) {
+    public void setArgument(int index, IValueReference valueReference) throws IOException, AsterixException {
+        IVisitablePointable pointable = null;
+        IJObject jObject = null;
+        IAType type = finfo.getParamList().get(index);
+        switch (type.getTypeTag()) {
             case RECORD:
-                reset((JRecord) resultHolder);
+                pointable = pointableAllocator.allocateRecordValue(type);
+                pointable.set(valueReference);
+                jObject = pointableVisitor.visit((ARecordPointable) pointable, getTypeInfo(index, type));
+                break;
+            case ORDEREDLIST:
+            case UNORDEREDLIST:
+                pointable = pointableAllocator.allocateListValue(type);
+                pointable.set(valueReference);
+                jObject = pointableVisitor.visit((AListPointable) pointable, getTypeInfo(index, type));
+                break;
+            case ANY:
+                throw new IllegalStateException("Cannot handle a function argument of type " + type.getTypeTag());
+            default:
+                pointable = pointableAllocator.allocateFieldValue(type);
+                pointable.set(valueReference);
+                jObject = pointableVisitor.visit((AFlatValuePointable) pointable, getTypeInfo(index, type));
                 break;
         }
+        arguments[index] = jObject;
     }
 
-    private void reset(JRecord jRecord) {
-        List<IJObject> fields = ((JRecord) jRecord).getFields();
-        for (IJObject field : fields) {
-            switch (field.getTypeTag()) {
-                case RECORD:
-                    reset((JRecord) field);
-                    break;
-            }
+    private TypeInfo getTypeInfo(int index, IAType type) {
+        TypeInfo typeInfo = poolTypeInfo.get(index);
+        if (typeInfo == null) {
+            typeInfo = new TypeInfo(objectPool, type, type.getTypeTag());
+            poolTypeInfo.put(index, typeInfo);
         }
-        jRecord.close();
-    }
-
-    public void setArgument(int index, byte[] argument) throws IOException, AsterixException {
-        bis.setContent(argument, 1, argument.length);
-        IAType type = finfo.getParamList().get(index);
-        arguments[index] = JObjectUtil.getJType(type.getTypeTag(), type, dis, objectPool);
+        return typeInfo;
     }
 
     @Override
     public IJObject getResultObject() {
+        if (resultHolder == null) {
+            resultHolder = objectPool.allocate(finfo.getReturnType());
+        }
         return resultHolder;
     }
 
@@ -197,4 +138,10 @@ public class JavaFunctionHelper implements IFunctionHelper {
         return retValue;
     }
 
+    @Override
+    public void reset() {
+        pointableAllocator.reset();
+        objectPool.reset();
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/TypeInfo.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/TypeInfo.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/TypeInfo.java
new file mode 100644
index 0000000..e2c66ca
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/TypeInfo.java
@@ -0,0 +1,49 @@
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.java.IJObject;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.container.IObjectPool;
+
+public class TypeInfo {
+
+    private IObjectPool<IJObject, IAType> objectPool;
+    private IAType atype;
+    private ATypeTag typeTag;
+
+    public TypeInfo(IObjectPool<IJObject, IAType> objectPool, IAType atype, ATypeTag typeTag) {
+        this.objectPool = objectPool;
+        this.atype = atype;
+        this.typeTag = typeTag;
+    }
+
+    public void reset(IAType atype, ATypeTag typeTag) {
+        this.atype = atype;
+        this.typeTag = typeTag;
+    }
+
+    public IObjectPool<IJObject, IAType> getObjectPool() {
+        return objectPool;
+    }
+
+    public void setObjectPool(IObjectPool<IJObject, IAType> objectPool) {
+        this.objectPool = objectPool;
+    }
+
+    public IAType getAtype() {
+        return atype;
+    }
+
+    public void setAtype(IAType atype) {
+        this.atype = atype;
+    }
+
+    public ATypeTag getTypeTag() {
+        return typeTag;
+    }
+
+    public void setTypeTag(ATypeTag typeTag) {
+        this.typeTag = typeTag;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJListAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJListAccessor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJListAccessor.java
new file mode 100644
index 0000000..87db84a
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJListAccessor.java
@@ -0,0 +1,12 @@
+package edu.uci.ics.asterix.external.library.java;
+
+import edu.uci.ics.asterix.om.pointables.AListPointable;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.container.IObjectPool;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IJListAccessor {
+
+    IJObject access(AListPointable pointable, IObjectPool<IJObject, IAType> objectPool, IAType listType,
+            JObjectPointableVisitor pointableVisitor) throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java
index ff8e563..3567e7f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java
@@ -14,12 +14,20 @@
  */
 package edu.uci.ics.asterix.external.library.java;
 
+import java.io.DataOutput;
+
 import edu.uci.ics.asterix.om.base.IAObject;
 import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public interface IJObject {
 
     public ATypeTag getTypeTag();
 
     public IAObject getIAObject();
+
+    public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException;
+
+    public void reset() throws AlgebricksException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObjectAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObjectAccessor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObjectAccessor.java
new file mode 100644
index 0000000..6967243
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObjectAccessor.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.external.library.java;
+
+import edu.uci.ics.asterix.om.pointables.base.IVisitablePointable;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.container.IObjectPool;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IJObjectAccessor {
+    IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> obj) throws HyracksDataException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJRecordAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJRecordAccessor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJRecordAccessor.java
new file mode 100644
index 0000000..55ae262
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJRecordAccessor.java
@@ -0,0 +1,15 @@
+package edu.uci.ics.asterix.external.library.java;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.om.pointables.ARecordPointable;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.container.IObjectPool;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IJRecordAccessor {
+
+    public JRecord access(ARecordPointable pointable, IObjectPool<IJObject, IAType> objectPool, ARecordType recordType,
+            JObjectPointableVisitor pointableVisitor) throws HyracksDataException;
+
+}


Mime
View raw message