asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sjaco...@apache.org
Subject [02/24] incubator-asterixdb git commit: Introduces Feeds 2.0
Date Mon, 29 Jun 2015 19:44:59 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmSchemafullRecordParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmSchemafullRecordParserFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmSchemafullRecordParserFactory.java
deleted file mode 100644
index 2ce0e61..0000000
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmSchemafullRecordParserFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.runtime.operators.file;
-
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-
-/**
- * A Tuple parser factory for creating a tuple parser capable of parsing
- * ADM data.
- */
-public class AdmSchemafullRecordParserFactory implements ITupleParserFactory {
-
-    private static final long serialVersionUID = 1L;
-    protected ARecordType recType;
-
-    public AdmSchemafullRecordParserFactory(ARecordType recType) {
-        this.recType = recType;
-    }
-
-    @Override
-    public ITupleParser createTupleParser(final IHyracksTaskContext ctx) throws HyracksDataException {
-        return new AdmTupleParser(ctx, recType);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmTupleParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmTupleParser.java
deleted file mode 100644
index 8aab2db..0000000
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmTupleParser.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.runtime.operators.file;
-
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * An extension of AbstractTupleParser that provides functionality for
- * parsing delimited files.
- */
-public class AdmTupleParser extends AbstractTupleParser {
-
-    public AdmTupleParser(IHyracksTaskContext ctx, ARecordType recType) throws HyracksDataException {
-        super(ctx, recType);
-    }
-
-    @Override
-    public IDataParser getDataParser() {
-        return new ADMDataParser(filename);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AsterixTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AsterixTupleParserFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AsterixTupleParserFactory.java
new file mode 100644
index 0000000..f1bd0f6
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AsterixTupleParserFactory.java
@@ -0,0 +1,254 @@
+package edu.uci.ics.asterix.runtime.operators.file;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy.TupleForwardPolicyType;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class AsterixTupleParserFactory implements ITupleParserFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public static enum InputDataFormat {
+        ADM,
+        DELIMITED,
+        UNKNOWN
+    }
+
+    public static final String HAS_HEADER = "has.header";
+    public static final String KEY_FORMAT = "format";
+    public static final String FORMAT_ADM = "adm";
+    public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
+    public static final String FORMAT_BINARY = "binary";
+
+    public static final String KEY_PATH = "path";
+    public static final String KEY_SOURCE_DATATYPE = "type-name";
+    public static final String KEY_DELIMITER = "delimiter";
+    public static final String KEY_PARSER_FACTORY = "parser";
+    public static final String KEY_HEADER = "header";
+    public static final String KEY_QUOTE = "quote";
+    public static final String TIME_TRACKING = "time.tracking";
+    public static final String DEFAULT_QUOTE = "\"";
+    public static final String AT_LEAST_ONE_SEMANTICS = FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS;
+    public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
+    public static final String DEFAULT_DELIMITER = ",";
+
+    private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
+
+    private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
+        Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, IValueParserFactory>();
+        m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
+        m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+        m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+        m.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
+        m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+        return m;
+    }
+
+    private final ARecordType recordType;
+    private final Map<String, String> configuration;
+    private final InputDataFormat inputDataFormat;
+
+    public AsterixTupleParserFactory(Map<String, String> configuration, ARecordType recType, InputDataFormat dataFormat) {
+        this.recordType = recType;
+        this.configuration = configuration;
+        this.inputDataFormat = dataFormat;
+    }
+
+    @Override
+    public ITupleParser createTupleParser(IHyracksTaskContext ctx) throws HyracksDataException {
+        ITupleParser tupleParser = null;
+        try {
+            String parserFactoryClassname = (String) configuration.get(KEY_PARSER_FACTORY);
+            ITupleParserFactory parserFactory = null;
+            if (parserFactoryClassname != null) {
+                parserFactory = (ITupleParserFactory) Class.forName(parserFactoryClassname).newInstance();
+                tupleParser = parserFactory.createTupleParser(ctx);
+            } else {
+                IDataParser dataParser = null;
+                dataParser = createDataParser(ctx);
+                ITupleForwardPolicy policy = getTupleParserPolicy(configuration);
+                policy.configure(configuration);
+                tupleParser = new GenericTupleParser(ctx, recordType, dataParser, policy);
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        return tupleParser;
+    }
+
+    private static class GenericTupleParser extends AbstractTupleParser {
+
+        private final IDataParser dataParser;
+
+        private final ITupleForwardPolicy policy;
+
+        public GenericTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
+                ITupleForwardPolicy policy) throws HyracksDataException {
+            super(ctx, recType);
+            this.dataParser = dataParser;
+            this.policy = policy;
+        }
+
+        @Override
+        public IDataParser getDataParser() {
+            return dataParser;
+        }
+
+        @Override
+        public ITupleForwardPolicy getTupleParserPolicy() {
+            return policy;
+        }
+
+    }
+
+    private IDataParser createDataParser(IHyracksTaskContext ctx) throws Exception {
+        IDataParser dataParser = null;
+        switch (inputDataFormat) {
+            case ADM:
+                dataParser = new ADMDataParser();
+                break;
+            case DELIMITED:
+                dataParser = configureDelimitedDataParser(ctx);
+                break;
+            case UNKNOWN:
+                String specifiedFormat = (String) configuration.get(KEY_FORMAT);
+                if (specifiedFormat == null) {
+                    throw new IllegalArgumentException(" Unspecified data format");
+                } else {
+                    if (FORMAT_ADM.equalsIgnoreCase(specifiedFormat.toUpperCase())) {
+                        dataParser = new ADMDataParser();
+                    } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat.toUpperCase())) {
+                        dataParser = configureDelimitedDataParser(ctx);
+                    } else {
+                        throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT)
+                                + " not supported");
+                    }
+                }
+        }
+        return dataParser;
+    }
+
+    public static ITupleForwardPolicy getTupleParserPolicy(Map<String, String> configuration) {
+        ITupleForwardPolicy policy = null;
+        ITupleForwardPolicy.TupleForwardPolicyType policyType = null;
+        String propValue = configuration.get(ITupleForwardPolicy.PARSER_POLICY);
+        if (propValue == null) {
+            policyType = TupleForwardPolicyType.FRAME_FULL;
+        } else {
+            policyType = TupleForwardPolicyType.valueOf(propValue.trim().toUpperCase());
+        }
+        switch (policyType) {
+            case FRAME_FULL:
+                policy = new FrameFullTupleForwardPolicy();
+                break;
+            case COUNTER_TIMER_EXPIRED:
+                policy = new CounterTimerTupleForwardPolicy();
+                break;
+            case RATE_CONTROLLED:
+                policy = new RateControlledTupleForwardPolicy();
+                break;
+        }
+        return policy;
+    }
+
+    private IDataParser configureDelimitedDataParser(IHyracksTaskContext ctx) throws AsterixException {
+        IValueParserFactory[] valueParserFactories = getValueParserFactories();
+        Character delimiter = getDelimiter(configuration);
+        char quote = getQuote(configuration, delimiter);
+        boolean hasHeader = hasHeader();
+        return new DelimitedDataParser(recordType, valueParserFactories, delimiter, quote, hasHeader);
+    }
+  
+
+    private boolean hasHeader() {
+        String value = configuration.get(KEY_HEADER);
+        if (value != null) {
+            return Boolean.valueOf(value);
+        }
+        return false;
+    }
+
+    private IValueParserFactory[] getValueParserFactories() {
+        int n = recordType.getFieldTypes().length;
+        IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
+        for (int i = 0; i < n; i++) {
+            ATypeTag tag = null;
+            if (recordType.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) {
+                List<IAType> unionTypes = ((AUnionType) recordType.getFieldTypes()[i]).getUnionList();
+                if (unionTypes.size() != 2 && unionTypes.get(0).getTypeTag() != ATypeTag.NULL) {
+                    throw new NotImplementedException("Non-optional UNION type is not supported.");
+                }
+                tag = unionTypes.get(1).getTypeTag();
+            } else {
+                tag = recordType.getFieldTypes()[i].getTypeTag();
+            }
+            if (tag == null) {
+                throw new NotImplementedException("Failed to get the type information for field " + i + ".");
+            }
+            IValueParserFactory vpf = valueParserFactoryMap.get(tag);
+            if (vpf == null) {
+                throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
+            }
+            fieldParserFactories[i] = vpf;
+        }
+        return fieldParserFactories;
+    }
+
+    // Get a delimiter from the given configuration
+    public static char getDelimiter(Map<String, String> configuration) throws AsterixException {
+        String delimiterValue = configuration.get(AsterixTupleParserFactory.KEY_DELIMITER);
+        if (delimiterValue == null) {
+            delimiterValue = AsterixTupleParserFactory.DEFAULT_DELIMITER;
+        } else if (delimiterValue.length() != 1) {
+            throw new AsterixException("'" + delimiterValue
+                    + "' is not a valid delimiter. The length of a delimiter should be 1.");
+        }
+        return delimiterValue.charAt(0);
+    }
+
+    // Get a quote from the given configuration when the delimiter is given
+    // Need to pass delimiter to check whether they share the same character
+    public static char getQuote(Map<String, String> configuration, char delimiter) throws AsterixException {
+        String quoteValue = configuration.get(AsterixTupleParserFactory.KEY_QUOTE);
+        if (quoteValue == null) {
+            quoteValue = AsterixTupleParserFactory.DEFAULT_QUOTE;
+        } else if (quoteValue.length() != 1) {
+            throw new AsterixException("'" + quoteValue + "' is not a valid quote. The length of a quote should be 1.");
+        }
+
+        // Since delimiter (char type value) can't be null,
+        // we only check whether delimiter and quote use the same character
+        if (quoteValue.charAt(0) == delimiter) {
+            throw new AsterixException("Quote '" + quoteValue + "' cannot be used with the delimiter '" + delimiter
+                    + "'. ");
+        }
+
+        return quoteValue.charAt(0);
+    }
+
+    // Get the header flag
+    public static boolean getHasHeader(Map<String, String> configuration) {
+        return Boolean.parseBoolean(configuration.get(AsterixTupleParserFactory.KEY_HEADER));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
new file mode 100644
index 0000000..261eb13
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
@@ -0,0 +1,140 @@
+package edu.uci.ics.asterix.runtime.operators.file;
+
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class CounterTimerTupleForwardPolicy implements ITupleForwardPolicy {
+
+    public static final String BATCH_SIZE = "batch-size";
+    public static final String BATCH_INTERVAL = "batch-interval";
+
+    private static final Logger LOGGER = Logger.getLogger(CounterTimerTupleForwardPolicy.class.getName());
+   
+    private FrameTupleAppender appender;
+    private IFrame frame;
+    private IFrameWriter writer;
+    private int batchSize;
+    private long batchInterval;
+    private int tuplesInFrame = 0;
+    private TimeBasedFlushTask flushTask;
+    private Timer timer;
+    private Object lock = new Object();
+    private boolean activeTimer = false;
+
+    public void configure(Map<String, String> configuration) {
+        String propValue = (String) configuration.get(BATCH_SIZE);
+        if (propValue != null) {
+            batchSize = Integer.parseInt(propValue);
+        } else {
+            batchSize = -1;
+        }
+
+        propValue = (String) configuration.get(BATCH_INTERVAL);
+        if (propValue != null) {
+            batchInterval = Long.parseLong(propValue);
+            activeTimer = true;
+        }
+    }
+
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
+        this.appender = new FrameTupleAppender();
+        this.frame = new VSizeFrame(ctx);
+        appender.reset(frame, true);
+        this.writer = writer;
+        if (activeTimer) {
+            this.timer = new Timer();
+            this.flushTask = new TimeBasedFlushTask(writer, lock);
+            timer.scheduleAtFixedRate(flushTask, 0, batchInterval);
+        }
+    }
+
+    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+        if (activeTimer) {
+            synchronized (lock) {
+                addTupleToFrame(tb);
+            }
+        } else {
+            addTupleToFrame(tb);
+        }
+        tuplesInFrame++;
+    }
+
+    private void addTupleToFrame(ArrayTupleBuilder tb) throws HyracksDataException {
+        if (tuplesInFrame == batchSize || !appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("flushing frame containg (" + tuplesInFrame + ") tuples");
+            }
+            FrameUtils.flushFrame(frame.getBuffer(), writer);
+            tuplesInFrame = 0;
+            appender.reset(frame, true);
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    public void close() throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            if (activeTimer) {
+                synchronized (lock) {
+                    FrameUtils.flushFrame(frame.getBuffer(), writer);
+                }
+            } else {
+                FrameUtils.flushFrame(frame.getBuffer(), writer);
+            }
+        }
+
+        if (timer != null) {
+            timer.cancel();
+        }
+    }
+
+    private class TimeBasedFlushTask extends TimerTask {
+
+        private IFrameWriter writer;
+        private final Object lock;
+
+        public TimeBasedFlushTask(IFrameWriter writer, Object lock) {
+            this.writer = writer;
+            this.lock = lock;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (tuplesInFrame > 0) {
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("TTL expired flushing frame (" + tuplesInFrame + ")");
+                    }
+                    synchronized (lock) {
+                        FrameUtils.flushFrame(frame.getBuffer(), writer);
+                        appender.reset(frame, true);
+                        tuplesInFrame = 0;
+                    }
+                }
+            } catch (HyracksDataException e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+    @Override
+    public TupleForwardPolicyType getType() {
+        return TupleForwardPolicyType.COUNTER_TIMER_EXPIRED;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataTupleParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataTupleParser.java
deleted file mode 100644
index 42e336d..0000000
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataTupleParser.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.runtime.operators.file;
-
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-
-/**
- * An extension of AbstractTupleParser that provides functionality for
- * parsing delimited files.
- */
-public class DelimitedDataTupleParser extends AbstractTupleParser {
-
-    private final DelimitedDataParser dataParser;
-
-    public DelimitedDataTupleParser(IHyracksTaskContext ctx, ARecordType recType,
-            IValueParserFactory[] valueParserFactories, char fieldDelimter, char quote, boolean hasHeader) throws HyracksDataException {
-        super(ctx, recType);
-        dataParser = new DelimitedDataParser(recType, valueParserFactories, fieldDelimter, quote, hasHeader);
-    }
-
-    @Override
-    public IDataParser getDataParser() {
-        return dataParser;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
new file mode 100644
index 0000000..6ce3dfa
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
@@ -0,0 +1,58 @@
+package edu.uci.ics.asterix.runtime.operators.file;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class FrameFullTupleForwardPolicy implements ITupleForwardPolicy {
+
+	private FrameTupleAppender appender;
+	private IFrame frame;
+	private IFrameWriter writer;
+
+	public void configure(Map<String, String> configuration) {
+		// no-op
+	}
+
+	public void initialize(IHyracksTaskContext ctx, IFrameWriter writer)
+			throws HyracksDataException {
+		this.appender = new FrameTupleAppender();
+		this.frame = new VSizeFrame(ctx);
+		this.writer = writer;
+		appender.reset(frame, true);
+	}
+
+	public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+		boolean success = appender.append(tb.getFieldEndOffsets(),
+				tb.getByteArray(), 0, tb.getSize());
+		if (!success) {
+			FrameUtils.flushFrame(frame.getBuffer(), writer);
+			appender.reset(frame, true);
+			success = appender.append(tb.getFieldEndOffsets(),
+					tb.getByteArray(), 0, tb.getSize());
+			if (!success) {
+				throw new IllegalStateException();
+			}
+		}
+	}
+
+	public void close() throws HyracksDataException {
+		if (appender.getTupleCount() > 0) {
+			FrameUtils.flushFrame(frame.getBuffer(), writer);
+		}
+
+	}
+
+	@Override
+	public TupleForwardPolicyType getType() {
+		return TupleForwardPolicyType.FRAME_FULL;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/NtDelimitedDataTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/NtDelimitedDataTupleParserFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/NtDelimitedDataTupleParserFactory.java
deleted file mode 100644
index 0e85fd2..0000000
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/NtDelimitedDataTupleParserFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.runtime.operators.file;
-
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-
-/**
- * A tuple parser factory for creating a tuple parser capable of parsing
- * delimited data.
- */
-public class NtDelimitedDataTupleParserFactory implements ITupleParserFactory {
-    private static final long serialVersionUID = 1L;
-    protected final ARecordType recordType;
-    protected IValueParserFactory[] valueParserFactories;
-    protected final char fieldDelimiter;
-    // quote is used to enclose a string if it includes delimiter(s) in it.
-    protected final char quote;
-    // whether delimited text file has a header (which should be ignored)
-    protected final boolean hasHeader;
-
-    public NtDelimitedDataTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, char quote, boolean hasHeader) {
-        this.recordType = recordType;
-        this.valueParserFactories = valueParserFactories;
-        this.fieldDelimiter = fieldDelimiter;
-        this.quote = quote;
-        this.hasHeader = hasHeader;
-    }
-
-    @Override
-    public ITupleParser createTupleParser(final IHyracksTaskContext ctx) throws HyracksDataException {
-        return new DelimitedDataTupleParser(ctx, recordType, valueParserFactories, fieldDelimiter, quote,
-            hasHeader);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/RateContolledParserPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/RateContolledParserPolicy.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/RateContolledParserPolicy.java
new file mode 100644
index 0000000..3711f94
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/RateContolledParserPolicy.java
@@ -0,0 +1,79 @@
+package edu.uci.ics.asterix.runtime.operators.file;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.parse.ITupleParserPolicy;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class RateContolledParserPolicy implements ITupleParserPolicy {
+
+    protected FrameTupleAppender appender;
+    protected IFrame  frame;
+    private IFrameWriter writer;
+    private long interTupleInterval;
+    private boolean delayConfigured;
+
+    public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
+
+    public RateContolledParserPolicy() {
+
+    }
+
+    public TupleParserPolicy getType() {
+        return ITupleParserPolicy.TupleParserPolicy.FRAME_FULL;
+    }
+
+ 
+    @Override
+    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+        if (delayConfigured) {
+            try {
+                Thread.sleep(interTupleInterval);
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+        if (!success) {
+            FrameUtils.flushFrame(frame.getBuffer(), writer);
+            appender.reset(frame, true);
+            success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+            if (!success) {
+                throw new IllegalStateException();
+            }
+        }
+        appender.reset(frame, true);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(frame.getBuffer(), writer);
+        }
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws HyracksDataException {
+        String propValue = configuration.get(INTER_TUPLE_INTERVAL);
+        if (propValue != null) {
+            interTupleInterval = Long.parseLong(propValue);
+        } else {
+            interTupleInterval = 0;
+        }
+        delayConfigured = interTupleInterval != 0;
+        
+    }
+
+    @Override
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
+        this.appender = new FrameTupleAppender();
+        this.frame = new VSizeFrame(ctx);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
new file mode 100644
index 0000000..896ac73
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
@@ -0,0 +1,70 @@
+package edu.uci.ics.asterix.runtime.operators.file;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class RateControlledTupleForwardPolicy implements ITupleForwardPolicy {
+
+    private FrameTupleAppender appender;
+    private IFrame frame;
+    private IFrameWriter writer;
+    private long interTupleInterval;
+    private boolean delayConfigured;
+
+    public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
+
+    public void configure(Map<String, String> configuration) {
+        String propValue = configuration.get(INTER_TUPLE_INTERVAL);
+        if (propValue != null) {
+            interTupleInterval = Long.parseLong(propValue);
+        }
+        delayConfigured = interTupleInterval != 0;
+    }
+
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
+        this.appender = new FrameTupleAppender();
+        this.frame = new VSizeFrame(ctx);
+        this.writer = writer;
+        appender.reset(frame, true);
+    }
+
+    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+        if (delayConfigured) {
+            try {
+                Thread.sleep(interTupleInterval);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+        boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+        if (!success) {
+            FrameUtils.flushFrame(frame.getBuffer(), writer);
+            appender.reset(frame, true);
+            success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+            if (!success) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    public void close() throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(frame.getBuffer(), writer);
+        }
+
+    }
+
+    @Override
+    public TupleForwardPolicyType getType() {
+        return TupleForwardPolicyType.RATE_CONTROLLED;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
index ce22887..b8243a3 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
@@ -1,5 +1,5 @@
 /*
-x * Copyright 2009-2013 by The Regents of the University of California
+ * Copyright 2009-2013 by The Regents of the University of California
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
@@ -21,21 +21,18 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 
+import edu.uci.ics.asterix.external.util.Datatypes;
+
 public class DataGenerator {
 
     private RandomDateGenerator randDateGen;
-
     private RandomNameGenerator randNameGen;
-
     private RandomMessageGenerator randMessageGen;
-
     private RandomLocationGenerator randLocationGen;
-
     private Random random = new Random();
-
     private TwitterUser twUser = new TwitterUser();
-
     private TweetMessage twMessage = new TweetMessage();
+    private static final String DEFAULT_COUNTRY = "US";
 
     public DataGenerator(InitializationInfo info) {
         initialize(info);
@@ -44,28 +41,32 @@ public class DataGenerator {
     public class TweetMessageIterator implements Iterator<TweetMessage> {
 
         private final int duration;
-        private final GULongIDGenerator idGen;
         private long startTime = 0;
+        private int tweetId;
 
-        public TweetMessageIterator(int duration, GULongIDGenerator idGen) {
+        public TweetMessageIterator(int duration) {
             this.duration = duration;
-            this.idGen = idGen;
             this.startTime = System.currentTimeMillis();
         }
 
         @Override
         public boolean hasNext() {
+            if (duration == TweetGenerator.INFINITY) {
+                return true;
+            }
             return System.currentTimeMillis() - startTime <= duration * 1000;
         }
 
         @Override
         public TweetMessage next() {
+            tweetId++;
             TweetMessage msg = null;
             getTwitterUser(null);
             Message message = randMessageGen.getNextRandomMessage();
             Point location = randLocationGen.getRandomPoint();
             DateTime sendTime = randDateGen.getNextRandomDatetime();
-            twMessage.reset(idGen.getNextULong(), twUser, location, sendTime, message.getReferredTopics(), message);
+            twMessage.reset(tweetId, twUser, location.getLatitude(), location.getLongitude(), sendTime.toString(),
+                    message, DEFAULT_COUNTRY);
             msg = twMessage;
             return msg;
         }
@@ -73,6 +74,7 @@ public class DataGenerator {
         @Override
         public void remove() {
             // TODO Auto-generated method stub
+
         }
 
     }
@@ -218,8 +220,7 @@ public class DataGenerator {
 
         public String toString() {
             StringBuilder builder = new StringBuilder();
-            builder.append("datetime");
-            builder.append("(\"");
+            builder.append("\"");
             builder.append(super.getYear());
             builder.append("-");
             builder.append(super.getMonth() < 10 ? "0" + super.getMonth() : super.getMonth());
@@ -227,7 +228,7 @@ public class DataGenerator {
             builder.append(super.getDay() < 10 ? "0" + super.getDay() : super.getDay());
             builder.append("T");
             builder.append(hour + ":" + min + ":" + sec);
-            builder.append("\")");
+            builder.append("\"");
             return builder.toString();
         }
     }
@@ -475,78 +476,112 @@ public class DataGenerator {
 
     public static class TweetMessage {
 
-        private long tweetid;
+        private static final String[] DEFAULT_FIELDS = new String[] { TweetFields.TWEETID, TweetFields.USER,
+                TweetFields.LATITUDE, TweetFields.LONGITUDE, TweetFields.MESSAGE_TEXT, TweetFields.CREATED_AT,
+                TweetFields.COUNTRY };
+
+        private int id;
         private TwitterUser user;
-        private Point senderLocation;
-        private DateTime sendTime;
-        private List<String> referredTopics;
+        private double latitude;
+        private double longitude;
+        private String created_at;
         private Message messageText;
+        private String country;
+
+        public static final class TweetFields {
+            public static final String TWEETID = "id";
+            public static final String USER = "user";
+            public static final String LATITUDE = "latitude";
+            public static final String LONGITUDE = "longitude";
+            public static final String MESSAGE_TEXT = "message_text";
+            public static final String CREATED_AT = "created_at";
+            public static final String COUNTRY = "country";
+
+        }
 
         public TweetMessage() {
         }
 
-        public TweetMessage(long tweetid, TwitterUser user, Point senderLocation, DateTime sendTime,
-                List<String> referredTopics, Message messageText) {
-            this.tweetid = tweetid;
+        public TweetMessage(int tweetid, TwitterUser user, double latitude, double longitude, String created_at,
+                Message messageText, String country) {
+            this.id = tweetid;
             this.user = user;
-            this.senderLocation = senderLocation;
-            this.sendTime = sendTime;
-            this.referredTopics = referredTopics;
+            this.latitude = latitude;
+            this.longitude = longitude;
+            this.created_at = created_at;
             this.messageText = messageText;
+            this.country = country;
         }
 
-        public void reset(long tweetid, TwitterUser user, Point senderLocation, DateTime sendTime,
-                List<String> referredTopics, Message messageText) {
-            this.tweetid = tweetid;
+        public void reset(int tweetid, TwitterUser user, double latitude, double longitude, String created_at,
+                Message messageText, String country) {
+            this.id = tweetid;
             this.user = user;
-            this.senderLocation = senderLocation;
-            this.sendTime = sendTime;
-            this.referredTopics = referredTopics;
+            this.latitude = latitude;
+            this.longitude = longitude;
+            this.created_at = created_at;
             this.messageText = messageText;
+            this.country = country;
         }
 
-        public String toString() {
+        public String getAdmEquivalent(String[] fields) {
+            if (fields == null) {
+                fields = DEFAULT_FIELDS;
+            }
             StringBuilder builder = new StringBuilder();
             builder.append("{");
-            builder.append("\"tweetid\":");
-            builder.append("int64(\"" + tweetid + "\")");
-            builder.append(",");
-            builder.append("\"user\":");
-            builder.append(user);
-            builder.append(",");
-            builder.append("\"sender-location\":");
-            builder.append(senderLocation);
-            builder.append(",");
-            builder.append("\"send-time\":");
-            builder.append(sendTime);
-            builder.append(",");
-            builder.append("\"referred-topics\":");
-            builder.append("{{");
-            for (String topic : referredTopics) {
-                builder.append("\"" + topic + "\"");
+            for (String field : fields) {
+                switch (field) {
+                    case Datatypes.Tweet.ID:
+                        appendFieldName(builder, Datatypes.Tweet.ID);
+                        builder.append("int64(\"" + id + "\")");
+                        break;
+                    case Datatypes.Tweet.USER:
+                        appendFieldName(builder, Datatypes.Tweet.USER);
+                        builder.append(user);
+                        break;
+                    case Datatypes.Tweet.LATITUDE:
+                        appendFieldName(builder, Datatypes.Tweet.LATITUDE);
+                        builder.append(latitude);
+                        break;
+                    case Datatypes.Tweet.LONGITUDE:
+                        appendFieldName(builder, Datatypes.Tweet.LONGITUDE);
+                        builder.append(longitude);
+                        break;
+                    case Datatypes.Tweet.MESSAGE:
+                        appendFieldName(builder, Datatypes.Tweet.MESSAGE);
+                        builder.append("\"");
+                        for (int i = 0; i < messageText.getLength(); i++) {
+                            builder.append(messageText.charAt(i));
+                        }
+                        builder.append("\"");
+                        break;
+                    case Datatypes.Tweet.CREATED_AT:
+                        appendFieldName(builder, Datatypes.Tweet.CREATED_AT);
+                        builder.append(created_at);
+                        break;
+                    case Datatypes.Tweet.COUNTRY:
+                        appendFieldName(builder, Datatypes.Tweet.COUNTRY);
+                        builder.append("\"" + country + "\"");
+                        break;
+                }
                 builder.append(",");
             }
-            if (referredTopics.size() > 0) {
-                builder.deleteCharAt(builder.lastIndexOf(","));
-            }
-            builder.append("}}");
-            builder.append(",");
-            builder.append("\"message-text\":");
-            builder.append("\"");
-            for (int i = 0; i < messageText.getLength(); i++) {
-                builder.append(messageText.charAt(i));
-            }
-            builder.append("\"");
+            builder.deleteCharAt(builder.length() - 1);
             builder.append("}");
-            return new String(builder);
+            return builder.toString();
         }
 
-        public long getTweetid() {
-            return tweetid;
+        private void appendFieldName(StringBuilder builder, String fieldName) {
+            builder.append("\"" + fieldName + "\":");
         }
 
-        public void setTweetid(long tweetid) {
-            this.tweetid = tweetid;
+        public int getTweetid() {
+            return id;
+        }
+
+        public void setTweetid(int tweetid) {
+            this.id = tweetid;
         }
 
         public TwitterUser getUser() {
@@ -557,28 +592,12 @@ public class DataGenerator {
             this.user = user;
         }
 
-        public Point getSenderLocation() {
-            return senderLocation;
-        }
-
-        public void setSenderLocation(Point senderLocation) {
-            this.senderLocation = senderLocation;
-        }
-
-        public DateTime getSendTime() {
-            return sendTime;
-        }
-
-        public void setSendTime(DateTime sendTime) {
-            this.sendTime = sendTime;
-        }
-
-        public List<String> getReferredTopics() {
-            return referredTopics;
+        public double getLatitude() {
+            return latitude;
         }
 
-        public void setReferredTopics(List<String> referredTopics) {
-            this.referredTopics = referredTopics;
+        public String getSendTime() {
+            return created_at;
         }
 
         public Message getMessageText() {
@@ -589,6 +608,10 @@ public class DataGenerator {
             this.messageText = messageText;
         }
 
+        public String getCountry() {
+            return country;
+        }
+
     }
 
     public static class TwitterUser {
@@ -643,13 +666,13 @@ public class DataGenerator {
         public String toString() {
             StringBuilder builder = new StringBuilder();
             builder.append("{");
-            builder.append("\"screen-name\":" + "\"" + screenName + "\"");
+            builder.append("\"screen_name\":" + "\"" + screenName + "\"");
             builder.append(",");
-            builder.append("\"lang\":" + "\"" + lang + "\"");
+            builder.append("\"language\":" + "\"" + lang + "\"");
             builder.append(",");
             builder.append("\"friends_count\":" + friendsCount);
             builder.append(",");
-            builder.append("\"statuses_count\":" + statusesCount);
+            builder.append("\"status_count\":" + statusesCount);
             builder.append(",");
             builder.append("\"name\":" + "\"" + name + "\"");
             builder.append(",");
@@ -1158,5 +1181,4 @@ public class DataGenerator {
             "Lexicone", "Fax-fax", "Viatechi", "Inchdox", "Kongreen", "Doncare", "Y-geohex", "Opeelectronics",
             "Medflex", "Dancode", "Roundhex", "Labzatron", "Newhotplus", "Sancone", "Ronholdings", "Quoline",
             "zoomplus", "Fix-touch", "Codetechno", "Tanzumbam", "Indiex", "Canline" };
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
index 20b9be1..993d9c9 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
@@ -7,8 +7,8 @@ import java.net.Socket;
 import java.util.logging.Level;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -18,12 +18,14 @@ public class GenericSocketFeedAdapter extends StreamBasedAdapter implements IFee
 
     private static final long serialVersionUID = 1L;
 
+    private final int port;
     private SocketFeedServer socketFeedServer;
 
-    public GenericSocketFeedAdapter(ITupleParserFactory parserFactory, ARecordType outputtype, int port,
-            IHyracksTaskContext ctx) throws AsterixException, IOException {
-        super(parserFactory, outputtype, ctx);
-        this.socketFeedServer = new SocketFeedServer(outputtype, port);
+    public GenericSocketFeedAdapter(ITupleParserFactory parserFactory, ARecordType outputType, int port,
+            IHyracksTaskContext ctx, int partition) throws AsterixException, IOException {
+        super(parserFactory, outputType, ctx, partition);
+        this.port = port;
+        this.socketFeedServer = new SocketFeedServer(outputType, port);
     }
 
     @Override
@@ -90,4 +92,14 @@ public class GenericSocketFeedAdapter extends StreamBasedAdapter implements IFee
         return DataExchangeMode.PUSH;
     }
 
+    @Override
+    public boolean handleException(Exception e) {
+        try {
+            this.socketFeedServer = new SocketFeedServer((ARecordType) sourceDatatype, port);
+            return true;
+        } catch (Exception re) {
+            return false;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
index 17d989b..37d5141 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
@@ -23,12 +23,14 @@ import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
 
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
 import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -40,7 +42,7 @@ import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
  * adapter listens at a port for receiving data (from external world).
  * Data received is transformed into Asterix Data Format (ADM).
  */
-public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
+public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory implements IFeedAdapterFactory {
 
     private static final long serialVersionUID = 1L;
 
@@ -65,11 +67,6 @@ public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory i
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.GENERIC;
-    }
-
-    @Override
     public SupportedOperation getSupportedOperations() {
         return SupportedOperation.READ;
     }
@@ -81,9 +78,9 @@ public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory i
     @Override
     public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
         this.configuration = configuration;
-        outputType = (ARecordType) outputType;
-        this.configureFormat(outputType);
         this.configureSockets(configuration);
+        this.configureFormat(outputType);
+        this.outputType = (ARecordType) outputType;
     }
 
     @Override
@@ -98,7 +95,7 @@ public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory i
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
         Pair<String, Integer> socket = sockets.get(partition);
-        return new GenericSocketFeedAdapter(parserFactory, outputType, socket.second, ctx);
+        return new GenericSocketFeedAdapter(parserFactory, outputType, socket.second, ctx, partition);
     }
 
     private void configureSockets(Map<String, String> configuration) throws Exception {
@@ -117,8 +114,8 @@ public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory i
         Random random = new Random();
         for (String socket : socketsArray) {
             String[] socketTokens = socket.split(":");
-            String host = socketTokens[0];
-            int port = Integer.parseInt(socketTokens[1]);
+            String host = socketTokens[0].trim();
+            int port = Integer.parseInt(socketTokens[1].trim());
             Pair<String, Integer> p = null;
             switch (mode) {
                 case IP:
@@ -148,6 +145,23 @@ public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory i
     }
 
     @Override
+    public ARecordType getAdapterOutputType() {
+        return outputType;
+    }
+
+    @Override
+    public InputDataFormat getInputDataFormat() {
+        return InputDataFormat.UNKNOWN;
+    }
+
+    public boolean isRecordTrackingEnabled() {
+        return false;
+    }
+
+    public IIntakeProgressTracker createIntakeProgressTracker() {
+        throw new UnsupportedOperationException("Tracking of ingested records not enabled");
+    }
+    
     public void setFiles(List<ExternalFile> files) throws AlgebricksException {
         throw new AlgebricksException("files access not supported for this adapter");
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index 40342cd..e18ba42 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -18,8 +18,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
 
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
@@ -49,7 +49,7 @@ public class RateControlledFileSystemBasedAdapter extends FileSystemBasedAdapter
 
     @Override
     public void stop() {
-        ((RateControlledTupleParser) tupleParser).stop();
+       // ((RateControlledTupleParser) tupleParser).stop();
     }
 
     @Override
@@ -57,4 +57,9 @@ public class RateControlledFileSystemBasedAdapter extends FileSystemBasedAdapter
         return DataExchangeMode.PULL;
     }
 
+    @Override
+    public boolean handleException(Exception e) {
+        return false;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index d3f13f1..2a18133 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -14,34 +14,21 @@
  */
 package edu.uci.ics.asterix.tools.external.data;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
 import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
 import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
-import edu.uci.ics.asterix.runtime.operators.file.AbstractTupleParser;
-import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataParser;
-import edu.uci.ics.asterix.runtime.operators.file.IDataParser;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 
 /**
  * Factory class for creating @see{RateControllerFileSystemBasedAdapter} The
@@ -50,7 +37,7 @@ import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
  * source file has been ingested.
  */
 public class RateControlledFileSystemBasedAdapterFactory extends StreamBasedAdapterFactory implements
-        IGenericAdapterFactory {
+        IFeedAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String KEY_FILE_SYSTEM = "fs";
@@ -59,9 +46,8 @@ public class RateControlledFileSystemBasedAdapterFactory extends StreamBasedAdap
     public static final String KEY_PATH = "path";
     public static final String KEY_FORMAT = "format";
 
-    private IGenericAdapterFactory adapterFactory;
+    private IAdapterFactory adapterFactory;
     private String format;
-    private Map<String, String> configuration;
     private ARecordType atype;
 
     @Override
@@ -79,8 +65,8 @@ public class RateControlledFileSystemBasedAdapterFactory extends StreamBasedAdap
         if (configuration.get(KEY_FILE_SYSTEM) == null) {
             throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
         }
-        if (configuration.get(IGenericAdapterFactory.KEY_TYPE_NAME) == null) {
-            throw new Exception("Record type not specified (output-type-name=?)");
+        if (configuration.get(IAdapterFactory.KEY_TYPE_NAME) == null) {
+            throw new Exception("Record type not specified (type-name=?)");
         }
         if (configuration.get(KEY_PATH) == null) {
             throw new Exception("File path not specified (path=?)");
@@ -91,17 +77,12 @@ public class RateControlledFileSystemBasedAdapterFactory extends StreamBasedAdap
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.GENERIC;
-    }
-
-    @Override
     public SupportedOperation getSupportedOperations() {
         return SupportedOperation.READ;
     }
 
     @Override
-    public void configure(Map<String, String> configuration, ARecordType recordType) throws Exception {
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
         this.configuration = configuration;
         checkRequiredArgs(configuration);
         String fileSystem = (String) configuration.get(KEY_FILE_SYSTEM);
@@ -113,12 +94,11 @@ public class RateControlledFileSystemBasedAdapterFactory extends StreamBasedAdap
         } else {
             throw new AsterixException("Unsupported file system type " + fileSystem);
         }
+        this.atype = outputType;
         format = configuration.get(KEY_FORMAT);
-        adapterFactory = (IGenericAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
-        adapterFactory.configure(configuration, recordType);
-
-        atype = (ARecordType) recordType;
-        configureFormat();
+        adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
+        adapterFactory.configure(configuration, outputType);
+        configureFormat(outputType);
     }
 
     @Override
@@ -126,155 +106,22 @@ public class RateControlledFileSystemBasedAdapterFactory extends StreamBasedAdap
         return adapterFactory.getPartitionConstraint();
     }
 
-    private void configureFormat() throws AsterixException {
-        switch (format) {
-            case FORMAT_ADM:
-                parserFactory = new RateControlledTupleParserFactory(atype, configuration);
-                break;
-
-            case FORMAT_DELIMITED_TEXT:
-                char delimiter = StreamBasedAdapterFactory.getDelimiter(configuration);
-                char quote = StreamBasedAdapterFactory.getQuote(configuration, delimiter);
-                boolean hasHeader = StreamBasedAdapterFactory.getHasHeader(configuration);
-                IValueParserFactory[] valueParserFactories = getValueParserFactories(atype);
-                parserFactory = new RateControlledTupleParserFactory(atype, valueParserFactories, delimiter, quote,
-                        hasHeader, configuration);
-                break;
-        }
-    }
-
-    protected IValueParserFactory[] getValueParserFactories(ARecordType recordType) throws AsterixException {
-        int n = recordType.getFieldTypes().length;
-        IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
-        for (int i = 0; i < n; i++) {
-            ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
-            IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
-            if (vpf == null) {
-                throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
-            }
-            fieldParserFactories[i] = vpf;
-
-        }
-        return fieldParserFactories;
-    }
-
     @Override
-    public void setFiles(List<ExternalFile> files) throws AlgebricksException {
-        throw new AlgebricksException("can't set files for this Adapter");
-    }
-
-}
-
-class RateControlledTupleParserFactory implements ITupleParserFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    private final ARecordType recordType;
-    private final Map<String, String> configuration;
-    private IValueParserFactory[] valueParserFactories;
-    private char delimiter;
-    private char quote;
-    private boolean hasHeader;
-    private final ParserType parserType;
-
-    public enum ParserType {
-        ADM,
-        DELIMITED_DATA
-    }
-
-    public RateControlledTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, char quote, boolean hasHeader, Map<String, String> configuration) {
-        this.recordType = recordType;
-        this.valueParserFactories = valueParserFactories;
-        this.delimiter = fieldDelimiter;
-        this.quote = quote;
-        this.hasHeader = hasHeader;
-        this.configuration = configuration;
-        this.parserType = ParserType.DELIMITED_DATA;
-    }
-
-    public RateControlledTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
-        this.recordType = recordType;
-        this.configuration = configuration;
-        this.parserType = ParserType.ADM;
+    public ARecordType getAdapterOutputType() {
+        return atype;
     }
 
     @Override
-    public ITupleParser createTupleParser(IHyracksTaskContext ctx) throws HyracksDataException {
-        IDataParser dataParser = null;
-        switch (parserType) {
-            case ADM:
-                dataParser = new ADMDataParser();
-                break;
-            case DELIMITED_DATA:
-                dataParser = new DelimitedDataParser(recordType, valueParserFactories, delimiter, quote, hasHeader);
-                break;
-        }
-        return new RateControlledTupleParser(ctx, recordType, dataParser, configuration);
+    public InputDataFormat getInputDataFormat() {
+        return InputDataFormat.UNKNOWN;
     }
 
-}
-
-class RateControlledTupleParser extends AbstractTupleParser {
-
-    private final IDataParser dataParser;
-    private long interTupleInterval;
-    private boolean delayConfigured;
-    private boolean continueIngestion = true;
-
-    public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
-
-    public RateControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
-            Map<String, String> configuration) throws HyracksDataException {
-        super(ctx, recType);
-        this.dataParser = dataParser;
-        String propValue = configuration.get(INTER_TUPLE_INTERVAL);
-        if (propValue != null) {
-            interTupleInterval = Long.parseLong(propValue);
-        } else {
-            interTupleInterval = 0;
-        }
-        delayConfigured = interTupleInterval != 0;
-    }
-
-    public void setInterTupleInterval(long val) {
-        this.interTupleInterval = val;
-        this.delayConfigured = val > 0;
-    }
-
-    public void stop() {
-        continueIngestion = false;
+    public boolean isRecordTrackingEnabled() {
+        return false;
     }
 
-    @Override
-    public IDataParser getDataParser() {
-        return dataParser;
+    public IIntakeProgressTracker createIntakeProgressTracker() {
+        throw new UnsupportedOperationException("Tracking of ingested records not enabled");
     }
 
-    @Override
-    public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
-
-        IDataParser parser = getDataParser();
-        try {
-            parser.initialize(in, recType, true);
-            while (continueIngestion) {
-                tb.reset();
-                if (!parser.parse(tb.getDataOutput())) {
-                    break;
-                }
-                tb.addFieldEndOffset();
-                if (delayConfigured) {
-                    Thread.sleep(interTupleInterval);
-                }
-                addTupleToFrame(writer);
-            }
-            appender.flush(writer, true);
-        } catch (AsterixException ae) {
-            throw new HyracksDataException(ae);
-        } catch (IOException ioe) {
-            throw new HyracksDataException(ioe);
-        } catch (InterruptedException ie) {
-            throw new HyracksDataException(ie);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SocketClientAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SocketClientAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SocketClientAdapter.java
index 4cbc4f1..dcdf9c1 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SocketClientAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SocketClientAdapter.java
@@ -21,7 +21,7 @@ import java.net.Socket;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
@@ -33,6 +33,8 @@ public class SocketClientAdapter implements IFeedAdapter {
 
     private static final String LOCALHOST = "127.0.0.1";
 
+    private static final long RECONNECT_PERIOD = 2000;
+
     private final String localFile;
 
     private final int port;
@@ -49,7 +51,7 @@ public class SocketClientAdapter implements IFeedAdapter {
 
     @Override
     public void start(int partition, IFrameWriter writer) throws Exception {
-        Socket socket = new Socket(LOCALHOST, port);
+        Socket socket = waitForReceiver();
         OutputStream os = socket.getOutputStream();
         FileInputStream fin = new FileInputStream(new File(localFile));
         byte[] chunk = new byte[1024];
@@ -63,13 +65,31 @@ public class SocketClientAdapter implements IFeedAdapter {
                     break;
                 }
             }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Finished streaming file " + localFile + "to port [" + port + "]");
+            }
+
         } finally {
             socket.close();
             fin.close();
         }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Finished streaming file " + localFile + "to port [" + port + "]");
+
+    }
+
+    private Socket waitForReceiver() throws Exception {
+        Socket socket = null;
+        while (socket == null) {
+            try {
+                socket = new Socket(LOCALHOST, port);
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Receiver not ready, would wait for " + (RECONNECT_PERIOD / 1000)
+                            + " seconds before reconnecting");
+                }
+                Thread.sleep(RECONNECT_PERIOD);
+            }
         }
+        return socket;
     }
 
     @Override
@@ -82,4 +102,9 @@ public class SocketClientAdapter implements IFeedAdapter {
         continueStreaming = false;
     }
 
+    @Override
+    public boolean handleException(Exception e) {
+        return false;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SocketClientAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SocketClientAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SocketClientAdapterFactory.java
index 24a89dd..ef18197 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SocketClientAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SocketClientAdapterFactory.java
@@ -16,23 +16,20 @@ package edu.uci.ics.asterix.tools.external.data;
 
 import java.util.Map;
 
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.AUnorderedListType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
-public class SocketClientAdapterFactory implements ITypedAdapterFactory {
+public class SocketClientAdapterFactory implements IFeedAdapterFactory {
 
     private static final long serialVersionUID = 1L;
 
-    private static final ARecordType outputType = initOutputType();
+    private ARecordType outputType;
 
     private GenericSocketFeedAdapterFactory genericSocketAdapterFactory;
 
@@ -41,42 +38,26 @@ public class SocketClientAdapterFactory implements ITypedAdapterFactory {
     public static final String KEY_FILE_SPLITS = "file_splits";
 
     @Override
-    public SupportedOperation getSupportedOperations() {
-        return SupportedOperation.READ;
-    }
-
-    private static ARecordType initOutputType() {
-        ARecordType outputType = null;
-        try {
-            String[] userFieldNames = new String[] { "screen-name", "lang", "friends_count", "statuses_count", "name",
-                    "followers_count" };
-
-            IAType[] userFieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32,
-                    BuiltinType.AINT32, BuiltinType.ASTRING, BuiltinType.AINT32 };
-            ARecordType userRecordType = new ARecordType("TwitterUserType", userFieldNames, userFieldTypes, false);
-
-            String[] fieldNames = new String[] { "tweetid", "user", "sender-location", "send-time", "referred-topics",
-                    "message-text" };
-
-            AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, "referred-topics");
-            IAType[] fieldTypes = new IAType[] { BuiltinType.AINT64, userRecordType, BuiltinType.APOINT,
-                    BuiltinType.ADATETIME, unorderedListType, BuiltinType.ASTRING };
-            outputType = new ARecordType("TweetMessageType", fieldNames, fieldTypes, false);
-
-        } catch (AsterixException | HyracksDataException e) {
-            throw new IllegalStateException("Unable to initialize output type");
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+        this.outputType = outputType;
+        String fileSplitsValue = configuration.get(KEY_FILE_SPLITS);
+        if (fileSplitsValue == null) {
+            throw new IllegalArgumentException(
+                    "File splits not specified. File split is specified as a comma separated list of paths");
         }
-        return outputType;
+        fileSplits = fileSplitsValue.trim().split(",");
+        genericSocketAdapterFactory = new GenericSocketFeedAdapterFactory();
+        genericSocketAdapterFactory.configure(configuration, outputType);
     }
 
     @Override
-    public String getName() {
-        return "socket_client";
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.TYPED;
+    public String getName() {
+        return "socket_client";
     }
 
     @Override
@@ -96,14 +77,13 @@ public class SocketClientAdapterFactory implements ITypedAdapterFactory {
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        String fileSplitsValue = configuration.get(KEY_FILE_SPLITS);
-        if (fileSplitsValue == null) {
-            throw new IllegalArgumentException(
-                    "File splits not specified. File split is specified as a comma separated list of paths");
-        }
-        fileSplits = fileSplitsValue.trim().split(",");
-        genericSocketAdapterFactory = new GenericSocketFeedAdapterFactory();
-        genericSocketAdapterFactory.configure(configuration, outputType);
+    public boolean isRecordTrackingEnabled() {
+        return false;
     }
+
+    @Override
+    public IIntakeProgressTracker createIntakeProgressTracker() {
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
index 8f252e6..961f497 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
@@ -1,5 +1,5 @@
 /*
-x * Copyright 2009-2013 by The Regents of the University of California
+ * Copyright 2009-2013 by The Regents of the University of California
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
@@ -17,6 +17,8 @@ package edu.uci.ics.asterix.tools.external.data;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -31,45 +33,38 @@ public class TweetGenerator {
 
     public static final String KEY_DURATION = "duration";
     public static final String KEY_TPS = "tps";
-    public static final String KEY_GUID_SEED = "guid-seed";
+    public static final String KEY_VERBOSE = "verbose";
+    public static final String KEY_FIELDS = "fields";
+    public static final int INFINITY = 0;
 
-    public static final String OUTPUT_FORMAT = "output-format";
-    public static final String OUTPUT_FORMAT_ARECORD = "arecord";
-    public static final String OUTPUT_FORMAT_ADM_STRING = "adm-string";
-
-    private static final int DEFAULT_DURATION = 60; //seconds
-    private static final int DEFAULT_GUID_SEED = 0;
+    private static final int DEFAULT_DURATION = INFINITY;
 
     private int duration;
     private TweetMessageIterator tweetIterator = null;
     private int partition;
-    private int tweetCount = 0;
+    private long tweetCount = 0;
     private int frameTweetCount = 0;
     private int numFlushedTweets = 0;
-    private OutputStream os;
     private DataGenerator dataGenerator = null;
     private ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
-    private GULongIDGenerator uidGenerator;
-
-    public int getTweetCount() {
-        return tweetCount;
-    }
+    private String[] fields;
+    private final List<OutputStream> subscribers;
+    private final Object lock = new Object();
+    private final List<OutputStream> subscribersForRemoval = new ArrayList<OutputStream>();
 
-    public TweetGenerator(Map<String, String> configuration, int partition, String format, OutputStream os)
-            throws Exception {
+    public TweetGenerator(Map<String, String> configuration, int partition) throws Exception {
         this.partition = partition;
         String value = configuration.get(KEY_DURATION);
         this.duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
-        int guidSeed = configuration.get(KEY_GUID_SEED) != null ? Integer.parseInt(configuration.get(KEY_GUID_SEED))
-                : DEFAULT_GUID_SEED;
-        uidGenerator = new GULongIDGenerator(partition, (byte) (guidSeed));
         dataGenerator = new DataGenerator(new InitializationInfo());
-        tweetIterator = dataGenerator.new TweetMessageIterator(duration, uidGenerator);
-        this.os = os;
+        tweetIterator = dataGenerator.new TweetMessageIterator(duration);
+        this.fields = configuration.get(KEY_FIELDS) != null ? configuration.get(KEY_FIELDS).split(",") : null;
+        this.subscribers = new ArrayList<OutputStream>();
     }
 
     private void writeTweetString(TweetMessage tweetMessage) throws IOException {
-        String tweet = tweetMessage.toString() + "\n";
+        String tweet = tweetMessage.getAdmEquivalent(fields) + "\n";
+        System.out.println(tweet);
         tweetCount++;
         byte[] b = tweet.getBytes();
         if (outputBuffer.position() + b.length > outputBuffer.limit()) {
@@ -83,18 +78,26 @@ public class TweetGenerator {
         frameTweetCount++;
     }
 
-    public int getNumFlushedTweets() {
-        return numFlushedTweets;
-    }
-
     private void flush() throws IOException {
         outputBuffer.flip();
-        os.write(outputBuffer.array(), 0, outputBuffer.limit());
+        synchronized (lock) {
+            for (OutputStream os : subscribers) {
+                try {
+                    os.write(outputBuffer.array(), 0, outputBuffer.limit());
+                } catch (Exception e) {
+                    subscribersForRemoval.add(os);
+                }
+            }
+            if (!subscribersForRemoval.isEmpty()) {
+                subscribers.removeAll(subscribersForRemoval);
+                subscribersForRemoval.clear();
+            }
+        }
         outputBuffer.position(0);
         outputBuffer.limit(32 * 1024);
     }
 
-    public boolean setNextRecordBatch(int numTweetsInBatch) throws Exception {
+    public boolean generateNextBatch(int numTweets) throws Exception {
         boolean moreData = tweetIterator.hasNext();
         if (!moreData) {
             if (outputBuffer.position() > 0) {
@@ -106,11 +109,44 @@ public class TweetGenerator {
             return false;
         } else {
             int count = 0;
-            while (count < numTweetsInBatch) {
+            while (count < numTweets) {
                 writeTweetString(tweetIterator.next());
                 count++;
             }
             return true;
         }
     }
+
+    public int getNumFlushedTweets() {
+        return numFlushedTweets;
+    }
+
+    public void registerSubscriber(OutputStream os) {
+        synchronized (lock) {
+            subscribers.add(os);
+        }
+    }
+
+    public void deregisterSubscribers(OutputStream os) {
+        synchronized (lock) {
+            subscribers.remove(os);
+        }
+    }
+
+    public void close() throws IOException {
+        synchronized (lock) {
+            for (OutputStream os : subscribers) {
+                os.close();
+            }
+        }
+    }
+
+    public boolean isSubscribed() {
+        return !subscribers.isEmpty();
+    }
+
+    public long getTweetCount() {
+        return tweetCount;
+    }
+
 }
\ No newline at end of file


Mime
View raw message