asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [11/19] incubator-asterixdb git commit: Support Change Feeds and Ingestion of Records with MetaData
Date Tue, 15 Mar 2016 23:36:27 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index 926022c..73de838 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -20,7 +20,6 @@ package org.apache.asterix.external.dataflow;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Map;
 
 import javax.annotation.Nonnull;
 
@@ -41,12 +40,12 @@ import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class FeedTupleForwarder implements ITupleForwarder {
 
+    private final FeedLogManager feedLogManager;
     private int maxRecordSize; // temporary until the big object in storage is solved
     private FrameTupleAppender appender;
     private IFrame frame;
     private IFrameWriter writer;
     private boolean paused = false;
-    private final FeedLogManager feedLogManager;
     private boolean initialized;
 
     public FeedTupleForwarder(@Nonnull FeedLogManager feedLogManager) {
@@ -58,10 +57,6 @@ public class FeedTupleForwarder implements ITupleForwarder {
     }
 
     @Override
-    public void configure(Map<String, String> configuration) {
-    }
-
-    @Override
     public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
         if (!initialized) {
             this.maxRecordSize = ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
new file mode 100644
index 0000000..203b5a7
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class FeedWithMetaDataFlowController<T, O> extends FeedRecordDataFlowController<T> {
+
+    //This field mask a super class field dataParser. We do this to avoid down-casting when calling parseMeta
+    protected RecordWithMetadataParser<T, O> dataParser;
+
+    public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
+            FeedLogManager feedLogManager, int numOfOutputFields, RecordWithMetadataParser<T, O> dataParser,
+            IRecordReader<T> recordReader) {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+        this.dataParser = dataParser;
+    }
+
+    @Override
+    protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
+        dataParser.parseMeta(tb.getDataOutput());
+        tb.addFieldEndOffset();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
index 2caf98c..be737ae 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.external.dataflow;
 
-import java.util.Map;
-
 import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -37,11 +35,6 @@ public class FrameFullTupleForwarder implements ITupleForwarder {
     private IFrameWriter writer;
 
     @Override
-    public void configure(Map<String, String> configuration) {
-        // no-op
-    }
-
-    @Override
     public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
         this.appender = new FrameTupleAppender();
         this.frame = new VSizeFrame(ctx);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
index ffa025b..9c8563d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
@@ -18,26 +18,34 @@
  */
 package org.apache.asterix.external.dataflow;
 
+import java.io.IOException;
+
 import javax.annotation.Nonnull;
 
 import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IIndexingDatasource;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.ITupleForwarder;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 public class IndexingDataFlowController<T> extends RecordDataFlowController<T> {
     private final IExternalIndexer indexer;
 
-    public IndexingDataFlowController(@Nonnull IRecordDataParser<T> dataParser,
-            @Nonnull IRecordReader<? extends T> recordReader) throws Exception {
-        super(dataParser, recordReader);
-        indexer = ((IIndexingDatasource) recordReader).getIndexer();
-        numOfTupleFields += indexer.getNumberOfFields();
+    public IndexingDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder,
+            @Nonnull IRecordDataParser<T> dataParser, @Nonnull IRecordReader<? extends T> recordReader,
+            IExternalIndexer indexer) throws IOException {
+        super(ctx, tupleForwarder, dataParser, recordReader, 1 + indexer.getNumberOfFields());
+        this.indexer = indexer;
     }
 
     @Override
-    protected void appendOtherTupleFields(ArrayTupleBuilder tb) throws Exception {
-        indexer.index(tb);
+    protected void appendOtherTupleFields(ArrayTupleBuilder tb) throws HyracksDataException {
+        try {
+            indexer.index(tb);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
index f8fcd6f..cb80e45 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
@@ -40,13 +40,19 @@ public class RateControlledTupleForwarder implements ITupleForwarder {
 
     public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
 
-    @Override
-    public void configure(Map<String, String> configuration) {
+    private RateControlledTupleForwarder(long interTupleInterval) {
+        this.interTupleInterval = interTupleInterval;
+        delayConfigured = interTupleInterval != 0L;
+    }
+
+    // Factory method
+    public static RateControlledTupleForwarder create(Map<String, String> configuration) {
+        long interTupleInterval = 0L;
         String propValue = configuration.get(INTER_TUPLE_INTERVAL);
         if (propValue != null) {
             interTupleInterval = Long.parseLong(propValue);
         }
-        delayConfigured = interTupleInterval != 0;
+        return new RateControlledTupleForwarder(interTupleInterval);
     }
 
     @Override
@@ -82,6 +88,5 @@ public class RateControlledTupleForwarder implements ITupleForwarder {
         if (appender.getTupleCount() > 0) {
             FrameUtils.flushFrame(frame.getBuffer(), writer);
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
index 57f0f3d..99654d0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
@@ -23,7 +23,9 @@ import javax.annotation.Nonnull;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
@@ -31,19 +33,22 @@ public class RecordDataFlowController<T> extends AbstractDataFlowController {
 
     protected final IRecordDataParser<T> dataParser;
     protected final IRecordReader<? extends T> recordReader;
-    protected int numOfTupleFields = 1;
+    protected final int numOfTupleFields;
 
-    public RecordDataFlowController(@Nonnull IRecordDataParser<T> dataParser,
-            @Nonnull IRecordReader<? extends T> recordReader) {
+    public RecordDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder,
+            @Nonnull IRecordDataParser<T> dataParser, @Nonnull IRecordReader<? extends T> recordReader,
+            int numOfTupleFields) {
+        super(ctx, tupleForwarder);
         this.dataParser = dataParser;
         this.recordReader = recordReader;
+        this.numOfTupleFields = numOfTupleFields;
     }
 
     @Override
     public void start(IFrameWriter writer) throws HyracksDataException {
         try {
             ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields);
-            initializeTupleForwarder(writer);
+            tupleForwarder.initialize(ctx, writer);
             while (recordReader.hasNext()) {
                 IRawRecord<? extends T> record = recordReader.next();
                 tb.reset();
@@ -61,24 +66,4 @@ public class RecordDataFlowController<T> extends AbstractDataFlowController {
 
     protected void appendOtherTupleFields(ArrayTupleBuilder tb) throws Exception {
     }
-
-    @Override
-    public boolean stop() {
-        return recordReader.stop();
-    }
-
-    @Override
-    public boolean handleException(Throwable th) {
-        return false;
-    }
-
-    @Override
-    public boolean pause() throws HyracksDataException {
-        return false;
-    }
-
-    @Override
-    public boolean resume() throws HyracksDataException {
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
index 43738eb..ccf22da 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
@@ -19,20 +19,26 @@
 package org.apache.asterix.external.dataflow;
 
 import org.apache.asterix.external.api.IStreamDataParser;
-import org.apache.asterix.external.api.IStreamFlowController;
+import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
-public class StreamDataFlowController extends AbstractDataFlowController implements IStreamFlowController {
-    private IStreamDataParser dataParser;
-    private static final int NUMBER_OF_TUPLE_FIELDS = 1;
+public class StreamDataFlowController extends AbstractDataFlowController {
+    private final IStreamDataParser dataParser;
+
+    public StreamDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder,
+            IStreamDataParser dataParser) {
+        super(ctx, tupleForwarder);
+        this.dataParser = dataParser;
+    }
 
     @Override
     public void start(IFrameWriter writer) throws HyracksDataException {
         try {
-            ArrayTupleBuilder tb = new ArrayTupleBuilder(NUMBER_OF_TUPLE_FIELDS);
-            initializeTupleForwarder(writer);
+            ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+            tupleForwarder.initialize(ctx, writer);
             while (true) {
                 tb.reset();
                 if (!dataParser.parse(tb.getDataOutput())) {
@@ -46,29 +52,4 @@ public class StreamDataFlowController extends AbstractDataFlowController impleme
             throw new HyracksDataException(e);
         }
     }
-
-    @Override
-    public boolean stop() {
-        return false;
-    }
-
-    @Override
-    public boolean handleException(Throwable th) {
-        return false;
-    }
-
-    @Override
-    public void setStreamParser(IStreamDataParser dataParser) {
-        this.dataParser = dataParser;
-    }
-
-    @Override
-    public boolean pause() throws HyracksDataException {
-        return false;
-    }
-
-    @Override
-    public boolean resume() throws HyracksDataException {
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
new file mode 100644
index 0000000..d1bde71
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataset.adapter;
+
+import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedAdapter implements IDataSourceAdapter {
+    private static final long serialVersionUID = 1L;
+    private final AbstractFeedDataFlowController controller;
+
+    public FeedAdapter(AbstractFeedDataFlowController controller) {
+        this.controller = controller;
+    }
+
+    @Override
+    public void start(int partition, IFrameWriter writer) throws HyracksDataException {
+        controller.start(writer);
+    }
+
+    public boolean stop() throws HyracksDataException {
+        return controller.stop();
+    }
+
+    public boolean handleException(Throwable e) {
+        return controller.handleException(e);
+    }
+
+    public boolean pause() throws HyracksDataException {
+        return controller.pause();
+    }
+
+    public boolean resume() throws HyracksDataException {
+        return controller.resume();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index d19eedf..3ab370e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -19,11 +19,11 @@
 package org.apache.asterix.external.dataset.adapter;
 
 import org.apache.asterix.external.api.IDataFlowController;
-import org.apache.asterix.external.api.IFeedAdapter;
+import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class GenericAdapter implements IFeedAdapter {
+public class GenericAdapter implements IDataSourceAdapter {
 
     private static final long serialVersionUID = 1L;
     private final IDataFlowController controller;
@@ -36,24 +36,4 @@ public class GenericAdapter implements IFeedAdapter {
     public void start(int partition, IFrameWriter writer) throws HyracksDataException {
         controller.start(writer);
     }
-
-    @Override
-    public boolean stop() throws HyracksDataException {
-        return controller.stop();
-    }
-
-    @Override
-    public boolean handleException(Throwable e) {
-        return controller.handleException(e);
-    }
-
-    @Override
-    public boolean pause() throws HyracksDataException {
-        return controller.pause();
-    }
-
-    @Override
-    public boolean resume() throws HyracksDataException {
-        return controller.resume();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
index 483ba19..f102f93 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
@@ -35,7 +35,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
 public class FeedExceptionHandler implements IExceptionHandler {
 
-    private static final Logger LOGGER = Logger.getLogger(FeedExceptionHandler.class.getName());
+    private static Logger LOGGER = Logger.getLogger(FeedExceptionHandler.class.getName());
 
     //TODO: Enable logging
     private final IHyracksTaskContext ctx;
@@ -47,6 +47,11 @@ public class FeedExceptionHandler implements IExceptionHandler {
         this.fta = fta;
     }
 
+    public void prettyPrint(ByteBuffer frame) {
+        fta.reset(frame);
+        fta.prettyPrint();
+    }
+
     @Override
     public ByteBuffer handleException(Exception e, ByteBuffer frame) {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 3370118..1ceb36b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.external.feed.api.IExceptionHandler;
 import org.apache.asterix.external.feed.api.IFeedManager;
 import org.apache.asterix.external.feed.api.IFeedMemoryComponent;
 import org.apache.asterix.external.feed.api.IFeedMessage;
@@ -49,29 +48,30 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
  * 2. FeedMetaStoreNodePushable.initializeNewFeedRuntime();
  *              ______
  *             |      |
- * ============|core  |============
- * ============| op   |============
+ * ============| core |============
+ * ============|  op  |============
  * ^^^^^^^^^^^^|______|
- *  Input Side
- *  Handler
- *
+ * Input Side
+ * Handler
  **/
 public class FeedRuntimeInputHandler implements IFrameWriter {
 
-    private static Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
 
     private final FeedConnectionId connectionId;
     private final FeedRuntimeId runtimeId;
     private final FeedPolicyAccessor feedPolicyAccessor;
-    private final IExceptionHandler exceptionHandler;
+    private final FeedExceptionHandler exceptionHandler;
     private final FeedFrameDiscarder discarder;
     private final FeedFrameSpiller spiller;
     private final FeedPolicyAccessor fpa;
     private final IFeedManager feedManager;
+    private final MonitoredBuffer mBuffer;
+    private final DataBucketPool pool;
+    private final FrameEventCallback frameEventCallback;
+
     private boolean bufferingEnabled;
     private IFrameWriter coreOperator;
-    private final MonitoredBuffer mBuffer;
-    private DataBucketPool pool;
     private FrameCollection frameCollection;
     private Mode mode;
     private Mode lastMode;
@@ -79,8 +79,6 @@ public class FeedRuntimeInputHandler implements IFrameWriter {
     private long nProcessed;
     private boolean throttlingEnabled;
 
-    private FrameEventCallback frameEventCallback;
-
     public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
             IFrameWriter coreOperator, FeedPolicyAccessor fpa, boolean bufferingEnabled, FrameTupleAccessor fta,
             RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions) throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
index 22be702..43d5bce 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
@@ -20,7 +20,7 @@ package org.apache.asterix.external.feed.runtime;
 
 import org.apache.asterix.external.api.IAdapterRuntimeManager;
 import org.apache.asterix.external.api.IAdapterRuntimeManager.State;
-import org.apache.asterix.external.api.IFeedAdapter;
+import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 import org.apache.log4j.Logger;
@@ -34,10 +34,10 @@ public class AdapterExecutor implements Runnable {
 
     private final DistributeFeedFrameWriter writer;     // A writer that sends frames to multiple receivers (that can
                                                         // increase or decrease at any time)
-    private final IFeedAdapter adapter;                 // The adapter
+    private final FeedAdapter adapter;                 // The adapter
     private final IAdapterRuntimeManager adapterManager;// The runtime manager <-- two way visibility -->
 
-    public AdapterExecutor(int partition, DistributeFeedFrameWriter writer, IFeedAdapter adapter,
+    public AdapterExecutor(int partition, DistributeFeedFrameWriter writer, FeedAdapter adapter,
             IAdapterRuntimeManager adapterManager) {
         this.writer = writer;
         this.adapter = adapter;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
index 6c3e44d..b0f2517 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -23,7 +23,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.external.api.IAdapterRuntimeManager;
-import org.apache.asterix.external.api.IFeedAdapter;
+import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.management.FeedId;
@@ -38,7 +38,7 @@ public class AdapterRuntimeManager implements IAdapterRuntimeManager {
 
     private final FeedId feedId;                    // (dataverse-feed)
 
-    private final IFeedAdapter feedAdapter;         // The adapter
+    private final FeedAdapter feedAdapter;         // The adapter
 
     private final IIntakeProgressTracker tracker;   // Not used. needs to be fixed soon.
 
@@ -54,7 +54,7 @@ public class AdapterRuntimeManager implements IAdapterRuntimeManager {
     private State state;                            // One of {ACTIVE_INGESTION, NACTIVE_INGESTION, FINISHED_INGESTION,
                                                     // FAILED_INGESTION}
 
-    public AdapterRuntimeManager(FeedId feedId, IFeedAdapter feedAdapter, IIntakeProgressTracker tracker,
+    public AdapterRuntimeManager(FeedId feedId, FeedAdapter feedAdapter, IIntakeProgressTracker tracker,
             DistributeFeedFrameWriter writer, int partition) {
         this.feedId = feedId;
         this.feedAdapter = feedAdapter;
@@ -107,7 +107,7 @@ public class AdapterRuntimeManager implements IAdapterRuntimeManager {
     }
 
     @Override
-    public IFeedAdapter getFeedAdapter() {
+    public FeedAdapter getFeedAdapter() {
         return feedAdapter;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
index 0fbbd2e..873b420 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
@@ -31,14 +31,15 @@ import org.apache.asterix.om.types.BuiltinType;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 public class FileOffsetIndexer implements IExternalIndexer {
 
     private static final long serialVersionUID = 1L;
     public static final int NUM_OF_FIELDS = 2;
-    protected AMutableInt32 fileNumber = new AMutableInt32(0);
-    protected AMutableInt64 offset = new AMutableInt64(0);
+    protected final AMutableInt32 fileNumber = new AMutableInt32(0);
+    protected final AMutableInt64 offset = new AMutableInt64(0);
     protected RecordReader<?, Writable> recordReader;
 
     @SuppressWarnings("unchecked")
@@ -49,21 +50,29 @@ public class FileOffsetIndexer implements IExternalIndexer {
             .getSerializerDeserializer(BuiltinType.AINT64);
 
     @Override
-    public void reset(IRecordReader<?> reader) throws IOException {
-        //TODO: Make it more generic since we can't assume it is always going to be HDFS records.
-        @SuppressWarnings("unchecked")
-        HDFSRecordReader<?, Writable> hdfsReader = (HDFSRecordReader<?, Writable>) reader;
-        fileNumber.setValue(hdfsReader.getSnapshot().get(hdfsReader.getCurrentSplitIndex()).getFileNumber());
-        recordReader = hdfsReader.getReader();
-        offset.setValue(recordReader.getPos());
+    public void reset(IRecordReader<?> reader) throws HyracksDataException {
+        try {
+            //TODO: Make it more generic since we can't assume it is always going to be HDFS records.
+            @SuppressWarnings("unchecked")
+            HDFSRecordReader<?, Writable> hdfsReader = (HDFSRecordReader<?, Writable>) reader;
+            fileNumber.setValue(hdfsReader.getSnapshot().get(hdfsReader.getCurrentSplitIndex()).getFileNumber());
+            recordReader = hdfsReader.getReader();
+            offset.setValue(recordReader.getPos());
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
     }
 
     @Override
-    public void index(ArrayTupleBuilder tb) throws IOException {
-        tb.addField(intSerde, fileNumber);
-        tb.addField(longSerde, offset);
-        // Get position for next index(tb) call
-        offset.setValue(recordReader.getPos());
+    public void index(ArrayTupleBuilder tb) throws HyracksDataException {
+        try {
+            tb.addField(intSerde, fileNumber);
+            tb.addField(longSerde, offset);
+            // Get position for next index(tb) call
+            offset.setValue(recordReader.getPos());
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
index 9fa26f0..a2641c8 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
@@ -31,16 +31,17 @@ import org.apache.asterix.om.types.BuiltinType;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 public class RecordColumnarIndexer implements IExternalIndexer {
 
     private static final long serialVersionUID = 1L;
     public static final int NUM_OF_FIELDS = 3;
-    protected AMutableInt32 fileNumber = new AMutableInt32(0);
-    protected AMutableInt64 offset = new AMutableInt64(0);
+    protected final AMutableInt32 fileNumber = new AMutableInt32(0);
+    protected final AMutableInt64 offset = new AMutableInt64(0);
     protected long nextOffset;
-    protected AMutableInt32 rowNumber = new AMutableInt32(0);
+    protected final AMutableInt32 rowNumber = new AMutableInt32(0);
     protected RecordReader<?, Writable> recordReader;
 
     @SuppressWarnings("unchecked")
@@ -51,29 +52,38 @@ public class RecordColumnarIndexer implements IExternalIndexer {
             .getSerializerDeserializer(BuiltinType.AINT64);
 
     @Override
-    public void reset(IRecordReader<?> reader) throws IOException {
-        //TODO: Make this more generic. right now, it works because we only index hdfs files.
-        @SuppressWarnings("unchecked")
-        HDFSRecordReader<?, Writable> hdfsReader = (HDFSRecordReader<?, Writable>) reader;
-        fileNumber.setValue(hdfsReader.getSnapshot().get(hdfsReader.getCurrentSplitIndex()).getFileNumber());
-        recordReader = hdfsReader.getReader();
-        offset.setValue(recordReader.getPos());
-        nextOffset = offset.getLongValue();
-        rowNumber.setValue(0);
+    public void reset(IRecordReader<?> reader) throws HyracksDataException {
+        try {
+            //TODO: Make this more generic. right now, it works because we only index hdfs files.
+            @SuppressWarnings("unchecked")
+            HDFSRecordReader<?, Writable> hdfsReader = (HDFSRecordReader<?, Writable>) reader;
+            fileNumber.setValue(hdfsReader.getSnapshot().get(hdfsReader.getCurrentSplitIndex()).getFileNumber());
+            recordReader = hdfsReader.getReader();
+            offset.setValue(recordReader.getPos());
+
+            nextOffset = offset.getLongValue();
+            rowNumber.setValue(0);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
     }
 
     @Override
-    public void index(ArrayTupleBuilder tb) throws IOException {
-        if (recordReader.getPos() != nextOffset) {
-            // start of a new group
-            offset.setValue(nextOffset);
-            nextOffset = recordReader.getPos();
-            rowNumber.setValue(0);
+    public void index(ArrayTupleBuilder tb) throws HyracksDataException {
+        try {
+            if (recordReader.getPos() != nextOffset) {
+                // start of a new group
+                offset.setValue(nextOffset);
+                nextOffset = recordReader.getPos();
+                rowNumber.setValue(0);
+            }
+            tb.addField(intSerde, fileNumber);
+            tb.addField(longSerde, offset);
+            tb.addField(intSerde, rowNumber);
+            rowNumber.setValue(rowNumber.getIntegerValue() + 1);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
         }
-        tb.addField(intSerde, fileNumber);
-        tb.addField(longSerde, offset);
-        tb.addField(intSerde, rowNumber);
-        rowNumber.setValue(rowNumber.getIntegerValue() + 1);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 6e3ead2..f44d7bc 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -18,10 +18,12 @@
  */
 package org.apache.asterix.external.input;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.IIndexibleExternalDataSource;
 import org.apache.asterix.external.api.IInputStreamProvider;
 import org.apache.asterix.external.api.IInputStreamProviderFactory;
@@ -59,7 +61,7 @@ public class HDFSDataSourceFactory
     protected static Scheduler hdfsScheduler;
     protected static IndexingScheduler indexingScheduler;
     protected static Boolean initialized = false;
-    protected static final Object initLock = new Object();
+    protected static Object initLock = new Object();
     protected List<ExternalFile> files;
     protected Map<String, String> configuration;
     protected Class<?> recordClass;
@@ -69,37 +71,41 @@ public class HDFSDataSourceFactory
     private String nodeName;
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        initialize();
-        this.configuration = configuration;
-        JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
-        confFactory = new ConfFactory(conf);
-        clusterLocations = getPartitionConstraint();
-        int numPartitions = clusterLocations.getLocations().length;
-        // if files list was set, we restrict the splits to the list
-        InputSplit[] inputSplits;
-        if (files == null) {
-            inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
-        } else {
-            inputSplits = HDFSUtils.getSplits(conf, files);
-        }
-        if (indexingOp) {
-            readSchedule = indexingScheduler.getLocationConstraints(inputSplits);
-        } else {
-            readSchedule = hdfsScheduler.getLocationConstraints(inputSplits);
-        }
-        inputSplitsFactory = new InputSplitsFactory(inputSplits);
-        read = new boolean[readSchedule.length];
-        Arrays.fill(read, false);
-        if (!ExternalDataUtils.isDataSourceStreamProvider(configuration)) {
-            RecordReader<?, ?> reader = conf.getInputFormat().getRecordReader(inputSplits[0], conf, Reporter.NULL);
-            this.recordClass = reader.createValue().getClass();
-            reader.close();
+    public void configure(Map<String, String> configuration) throws AsterixException {
+        try {
+            init();
+            this.configuration = configuration;
+            JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
+            confFactory = new ConfFactory(conf);
+            clusterLocations = getPartitionConstraint();
+            int numPartitions = clusterLocations.getLocations().length;
+            // if files list was set, we restrict the splits to the list
+            InputSplit[] inputSplits;
+            if (files == null) {
+                inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+            } else {
+                inputSplits = HDFSUtils.getSplits(conf, files);
+            }
+            if (indexingOp) {
+                readSchedule = indexingScheduler.getLocationConstraints(inputSplits);
+            } else {
+                readSchedule = hdfsScheduler.getLocationConstraints(inputSplits);
+            }
+            inputSplitsFactory = new InputSplitsFactory(inputSplits);
+            read = new boolean[readSchedule.length];
+            Arrays.fill(read, false);
+            if (!ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.STREAM)) {
+                RecordReader<?, ?> reader = conf.getInputFormat().getRecordReader(inputSplits[0], conf, Reporter.NULL);
+                this.recordClass = reader.createValue().getClass();
+                reader.close();
+            }
+        } catch (IOException e) {
+            throw new AsterixException(e);
         }
     }
 
-    // Used to tell the factory to restrict the splits to the intersection between this list and the
-    // actual files on hdfs side
+    // Used to tell the factory to restrict the splits to the intersection between this list a
+    // actual files on hde
     @Override
     public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
         this.files = files;
@@ -110,8 +116,7 @@ public class HDFSDataSourceFactory
      * The method below was modified to take care of the following
      * 1. when target files are not null, it generates a file aware input stream that validate
      * against the files
-     * 2. if the data is binary, it returns a generic reader
-     */
+     * 2. if the data is binary, it returns a generic reade */
     @Override
     public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
@@ -133,6 +138,7 @@ public class HDFSDataSourceFactory
      * Get the cluster locations for this input stream factory. This method specifies on which asterix nodes the
      * external
      * adapter will run and how many threads per node.
+     *
      * @return
      */
     @Override
@@ -145,7 +151,7 @@ public class HDFSDataSourceFactory
      * This method initialize the scheduler which assigns responsibility of reading different logical input splits from
      * HDFS
      */
-    private static void initialize() {
+    private static void init() {
         if (!initialized) {
             synchronized (initLock) {
                 if (!initialized) {
@@ -163,24 +169,21 @@ public class HDFSDataSourceFactory
 
     @Override
     public DataSourceType getDataSourceType() {
-        return (ExternalDataUtils.isDataSourceStreamProvider(configuration)) ? DataSourceType.STREAM
-                : DataSourceType.RECORDS;
+        return ExternalDataUtils.getDataSourceType(configuration);
     }
 
     @Override
     public IRecordReader<? extends Writable> createRecordReader(IHyracksTaskContext ctx, int partition)
-            throws Exception {
-        JobConf conf = confFactory.getConf();
-        InputSplit[] inputSplits = inputSplitsFactory.getSplits();
-        String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
-        HDFSRecordReader<Object, Writable> recordReader = new HDFSRecordReader<Object, Writable>(read, inputSplits,
-                readSchedule, nodeName, conf);
-        if (files != null) {
-            recordReader.setSnapshot(files);
-            recordReader.setIndexer(ExternalIndexerProvider.getIndexer(configuration));
+            throws HyracksDataException {
+        try {
+            JobConf conf = confFactory.getConf();
+            InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+            String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+            return new HDFSRecordReader<Object, Writable>(read, inputSplits, readSchedule, nodeName, conf, files,
+                    files == null ? null : ExternalIndexerProvider.getIndexer(configuration));
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
         }
-        recordReader.configure(configuration);
-        return recordReader;
     }
 
     @Override
@@ -195,6 +198,6 @@ public class HDFSDataSourceFactory
 
     @Override
     public boolean isIndexingOp() {
-        return (files != null && indexingOp);
+        return ((files != null) && indexingOp);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
index c4b37f1..affdc84 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
@@ -84,10 +84,6 @@ public class CharArrayRecord implements IRawRecord<char[]> {
         return String.valueOf(value, 0, size);
     }
 
-    public void setValue(char[] value) {
-        this.value = value;
-    }
-
     public void endRecord() {
         if (value[size - 1] != ExternalDataConstants.LF) {
             appendChar(ExternalDataConstants.LF);
@@ -117,4 +113,10 @@ public class CharArrayRecord implements IRawRecord<char[]> {
         this.value = value;
         this.size = value.length;
     }
+
+    public void set(StringBuilder builder) {
+        ensureCapacity(builder.length());
+        builder.getChars(0, builder.length(), value, 0);
+        this.size = builder.length();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
deleted file mode 100644
index d5640a6..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.asterix.external.api.IDataParser;
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.om.base.ABoolean;
-import org.apache.asterix.om.base.ADouble;
-import org.apache.asterix.om.base.AInt32;
-import org.apache.asterix.om.base.AInt64;
-import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-
-public class RecordWithMetadata<T> {
-
-    private ArrayBackedValueStorage[] fieldValueBuffers;
-    private DataOutput[] fieldValueBufferOutputs;
-    private IValueParserFactory[] valueParserFactories;
-    private byte[] fieldTypeTags;
-    private IRawRecord<T> record;
-
-    // Serializers
-    @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-    private AMutableDouble mutableDouble = new AMutableDouble(0);
-    @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ASTRING);
-    private AMutableString mutableString = new AMutableString(null);
-    @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT32);
-    private AMutableInt32 mutableInt = new AMutableInt32(0);
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT64);
-    private AMutableInt64 mutableLong = new AMutableInt64(0);
-    @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<ABoolean> booleanSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
-
-    public RecordWithMetadata(Class<? extends T> recordClass) {
-    }
-
-    public RecordWithMetadata(IAType[] metaTypes, Class<? extends T> recordClass) {
-        int n = metaTypes.length;
-        this.fieldValueBuffers = new ArrayBackedValueStorage[n];
-        this.fieldValueBufferOutputs = new DataOutput[n];
-        this.valueParserFactories = new IValueParserFactory[n];
-        this.fieldTypeTags = new byte[n];
-        for (int i = 0; i < n; i++) {
-            ATypeTag tag = metaTypes[i].getTypeTag();
-            fieldTypeTags[i] = tag.serialize();
-            fieldValueBuffers[i] = new ArrayBackedValueStorage();
-            fieldValueBufferOutputs[i] = fieldValueBuffers[i].getDataOutput();
-            valueParserFactories[i] = ExternalDataUtils.getParserFactory(tag);
-        }
-    }
-
-    public IRawRecord<T> getRecord() {
-        return record;
-    }
-
-    public ArrayBackedValueStorage getMetadata(int index) {
-        return fieldValueBuffers[index];
-    }
-
-    public void setRecord(IRawRecord<T> record) {
-        this.record = record;
-    }
-
-    public void reset() {
-        record.reset();
-        for (ArrayBackedValueStorage fieldBuffer : fieldValueBuffers) {
-            fieldBuffer.reset();
-        }
-    }
-
-    public void setMetadata(int index, int value) throws IOException {
-        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
-        mutableInt.setValue(value);
-        IDataParser.toBytes(mutableInt, fieldValueBuffers[index], int32Serde);
-    }
-
-    public void setMetadata(int index, long value) throws IOException {
-        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
-        mutableLong.setValue(value);
-        IDataParser.toBytes(mutableLong, fieldValueBuffers[index], int64Serde);
-    }
-
-    public void setMetadata(int index, String value) throws IOException {
-        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
-        mutableString.setValue(value);
-        IDataParser.toBytes(mutableString, fieldValueBuffers[index], stringSerde);
-    }
-
-    public void setMeta(int index, boolean value) throws IOException {
-        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
-        IDataParser.toBytes(value ? ABoolean.TRUE : ABoolean.FALSE, fieldValueBuffers[index], booleanSerde);
-    }
-
-    public void setMeta(int index, double value) throws IOException {
-        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
-        mutableDouble.setValue(value);
-        IDataParser.toBytes(mutableDouble, fieldValueBuffers[index], doubleSerde);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java
new file mode 100644
index 0000000..ca6725f
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
+
+public class RecordWithMetadataAndPK<T> extends RecordWithPK<T> {
+
+    private final ArrayBackedValueStorage[] fieldValueBuffers;
+    private final DataOutput[] fieldValueBufferOutputs;
+    private final IValueParser[] valueParsers;
+    private final byte[] fieldTypeTags;
+    private final IAType[] metaTypes;
+
+    // Serializers
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADOUBLE);
+    private final AMutableDouble mutableDouble = new AMutableDouble(0);
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ASTRING);
+    private final AMutableString mutableString = new AMutableString(null);
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT32);
+    private final AMutableInt32 mutableInt = new AMutableInt32(0);
+    @SuppressWarnings("unchecked")
+    protected ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    private final AMutableInt64 mutableLong = new AMutableInt64(0);
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<ABoolean> booleanSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+    private final int[] keyIndicator;
+
+    public RecordWithMetadataAndPK(final IRawRecord<T> record, final IAType[] metaTypes, final ARecordType recordType,
+            final int[] keyIndicator, final int[] pkIndexes, final IAType[] keyTypes) {
+        super(record, keyTypes, pkIndexes);
+        this.metaTypes = metaTypes;
+        this.fieldValueBuffers = new ArrayBackedValueStorage[metaTypes.length];
+        this.fieldValueBufferOutputs = new DataOutput[metaTypes.length];
+        this.valueParsers = new IValueParser[metaTypes.length];
+        this.fieldTypeTags = new byte[metaTypes.length];
+        for (int i = 0; i < metaTypes.length; i++) {
+            final ATypeTag tag = metaTypes[i].getTypeTag();
+            fieldTypeTags[i] = tag.serialize();
+            fieldValueBuffers[i] = new ArrayBackedValueStorage();
+            fieldValueBufferOutputs[i] = fieldValueBuffers[i].getDataOutput();
+            valueParsers[i] = ExternalDataUtils.getParserFactory(tag).createValueParser();
+        }
+        this.keyIndicator = keyIndicator;
+    }
+
+    @Override
+    public IRawRecord<T> getRecord() {
+        return record;
+    }
+
+    public ArrayBackedValueStorage getMetadata(final int index) {
+        return fieldValueBuffers[index];
+    }
+
+    @Override
+    public void reset() {
+        record.reset();
+        for (final ArrayBackedValueStorage fieldBuffer : fieldValueBuffers) {
+            fieldBuffer.reset();
+        }
+    }
+
+    public void setMetadata(final int index, final int value) throws IOException {
+        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+        mutableInt.setValue(value);
+        IDataParser.toBytes(mutableInt, fieldValueBuffers[index], int32Serde);
+    }
+
+    public void setMetadata(final int index, final long value) throws IOException {
+        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+        mutableLong.setValue(value);
+        IDataParser.toBytes(mutableLong, fieldValueBuffers[index], int64Serde);
+    }
+
+    public void setMetadata(final int index, final String value) throws IOException {
+        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+        mutableString.setValue(value);
+        IDataParser.toBytes(mutableString, fieldValueBuffers[index], stringSerde);
+    }
+
+    public void setMetadata(final int index, final boolean value) throws IOException {
+        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+        IDataParser.toBytes(value ? ABoolean.TRUE : ABoolean.FALSE, fieldValueBuffers[index], booleanSerde);
+    }
+
+    public void setMetadata(final int index, final double value) throws IOException {
+        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+        mutableDouble.setValue(value);
+        IDataParser.toBytes(mutableDouble, fieldValueBuffers[index], doubleSerde);
+    }
+
+    public void setRawMetadata(final int index, final char[] src, final int offset, final int length)
+            throws IOException {
+        if (length == 0) {
+            if (!NonTaggedFormatUtil.isOptional(metaTypes[index])) {
+                throw new HyracksDataException(
+                        "Field " + index + " of meta record is not an optional type so it cannot accept null value. ");
+            }
+            fieldValueBufferOutputs[index].writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+        } else {
+            fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+            valueParsers[index].parse(src, offset, length, fieldValueBufferOutputs[index]);
+        }
+    }
+
+    @Override
+    public void appendPk(final ArrayTupleBuilder tb) throws IOException {
+        for (int i = 0; i < pkIndexes.length; i++) {
+            if (keyIndicator[i] == 1) {
+                tb.addField(getMetadata(pkIndexes[i]));
+            } else {
+                throw new HyracksDataException("Can't get PK from record part");
+            }
+        }
+    }
+
+    @Override
+    public byte[] getBytes() {
+        return record.getBytes();
+    }
+
+    @Override
+    public T get() {
+        return record.get();
+    }
+
+    @Override
+    public int size() {
+        return record.size();
+    }
+
+    @Override
+    public void set(final T t) {
+        record.set(t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithPK.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithPK.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithPK.java
new file mode 100644
index 0000000..b99d4d5
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithPK.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class RecordWithPK<T> implements IRawRecord<T> {
+
+    protected final ArrayBackedValueStorage[] pkFieldValueBuffers;
+    protected final int[] pkIndexes;
+    protected final IAType[] keyTypes;
+    protected IRawRecord<T> record;
+
+    public RecordWithPK(final IRawRecord<T> record, final IAType[] pkTypes, final int[] pkIndexes) {
+        this.record = record;
+        this.keyTypes = pkTypes;
+        this.pkIndexes = pkIndexes;
+        if (keyTypes != null) {
+            this.pkFieldValueBuffers = new ArrayBackedValueStorage[pkTypes.length];
+        } else {
+            this.pkFieldValueBuffers = null;
+        }
+    }
+
+    public RecordWithPK(final IRawRecord<T> rawRecord, final ArrayBackedValueStorage[] pkFieldValueBuffers) {
+        this.record = rawRecord;
+        this.keyTypes = null;
+        this.pkIndexes = null;
+        this.pkFieldValueBuffers = pkFieldValueBuffers;
+    }
+
+    public ArrayBackedValueStorage[] getPKs() {
+        return pkFieldValueBuffers;
+    }
+
+    @Override
+    public byte[] getBytes() {
+        return record.getBytes();
+    }
+
+    @Override
+    public T get() {
+        return record.get();
+    }
+
+    public IRawRecord<? extends T> getRecord() {
+        return record;
+    }
+
+    @Override
+    public void reset() {
+        record.reset();
+        for (final ArrayBackedValueStorage pkStorage : pkFieldValueBuffers) {
+            pkStorage.reset();
+        }
+    }
+
+    @Override
+    public int size() {
+        return record.size();
+    }
+
+    @Override
+    public void set(final T t) {
+        record.set(t);
+    }
+
+    public void appendPk(final ArrayTupleBuilder tb) throws IOException {
+        for (final ArrayBackedValueStorage pkStorage : pkFieldValueBuffers) {
+            tb.addField(pkStorage);
+        }
+    }
+
+    public void set(final IRawRecord<? extends T> record) {
+        this.record.set(record.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
new file mode 100644
index 0000000..8255ebb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.converter;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.dataflow.std.file.FieldCursorForDelimitedDataParser;
+
+public class CSVToRecordWithMetadataAndPKConverter
+        implements IRecordToRecordWithMetadataAndPKConverter<char[], char[]> {
+
+    private final FieldCursorForDelimitedDataParser cursor;
+    private final int valueIndex;
+    private final RecordWithMetadataAndPK<char[]> recordWithMetadata;
+    private final CharArrayRecord record;
+
+    public CSVToRecordWithMetadataAndPKConverter(final int valueIndex, final char delimiter, final ARecordType metaType,
+            final ARecordType recordType, final int[] keyIndicator, final int[] keyIndexes, final IAType[] keyTypes) {
+        this.cursor = new FieldCursorForDelimitedDataParser(null, delimiter, ExternalDataConstants.QUOTE);
+        this.record = new CharArrayRecord();
+        this.valueIndex = valueIndex;
+        this.recordWithMetadata = new RecordWithMetadataAndPK<char[]>(record, metaType.getFieldTypes(), recordType,
+                keyIndicator, keyIndexes, keyTypes);
+    }
+
+    @Override
+    public RecordWithMetadataAndPK<char[]> convert(final IRawRecord<? extends char[]> input) throws IOException {
+        record.reset();
+        recordWithMetadata.reset();
+        cursor.nextRecord(input.get(), input.size());
+        int i = 0;
+        int j = 0;
+        while (cursor.nextField()) {
+            if (cursor.isDoubleQuoteIncludedInThisField) {
+                cursor.eliminateDoubleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
+                cursor.fEnd -= cursor.doubleQuoteCount;
+                cursor.isDoubleQuoteIncludedInThisField = false;
+            }
+            if (i == valueIndex) {
+                record.setValue(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
+                record.endRecord();
+            } else {
+                recordWithMetadata.setRawMetadata(j, cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
+                j++;
+            }
+            i++;
+        }
+        return recordWithMetadata;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java
new file mode 100644
index 0000000..ee16228
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.converter;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IRecordConverter;
+import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+
+public class CSVWithRecordConverterFactory implements IRecordConverterFactory<char[], RecordWithMetadataAndPK<char[]>> {
+
+    private static final long serialVersionUID = 1L;
+    private int recordIndex;
+    private char delimiter;
+    private ARecordType metaType;
+    private ARecordType recordType;
+    private int[] keyIndicators;
+    private int[] keyIndexes;
+    private IAType[] keyTypes;
+
+    @Override
+    public IRecordConverter<char[], RecordWithMetadataAndPK<char[]>> createConverter() {
+        return new CSVToRecordWithMetadataAndPKConverter(recordIndex, delimiter, metaType, recordType, keyIndicators,
+                keyIndexes, keyTypes);
+    }
+
+    @Override
+    public void configure(final Map<String, String> configuration) throws AsterixException {
+        //validate and set
+        String property = configuration.get(ExternalDataConstants.KEY_RECORD_INDEX);
+        if (property == null) {
+            throw new AsterixException(
+                    "Unspecified " + ExternalDataConstants.KEY_RECORD_INDEX + " for csv to csv with record converter");
+        }
+        recordIndex = Integer.parseInt(property);
+        property = configuration.get(ExternalDataConstants.KEY_DELIMITER);
+        if (property == null) {
+            throw new AsterixException(
+                    "Unspecified " + ExternalDataConstants.KEY_DELIMITER + " for csv to csv with record converter");
+        }
+        if (property.trim().length() > 1) {
+            throw new AsterixException("Large delimiter. The maximum delimiter size = 1");
+        }
+        delimiter = property.trim().charAt(0);
+        // only works for top level keys
+        property = configuration.get(ExternalDataConstants.KEY_KEY_INDEXES);
+        if (property == null) {
+            keyIndexes = null;
+            keyIndicators = null;
+            keyTypes = null;
+        } else {
+            final String[] indexes = property.split(",");
+            keyIndexes = new int[indexes.length];
+            for (int i = 0; i < keyIndexes.length; i++) {
+                keyIndexes[i] = Integer.parseInt(indexes[i].trim());
+            }
+            // default key indicators point to meta part
+            property = configuration.get(ExternalDataConstants.KEY_KEY_INDICATORS);
+            if (property == null) {
+                keyIndicators = new int[keyIndexes.length];
+                Arrays.fill(keyIndicators, 1);
+            } else {
+                keyIndicators = new int[keyIndexes.length];
+                final String[] indicators = property.split(",");
+                for (int i = 0; i < keyIndicators.length; i++) {
+                    keyIndicators[i] = Integer.parseInt(indicators[i].trim());
+                    if ((keyIndicators[i] > 1) || (keyIndicators[i] < 0)) {
+                        throw new AsterixException("Invalid " + ExternalDataConstants.KEY_KEY_INDICATORS
+                                + " value. Allowed values are only 0 and 1.");
+                    }
+                }
+                keyTypes = new IAType[keyIndexes.length];
+                for (int i = 0; i < keyIndicators.length; i++) {
+                    if (keyIndicators[i] == 0) {
+                        keyTypes[i] = recordType.getFieldTypes()[keyIndexes[i]];
+                    } else {
+                        keyTypes[i] = metaType.getFieldTypes()[keyIndexes[i]];
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public Class<?> getInputClass() {
+        return char[].class;
+    }
+
+    @Override
+    public Class<?> getOutputClass() {
+        return char[].class;
+    }
+
+    @Override
+    public void setRecordType(final ARecordType recordType) {
+        this.recordType = recordType;
+    }
+
+    @Override
+    public void setMetaType(final ARecordType metaType) {
+        this.metaType = metaType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
new file mode 100644
index 0000000..1d9311e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.converter;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IRecordConverter;
+import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
+import org.apache.asterix.om.types.ARecordType;
+
+import com.couchbase.client.core.message.dcp.DCPRequest;
+
+public class DCPConverterFactory implements IRecordConverterFactory<DCPRequest, RecordWithMetadataAndPK<char[]>> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void configure(final Map<String, String> configuration) throws AsterixException {
+    }
+
+    @Override
+    public Class<?> getInputClass() {
+        return DCPRequest.class;
+    }
+
+    @Override
+    public Class<?> getOutputClass() {
+        return char[].class;
+    }
+
+    @Override
+    public void setRecordType(final ARecordType recordType) {
+    }
+
+    @Override
+    public IRecordConverter<DCPRequest, RecordWithMetadataAndPK<char[]>> createConverter() {
+        return new DCPRequestToRecordWithMetadataAndPKConverter();
+    }
+
+    @Override
+    public void setMetaType(final ARecordType metaType) {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
new file mode 100644
index 0000000..1f82e85
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.converter;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.couchbase.client.core.message.dcp.DCPRequest;
+import com.couchbase.client.core.message.dcp.MutationMessage;
+import com.couchbase.client.core.message.dcp.RemoveMessage;
+import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
+
+public class DCPRequestToRecordWithMetadataAndPKConverter
+        implements IRecordToRecordWithMetadataAndPKConverter<DCPRequest, char[]> {
+
+    private final RecordWithMetadataAndPK<char[]> recordWithMetadata;
+    private final CharArrayRecord value;
+    private final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
+    private final ByteBuffer bytes = ByteBuffer.allocateDirect(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+    private final CharBuffer chars = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+    // metaTypes = {key(string), bucket(string), vbucket(int32), seq(long), cas(long),
+    // creationTime(long),expiration(int32),flags(int32),revSeqNumber(long),lockTime(int32)}
+    private static final IAType[] CB_META_TYPES = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
+            BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT32,
+            BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT32 };
+    private static final int[] PK_INDICATOR = { 1 };
+    private static final int[] PK_INDEXES = { 0 };
+    private static final IAType[] PK_TYPES = { BuiltinType.ASTRING };
+
+    public DCPRequestToRecordWithMetadataAndPKConverter() {
+        this.value = new CharArrayRecord();
+        this.recordWithMetadata = new RecordWithMetadataAndPK<char[]>(value, CB_META_TYPES,
+                ARecordType.FULLY_OPEN_RECORD_TYPE, PK_INDICATOR, PK_INDEXES, PK_TYPES);
+    }
+
+    @Override
+    public RecordWithMetadataAndPK<char[]> convert(final IRawRecord<? extends DCPRequest> input) throws IOException {
+        final DCPRequest dcpRequest = input.get();
+        if (dcpRequest instanceof MutationMessage) {
+            final MutationMessage message = (MutationMessage) dcpRequest;
+            final String key = message.key();
+            final int vbucket = message.partition();
+            final long seq = message.bySequenceNumber();
+            final String bucket = message.bucket();
+            final long cas = message.cas();
+            final long creationTime = message.creationTime();
+            final int expiration = message.expiration();
+            final int flags = message.flags();
+            final long revSeqNumber = message.revisionSequenceNumber();
+            final int lockTime = message.lockTime();
+            recordWithMetadata.reset();
+            recordWithMetadata.setMetadata(0, key);
+            recordWithMetadata.setMetadata(1, bucket);
+            recordWithMetadata.setMetadata(2, vbucket);
+            recordWithMetadata.setMetadata(3, seq);
+            recordWithMetadata.setMetadata(4, cas);
+            recordWithMetadata.setMetadata(5, creationTime);
+            recordWithMetadata.setMetadata(6, expiration);
+            recordWithMetadata.setMetadata(7, flags);
+            recordWithMetadata.setMetadata(8, revSeqNumber);
+            recordWithMetadata.setMetadata(9, lockTime);
+            DCPRequestToRecordWithMetadataAndPKConverter.set(message.content(), decoder, bytes, chars, value);
+        } else if (dcpRequest instanceof RemoveMessage) {
+            final RemoveMessage message = (RemoveMessage) dcpRequest;
+            final String key = message.key();
+            recordWithMetadata.reset();
+            recordWithMetadata.setMetadata(0, key);
+        } else {
+            throw new HyracksDataException("Unknown DCP request: " + dcpRequest);
+        }
+        return recordWithMetadata;
+    }
+
+    public static void set(final ByteBuf content, final CharsetDecoder decoder, final ByteBuffer bytes,
+            final CharBuffer chars, final CharArrayRecord record) {
+        int position = content.readerIndex();
+        final int limit = content.writerIndex();
+        final int contentSize = content.readableBytes();
+        while (position < limit) {
+            bytes.clear();
+            chars.clear();
+            if ((contentSize - position) < bytes.capacity()) {
+                bytes.limit(contentSize - position);
+            }
+            content.getBytes(position, bytes);
+            position += bytes.position();
+            bytes.flip();
+            decoder.decode(bytes, chars, false);
+            chars.flip();
+            record.append(chars);
+        }
+        record.endRecord();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java
new file mode 100644
index 0000000..4990527
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.converter;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IRecordConverter;
+import org.apache.asterix.om.types.ARecordType;
+
+public interface IRecordConverterFactory<I, O> extends Serializable {
+
+    public IRecordConverter<I, O> createConverter();
+
+    public void configure(Map<String, String> configuration) throws AsterixException;
+
+    public Class<?> getInputClass();
+
+    public Class<?> getOutputClass();
+
+    public void setRecordType(ARecordType recordType);
+
+    public void setMetaType(ARecordType metaType);
+
+}



Mime
View raw message