asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [15/50] [abbrv] incubator-asterixdb git commit: Merge branch 'master' into hyracks-merge2
Date Thu, 07 Apr 2016 14:59:50 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 00c1eb7,0000000..3c3b8fb
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@@ -1,182 -1,0 +1,178 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.input.stream;
 +
 +import java.io.File;
 +import java.io.FileInputStream;
 +import java.io.IOException;
- import java.nio.file.Path;
- import java.util.Map;
 +
 +import org.apache.asterix.external.api.AsterixInputStream;
 +import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.asterix.external.util.FeedLogManager;
 +import org.apache.asterix.external.util.FileSystemWatcher;
- import org.apache.hyracks.api.context.IHyracksTaskContext;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
- import org.apache.hyracks.dataflow.std.file.FileSplit;
 +import org.apache.log4j.Logger;
 +
 +public class LocalFSInputStream extends AsterixInputStream {
 +
 +    private static final Logger LOGGER = Logger.getLogger(LocalFSInputStream.class.getName());
-     private final Path path;
 +    private final FileSystemWatcher watcher;
 +    private FileInputStream in;
 +    private byte lastByte;
 +    private File currentFile;
 +
-     public LocalFSInputStream(final FileSplit[] fileSplits, final IHyracksTaskContext ctx,
-             final Map<String, String> configuration, final int partition, final String expression, final boolean isFeed)
-             throws IOException {
-         this.path = fileSplits[partition].getLocalFile().getFile().toPath();
-         this.watcher = new FileSystemWatcher(path, expression, isFeed);
-         this.watcher.init();
-     }
- 
-     @Override
-     public void setFeedLogManager(FeedLogManager logManager) {
-         super.setFeedLogManager(logManager);
-         watcher.setFeedLogManager(logManager);
++    public LocalFSInputStream(FileSystemWatcher watcher) {
++        this.watcher = watcher;
 +    }
 +
 +    @Override
 +    public void setController(AbstractFeedDataFlowController controller) {
 +        super.setController(controller);
-         watcher.setController(controller);
 +    }
 +
 +    @Override
++    public void setFeedLogManager(FeedLogManager logManager) throws HyracksDataException {
++        super.setFeedLogManager(logManager);
++        watcher.setFeedLogManager(logManager);
++    };
++
++    @Override
 +    public void close() throws IOException {
 +        IOException ioe = null;
 +        if (in != null) {
 +            try {
 +                closeFile();
 +            } catch (Exception e) {
 +                ioe = new IOException(e);
 +            }
 +        }
 +        try {
 +            watcher.close();
 +        } catch (Exception e) {
 +            if (ioe == null) {
 +                throw e;
 +            }
 +            ioe.addSuppressed(e);
 +            throw ioe;
 +        }
 +    }
 +
 +    private void closeFile() throws IOException {
 +        if (in != null) {
++            if (logManager != null) {
++                logManager.endPartition(currentFile.getAbsolutePath());
++            }
 +            try {
 +                in.close();
 +            } finally {
 +                in = null;
 +                currentFile = null;
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Closes the current input stream and opens the next one, if any.
 +     */
 +    private boolean advance() throws IOException {
 +        closeFile();
-         if (watcher.hasNext()) {
-             currentFile = watcher.next();
++        currentFile = watcher.poll();
++        if (currentFile == null) {
++            if (controller != null) {
++                controller.flush();
++            }
++            currentFile = watcher.take();
++        }
++        if (currentFile != null) {
 +            in = new FileInputStream(currentFile);
++            if (notificationHandler != null) {
++                notificationHandler.notifyNewSource();
++            }
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    @Override
 +    public int read() throws IOException {
 +        throw new HyracksDataException(
 +                "read() is not supported with this stream. use read(byte[] b, int off, int len)");
 +    }
 +
 +    @Override
 +    public int read(byte[] b, int off, int len) throws IOException {
 +        if (in == null) {
 +            if (!advance()) {
 +                return -1;
 +            }
 +        }
 +        int result = in.read(b, off, len);
 +        while ((result < 0) && advance()) {
 +            // return a new line at the end of every file <--Might create problems for some cases
 +            // depending on the parser implementation-->
 +            if ((lastByte != ExternalDataConstants.BYTE_LF) && (lastByte != ExternalDataConstants.BYTE_LF)) {
 +                lastByte = ExternalDataConstants.BYTE_LF;
 +                b[off] = ExternalDataConstants.BYTE_LF;
 +                return 1;
 +            }
 +            // recursive call
 +            result = in.read(b, off, len);
 +        }
 +        if (result > 0) {
 +            lastByte = b[(off + result) - 1];
 +        }
 +        return result;
 +    }
 +
 +    @Override
 +    public boolean stop() throws Exception {
++        closeFile();
 +        watcher.close();
 +        return true;
 +    }
 +
 +    @Override
 +    public boolean handleException(Throwable th) {
 +        if (in == null) {
 +            return false;
 +        }
 +        if (th instanceof IOException) {
 +            // TODO: Change from string check to exception type
 +            if (th.getCause().getMessage().contains("Malformed input stream")) {
 +                if (currentFile != null) {
 +                    try {
 +                        logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file");
 +                    } catch (IOException e) {
 +                        LOGGER.warn("Filed to write to feed log file", e);
 +                    }
 +                    LOGGER.warn("Corrupted input file: " + currentFile.getAbsolutePath());
 +                }
 +                try {
 +                    advance();
 +                    return true;
 +                } catch (Exception e) {
-                     return false;
-                 }
-             } else {
-                 try {
-                     watcher.init();
-                 } catch (IOException e) {
-                     LOGGER.warn("Failed to initialize watcher during failure recovery", e);
-                     return false;
++                    LOGGER.warn("An exception was thrown while trying to skip a file", e);
 +                }
 +            }
-             return true;
 +        }
++        LOGGER.warn("Failed to recover from failure", th);
 +        return false;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
index 85d0e41,0000000..ae012f3
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@@ -1,158 -1,0 +1,164 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.input.stream.factory;
 +
 +import java.io.File;
- import java.io.IOException;
++import java.nio.file.Path;
++import java.util.ArrayList;
 +import java.util.Map;
++import java.util.Set;
++import java.util.TreeSet;
 +import java.util.logging.Level;
 +import java.util.logging.Logger;
 +
 +import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.external.api.AsterixInputStream;
 +import org.apache.asterix.external.api.IInputStreamFactory;
 +import org.apache.asterix.external.api.INodeResolver;
 +import org.apache.asterix.external.api.INodeResolverFactory;
 +import org.apache.asterix.external.input.stream.LocalFSInputStream;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.asterix.external.util.ExternalDataUtils;
- import org.apache.asterix.external.util.FeedUtils;
++import org.apache.asterix.external.util.FileSystemWatcher;
 +import org.apache.asterix.external.util.NodeResolverFactory;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 +import org.apache.hyracks.api.context.IHyracksTaskContext;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.api.io.FileReference;
 +import org.apache.hyracks.dataflow.std.file.FileSplit;
 +
 +public class LocalFSInputStreamFactory implements IInputStreamFactory {
 +
 +    private static final long serialVersionUID = 1L;
 +
 +    protected static final INodeResolver DEFAULT_NODE_RESOLVER = new NodeResolverFactory().createNodeResolver();
 +    protected static final Logger LOGGER = Logger.getLogger(LocalFSInputStreamFactory.class.getName());
 +    protected static INodeResolver nodeResolver;
 +    protected Map<String, String> configuration;
 +    protected FileSplit[] inputFileSplits;
-     protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage
 +    protected boolean isFeed;
 +    protected String expression;
 +    // transient fields (They don't need to be serialized and transferred)
 +    private transient AlgebricksAbsolutePartitionConstraint constraints;
++    private transient FileSystemWatcher watcher;
 +
 +    @Override
-     public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
-         try {
-             return new LocalFSInputStream(inputFileSplits, ctx, configuration, partition, expression, isFeed);
-         } catch (IOException e) {
-             throw new HyracksDataException(e);
++    public synchronized AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition)
++            throws HyracksDataException {
++        if (watcher == null) {
++            String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
++            ArrayList<Path> inputResources = new ArrayList<>();
++            for (int i = 0; i < inputFileSplits.length; i++) {
++                if (inputFileSplits[i].getNodeName().equals(nodeName)) {
++                    inputResources.add(inputFileSplits[i].getLocalFile().getFile().toPath());
++                }
++            }
++            watcher = new FileSystemWatcher(inputResources, expression, isFeed);
 +        }
++        return new LocalFSInputStream(watcher);
 +    }
 +
 +    @Override
 +    public DataSourceType getDataSourceType() {
 +        return DataSourceType.STREAM;
 +    }
 +
 +    @Override
 +    public boolean isIndexible() {
 +        return false;
 +    }
 +
 +    @Override
 +    public void configure(Map<String, String> configuration) throws AsterixException {
 +        this.configuration = configuration;
 +        String[] splits = configuration.get(ExternalDataConstants.KEY_PATH).split(",");
 +        configureFileSplits(splits);
 +        configurePartitionConstraint();
 +        this.isFeed = ExternalDataUtils.isFeed(configuration) && ExternalDataUtils.keepDataSourceOpen(configuration);
-         if (isFeed) {
-             feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
-                     ExternalDataUtils.getFeedName(configuration), constraints);
-         }
 +        this.expression = configuration.get(ExternalDataConstants.KEY_EXPRESSION);
 +    }
 +
 +    @Override
 +    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
 +        return constraints;
 +    }
 +
 +    private void configureFileSplits(String[] splits) throws AsterixException {
++        INodeResolver resolver = getNodeResolver();
 +        if (inputFileSplits == null) {
 +            inputFileSplits = new FileSplit[splits.length];
 +            String nodeName;
 +            String nodeLocalPath;
 +            int count = 0;
 +            String trimmedValue;
 +            for (String splitPath : splits) {
 +                trimmedValue = splitPath.trim();
 +                if (!trimmedValue.contains("://")) {
 +                    throw new AsterixException(
 +                            "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\"");
 +                }
-                 nodeName = trimmedValue.split(":")[0];
++                nodeName = resolver.resolveNode(trimmedValue.split(":")[0]);
 +                nodeLocalPath = trimmedValue.split("://")[1];
 +                FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
 +                inputFileSplits[count++] = fileSplit;
 +            }
 +        }
 +    }
 +
 +    private void configurePartitionConstraint() throws AsterixException {
-         String[] locs = new String[inputFileSplits.length];
-         String location;
++        Set<String> locs = new TreeSet<>();
 +        for (int i = 0; i < inputFileSplits.length; i++) {
-             location = getNodeResolver().resolveNode(inputFileSplits[i].getNodeName());
-             locs[i] = location;
++            String location = inputFileSplits[i].getNodeName();
++            locs.add(location);
 +        }
-         constraints = new AlgebricksAbsolutePartitionConstraint(locs);
++        constraints = new AlgebricksAbsolutePartitionConstraint(locs.toArray(new String[locs.size()]));
 +    }
 +
 +    protected INodeResolver getNodeResolver() {
 +        if (nodeResolver == null) {
 +            synchronized (DEFAULT_NODE_RESOLVER) {
 +                if (nodeResolver == null) {
 +                    nodeResolver = initializeNodeResolver();
 +                }
 +            }
 +        }
 +        return nodeResolver;
 +    }
 +
 +    private static INodeResolver initializeNodeResolver() {
 +        INodeResolver nodeResolver = null;
 +        String configuredNodeResolverFactory = System.getProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY);
 +        if (configuredNodeResolverFactory != null) {
 +            try {
 +                nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance()))
 +                        .createNodeResolver();
 +
 +            } catch (Exception e) {
 +                if (LOGGER.isLoggable(Level.WARNING)) {
 +                    LOGGER.log(Level.WARNING, "Unable to create node resolver from the configured classname "
 +                            + configuredNodeResolverFactory + "\n" + e.getMessage());
 +                }
 +                nodeResolver = DEFAULT_NODE_RESOLVER;
 +            }
 +        } else {
 +            nodeResolver = DEFAULT_NODE_RESOLVER;
 +        }
 +        return nodeResolver;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index d362201,0000000..6ba27d8
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@@ -1,129 -1,0 +1,124 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.provider;
 +
 +import java.io.IOException;
 +import java.util.Map;
 +
 +import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.external.api.AsterixInputStream;
 +import org.apache.asterix.external.api.IDataFlowController;
 +import org.apache.asterix.external.api.IDataParserFactory;
 +import org.apache.asterix.external.api.IExternalDataSourceFactory;
 +import org.apache.asterix.external.api.IIndexingDatasource;
 +import org.apache.asterix.external.api.IInputStreamFactory;
 +import org.apache.asterix.external.api.IRecordDataParser;
 +import org.apache.asterix.external.api.IRecordDataParserFactory;
 +import org.apache.asterix.external.api.IRecordReader;
 +import org.apache.asterix.external.api.IRecordReaderFactory;
 +import org.apache.asterix.external.api.IRecordWithPKDataParser;
 +import org.apache.asterix.external.api.IStreamDataParser;
 +import org.apache.asterix.external.api.IStreamDataParserFactory;
 +import org.apache.asterix.external.dataflow.ChangeFeedDataFlowController;
 +import org.apache.asterix.external.dataflow.ChangeFeedWithMetaDataFlowController;
 +import org.apache.asterix.external.dataflow.FeedRecordDataFlowController;
 +import org.apache.asterix.external.dataflow.FeedStreamDataFlowController;
 +import org.apache.asterix.external.dataflow.FeedTupleForwarder;
 +import org.apache.asterix.external.dataflow.FeedWithMetaDataFlowController;
 +import org.apache.asterix.external.dataflow.IndexingDataFlowController;
 +import org.apache.asterix.external.dataflow.RecordDataFlowController;
 +import org.apache.asterix.external.dataflow.StreamDataFlowController;
 +import org.apache.asterix.external.parser.RecordWithMetadataParser;
 +import org.apache.asterix.external.util.DataflowUtils;
 +import org.apache.asterix.external.util.ExternalDataUtils;
 +import org.apache.asterix.external.util.FeedLogManager;
 +import org.apache.asterix.external.util.FeedUtils;
 +import org.apache.asterix.om.types.ARecordType;
 +import org.apache.hyracks.api.context.IHyracksTaskContext;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
- import org.apache.hyracks.dataflow.std.file.FileSplit;
 +
 +public class DataflowControllerProvider {
 +
 +    // TODO: Instead, use a factory just like data source and data parser.
 +    @SuppressWarnings({ "rawtypes", "unchecked" })
 +    public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx,
 +            int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
-             Map<String, String> configuration, boolean indexingOp, boolean isFeed, FileSplit[] feedLogFileSplits)
-                     throws HyracksDataException {
++            Map<String, String> configuration, boolean indexingOp, boolean isFeed, FeedLogManager feedLogManager)
++            throws HyracksDataException {
 +        try {
-             FeedLogManager feedLogManager = null;
-             if (isFeed) {
-                 feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits);
-             }
 +            switch (dataSourceFactory.getDataSourceType()) {
 +                case RECORDS:
 +                    IRecordReaderFactory<?> recordReaderFactory = (IRecordReaderFactory<?>) dataSourceFactory;
 +                    IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(ctx, partition);
 +                    IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
 +                    IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
 +                    if (indexingOp) {
 +                        return new IndexingDataFlowController(ctx,
 +                                DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
 +                                recordReader, ((IIndexingDatasource) recordReader).getIndexer());
 +                    } else if (isFeed) {
 +                        FeedTupleForwarder tupleForwarder = (FeedTupleForwarder) DataflowUtils
 +                                .getTupleForwarder(configuration, feedLogManager);
 +                        boolean isChangeFeed = ExternalDataUtils.isChangeFeed(configuration);
 +                        boolean isRecordWithMeta = ExternalDataUtils.isRecordWithMeta(configuration);
 +                        if (isRecordWithMeta) {
 +                            if (isChangeFeed) {
 +                                int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
 +                                return new ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager,
 +                                        numOfKeys + 2, (RecordWithMetadataParser) dataParser, recordReader);
 +                            } else {
 +                                return new FeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, 2,
 +                                        (RecordWithMetadataParser) dataParser, recordReader);
 +                            }
 +                        } else if (isChangeFeed) {
 +                            int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
 +                            return new ChangeFeedDataFlowController(ctx, tupleForwarder, feedLogManager, numOfKeys + 1,
 +                                    (IRecordWithPKDataParser) dataParser, recordReader);
 +                        } else {
 +                            return new FeedRecordDataFlowController(ctx, tupleForwarder, feedLogManager, 1, dataParser,
 +                                    recordReader);
 +                        }
 +                    } else {
 +                        return new RecordDataFlowController(ctx,
 +                                DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
 +                                recordReader, 1);
 +                    }
 +                case STREAM:
 +                    IInputStreamFactory streamFactory = (IInputStreamFactory) dataSourceFactory;
 +                    AsterixInputStream stream = streamFactory.createInputStream(ctx, partition);
 +                    IStreamDataParserFactory streamParserFactory = (IStreamDataParserFactory) dataParserFactory;
 +                    IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition);
 +                    streamParser.setInputStream(stream);
 +                    if (isFeed) {
 +                        return new FeedStreamDataFlowController(ctx,
 +                                (FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager),
 +                                feedLogManager, FeedUtils.getNumOfFields(configuration), streamParser, stream);
 +                    } else {
 +                        return new StreamDataFlowController(ctx, DataflowUtils.getTupleForwarder(configuration, null),
 +                                streamParser);
 +                    }
 +                default:
 +                    throw new HyracksDataException(
 +                            "Unknown data source type: " + dataSourceFactory.getDataSourceType());
 +            }
 +        } catch (IOException | AsterixException e) {
 +            throw new HyracksDataException(e);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index f8d64e0,0000000..0f24f91
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@@ -1,149 -1,0 +1,115 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.provider;
 +
 +import java.util.Map;
 +
 +import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.external.api.IExternalDataSourceFactory;
 +import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
 +import org.apache.asterix.external.api.IInputStreamFactory;
 +import org.apache.asterix.external.api.IRecordReaderFactory;
 +import org.apache.asterix.external.input.HDFSDataSourceFactory;
 +import org.apache.asterix.external.input.record.reader.RecordWithPKTestReaderFactory;
 +import org.apache.asterix.external.input.record.reader.kv.KVReaderFactory;
 +import org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory;
- import org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReaderFactory;
- import org.apache.asterix.external.input.record.reader.stream.LineRecordReaderFactory;
- import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReaderFactory;
++import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
 +import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory;
 +import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory;
++import org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory;
 +import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
 +import org.apache.asterix.external.input.stream.factory.TwitterFirehoseStreamFactory;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.asterix.external.util.ExternalDataUtils;
 +
 +public class DatasourceFactoryProvider {
 +
 +    public static IExternalDataSourceFactory getExternalDataSourceFactory(Map<String, String> configuration)
 +            throws AsterixException {
 +        if (ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.RECORDS)) {
 +            String reader = configuration.get(ExternalDataConstants.KEY_READER);
 +            return DatasourceFactoryProvider.getRecordReaderFactory(reader, configuration);
 +        } else {
 +            // get stream source
 +            String streamSource = configuration.get(ExternalDataConstants.KEY_STREAM_SOURCE);
 +            return DatasourceFactoryProvider.getInputStreamFactory(streamSource, configuration);
 +        }
 +    }
 +
-     public static IInputStreamFactory getInputStreamFactory(String streamSource,
-             Map<String, String> configuration) throws AsterixException {
++    public static IInputStreamFactory getInputStreamFactory(String streamSource, Map<String, String> configuration)
++            throws AsterixException {
 +        IInputStreamFactory streamSourceFactory;
 +        if (ExternalDataUtils.isExternal(streamSource)) {
 +            String dataverse = ExternalDataUtils.getDataverse(configuration);
 +            streamSourceFactory = ExternalDataUtils.createExternalInputStreamFactory(dataverse, streamSource);
 +        } else {
 +            switch (streamSource) {
-                 case ExternalDataConstants.STREAM_HDFS:
-                     streamSourceFactory = new HDFSDataSourceFactory();
-                     break;
 +                case ExternalDataConstants.STREAM_LOCAL_FILESYSTEM:
 +                    streamSourceFactory = new LocalFSInputStreamFactory();
 +                    break;
-                 case ExternalDataConstants.STREAM_SOCKET:
++                case ExternalDataConstants.SOCKET:
 +                case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
 +                    streamSourceFactory = new SocketServerInputStreamFactory();
 +                    break;
 +                case ExternalDataConstants.STREAM_SOCKET_CLIENT:
 +                    streamSourceFactory = new SocketServerInputStreamFactory();
 +                    break;
 +                case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
 +                    streamSourceFactory = new TwitterFirehoseStreamFactory();
 +                    break;
 +                default:
 +                    throw new AsterixException("unknown input stream factory");
 +            }
 +        }
 +        return streamSourceFactory;
 +    }
 +
 +    public static IRecordReaderFactory<?> getRecordReaderFactory(String reader, Map<String, String> configuration)
 +            throws AsterixException {
 +        if (reader.equals(ExternalDataConstants.EXTERNAL)) {
 +            return ExternalDataUtils.createExternalRecordReaderFactory(configuration);
 +        }
-         String parser = configuration.get(ExternalDataConstants.KEY_PARSER);
-         IInputStreamFactory inputStreamFactory;
-         switch (parser) {
-             case ExternalDataConstants.FORMAT_ADM:
-             case ExternalDataConstants.FORMAT_JSON:
-             case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
-                 inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                 return new SemiStructuredRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
-             case ExternalDataConstants.FORMAT_LINE_SEPARATED:
-                 inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                 return new EmptyLineSeparatedRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
-             case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
-             case ExternalDataConstants.FORMAT_CSV:
-                 inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                 return new LineRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
-             case ExternalDataConstants.FORMAT_RECORD_WITH_METADATA:
-                 switch (reader) {
-                     case ExternalDataConstants.READER_KV:
-                         return new KVReaderFactory();
-                     case ExternalDataConstants.READER_KV_TEST:
-                         return new KVTestReaderFactory();
-                 }
-         }
-         String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
-         if (format != null) {
-             switch (format) {
-                 case ExternalDataConstants.FORMAT_ADM:
-                 case ExternalDataConstants.FORMAT_JSON:
-                 case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
-                     inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                     return new SemiStructuredRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
-                 case ExternalDataConstants.FORMAT_LINE_SEPARATED:
-                     inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                     return new EmptyLineSeparatedRecordReaderFactory()
-                             .setInputStreamFactoryProvider(inputStreamFactory);
-                 case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
-                 case ExternalDataConstants.FORMAT_CSV:
-                     inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                     return new LineRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
-             }
-         }
 +        switch (reader) {
++            case ExternalDataConstants.READER_KV:
++                return new KVReaderFactory();
++            case ExternalDataConstants.READER_KV_TEST:
++                return new KVTestReaderFactory();
 +            case ExternalDataConstants.READER_HDFS:
 +                return new HDFSDataSourceFactory();
++            case ExternalDataConstants.ALIAS_LOCALFS_ADAPTER:
++                return new StreamRecordReaderFactory(new LocalFSInputStreamFactory());
 +            case ExternalDataConstants.READER_TWITTER_PULL:
 +            case ExternalDataConstants.READER_TWITTER_PUSH:
++            case ExternalDataConstants.READER_PUSH_TWITTER:
++            case ExternalDataConstants.READER_PULL_TWITTER:
 +                return new TwitterRecordReaderFactory();
-             case ExternalDataConstants.READER_KV:
-                 return new KVReaderFactory();
-             case ExternalDataConstants.READER_KV_TEST:
-                 return new KVTestReaderFactory();
 +            case ExternalDataConstants.TEST_RECORD_WITH_PK:
 +                return new RecordWithPKTestReaderFactory();
++            case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
++                return new StreamRecordReaderFactory(new TwitterFirehoseStreamFactory());
++            case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
++            case ExternalDataConstants.SOCKET:
++                return new StreamRecordReaderFactory(new SocketServerInputStreamFactory());
++            case ExternalDataConstants.STREAM_SOCKET_CLIENT:
++                return new StreamRecordReaderFactory(new SocketClientInputStreamFactory());
 +            default:
 +                throw new AsterixException("unknown record reader factory: " + reader);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index 06928b3,0000000..682fb89
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@@ -1,76 -1,0 +1,76 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.provider;
 +
 +import java.util.Map;
 +
 +import javax.annotation.Nonnull;
 +
 +import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.external.api.IDataParserFactory;
 +import org.apache.asterix.external.parser.factory.ADMDataParserFactory;
 +import org.apache.asterix.external.parser.factory.DelimitedDataParserFactory;
 +import org.apache.asterix.external.parser.factory.HiveDataParserFactory;
 +import org.apache.asterix.external.parser.factory.RSSParserFactory;
 +import org.apache.asterix.external.parser.factory.RecordWithMetadataParserFactory;
 +import org.apache.asterix.external.parser.factory.TestRecordWithPKParserFactory;
 +import org.apache.asterix.external.parser.factory.TweetParserFactory;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.asterix.external.util.ExternalDataUtils;
 +
 +public class ParserFactoryProvider {
 +    public static IDataParserFactory getDataParserFactory(Map<String, String> configuration) throws AsterixException {
 +        IDataParserFactory parserFactory = null;
 +        String parserFactoryName = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
 +        if ((parserFactoryName != null) && ExternalDataUtils.isExternal(parserFactoryName)) {
 +            return ExternalDataUtils.createExternalParserFactory(ExternalDataUtils.getDataverse(configuration),
 +                    parserFactoryName);
 +        } else {
 +            parserFactory = ParserFactoryProvider
 +                    .getDataParserFactory(ExternalDataUtils.getRecordFormat(configuration));
 +        }
 +        return parserFactory;
 +    }
 +
 +    @SuppressWarnings("rawtypes")
 +    public static IDataParserFactory getDataParserFactory(@Nonnull String parser) throws AsterixException {
 +        switch (parser) {
 +            case ExternalDataConstants.FORMAT_ADM:
 +            case ExternalDataConstants.FORMAT_JSON:
 +            case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
 +                return new ADMDataParserFactory();
 +            case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
 +            case ExternalDataConstants.FORMAT_CSV:
 +                return new DelimitedDataParserFactory();
 +            case ExternalDataConstants.FORMAT_HIVE:
 +            case ExternalDataConstants.PARSER_HIVE:
 +                return new HiveDataParserFactory();
 +            case ExternalDataConstants.FORMAT_TWEET:
 +                return new TweetParserFactory();
 +            case ExternalDataConstants.FORMAT_RSS:
 +                return new RSSParserFactory();
 +            case ExternalDataConstants.FORMAT_RECORD_WITH_METADATA:
 +                return new RecordWithMetadataParserFactory();
 +            case ExternalDataConstants.TEST_RECORD_WITH_PK:
 +                return new TestRecordWithPKParserFactory();
 +            default:
-                 throw new AsterixException("Unknown parser " + parser);
++                throw new AsterixException("Unknown format: " + parser);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index a02152b,0000000..b5ec27a
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@@ -1,232 -1,0 +1,232 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.util;
 +
 +import org.apache.hadoop.hdfs.DistributedFileSystem;
 +import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 +import org.apache.hadoop.mapred.SequenceFileInputFormat;
 +import org.apache.hadoop.mapred.TextInputFormat;
 +
 +public class ExternalDataConstants {
 +    // TODO: Remove unused variables.
 +    /**
 +     * Keys
 +     */
 +    // used to specify the stream factory for an adapter that has a stream data source
 +    public static final String KEY_STREAM = "stream";
 +    // used to specify the dataverse of the adapter
 +    public static final String KEY_DATAVERSE = "dataverse";
 +    // used to specify the socket addresses when reading data from sockets
 +    public static final String KEY_SOCKETS = "sockets";
 +    // specify whether the socket address points to an NC or an IP
 +    public static final String KEY_MODE = "address-type";
 +    // specify the HDFS name node address when reading HDFS data
 +    public static final String KEY_HDFS_URL = "hdfs";
 +    // specify the path when reading from a file system
 +    public static final String KEY_PATH = "path";
 +    // specify the HDFS input format when reading data from HDFS
 +    public static final String KEY_INPUT_FORMAT = "input-format";
 +    // specifies the filesystem (localfs or HDFS) when using a filesystem data source
 +    public static final String KEY_FILESYSTEM = "fs";
 +    // specifies the address of the HDFS name node
 +    public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
 +    // specifies the class implementation of the accessed instance of HDFS
 +    public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
 +    public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
 +    public static final String KEY_HADOOP_INPUT_FORMAT = "mapred.input.format.class";
 +    public static final String KEY_HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
 +    public static final String KEY_HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
 +    public static final String KEY_HADOOP_BUFFER_SIZE = "io.file.buffer.size";
 +    public static final String KEY_SOURCE_DATATYPE = "type-name";
 +    public static final String KEY_DELIMITER = "delimiter";
 +    public static final String KEY_PARSER_FACTORY = "tuple-parser";
 +    public static final String KEY_DATA_PARSER = "parser";
 +    public static final String KEY_HEADER = "header";
 +    public static final String KEY_READER = "reader";
 +    public static final String KEY_READER_STREAM = "stream";
 +    public static final String KEY_TYPE_NAME = "type-name";
 +    public static final String KEY_RECORD_START = "record-start";
 +    public static final String KEY_RECORD_END = "record-end";
 +    public static final String KEY_EXPRESSION = "expression";
 +    public static final String KEY_LOCAL_SOCKET_PATH = "local-socket-path";
 +    public static final String KEY_FORMAT = "format";
 +    public static final String KEY_QUOTE = "quote";
 +    public static final String KEY_PARSER = "parser";
 +    public static final String KEY_DATASET_RECORD = "dataset-record";
 +    public static final String KEY_HIVE_SERDE = "hive-serde";
 +    public static final String KEY_RSS_URL = "url";
 +    public static final String KEY_INTERVAL = "interval";
-     public static final String KEY_PULL = "pull";
-     public static final String KEY_PUSH = "push";
 +    public static final String KEY_IS_FEED = "is-feed";
 +    public static final String KEY_WAIT_FOR_DATA = "wait-for-data";
 +    public static final String KEY_FEED_NAME = "feed";
 +    // a string representing external bucket name
 +    public static final String KEY_BUCKET = "bucket";
 +    // a comma delimited list of nodes
 +    public static final String KEY_NODES = "nodes";
 +    // a string representing the password used to authenticate with the external data source
 +    public static final String KEY_PASSWORD = "password";
 +    // an integer representing the number of raw records that can be buffered in the parsing queue
 +    public static final String KEY_QUEUE_SIZE = "queue-size";
 +    // a comma delimited integers representing the indexes of the meta fields in the raw record (i,e: "3,1,0,2" denotes that the first meta field is in index 3 in the actual record)
 +    public static final String KEY_META_INDEXES = "meta-indexes";
 +    // an integer representing the index of the value field in the data type
 +    public static final String KEY_VALUE_INDEX = "value-index";
 +    // a string representing the format of the raw record in the value field in the data type
 +    public static final String KEY_VALUE_FORMAT = "value-format";
 +    // a boolean indicating whether the feed is a change feed
 +    public static final String KEY_IS_CHANGE_FEED = "change-feed";
 +    // an integer representing the number of keys in a change feed
 +    public static final String KEY_KEY_SIZE = "key-size";
 +    // a boolean indicating whether the feed produces records with metadata
 +    public static final String FORMAT_RECORD_WITH_METADATA = "record-with-metadata";
 +    // a string representing the format of the record (for adapters which produces records with additional information like pk or metadata)
 +    public static final String KEY_RECORD_FORMAT = "record-format";
 +    public static final String KEY_META_TYPE_NAME = "meta-type-name";
 +    public static final String READER_STREAM = "stream";
 +    /**
 +     * HDFS class names
 +     */
 +    public static final String CLASS_NAME_TEXT_INPUT_FORMAT = TextInputFormat.class.getName();
 +    public static final String CLASS_NAME_SEQUENCE_INPUT_FORMAT = SequenceFileInputFormat.class.getName();
 +    public static final String CLASS_NAME_RC_INPUT_FORMAT = RCFileInputFormat.class.getName();
 +    public static final String CLASS_NAME_HDFS_FILESYSTEM = DistributedFileSystem.class.getName();
 +    /**
 +     * input formats aliases
 +     */
 +    public static final String INPUT_FORMAT_TEXT = "text-input-format";
 +    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
 +    public static final String INPUT_FORMAT_RC = "rc-input-format";
 +    /**
 +     * Builtin streams
 +     */
 +
 +    /**
 +     * Builtin record readers
 +     */
 +    public static final String READER_HDFS = "hdfs";
 +    public static final String READER_KV = "key-value";
-     public static final String READER_TWITTER_PUSH = "twitter-push";
-     public static final String READER_TWITTER_PULL = "twitter-pull";
++    public static final String READER_TWITTER_PUSH = "twitter_push";
++    public static final String READER_PUSH_TWITTER = "push_twitter";
++    public static final String READER_TWITTER_PULL = "twitter_pull";
++    public static final String READER_PULL_TWITTER = "pull_twitter";
 +
 +    public static final String CLUSTER_LOCATIONS = "cluster-locations";
 +    public static final String SCHEDULER = "hdfs-scheduler";
 +    public static final String PARSER_HIVE = "hive-parser";
 +    public static final String HAS_HEADER = "has.header";
 +    public static final String TIME_TRACKING = "time.tracking";
 +    public static final String DEFAULT_QUOTE = "\"";
 +    public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
 +    public static final String DEFAULT_DELIMITER = ",";
 +    public static final String EXTERNAL_LIBRARY_SEPARATOR = "#";
 +    public static final String HDFS_INDEXING_ADAPTER = "hdfs-indexing-adapter";
 +    /**
 +     * supported builtin record formats
 +     */
 +    public static final String FORMAT_HIVE = "hive";
 +    public static final String FORMAT_BINARY = "binary";
 +    public static final String FORMAT_ADM = "adm";
 +    public static final String FORMAT_JSON = "json";
 +    public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
 +    public static final String FORMAT_TWEET = "twitter-status";
 +    public static final String FORMAT_RSS = "rss";
 +    public static final String FORMAT_SEMISTRUCTURED = "semi-structured";
 +    public static final String FORMAT_LINE_SEPARATED = "line-separated";
 +    public static final String FORMAT_HDFS_WRITABLE = "hdfs-writable";
 +    public static final String FORMAT_KV = "kv";
 +
 +    /**
 +     * input streams
 +     */
 +    public static final String STREAM_HDFS = "hdfs";
 +    public static final String STREAM_LOCAL_FILESYSTEM = "localfs";
-     public static final String STREAM_SOCKET = "socket";
++    public static final String SOCKET = "socket";
 +    public static final String STREAM_SOCKET_CLIENT = "socket-client";
 +
 +    /**
 +     * adapter aliases
 +     */
 +    public static final String ALIAS_GENERIC_ADAPTER = "adapter";
 +    public static final String ALIAS_LOCALFS_ADAPTER = "localfs";
 +    public static final String ALIAS_LOCALFS_PUSH_ADAPTER = "push_localfs";
 +    public static final String ALIAS_HDFS_ADAPTER = "hdfs";
 +    public static final String ALIAS_SOCKET_ADAPTER = "socket_adapter";
 +    public static final String ALIAS_TWITTER_FIREHOSE_ADAPTER = "twitter_firehose";
 +    public static final String ALIAS_SOCKET_CLIENT_ADAPTER = "socket_client";
 +    public static final String ALIAS_RSS_ADAPTER = "rss_feed";
 +    public static final String ALIAS_FILE_FEED_ADAPTER = "file_feed";
 +    public static final String ALIAS_TWITTER_PUSH_ADAPTER = "push_twitter";
 +    public static final String ALIAS_TWITTER_PULL_ADAPTER = "pull_twitter";
 +    public static final String ALIAS_CNN_ADAPTER = "cnn_feed";
 +    public static final String ALIAS_FEED_WITH_META_ADAPTER = "feed_with_meta";
 +    public static final String ALIAS_CHANGE_FEED_WITH_META_ADAPTER = "change_feed_with_meta";
 +    // for testing purposes
 +    public static final String ALIAS_TEST_CHANGE_ADAPTER = "test_change_feed";
 +
 +    /**
 +     * Constant String values
 +     */
 +    public static final String TRUE = "true";
 +    public static final String FALSE = "false";
 +
 +    /**
 +     * Constant characters
 +     */
 +    public static final char ESCAPE = '\\';
 +    public static final char QUOTE = '"';
 +    public static final char SPACE = ' ';
 +    public static final char TAB = '\t';
 +    public static final char LF = '\n';
 +    public static final char CR = '\r';
 +    public static final char DEFAULT_RECORD_START = '{';
 +    public static final char DEFAULT_RECORD_END = '}';
 +
 +    /**
 +     * Constant byte characters
 +     */
 +    public static final byte BYTE_LF = '\n';
 +    public static final byte BYTE_CR = '\r';
 +    /**
 +     * Size default values
 +     */
 +    public static final int DEFAULT_BUFFER_SIZE = 4096;
 +    public static final int DEFAULT_BUFFER_INCREMENT = 2048;
 +    public static final int DEFAULT_QUEUE_SIZE = 64;
 +    public static final int MAX_RECORD_SIZE = 32000000;
 +
 +    /**
 +     * Expected parameter values
 +     */
 +    public static final String PARAMETER_OF_SIZE_ONE = "Value of size 1";
 +    public static final String LARGE_RECORD_ERROR_MESSAGE = "Record is too large";
 +    public static final String KEY_RECORD_INDEX = "record-index";
 +    public static final String FORMAT_DCP = "dcp";
 +    public static final String KEY_KEY_INDEXES = "key-indexes";
 +    public static final String KEY_KEY_INDICATORS = "key-indicators";
 +    public static final String KEY_STREAM_SOURCE = "stream-source";
 +    public static final String EXTERNAL = "external";
 +    public static final String KEY_READER_FACTORY = "reader-factory";
 +    public static final String READER_KV_TEST = "kv_test";
 +    public static final String READER_RSS = "rss";
 +    public static final String FORMAT_CSV = "csv";
 +    public static final String TEST_RECORD_WITH_PK = "test-record-with-pk";
 +
 +    public static final String ERROR_LARGE_RECORD = "Record is too large";
 +    public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 42fe8bf,0000000..76898c2
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@@ -1,342 -1,0 +1,326 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.util;
 +
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.external.api.IDataParserFactory;
 +import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
 +import org.apache.asterix.external.api.IInputStreamFactory;
 +import org.apache.asterix.external.api.IRecordReaderFactory;
 +import org.apache.asterix.external.library.ExternalLibraryManager;
 +import org.apache.asterix.om.types.ARecordType;
 +import org.apache.asterix.om.types.ATypeTag;
 +import org.apache.asterix.om.types.AUnionType;
 +import org.apache.asterix.om.types.IAType;
 +import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 +import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
 +import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
 +import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 +import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
 +import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
 +import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 +
 +public class ExternalDataUtils {
 +
 +    // Get a delimiter from the given configuration
 +    public static char getDelimiter(Map<String, String> configuration) throws AsterixException {
 +        String delimiterValue = configuration.get(ExternalDataConstants.KEY_DELIMITER);
 +        if (delimiterValue == null) {
 +            delimiterValue = ExternalDataConstants.DEFAULT_DELIMITER;
 +        } else if (delimiterValue.length() != 1) {
 +            throw new AsterixException(
 +                    "'" + delimiterValue + "' is not a valid delimiter. The length of a delimiter should be 1.");
 +        }
 +        return delimiterValue.charAt(0);
 +    }
 +
 +    // Get a quote from the given configuration when the delimiter is given
 +    // Need to pass delimiter to check whether they share the same character
 +    public static char getQuote(Map<String, String> configuration, char delimiter) throws AsterixException {
 +        String quoteValue = configuration.get(ExternalDataConstants.KEY_QUOTE);
 +        if (quoteValue == null) {
 +            quoteValue = ExternalDataConstants.DEFAULT_QUOTE;
 +        } else if (quoteValue.length() != 1) {
 +            throw new AsterixException("'" + quoteValue + "' is not a valid quote. The length of a quote should be 1.");
 +        }
 +
 +        // Since delimiter (char type value) can't be null,
 +        // we only check whether delimiter and quote use the same character
 +        if (quoteValue.charAt(0) == delimiter) {
 +            throw new AsterixException(
 +                    "Quote '" + quoteValue + "' cannot be used with the delimiter '" + delimiter + "'. ");
 +        }
 +
 +        return quoteValue.charAt(0);
 +    }
 +
 +    // Get the header flag
 +    public static boolean getHasHeader(Map<String, String> configuration) {
 +        return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_HEADER));
 +    }
 +
 +    public static void validateParameters(Map<String, String> configuration) throws AsterixException {
 +        String reader = configuration.get(ExternalDataConstants.KEY_READER);
 +        if (reader == null) {
 +            throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER + " must be specified.");
 +        }
-         String parser = configuration.get(ExternalDataConstants.KEY_PARSER);
++        String parser = configuration.get(ExternalDataConstants.KEY_FORMAT);
 +        if (parser == null) {
-             throw new AsterixException("The parameter " + ExternalDataConstants.KEY_PARSER + " must be specified.");
++            throw new AsterixException("The parameter " + ExternalDataConstants.KEY_FORMAT + " must be specified.");
 +        }
 +    }
 +
 +    public static DataSourceType getDataSourceType(Map<String, String> configuration) {
 +        String reader = configuration.get(ExternalDataConstants.KEY_READER);
 +        if ((reader != null) && reader.equals(ExternalDataConstants.READER_STREAM)) {
 +            return DataSourceType.STREAM;
 +        } else {
 +            return DataSourceType.RECORDS;
 +        }
 +    }
 +
 +    public static boolean isExternal(String aString) {
 +        return ((aString != null) && aString.contains(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR)
 +                && (aString.trim().length() > 1));
 +    }
 +
 +    public static ClassLoader getClassLoader(String dataverse, String library) {
 +        return ExternalLibraryManager.getLibraryClassLoader(dataverse, library);
 +    }
 +
 +    public static String getLibraryName(String aString) {
 +        return aString.trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
 +    }
 +
 +    public static String getExternalClassName(String aString) {
 +        return aString.trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[1];
 +    }
 +
 +    public static IInputStreamFactory createExternalInputStreamFactory(String dataverse, String stream)
 +            throws AsterixException {
 +        try {
 +            String libraryName = getLibraryName(stream);
 +            String className = getExternalClassName(stream);
 +            ClassLoader classLoader = getClassLoader(dataverse, libraryName);
 +            return ((IInputStreamFactory) (classLoader.loadClass(className).newInstance()));
 +        } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
 +            throw new AsterixException("Failed to create stream factory", e);
 +        }
 +    }
 +
 +    public static String getDataverse(Map<String, String> configuration) {
 +        return configuration.get(ExternalDataConstants.KEY_DATAVERSE);
 +    }
 +
 +    public static String getRecordFormat(Map<String, String> configuration) {
 +        String parserFormat = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
 +        return parserFormat != null ? parserFormat : configuration.get(ExternalDataConstants.KEY_FORMAT);
 +    }
 +
 +    public static void setRecordFormat(Map<String, String> configuration, String format) {
 +        if (!configuration.containsKey(ExternalDataConstants.KEY_DATA_PARSER)) {
 +            configuration.put(ExternalDataConstants.KEY_DATA_PARSER, format);
 +        }
 +        if (!configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
 +            configuration.put(ExternalDataConstants.KEY_FORMAT, format);
 +        }
 +    }
 +
 +    private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
 +
 +    private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
 +        Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, IValueParserFactory>();
 +        m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
 +        m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
 +        m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
 +        m.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
 +        m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
 +        return m;
 +    }
 +
 +    public static IValueParserFactory[] getValueParserFactories(ARecordType recordType) {
 +        int n = recordType.getFieldTypes().length;
 +        IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
 +        for (int i = 0; i < n; i++) {
 +            ATypeTag tag = null;
 +            if (recordType.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) {
 +                List<IAType> unionTypes = ((AUnionType) recordType.getFieldTypes()[i]).getUnionList();
 +                if ((unionTypes.size() != 2) && (unionTypes.get(0).getTypeTag() != ATypeTag.NULL)) {
 +                    throw new NotImplementedException("Non-optional UNION type is not supported.");
 +                }
 +                tag = unionTypes.get(1).getTypeTag();
 +            } else {
 +                tag = recordType.getFieldTypes()[i].getTypeTag();
 +            }
 +            if (tag == null) {
 +                throw new NotImplementedException("Failed to get the type information for field " + i + ".");
 +            }
 +            fieldParserFactories[i] = getParserFactory(tag);
 +        }
 +        return fieldParserFactories;
 +    }
 +
 +    public static IValueParserFactory getParserFactory(ATypeTag tag) {
 +        IValueParserFactory vpf = valueParserFactoryMap.get(tag);
 +        if (vpf == null) {
 +            throw new NotImplementedException("No value parser factory for fields of type " + tag);
 +        }
 +        return vpf;
 +    }
 +
 +    public static String getRecordReaderStreamName(Map<String, String> configuration) {
 +        return configuration.get(ExternalDataConstants.KEY_READER_STREAM);
 +    }
 +
 +    public static boolean hasHeader(Map<String, String> configuration) {
 +        String value = configuration.get(ExternalDataConstants.KEY_HEADER);
 +        if (value != null) {
 +            return Boolean.valueOf(value);
 +        }
 +        return false;
 +    }
 +
-     public static boolean isPull(Map<String, String> configuration) {
-         String pull = configuration.get(ExternalDataConstants.KEY_PULL);
-         if (pull == null) {
-             return false;
-         }
-         return Boolean.parseBoolean(pull);
-     }
- 
-     public static boolean isPush(Map<String, String> configuration) {
-         String push = configuration.get(ExternalDataConstants.KEY_PUSH);
-         if (push == null) {
-             return false;
-         }
-         return Boolean.parseBoolean(push);
-     }
- 
 +    public static IRecordReaderFactory<?> createExternalRecordReaderFactory(Map<String, String> configuration)
 +            throws AsterixException {
 +        String readerFactory = configuration.get(ExternalDataConstants.KEY_READER_FACTORY);
 +        if (readerFactory == null) {
 +            throw new AsterixException("to use " + ExternalDataConstants.EXTERNAL + " reader, the parameter "
 +                    + ExternalDataConstants.KEY_READER_FACTORY + " must be specified.");
 +        }
 +        String[] libraryAndFactory = readerFactory.split(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR);
 +        if (libraryAndFactory.length != 2) {
 +            throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY
 +                    + " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\"");
 +        }
 +        String[] dataverseAndLibrary = libraryAndFactory[0].split(".");
 +        if (dataverseAndLibrary.length != 2) {
 +            throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY
 +                    + " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\"");
 +        }
 +
 +        ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(dataverseAndLibrary[0],
 +                dataverseAndLibrary[1]);
 +        try {
 +            return (IRecordReaderFactory<?>) classLoader.loadClass(libraryAndFactory[1]).newInstance();
 +        } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
 +            throw new AsterixException("Failed to create record reader factory", e);
 +        }
 +    }
 +
 +    public static IDataParserFactory createExternalParserFactory(String dataverse, String parserFactoryName)
 +            throws AsterixException {
 +        try {
 +            String library = parserFactoryName.substring(0,
 +                    parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR));
 +            ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(dataverse, library);
 +            return (IDataParserFactory) classLoader
 +                    .loadClass(parserFactoryName
 +                            .substring(parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) + 1))
 +                    .newInstance();
 +        } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
 +            throw new AsterixException("Failed to create an external parser factory", e);
 +        }
 +    }
 +
 +    public static boolean isFeed(Map<String, String> configuration) {
 +        if (!configuration.containsKey(ExternalDataConstants.KEY_IS_FEED)) {
 +            return false;
 +        } else {
 +            return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_FEED));
 +        }
 +    }
 +
 +    public static void prepareFeed(Map<String, String> configuration, String dataverseName, String feedName) {
 +        if (!configuration.containsKey(ExternalDataConstants.KEY_IS_FEED)) {
 +            configuration.put(ExternalDataConstants.KEY_IS_FEED, ExternalDataConstants.TRUE);
 +        }
 +        configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataverseName);
 +        configuration.put(ExternalDataConstants.KEY_FEED_NAME, feedName);
 +    }
 +
 +    public static boolean keepDataSourceOpen(Map<String, String> configuration) {
 +        if (!configuration.containsKey(ExternalDataConstants.KEY_WAIT_FOR_DATA)) {
 +            return true;
 +        }
 +        return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_WAIT_FOR_DATA));
 +    }
 +
 +    public static String getFeedName(Map<String, String> configuration) {
 +        return configuration.get(ExternalDataConstants.KEY_FEED_NAME);
 +    }
 +
 +    public static int getQueueSize(Map<String, String> configuration) {
 +        return configuration.containsKey(ExternalDataConstants.KEY_QUEUE_SIZE)
 +                ? Integer.parseInt(configuration.get(ExternalDataConstants.KEY_QUEUE_SIZE))
 +                : ExternalDataConstants.DEFAULT_QUEUE_SIZE;
 +    }
 +
 +    public static boolean isRecordWithMeta(Map<String, String> configuration) {
 +        return configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME);
 +    }
 +
 +    public static void setRecordWithMeta(Map<String, String> configuration, String booleanString) {
 +        configuration.put(ExternalDataConstants.FORMAT_RECORD_WITH_METADATA, booleanString);
 +    }
 +
 +    public static boolean isChangeFeed(Map<String, String> configuration) {
 +        return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_CHANGE_FEED));
 +    }
 +
 +    public static int getNumberOfKeys(Map<String, String> configuration) throws AsterixException {
 +        String keyIndexes = configuration.get(ExternalDataConstants.KEY_KEY_INDEXES);
 +        if (keyIndexes == null) {
 +            throw new AsterixException(
 +                    "A change feed must have the parameter " + ExternalDataConstants.KEY_KEY_INDEXES);
 +        }
 +        return keyIndexes.split(",").length;
 +    }
 +
 +    public static void setNumberOfKeys(Map<String, String> configuration, int value) {
 +        configuration.put(ExternalDataConstants.KEY_KEY_SIZE, String.valueOf(value));
 +    }
 +
 +    public static void setChangeFeed(Map<String, String> configuration, String booleanString) {
 +        configuration.put(ExternalDataConstants.KEY_IS_CHANGE_FEED, booleanString);
 +    }
 +
 +    public static int[] getPKIndexes(Map<String, String> configuration) {
 +        String keyIndexes = configuration.get(ExternalDataConstants.KEY_KEY_INDEXES);
 +        String[] stringIndexes = keyIndexes.split(",");
 +        int[] intIndexes = new int[stringIndexes.length];
 +        for (int i = 0; i < stringIndexes.length; i++) {
 +            intIndexes[i] = Integer.parseInt(stringIndexes[i]);
 +        }
 +        return intIndexes;
 +    }
 +
 +    public static int[] getPKSourceIndicators(Map<String, String> configuration) {
 +        String keyIndicators = configuration.get(ExternalDataConstants.KEY_KEY_INDICATORS);
 +        String[] stringIndicators = keyIndicators.split(",");
 +        int[] intIndicators = new int[stringIndicators.length];
 +        for (int i = 0; i < stringIndicators.length; i++) {
 +            intIndicators[i] = Integer.parseInt(stringIndicators[i]);
 +        }
 +        return intIndicators;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
index fc15d3c,0000000..5bb8ec3
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
@@@ -1,172 -1,0 +1,180 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.util;
 +
 +import java.io.BufferedReader;
 +import java.io.BufferedWriter;
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.charset.StandardCharsets;
 +import java.nio.file.Files;
 +import java.nio.file.Path;
 +import java.nio.file.Paths;
 +import java.nio.file.StandardOpenOption;
 +import java.util.TreeSet;
 +
 +import org.apache.commons.io.FileUtils;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +
 +public class FeedLogManager {
 +
 +    public enum LogEntryType {
 +        START, // partition start
 +        END, // partition end
 +        COMMIT, // a record commit within a partition
-         SNAPSHOT // an identifier that partitions with identifiers before this one should be
-                  // ignored
++        SNAPSHOT // an identifier that partitions with identifiers before this one should be ignored
 +    }
 +
 +    public static final String PROGRESS_LOG_FILE_NAME = "progress.log";
 +    public static final String ERROR_LOG_FILE_NAME = "error.log";
 +    public static final String BAD_RECORDS_FILE_NAME = "failed_record.log";
 +    public static final String START_PREFIX = "s:";
 +    public static final String END_PREFIX = "e:";
 +    public static final int PREFIX_SIZE = 2;
 +    private String currentPartition;
 +    private final TreeSet<String> completed;
 +    private final Path dir;
 +    private BufferedWriter progressLogger;
 +    private BufferedWriter errorLogger;
 +    private BufferedWriter recordLogger;
 +    private final StringBuilder stringBuilder = new StringBuilder();
++    private int count = 0;
 +
 +    public FeedLogManager(File file) throws HyracksDataException {
 +        try {
 +            this.dir = file.toPath();
 +            this.completed = new TreeSet<String>();
 +            if (!exists()) {
 +                create();
 +            }
 +            open();
 +        } catch (IOException e) {
 +            throw new HyracksDataException(e);
 +        }
 +    }
 +
-     public void endPartition() throws IOException {
++    public synchronized void touch() {
++        count++;
++    }
++
++    public synchronized void endPartition() throws IOException {
 +        logProgress(END_PREFIX + currentPartition);
 +        completed.add(currentPartition);
 +    }
 +
-     public void endPartition(String partition) throws IOException {
++    public synchronized void endPartition(String partition) throws IOException {
 +        currentPartition = partition;
 +        logProgress(END_PREFIX + currentPartition);
 +        completed.add(currentPartition);
 +    }
 +
-     public void startPartition(String partition) throws IOException {
++    public synchronized void startPartition(String partition) throws IOException {
 +        currentPartition = partition;
 +        logProgress(START_PREFIX + currentPartition);
 +    }
 +
 +    public boolean exists() {
 +        return Files.exists(dir);
 +    }
 +
-     public void open() throws IOException {
++    public synchronized void open() throws IOException {
 +        // read content of logs.
 +        BufferedReader reader = Files.newBufferedReader(
 +                Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME));
 +        String log = reader.readLine();
 +        while (log != null) {
 +            if (log.startsWith(END_PREFIX)) {
 +                completed.add(getSplitId(log));
 +            }
 +            log = reader.readLine();
 +        }
 +        reader.close();
 +
 +        progressLogger = Files.newBufferedWriter(
 +                Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME),
 +                StandardCharsets.UTF_8, StandardOpenOption.APPEND);
 +        errorLogger = Files.newBufferedWriter(
 +                Paths.get(dir.toAbsolutePath().toString() + File.separator + ERROR_LOG_FILE_NAME),
 +                StandardCharsets.UTF_8, StandardOpenOption.APPEND);
 +        recordLogger = Files.newBufferedWriter(
 +                Paths.get(dir.toAbsolutePath().toString() + File.separator + BAD_RECORDS_FILE_NAME),
 +                StandardCharsets.UTF_8, StandardOpenOption.APPEND);
 +    }
 +
-     public void close() throws IOException {
++    public synchronized void close() throws IOException {
++        count--;
++        if (count > 0) {
++            return;
++        }
 +        progressLogger.close();
 +        errorLogger.close();
 +        recordLogger.close();
 +    }
 +
-     public boolean create() throws IOException {
++    public synchronized boolean create() throws IOException {
 +        File f = dir.toFile();
 +        f.mkdirs();
 +        new File(f, PROGRESS_LOG_FILE_NAME).createNewFile();
 +        new File(f, ERROR_LOG_FILE_NAME).createNewFile();
 +        new File(f, BAD_RECORDS_FILE_NAME).createNewFile();
 +        return true;
 +    }
 +
-     public boolean destroy() throws IOException {
++    public synchronized boolean destroy() throws IOException {
 +        File f = dir.toFile();
 +        FileUtils.deleteDirectory(f);
 +        return true;
 +    }
 +
-     public void logProgress(String log) throws IOException {
++    public synchronized void logProgress(String log) throws IOException {
 +        stringBuilder.setLength(0);
 +        stringBuilder.append(log);
 +        stringBuilder.append(ExternalDataConstants.LF);
 +        progressLogger.write(stringBuilder.toString());
 +        progressLogger.flush();
 +    }
 +
-     public void logError(String error, Throwable th) throws IOException {
++    public synchronized void logError(String error, Throwable th) throws IOException {
 +        stringBuilder.setLength(0);
 +        stringBuilder.append(error);
 +        stringBuilder.append(ExternalDataConstants.LF);
 +        stringBuilder.append(th.toString());
 +        stringBuilder.append(ExternalDataConstants.LF);
 +        errorLogger.write(stringBuilder.toString());
 +        errorLogger.flush();
 +    }
 +
-     public void logRecord(String record, String errorMessage) throws IOException {
++    public synchronized void logRecord(String record, String errorMessage) throws IOException {
 +        stringBuilder.setLength(0);
 +        stringBuilder.append(record);
 +        stringBuilder.append(ExternalDataConstants.LF);
 +        stringBuilder.append(errorMessage);
 +        stringBuilder.append(ExternalDataConstants.LF);
 +        recordLogger.write(stringBuilder.toString());
 +        recordLogger.flush();
 +    }
 +
 +    public static String getSplitId(String log) {
 +        return log.substring(PREFIX_SIZE);
 +    }
 +
-     public boolean isSplitRead(String split) {
++    public synchronized boolean isSplitRead(String split) {
 +        return completed.contains(split);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 5ab41af,0000000..502a432
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@@ -1,123 -1,0 +1,111 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.util;
 +
 +import java.io.File;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.asterix.common.cluster.ClusterPartition;
 +import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.common.utils.StoragePathUtil;
 +import org.apache.asterix.om.util.AsterixClusterProperties;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
 +import org.apache.hyracks.api.comm.FrameHelper;
 +import org.apache.hyracks.api.context.IHyracksTaskContext;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.api.io.FileReference;
 +import org.apache.hyracks.api.io.IIOManager;
 +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 +import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 +import org.apache.hyracks.dataflow.std.file.FileSplit;
 +
 +public class FeedUtils {
 +    private static String prepareDataverseFeedName(String dataverseName, String feedName) {
 +        return dataverseName + File.separator + feedName;
 +    }
 +
-     public static FileSplit splitsForAdapter(String dataverseName, String feedName, int partition,
-             ClusterPartition[] nodePartitions) {
++    public static FileSplit splitsForAdapter(String dataverseName, String feedName, String nodeName,
++            ClusterPartition partition) {
 +        File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
 +        String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
-         ClusterPartition nodePartition = nodePartitions[0];
 +        String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName,
-                 nodePartition.getPartitionId());
-         // format: 'storage dir name'/partition_#/dataverse/feed/adapter_#
-         File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator
-                 + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + partition);
-         return StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f);
++                partition.getPartitionId());
++        // Note: feed adapter instances in a single node share the feed logger
++        // format: 'storage dir name'/partition_#/dataverse/feed/node
++        File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator + nodeName);
++        return StoragePathUtil.getFileSplitForClusterPartition(partition, f);
 +    }
 +
 +    public static FileSplit[] splitsForAdapter(String dataverseName, String feedName,
 +            AlgebricksPartitionConstraint partitionConstraints) throws AsterixException {
 +        if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) {
 +            throw new AsterixException("Can't create file splits for adapter with count partitioning constraints");
 +        }
-         File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
-         String[] locations = null;
-         locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
++        String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
 +        List<FileSplit> splits = new ArrayList<FileSplit>();
-         String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
-         int i = 0;
 +        for (String nd : locations) {
-             // Always get the first partition
-             ClusterPartition nodePartition = AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0];
-             String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName,
-                     nodePartition.getPartitionId());
-             // format: 'storage dir name'/partition_#/dataverse/feed/adapter_#
-             File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator
-                     + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + i);
-             splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f));
-             i++;
++            splits.add(splitsForAdapter(dataverseName, feedName, nd,
++                    AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0]));
 +        }
 +        return splits.toArray(new FileSplit[] {});
 +    }
 +
 +    public static FileReference getAbsoluteFileRef(String relativePath, int ioDeviceId, IIOManager ioManager) {
 +        return ioManager.getAbsoluteFileRef(ioDeviceId, relativePath);
 +    }
 +
 +    public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, int partition,
 +            FileSplit[] feedLogFileSplits) throws HyracksDataException {
 +        return new FeedLogManager(
 +                FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getLocalFile().getFile().getPath(),
 +                        feedLogFileSplits[partition].getIODeviceId(), ctx.getIOManager()).getFile());
 +    }
 +
 +    public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, FileSplit feedLogFileSplit)
 +            throws HyracksDataException {
 +        return new FeedLogManager(FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getLocalFile().getFile().getPath(),
 +                feedLogFileSplit.getIODeviceId(), ctx.getIOManager()).getFile());
 +    }
 +
 +    public static void processFeedMessage(ByteBuffer input, ByteBuffer message, FrameTupleAccessor fta) {
 +        // read the message and reduce the number of tuples
 +        fta.reset(input);
 +        int tc = fta.getTupleCount() - 1;
 +        int offset = fta.getTupleStartOffset(tc);
 +        int len = fta.getTupleLength(tc);
 +        message.clear();
 +        message.put(input.array(), offset, len);
 +        message.flip();
 +        IntSerDeUtils.putInt(input.array(), FrameHelper.getTupleCountOffset(input.capacity()), tc);
 +    }
 +
 +    public static int getNumOfFields(Map<String, String> configuration) {
 +        return 1;
 +    }
 +
 +    public static String getFeedMetaTypeName(Map<String, String> configuration) {
 +        return configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME);
 +
 +    }
 +}



Mime
View raw message