asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [10/19] incubator-asterixdb git commit: Support Change Feeds and Ingestion of Records with MetaData
Date Tue, 15 Mar 2016 23:36:26 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordToRecordWithMetadataAndPKConverter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordToRecordWithMetadataAndPKConverter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordToRecordWithMetadataAndPKConverter.java
new file mode 100644
index 0000000..50fb3cf
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordToRecordWithMetadataAndPKConverter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.converter;
+
+import org.apache.asterix.external.api.IRecordConverter;
+import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
+
+public interface IRecordToRecordWithMetadataAndPKConverter<T, O>
+        extends IRecordConverter<T, RecordWithMetadataAndPK<O>> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/EmptyRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/EmptyRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/EmptyRecordReader.java
deleted file mode 100644
index e742b1e..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/EmptyRecordReader.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapred.RecordReader;
-
-public class EmptyRecordReader<K, V> implements RecordReader<K, V> {
-
-    @Override
-    public boolean next(K key, V value) throws IOException {
-        return false;
-    }
-
-    @Override
-    public K createKey() {
-        return null;
-    }
-
-    @Override
-    public V createValue() {
-        return null;
-    }
-
-    @Override
-    public long getPos() throws IOException {
-        return 0;
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    @Override
-    public float getProgress() throws IOException {
-        return 0;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
new file mode 100644
index 0000000..87c187a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.util.Map;
+
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.record.RecordWithPK;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class RecordWithPKTestReaderFactory implements IRecordReaderFactory<RecordWithPK<char[]>> {
+
+    private static final long serialVersionUID = 1L;
+    private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, 1);
+        return clusterLocations;
+    }
+
+    @Override
+    public void configure(final Map<String, String> configuration) {
+    }
+
+    @Override
+    public IRecordReader<? extends RecordWithPK<char[]>> createRecordReader(final IHyracksTaskContext ctx,
+            final int partition) {
+        return new TestAsterixMembersReader();
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return RecordWithPK.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TestAsterixMembersReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TestAsterixMembersReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TestAsterixMembersReader.java
new file mode 100644
index 0000000..916904f
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TestAsterixMembersReader.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.input.record.RecordWithPK;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class TestAsterixMembersReader implements IRecordReader<RecordWithPK<char[]>> {
+
+    private final CharArrayRecord rawRecord;
+    private final GenericRecord<RecordWithPK<char[]>> record;
+    private final ArrayBackedValueStorage[] pkFieldValueBuffers;
+    private int counter = 0;
+    private final int numOfRecords = 10;
+    private final StringBuilder builder = new StringBuilder();
+    private static final String[] names = { "Abdullah", "Michael", "Till", "Yingyi", "Ildar", "Taewoo", "Young-Seok",
+            "Murtadha", "Ian", "Steven" };
+
+    public TestAsterixMembersReader() {
+        rawRecord = new CharArrayRecord();
+        pkFieldValueBuffers = new ArrayBackedValueStorage[1];
+        pkFieldValueBuffers[0] = new ArrayBackedValueStorage();
+        record = new GenericRecord<RecordWithPK<char[]>>(new RecordWithPK<char[]>(rawRecord, pkFieldValueBuffers));
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return counter < numOfRecords;
+    }
+
+    @Override
+    public IRawRecord<RecordWithPK<char[]>> next() throws IOException, InterruptedException {
+        if (counter < numOfRecords) {
+            record.get().reset();
+            builder.setLength(0);
+            builder.append("{\"id\":" + counter + ",\"name\":\"" + names[counter % names.length] + "\"}");
+            rawRecord.set(builder);
+            rawRecord.endRecord();
+            pkFieldValueBuffers[0].getDataOutput().writeByte(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+            pkFieldValueBuffers[0].getDataOutput().writeLong(counter);
+            counter++;
+            return record;
+        }
+        return null;
+    }
+
+    @Override
+    public boolean stop() {
+        return false;
+    }
+
+    @Override
+    public void setController(final IDataFlowController controller) {
+    }
+
+    @Override
+    public void setFeedLogManager(final FeedLogManager feedLogManager) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
deleted file mode 100644
index fe59aad..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.couchbase;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-
-import org.apache.asterix.external.api.IDataFlowController;
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.input.record.CharArrayRecord;
-import org.apache.asterix.external.input.record.GenericRecord;
-import org.apache.asterix.external.input.record.RecordWithMetadata;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
-
-import com.couchbase.client.core.CouchbaseCore;
-import com.couchbase.client.core.dcp.BucketStreamAggregator;
-import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
-import com.couchbase.client.core.dcp.BucketStreamState;
-import com.couchbase.client.core.dcp.BucketStreamStateUpdatedEvent;
-import com.couchbase.client.core.env.DefaultCoreEnvironment;
-import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
-import com.couchbase.client.core.message.cluster.CloseBucketRequest;
-import com.couchbase.client.core.message.cluster.OpenBucketRequest;
-import com.couchbase.client.core.message.cluster.SeedNodesRequest;
-import com.couchbase.client.core.message.dcp.DCPRequest;
-import com.couchbase.client.core.message.dcp.MutationMessage;
-import com.couchbase.client.core.message.dcp.RemoveMessage;
-import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
-import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
-
-import rx.functions.Action1;
-
-public class CouchbaseReader implements IRecordReader<RecordWithMetadata<char[]>> {
-
-    private static final MutationMessage POISON_PILL = new MutationMessage((short) 0, null, null, 0, 0L, 0L, 0, 0, 0L,
-            null);
-    private final String feedName;
-    private final short[] vbuckets;
-    private final String bucket;
-    private final String password;
-    private final String[] couchbaseNodes;
-    private AbstractFeedDataFlowController controller;
-    private Builder builder;
-    private BucketStreamAggregator bucketStreamAggregator;
-    private CouchbaseCore core;
-    private DefaultCoreEnvironment env;
-    private Thread pushThread;
-    private ArrayBlockingQueue<MutationMessage> messages;
-    private GenericRecord<RecordWithMetadata<char[]>> record;
-    private RecordWithMetadata<char[]> recordWithMetadata;
-    private boolean done = false;
-    private CharArrayRecord value;
-    private CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
-    private ByteBuffer bytes = ByteBuffer.allocateDirect(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
-    private CharBuffer chars = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
-    // metaTypes = {key(string), bucket(string), vbucket(int32), seq(long), cas(long),
-    // creationTime(long),expiration(int32),flags(int32),revSeqNumber(long),lockTime(int32)}
-    private static final IAType[] metaTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
-            BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT32,
-            BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT32 };
-    private static final Logger LOGGER = Logger.getLogger(CouchbaseReader.class);
-
-    public CouchbaseReader(String feedName, String bucket, String password, String[] couchbaseNodes, short[] vbuckets,
-            int queueSize) throws HyracksDataException {
-        this.feedName = feedName;
-        this.bucket = bucket;
-        this.password = password;
-        this.couchbaseNodes = couchbaseNodes;
-        this.vbuckets = vbuckets;
-        this.recordWithMetadata = new RecordWithMetadata<char[]>(metaTypes, char[].class);
-        this.messages = new ArrayBlockingQueue<MutationMessage>(queueSize);
-        this.value = new CharArrayRecord();
-        recordWithMetadata.setRecord(value);
-        this.record = new GenericRecord<RecordWithMetadata<char[]>>(recordWithMetadata);
-    }
-
-    @Override
-    public void close() {
-        if (!done) {
-            done = true;
-        }
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        this.builder = DefaultCoreEnvironment.builder().dcpEnabled(CouchbaseReaderFactory.DCP_ENABLED)
-                .autoreleaseAfter(CouchbaseReaderFactory.AUTO_RELEASE_AFTER_MILLISECONDS);
-        this.env = builder.build();
-        this.core = new CouchbaseCore(env);
-        this.bucketStreamAggregator = new BucketStreamAggregator(feedName, core, bucket);
-        connect();
-    }
-
-    private void connect() {
-        core.send(new SeedNodesRequest(couchbaseNodes))
-                .timeout(CouchbaseReaderFactory.TIMEOUT, CouchbaseReaderFactory.TIME_UNIT).toBlocking().single();
-        core.send(new OpenBucketRequest(bucket, password))
-                .timeout(CouchbaseReaderFactory.TIMEOUT, CouchbaseReaderFactory.TIME_UNIT).toBlocking().single();
-        this.pushThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                CouchbaseReader.this.run(bucketStreamAggregator);
-            }
-        }, feedName);
-        pushThread.start();
-    }
-
-    private void run(BucketStreamAggregator bucketStreamAggregator) {
-        BucketStreamAggregatorState state = new BucketStreamAggregatorState();
-        for (int i = 0; i < vbuckets.length; i++) {
-            state.put(new BucketStreamState(vbuckets[i], 0, 0, 0xffffffff, 0, 0xffffffff));
-        }
-        state.updates().subscribe(new Action1<BucketStreamStateUpdatedEvent>() {
-            @Override
-            public void call(BucketStreamStateUpdatedEvent event) {
-                if (event.partialUpdate()) {
-                } else {
-                }
-            }
-        });
-        try {
-            bucketStreamAggregator.feed(state).toBlocking().forEach(new Action1<DCPRequest>() {
-                @Override
-                public void call(final DCPRequest dcpRequest) {
-                    try {
-                        if (dcpRequest instanceof SnapshotMarkerMessage) {
-                            SnapshotMarkerMessage message = (SnapshotMarkerMessage) dcpRequest;
-                            final BucketStreamState oldState = state.get(message.partition());
-                            state.put(new BucketStreamState(message.partition(), oldState.vbucketUUID(),
-                                    message.endSequenceNumber(), oldState.endSequenceNumber(),
-                                    message.endSequenceNumber(), oldState.snapshotEndSequenceNumber()));
-                        } else if (dcpRequest instanceof MutationMessage) {
-
-                            messages.put((MutationMessage) dcpRequest);
-                        } else if (dcpRequest instanceof RemoveMessage) {
-                            RemoveMessage message = (RemoveMessage) dcpRequest;
-                            LOGGER.info(message.key() + " was deleted.");
-                        }
-                    } catch (Throwable th) {
-                        LOGGER.error(th);
-                    }
-                }
-            });
-        } catch (Throwable th) {
-            if (th.getCause() instanceof InterruptedException) {
-                LOGGER.warn("dcp thread was interrupted", th);
-                synchronized (this) {
-                    CouchbaseReader.this.close();
-                    notifyAll();
-                }
-            }
-            throw th;
-        }
-    }
-
-    @Override
-    public boolean hasNext() throws Exception {
-        return !done;
-    }
-
-    @Override
-    public IRawRecord<RecordWithMetadata<char[]>> next() throws IOException, InterruptedException {
-        if (messages.isEmpty()) {
-            controller.flush();
-        }
-        MutationMessage message = messages.take();
-        if (message == POISON_PILL) {
-            return null;
-        }
-        String key = message.key();
-        int vbucket = message.partition();
-        long seq = message.bySequenceNumber();
-        String bucket = message.bucket();
-        long cas = message.cas();
-        long creationTime = message.creationTime();
-        int expiration = message.expiration();
-        int flags = message.flags();
-        long revSeqNumber = message.revisionSequenceNumber();
-        int lockTime = message.lockTime();
-        recordWithMetadata.reset();
-        recordWithMetadata.setMetadata(0, key);
-        recordWithMetadata.setMetadata(1, bucket);
-        recordWithMetadata.setMetadata(2, vbucket);
-        recordWithMetadata.setMetadata(3, seq);
-        recordWithMetadata.setMetadata(4, cas);
-        recordWithMetadata.setMetadata(5, creationTime);
-        recordWithMetadata.setMetadata(6, expiration);
-        recordWithMetadata.setMetadata(7, flags);
-        recordWithMetadata.setMetadata(8, revSeqNumber);
-        recordWithMetadata.setMetadata(9, lockTime);
-        CouchbaseReader.set(message.content(), decoder, bytes, chars, value);
-        return record;
-    }
-
-    @Override
-    public boolean stop() {
-        done = true;
-        core.send(new CloseBucketRequest(bucket)).toBlocking();
-        try {
-            messages.put(CouchbaseReader.POISON_PILL);
-        } catch (InterruptedException e) {
-            LOGGER.warn(e);
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public void setController(IDataFlowController controller) {
-        this.controller = (AbstractFeedDataFlowController) controller;
-    }
-
-    public static void set(ByteBuf content, CharsetDecoder decoder, ByteBuffer bytes, CharBuffer chars,
-            CharArrayRecord record) {
-        int position = content.readerIndex();
-        int limit = content.writerIndex();
-        int contentSize = content.capacity();
-        while (position < limit) {
-            bytes.clear();
-            chars.clear();
-            if (contentSize - position < bytes.capacity()) {
-                bytes.limit(contentSize - position);
-            }
-            content.getBytes(position, bytes);
-            position += bytes.position();
-            bytes.flip();
-            decoder.decode(bytes, chars, false);
-            chars.flip();
-            record.append(chars);
-        }
-        record.endRecord();
-    }
-
-    @Override
-    public void setFeedLogManager(FeedLogManager feedLogManager) {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
deleted file mode 100644
index b715a26..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.couchbase;
-
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.input.record.RecordWithMetadata;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-import com.couchbase.client.core.CouchbaseCore;
-import com.couchbase.client.core.config.CouchbaseBucketConfig;
-import com.couchbase.client.core.env.DefaultCoreEnvironment;
-import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
-import com.couchbase.client.core.message.cluster.CloseBucketRequest;
-import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
-import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
-import com.couchbase.client.core.message.cluster.OpenBucketRequest;
-import com.couchbase.client.core.message.cluster.SeedNodesRequest;
-
-import rx.functions.Func1;
-
-public class CouchbaseReaderFactory implements IRecordReaderFactory<RecordWithMetadata<char[]>> {
-
-    private static final long serialVersionUID = 1L;
-    // Constant fields
-    public static final boolean DCP_ENABLED = true;
-    public static final long AUTO_RELEASE_AFTER_MILLISECONDS = 5000L;
-    public static final int TIMEOUT = 5;
-    public static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
-    // Dynamic fields
-    private Map<String, String> configuration;
-    private String bucket;
-    private String password = "";
-    private String[] couchbaseNodes;
-    private int numOfVBuckets;
-    private int[] schedule;
-    private String feedName;
-    // Transient fields
-    private transient CouchbaseCore core;
-    private transient Builder builder;
-    private transient DefaultCoreEnvironment env;
-
-    @Override
-    public DataSourceType getDataSourceType() {
-        return DataSourceType.RECORDS;
-    }
-
-    @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
-        return AsterixClusterProperties.INSTANCE.getClusterLocations();
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        // validate first
-        if (!configuration.containsKey(ExternalDataConstants.KEY_BUCKET)) {
-            throw new AsterixException("Unspecified bucket");
-        }
-        if (!configuration.containsKey(ExternalDataConstants.KEY_NODES)) {
-            throw new AsterixException("Unspecified Couchbase nodes");
-        }
-        if (configuration.containsKey(ExternalDataConstants.KEY_PASSWORD)) {
-            password = configuration.get(ExternalDataConstants.KEY_PASSWORD);
-        }
-        this.configuration = configuration;
-        bucket = configuration.get(ExternalDataConstants.KEY_BUCKET);
-        couchbaseNodes = configuration.get(ExternalDataConstants.KEY_NODES).split(",");
-        feedName = configuration.get(ExternalDataConstants.KEY_FEED_NAME);
-        builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED)
-                .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS);
-        env = builder.build();
-        core = new CouchbaseCore(env);
-        getNumberOfVbuckets();
-        schedule();
-    }
-
-    /*
-     * We distribute the work of streaming vbuckets between all the partitions in a round robin
-     * fashion.
-     */
-    private void schedule() {
-        schedule = new int[numOfVBuckets];
-        String[] locations = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations();
-        for (int i = 0; i < numOfVBuckets; i++) {
-            schedule[i] = i % locations.length;
-        }
-    }
-
-    private void getNumberOfVbuckets() {
-        core.send(new SeedNodesRequest(couchbaseNodes)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
-        core.send(new OpenBucketRequest(bucket, password)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
-        numOfVBuckets = core.<GetClusterConfigResponse> send(new GetClusterConfigRequest())
-                .map(new Func1<GetClusterConfigResponse, Integer>() {
-                    @Override
-                    public Integer call(GetClusterConfigResponse response) {
-                        CouchbaseBucketConfig config = (CouchbaseBucketConfig) response.config().bucketConfig(bucket);
-                        return config.numberOfPartitions();
-                    }
-                }).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
-        core.send(new CloseBucketRequest(bucket)).toBlocking();
-    }
-
-    @Override
-    public IRecordReader<? extends RecordWithMetadata<char[]>> createRecordReader(IHyracksTaskContext ctx,
-            int partition) throws Exception {
-        String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
-        ArrayList<Short> listOfAssignedVBuckets = new ArrayList<Short>();
-        for (int i = 0; i < schedule.length; i++) {
-            if (schedule[i] == partition) {
-                listOfAssignedVBuckets.add((short) i);
-            }
-        }
-        short[] vbuckets = new short[listOfAssignedVBuckets.size()];
-        for (int i = 0; i < vbuckets.length; i++) {
-            vbuckets[i] = listOfAssignedVBuckets.get(i);
-        }
-        CouchbaseReader reader = new CouchbaseReader(feedName + ":" + nodeName + ":" + partition, bucket, password,
-                couchbaseNodes, vbuckets, ExternalDataUtils.getQueueSize(configuration));
-        reader.configure(configuration);
-        return reader;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public Class<? extends RecordWithMetadata<char[]>> getRecordClass() {
-        RecordWithMetadata<char[]> record = new RecordWithMetadata<char[]>(char[].class);
-        return (Class<? extends RecordWithMetadata<char[]>>) record.getClass();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/EmptyRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/EmptyRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/EmptyRecordReader.java
new file mode 100644
index 0000000..00e5e71
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/EmptyRecordReader.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.RecordReader;
+
+public class EmptyRecordReader<K, V> implements RecordReader<K, V> {
+
+    @Override
+    public boolean next(K key, V value) throws IOException {
+        return false;
+    }
+
+    @Override
+    public K createKey() {
+        return null;
+    }
+
+    @Override
+    public V createValue() {
+        return null;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+        return 0;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
index 22488f7..bfcacd8 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.input.record.reader.hdfs;
 
+import java.io.IOException;
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -30,14 +31,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.hdfs.dataflow.ConfFactory;
 
 public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> {
 
-    protected static final long serialVersionUID = 1L;
-    protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+    private static final long serialVersionUID = 1L;
     protected ConfFactory confFactory;
     protected Map<String, String> configuration;
+    protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
 
     public HDFSLookupReaderFactory() {
     }
@@ -48,16 +50,20 @@ public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> {
     }
 
     @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException {
         clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
         return clusterLocations;
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
+    public void configure(Map<String, String> configuration) throws AsterixException {
         this.configuration = configuration;
         JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
-        confFactory = new ConfFactory(conf);
+        try {
+            confFactory = new ConfFactory(conf);
+        } catch (HyracksDataException e) {
+            throw new AsterixException(e);
+        }
 
     }
 
@@ -69,10 +75,15 @@ public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> {
     @SuppressWarnings("unchecked")
     @Override
     public ILookupRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition,
-            ExternalFileIndexAccessor snapshotAccessor) throws Exception {
+            ExternalFileIndexAccessor snapshotAccessor) throws HyracksDataException {
         String inputFormatParameter = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim();
         JobConf conf = confFactory.getConf();
-        FileSystem fs = FileSystem.get(conf);
+        FileSystem fs;
+        try {
+            fs = FileSystem.get(conf);
+        } catch (IOException e) {
+            throw new HyracksDataException("Unable to get filesystem object", e);
+        }
         switch (inputFormatParameter) {
             case ExternalDataConstants.INPUT_FORMAT_TEXT:
                 return (ILookupRecordReader<? extends T>) new TextLookupReader(snapshotAccessor, fs, conf);
@@ -81,7 +92,7 @@ public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> {
             case ExternalDataConstants.INPUT_FORMAT_RC:
                 return (ILookupRecordReader<? extends T>) new RCLookupReader(snapshotAccessor, fs, conf);
             default:
-                throw new AsterixException("Unrecognised input format: " + inputFormatParameter);
+                throw new HyracksDataException("Unrecognised input format: " + inputFormatParameter);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
index b162a02..5ed6dc5 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
@@ -20,7 +20,6 @@ package org.apache.asterix.external.input.record.reader.hdfs;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.api.IExternalIndexer;
@@ -29,7 +28,6 @@ import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.input.record.GenericRecord;
-import org.apache.asterix.external.input.record.reader.EmptyRecordReader;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -57,12 +55,12 @@ public class HDFSRecordReader<K, V extends Writable> implements IRecordReader<Wr
     protected JobConf conf;
     protected GenericRecord<Writable> record;
     // Indexing variables
-    protected IExternalIndexer indexer;
-    protected List<ExternalFile> snapshot;
-    protected FileSystem hdfs;
+    protected final IExternalIndexer indexer;
+    protected final List<ExternalFile> snapshot;
+    protected final FileSystem hdfs;
 
     public HDFSRecordReader(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
-            JobConf conf) {
+            JobConf conf, List<ExternalFile> snapshot, IExternalIndexer indexer) throws IOException {
         this.read = read;
         this.inputSplits = inputSplits;
         this.readSchedule = readSchedule;
@@ -70,6 +68,11 @@ public class HDFSRecordReader<K, V extends Writable> implements IRecordReader<Wr
         this.conf = conf;
         this.inputFormat = conf.getInputFormat();
         this.reader = new EmptyRecordReader<K, Writable>();
+        this.record = new GenericRecord<Writable>();
+        this.indexer = indexer;
+        this.snapshot = snapshot;
+        this.hdfs = FileSystem.get(conf);
+        nextInputSplit();
     }
 
     @Override
@@ -78,12 +81,6 @@ public class HDFSRecordReader<K, V extends Writable> implements IRecordReader<Wr
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        record = new GenericRecord<Writable>();
-        nextInputSplit();
-    }
-
-    @Override
     public boolean hasNext() throws Exception {
         if (reader.next(key, value)) {
             return true;
@@ -163,20 +160,10 @@ public class HDFSRecordReader<K, V extends Writable> implements IRecordReader<Wr
         return indexer;
     }
 
-    @Override
-    public void setIndexer(IExternalIndexer indexer) {
-        this.indexer = indexer;
-    }
-
     public List<ExternalFile> getSnapshot() {
         return snapshot;
     }
 
-    public void setSnapshot(List<ExternalFile> snapshot) throws IOException {
-        this.snapshot = snapshot;
-        hdfs = FileSystem.get(conf);
-    }
-
     public int getCurrentSplitIndex() {
         return currentSplitIndex;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
new file mode 100644
index 0000000..4e41357
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.kv;
+
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
+
+import com.couchbase.client.core.CouchbaseCore;
+import com.couchbase.client.core.dcp.BucketStreamAggregator;
+import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
+import com.couchbase.client.core.dcp.BucketStreamState;
+import com.couchbase.client.core.dcp.BucketStreamStateUpdatedEvent;
+import com.couchbase.client.core.env.DefaultCoreEnvironment;
+import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
+import com.couchbase.client.core.message.cluster.CloseBucketRequest;
+import com.couchbase.client.core.message.cluster.OpenBucketRequest;
+import com.couchbase.client.core.message.cluster.SeedNodesRequest;
+import com.couchbase.client.core.message.dcp.DCPRequest;
+import com.couchbase.client.core.message.dcp.MutationMessage;
+import com.couchbase.client.core.message.dcp.RemoveMessage;
+import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
+
+import rx.functions.Action1;
+
+public class KVReader implements IRecordReader<DCPRequest> {
+
+    private static final Logger LOGGER = Logger.getLogger(KVReader.class);
+    private static final MutationMessage POISON_PILL = new MutationMessage((short) 0, null, null, 0, 0L, 0L, 0, 0, 0L,
+            null);
+    private final String feedName;
+    private final short[] vbuckets;
+    private final String bucket;
+    private final String password;
+    private final String[] sourceNodes;
+    private final Builder builder;
+    private final BucketStreamAggregator bucketStreamAggregator;
+    private final CouchbaseCore core;
+    private final DefaultCoreEnvironment env;
+    private final GenericRecord<DCPRequest> record;
+    private final ArrayBlockingQueue<DCPRequest> messages;
+    private AbstractFeedDataFlowController controller;
+    private Thread pushThread;
+    private boolean done = false;
+
+    public KVReader(String feedName, String bucket, String password, String[] sourceNodes, short[] vbuckets,
+            int queueSize) throws HyracksDataException {
+        this.feedName = feedName;
+        this.bucket = bucket;
+        this.password = password;
+        this.sourceNodes = sourceNodes;
+        this.vbuckets = vbuckets;
+        this.messages = new ArrayBlockingQueue<DCPRequest>(queueSize);
+        this.builder = DefaultCoreEnvironment.builder().dcpEnabled(KVReaderFactory.DCP_ENABLED)
+                .autoreleaseAfter(KVReaderFactory.AUTO_RELEASE_AFTER_MILLISECONDS);
+        this.env = builder.build();
+        this.core = new CouchbaseCore(env);
+        this.bucketStreamAggregator = new BucketStreamAggregator(feedName, core, bucket);
+        this.record = new GenericRecord<>();
+        connect();
+    }
+
+    @Override
+    public void close() {
+        if (!done) {
+            done = true;
+        }
+    }
+
+    private void connect() {
+        core.send(new SeedNodesRequest(sourceNodes))
+                .timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT).toBlocking().single();
+        core.send(new OpenBucketRequest(bucket, password))
+                .timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT).toBlocking().single();
+        this.pushThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                KVReader.this.run(bucketStreamAggregator);
+            }
+        }, feedName);
+        pushThread.start();
+    }
+
+    private void run(BucketStreamAggregator bucketStreamAggregator) {
+        BucketStreamAggregatorState state = new BucketStreamAggregatorState();
+        for (int i = 0; i < vbuckets.length; i++) {
+            state.put(new BucketStreamState(vbuckets[i], 0, 0, 0xffffffff, 0, 0xffffffff));
+        }
+        state.updates().subscribe(new Action1<BucketStreamStateUpdatedEvent>() {
+            @Override
+            public void call(BucketStreamStateUpdatedEvent event) {
+                if (event.partialUpdate()) {
+                } else {
+                }
+            }
+        });
+        try {
+            bucketStreamAggregator.feed(state).toBlocking().forEach(new Action1<DCPRequest>() {
+                @Override
+                public void call(DCPRequest dcpRequest) {
+                    try {
+                        if (dcpRequest instanceof SnapshotMarkerMessage) {
+                            SnapshotMarkerMessage message = (SnapshotMarkerMessage) dcpRequest;
+                            BucketStreamState oldState = state.get(message.partition());
+                            state.put(new BucketStreamState(message.partition(), oldState.vbucketUUID(),
+                                    message.endSequenceNumber(), oldState.endSequenceNumber(),
+                                    message.endSequenceNumber(), oldState.snapshotEndSequenceNumber()));
+                        } else if ((dcpRequest instanceof MutationMessage) || (dcpRequest instanceof RemoveMessage)) {
+                            messages.put(dcpRequest);
+                        } else {
+                            LOGGER.warn("Unknown type of DCP messages: " + dcpRequest);
+                        }
+                    } catch (Throwable th) {
+                        LOGGER.error(th);
+                    }
+                }
+            });
+        } catch (Throwable th) {
+            if (th.getCause() instanceof InterruptedException) {
+                LOGGER.warn("dcp thread was interrupted", th);
+                synchronized (this) {
+                    KVReader.this.close();
+                    notifyAll();
+                }
+            }
+            throw th;
+        }
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return !done;
+    }
+
+    @Override
+    public IRawRecord<DCPRequest> next() throws IOException, InterruptedException {
+        if (messages.isEmpty()) {
+            controller.flush();
+        }
+        DCPRequest dcpRequest = messages.take();
+        if (dcpRequest == POISON_PILL) {
+            return null;
+        }
+        record.set(dcpRequest);
+        return record;
+    }
+
+    @Override
+    public boolean stop() {
+        done = true;
+        core.send(new CloseBucketRequest(bucket)).toBlocking();
+        try {
+            messages.put(KVReader.POISON_PILL);
+        } catch (InterruptedException e) {
+            LOGGER.warn(e);
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void setController(IDataFlowController controller) {
+        this.controller = (AbstractFeedDataFlowController) controller;
+    }
+
+    @Override
+    public void setFeedLogManager(FeedLogManager feedLogManager) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
new file mode 100644
index 0000000..bc2a980
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.kv;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.couchbase.client.core.CouchbaseCore;
+import com.couchbase.client.core.config.CouchbaseBucketConfig;
+import com.couchbase.client.core.env.DefaultCoreEnvironment;
+import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
+import com.couchbase.client.core.message.cluster.CloseBucketRequest;
+import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
+import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
+import com.couchbase.client.core.message.cluster.OpenBucketRequest;
+import com.couchbase.client.core.message.cluster.SeedNodesRequest;
+import com.couchbase.client.core.message.dcp.DCPRequest;
+
+import rx.functions.Func1;
+
+public class KVReaderFactory implements IRecordReaderFactory<DCPRequest> {
+
+    private static final long serialVersionUID = 1L;
+    // Constant fields
+    public static final boolean DCP_ENABLED = true;
+    public static final long AUTO_RELEASE_AFTER_MILLISECONDS = 5000L;
+    public static final int TIMEOUT = 5;
+    public static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
+    // Dynamic fields
+    private Map<String, String> configuration;
+    private String bucket;
+    private String password = "";
+    private String[] couchbaseNodes;
+    private int numOfVBuckets;
+    private int[] schedule;
+    private String feedName;
+    // Transient fields
+    private transient CouchbaseCore core;
+    private transient Builder builder;
+    private transient DefaultCoreEnvironment env;
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        return AsterixClusterProperties.INSTANCE.getClusterLocations();
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws AsterixException {
+        // validate first
+        if (!configuration.containsKey(ExternalDataConstants.KEY_BUCKET)) {
+            throw new AsterixException("Unspecified bucket");
+        }
+        if (!configuration.containsKey(ExternalDataConstants.KEY_NODES)) {
+            throw new AsterixException("Unspecified Couchbase nodes");
+        }
+        if (configuration.containsKey(ExternalDataConstants.KEY_PASSWORD)) {
+            password = configuration.get(ExternalDataConstants.KEY_PASSWORD);
+        }
+        this.configuration = configuration;
+        ExternalDataUtils.setNumberOfKeys(configuration, 1);
+        ExternalDataUtils.setChangeFeed(configuration, ExternalDataConstants.TRUE);
+        ExternalDataUtils.setRecordWithMeta(configuration, ExternalDataConstants.TRUE);
+        bucket = configuration.get(ExternalDataConstants.KEY_BUCKET);
+        couchbaseNodes = configuration.get(ExternalDataConstants.KEY_NODES).split(",");
+        feedName = configuration.get(ExternalDataConstants.KEY_FEED_NAME);
+        builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED)
+                .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS);
+        env = builder.build();
+        core = new CouchbaseCore(env);
+        getNumberOfVbuckets();
+        schedule();
+    }
+
+    /*
+     * We distribute the work of streaming vbuckets between all the partitions in a round robin
+     * fashion.
+     */
+    private void schedule() {
+        schedule = new int[numOfVBuckets];
+        String[] locations = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations();
+        for (int i = 0; i < numOfVBuckets; i++) {
+            schedule[i] = i % locations.length;
+        }
+    }
+
+    private void getNumberOfVbuckets() {
+        core.send(new SeedNodesRequest(couchbaseNodes)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
+        core.send(new OpenBucketRequest(bucket, password)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
+        numOfVBuckets = core.<GetClusterConfigResponse> send(new GetClusterConfigRequest())
+                .map(new Func1<GetClusterConfigResponse, Integer>() {
+                    @Override
+                    public Integer call(GetClusterConfigResponse response) {
+                        CouchbaseBucketConfig config = (CouchbaseBucketConfig) response.config().bucketConfig(bucket);
+                        return config.numberOfPartitions();
+                    }
+                }).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
+        core.send(new CloseBucketRequest(bucket)).toBlocking();
+    }
+
+    @Override
+    public IRecordReader<? extends DCPRequest> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+        ArrayList<Short> listOfAssignedVBuckets = new ArrayList<Short>();
+        for (int i = 0; i < schedule.length; i++) {
+            if (schedule[i] == partition) {
+                listOfAssignedVBuckets.add((short) i);
+            }
+        }
+        short[] vbuckets = new short[listOfAssignedVBuckets.size()];
+        for (int i = 0; i < vbuckets.length; i++) {
+            vbuckets[i] = listOfAssignedVBuckets.get(i);
+        }
+        return new KVReader(feedName + ":" + nodeName + ":" + partition, bucket, password, couchbaseNodes,
+                vbuckets, ExternalDataUtils.getQueueSize(configuration));
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return DCPRequest.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
new file mode 100644
index 0000000..9e797e3
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.kv;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.log4j.Logger;
+
+import com.couchbase.client.core.message.dcp.DCPRequest;
+import com.couchbase.client.core.message.dcp.MutationMessage;
+import com.couchbase.client.core.message.dcp.RemoveMessage;
+import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
+import com.couchbase.client.deps.io.netty.buffer.ByteBufAllocator;
+
+public class KVTestReader implements IRecordReader<DCPRequest> {
+
+    private final GenericRecord<DCPRequest> record;
+    private static final Logger LOGGER = Logger.getLogger(KVTestReader.class);
+    // Test variables
+    private final String bucket;
+    private final ArrayList<Short> assigned;
+    private final int numberOfMutations;
+    private int counter = 0;
+    private boolean stopped = false;
+    // for deterministic data generation
+    private int expiration = 7999;
+    private long seq = 16L;
+    private int lockTime = 158;
+    private long cas = 0L;
+    private int deleteCycle;
+    private int upsertCycle;
+    private String nextDeleteKey;
+    private short nextDeletePartition;
+    private String nextUpsertKey;
+    private short nextUpsertPartition;
+    private final ByteBuf byteBuff;
+    private final StringBuilder strBuilder = new StringBuilder();
+    private int upsertCounter = 0;
+    private final String[] names = { "Michael Carey", "Till Westmann", "Michael Blow", "Chris Hillary", "Yingyi Bu",
+            "Ian Maxon", "Abdullah Alamoudi" };
+
+    public KVTestReader(final int partition, final String bucket, final int[] schedule,
+            final int numberOfMutations, final int deleteCycle, final int upsertCycle) {
+        this.bucket = bucket;
+        this.numberOfMutations = numberOfMutations;
+        this.assigned = new ArrayList<>();
+        this.deleteCycle = deleteCycle;
+        this.upsertCycle = upsertCycle;
+        if ((deleteCycle < 5) || (upsertCycle < 5)) {
+            this.deleteCycle = 5;
+            this.upsertCycle = 6;
+        }
+        for (int i = 0; i < schedule.length; i++) {
+            if (schedule[i] == partition) {
+                assigned.add((short) i);
+            }
+        }
+        this.byteBuff = ByteBufAllocator.DEFAULT.buffer(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+        byteBuff.retain();
+        this.record = new GenericRecord<DCPRequest>();
+    }
+
+    private String generateKey() {
+        final short vbucket = assigned.get(counter % assigned.size());
+        final String next = vbucket + "-" + counter;
+        counter++;
+        if ((counter % deleteCycle) == 0) {
+            nextDeleteKey = next;
+            nextDeletePartition = vbucket;
+        }
+        if ((counter % upsertCycle) == 3) {
+            nextUpsertKey = next;
+            nextUpsertPartition = vbucket;
+        }
+        return next;
+    }
+
+    @Override
+    public void close() throws IOException {
+        stop();
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return !stopped;
+    }
+
+    @Override
+    public IRawRecord<DCPRequest> next() throws IOException, InterruptedException {
+        if (stopped) {
+            return null;
+        }
+        try {
+            final DCPRequest dcpRequest = generateNextDCPMessage();
+            record.set(dcpRequest);
+            if (counter >= numberOfMutations) {
+                stop();
+            }
+        } catch (final Throwable th) {
+            LOGGER.error(th.getMessage(), th);
+        }
+        return record;
+    }
+
+    private DCPRequest generateNextDCPMessage() {
+        if ((counter % deleteCycle) == (deleteCycle - 1)) {
+            if (nextDeleteKey != null) {
+                final String key = nextDeleteKey;
+                nextDeleteKey = null;
+                return new RemoveMessage(nextDeletePartition, key, cas++, seq++, 0L, bucket);
+            }
+        }
+        generateNextDocument();
+        if ((counter % upsertCycle) == (upsertCycle - 1)) {
+            if (nextUpsertKey != null) {
+                final String key = nextUpsertKey;
+                nextUpsertKey = null;
+                upsertCounter++;
+                return new MutationMessage(nextUpsertPartition, key, byteBuff, expiration++, seq++, 0, 0, lockTime++,
+                        cas++, bucket);
+            }
+        }
+        return new MutationMessage(assigned.get(counter % assigned.size()), generateKey(), byteBuff, expiration++,
+                seq++, 0, 0, lockTime++, cas++, bucket);
+    }
+
+    private void generateNextDocument() {
+        // reset the string
+        strBuilder.setLength(0);
+        strBuilder.append("{\"id\":" + (counter + upsertCounter) + ",\"name\":\""
+                + names[(counter + upsertCounter) % names.length] + "\",\"exp\":" + ((counter + upsertCounter) * 3)
+                + "}");
+        byteBuff.clear();
+        byteBuff.writeBytes(strBuilder.toString().getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Override
+    public boolean stop() {
+        if (!stopped) {
+            stopped = true;
+            byteBuff.release();
+        }
+        return stopped;
+    }
+
+    @Override
+    public void setController(final IDataFlowController controller) {
+    }
+
+    @Override
+    public void setFeedLogManager(final FeedLogManager feedLogManager) {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
new file mode 100644
index 0000000..8242554
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.kv;
+
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import com.couchbase.client.core.message.dcp.DCPRequest;
+
+public class KVTestReaderFactory implements IRecordReaderFactory<DCPRequest> {
+
+    private static final long serialVersionUID = 1L;
+    private final String bucket = "TestBucket";
+    private final int numOfVBuckets = 1024;
+    private final int[] schedule = new int[numOfVBuckets];
+    private int numOfRecords = 1000; // default = 1 Million
+    private int deleteCycle = 0;
+    private int upsertCycle = 0;
+    private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        clusterLocations = AsterixClusterProperties.INSTANCE.getClusterLocations();
+        return clusterLocations;
+    }
+
+    @Override
+    public void configure(final Map<String, String> configuration) {
+        if (configuration.containsKey("num-of-records")) {
+            numOfRecords = Integer.parseInt(configuration.get("num-of-records"));
+        }
+        final int numOfReaders = getPartitionConstraint().getLocations().length;
+        for (int i = 0; i < numOfVBuckets; i++) {
+            schedule[i] = i % numOfReaders;
+        }
+
+        if (configuration.containsKey("delete-cycle")) {
+            deleteCycle = Integer.parseInt(configuration.get("delete-cycle"));
+        }
+
+        if (configuration.containsKey("upsert-cycle")) {
+            upsertCycle = Integer.parseInt(configuration.get("upsert-cycle"));
+        }
+    }
+
+    @Override
+    public IRecordReader<? extends DCPRequest> createRecordReader(final IHyracksTaskContext ctx, final int partition) {
+        return new KVTestReader(partition, bucket, schedule,
+                (int) Math.ceil((double) numOfRecords / (double) getPartitionConstraint().getLocations().length),
+                deleteCycle, upsertCycle);
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return DCPRequest.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
index 1af8695..a78f780 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
@@ -23,7 +23,6 @@ import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Queue;
 
 import org.apache.asterix.external.api.IDataFlowController;
@@ -74,10 +73,6 @@ public class RSSRecordReader implements IRecordReader<SyndEntryImpl> {
     }
 
     @Override
-    public void configure(Map<String, String> configurations) throws Exception {
-    }
-
-    @Override
     public boolean hasNext() throws Exception {
         return !done;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
index beceea8..f9eedd1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.input.record.reader.rss;
 
+import java.net.MalformedURLException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -28,14 +29,14 @@ import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 import com.sun.syndication.feed.synd.SyndEntryImpl;
 
 public class RSSRecordReaderFactory implements IRecordReaderFactory<SyndEntryImpl> {
 
     private static final long serialVersionUID = 1L;
-    private Map<String, String> configuration;
-    private List<String> urls = new ArrayList<String>();
+    private final List<String> urls = new ArrayList<String>();
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
 
     @Override
@@ -44,15 +45,14 @@ public class RSSRecordReaderFactory implements IRecordReaderFactory<SyndEntryImp
     }
 
     @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
         int count = urls.size();
         clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, count);
         return clusterLocations;
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        this.configuration = configuration;
+    public void configure(Map<String, String> configuration) {
         String url = configuration.get(ExternalDataConstants.KEY_RSS_URL);
         if (url == null) {
             throw new IllegalArgumentException("no RSS URL provided");
@@ -75,10 +75,12 @@ public class RSSRecordReaderFactory implements IRecordReaderFactory<SyndEntryImp
 
     @Override
     public IRecordReader<? extends SyndEntryImpl> createRecordReader(IHyracksTaskContext ctx, int partition)
-            throws Exception {
-        RSSRecordReader reader = new RSSRecordReader(urls.get(partition));
-        reader.configure(configuration);
-        return reader;
+            throws HyracksDataException {
+        try {
+            return new RSSRecordReader(urls.get(partition));
+        } catch (MalformedURLException e) {
+            throw new HyracksDataException(e);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
index 2d6d8ea..6a5d776 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
-import java.util.Map;
 
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.api.IExternalIndexer;
@@ -34,15 +33,22 @@ import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
 
 public abstract class AbstractStreamRecordReader implements IRecordReader<char[]>, IIndexingDatasource {
-    protected AInputStreamReader reader;
+    protected final AInputStreamReader reader;
     protected CharArrayRecord record;
     protected char[] inputBuffer;
     protected int bufferLength = 0;
     protected int bufferPosn = 0;
-    protected IExternalIndexer indexer;
+    protected final IExternalIndexer indexer;
     protected boolean done = false;
     protected FeedLogManager feedLogManager;
 
+    public AbstractStreamRecordReader(AInputStream inputStream, IExternalIndexer indexer) {
+        this.reader = new AInputStreamReader(inputStream);
+        this.indexer = indexer;
+        record = new CharArrayRecord();
+        inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
+    }
+
     @Override
     public IRawRecord<char[]> next() throws IOException {
         return record;
@@ -56,27 +62,12 @@ public abstract class AbstractStreamRecordReader implements IRecordReader<char[]
         done = true;
     }
 
-    public void setInputStream(AInputStream inputStream) throws IOException {
-        this.reader = new AInputStreamReader(inputStream);
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        record = new CharArrayRecord();
-        inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
-    }
-
     @Override
     public IExternalIndexer getIndexer() {
         return indexer;
     }
 
     @Override
-    public void setIndexer(IExternalIndexer indexer) {
-        this.indexer = indexer;
-    }
-
-    @Override
     public boolean stop() {
         try {
             reader.stop();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
index d02de03..12c0229 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
@@ -21,16 +21,19 @@ package org.apache.asterix.external.input.record.reader.stream;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IIndexibleExternalDataSource;
 import org.apache.asterix.external.api.IIndexingDatasource;
 import org.apache.asterix.external.api.IInputStreamProvider;
 import org.apache.asterix.external.api.IInputStreamProviderFactory;
-import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class AbstractStreamRecordReaderFactory<T>
         implements IRecordReaderFactory<T>, IIndexibleExternalDataSource {
@@ -51,26 +54,23 @@ public abstract class AbstractStreamRecordReaderFactory<T>
     }
 
     @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException {
         return inputStreamFactory.getPartitionConstraint();
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
+    public void configure(Map<String, String> configuration) throws AsterixException {
         this.configuration = configuration;
         inputStreamFactory.configure(configuration);
-        configureStreamReaderFactory(configuration);
     }
 
-    protected abstract void configureStreamReaderFactory(Map<String, String> configuration) throws Exception;
-
     @Override
     public boolean isIndexible() {
         return inputStreamFactory.isIndexible();
     }
 
     @Override
-    public void setSnapshot(List<ExternalFile> files, boolean indexingOp) throws Exception {
+    public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
         ((IIndexibleExternalDataSource) inputStreamFactory).setSnapshot(files, indexingOp);
     }
 
@@ -82,8 +82,8 @@ public abstract class AbstractStreamRecordReaderFactory<T>
         return false;
     }
 
-    protected IRecordReader<char[]> configureReader(AbstractStreamRecordReader recordReader, IHyracksTaskContext ctx,
-            int partition) throws Exception {
+    protected Pair<AInputStream, IExternalIndexer> getStreamAndIndexer(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
         IInputStreamProvider inputStreamProvider = inputStreamFactory.createInputStreamProvider(ctx, partition);
         IExternalIndexer indexer = null;
         if (inputStreamFactory.isIndexible()) {
@@ -91,9 +91,6 @@ public abstract class AbstractStreamRecordReaderFactory<T>
                 indexer = ((IIndexingDatasource) inputStreamProvider).getIndexer();
             }
         }
-        recordReader.setInputStream(inputStreamProvider.getInputStream());
-        recordReader.setIndexer(indexer);
-        recordReader.configure(configuration);
-        return recordReader;
+        return new Pair<AInputStream, IExternalIndexer>(inputStreamProvider.getInputStream(), indexer);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
index ad2d90d..fb56062 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
@@ -20,14 +20,19 @@ package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
 
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 
 public class EmptyLineSeparatedRecordReader extends AbstractStreamRecordReader {
 
+    public EmptyLineSeparatedRecordReader(AInputStream inputStream, IExternalIndexer indexer) {
+        super(inputStream, indexer);
+    }
+
     private boolean prevCharCR;
     private boolean prevCharLF;
     private int newlineLength;
-    private int recordNumber = 0;
     private int readLength;
 
     @Override
@@ -53,7 +58,6 @@ public class EmptyLineSeparatedRecordReader extends AbstractStreamRecordReader {
                 if (bufferLength <= 0) {
                     if (readLength > 0) {
                         record.endRecord();
-                        recordNumber++;
                         return true;
                     }
                     close();
@@ -93,7 +97,6 @@ public class EmptyLineSeparatedRecordReader extends AbstractStreamRecordReader {
                 record.append(inputBuffer, startPosn, readLength);
             }
         } while (newlineLength < 2);
-        recordNumber++;
         record.endRecord();
         return true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java
index a1e8f31..75d16c5 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java
@@ -18,28 +18,26 @@
  */
 package org.apache.asterix.external.input.record.reader.stream;
 
-import java.util.Map;
-
+import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class EmptyLineSeparatedRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
 
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
-        EmptyLineSeparatedRecordReader recordReader = new EmptyLineSeparatedRecordReader();
-        return configureReader(recordReader, ctx, partition);
+    public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        final Pair<AInputStream, IExternalIndexer> streamAndIndexer = getStreamAndIndexer(ctx, partition);
+        return new EmptyLineSeparatedRecordReader(streamAndIndexer.first, streamAndIndexer.second);
     }
 
     @Override
     public Class<? extends char[]> getRecordClass() {
         return char[].class;
     }
-
-    @Override
-    protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
-    }
-
 }


Mime
View raw message