asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "abdullah alamoudi (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: Update the Key Value Reader
Date Fri, 15 Apr 2016 13:01:37 GMT
abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/805

Change subject: Update the Key Value Reader
......................................................................

Update the Key Value Reader

Change-Id: I54f8a313c871428b2468b74c0760e8d28e810699
---
M asterixdb/asterix-external-data/pom.xml
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
4 files changed, 24 insertions(+), 35 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/05/805/1

diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 8c59cc4..3a47ba6 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -287,7 +287,7 @@
         <dependency>
             <groupId>com.couchbase.client</groupId>
             <artifactId>core-io</artifactId>
-            <version>1.2.3</version>
+            <version>1.3.0-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>io.reactivex</groupId>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
index 185aea0..3f5b531 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
@@ -30,10 +30,7 @@
 import org.apache.log4j.Logger;
 
 import com.couchbase.client.core.CouchbaseCore;
-import com.couchbase.client.core.dcp.BucketStreamAggregator;
-import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
-import com.couchbase.client.core.dcp.BucketStreamState;
-import com.couchbase.client.core.dcp.BucketStreamStateUpdatedEvent;
+import com.couchbase.client.core.endpoint.dcp.DCPConnection;
 import com.couchbase.client.core.env.DefaultCoreEnvironment;
 import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
 import com.couchbase.client.core.message.cluster.CloseBucketRequest;
@@ -41,27 +38,30 @@
 import com.couchbase.client.core.message.cluster.SeedNodesRequest;
 import com.couchbase.client.core.message.dcp.DCPRequest;
 import com.couchbase.client.core.message.dcp.MutationMessage;
+import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
+import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
 import com.couchbase.client.core.message.dcp.RemoveMessage;
 import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
 
+import rx.Observable;
 import rx.functions.Action1;
 
 public class KVReader implements IRecordReader<DCPRequest> {
 
     private static final Logger LOGGER = Logger.getLogger(KVReader.class);
-    private static final MutationMessage POISON_PILL = new MutationMessage((short) 0, null,
null, 0, 0L, 0L, 0, 0, 0L,
-            null);
+    private static final MutationMessage POISON_PILL = new MutationMessage(0, (short) 0,
null, null, 0, 0L, 0L, 0, 0,
+            0L, null);
     private final String feedName;
     private final short[] vbuckets;
     private final String bucket;
     private final String password;
     private final String[] sourceNodes;
     private final Builder builder;
-    private final BucketStreamAggregator bucketStreamAggregator;
     private final CouchbaseCore core;
     private final DefaultCoreEnvironment env;
     private final GenericRecord<DCPRequest> record;
     private final ArrayBlockingQueue<DCPRequest> messages;
+    private final DCPConnection connection;
     private AbstractFeedDataFlowController controller;
     private Thread pushThread;
     private boolean done = false;
@@ -78,7 +78,8 @@
                 .autoreleaseAfter(KVReaderFactory.AUTO_RELEASE_AFTER_MILLISECONDS);
         this.env = builder.build();
         this.core = new CouchbaseCore(env);
-        this.bucketStreamAggregator = new BucketStreamAggregator(feedName, core, bucket);
+        connection = core.<OpenConnectionResponse> send(new OpenConnectionRequest(feedName,
bucket)).toBlocking()
+                .single().connection();
         this.record = new GenericRecord<>();
         connect();
     }
@@ -98,36 +99,24 @@
         this.pushThread = new Thread(new Runnable() {
             @Override
             public void run() {
-                KVReader.this.run(bucketStreamAggregator);
+                KVReader.this.run(connection);
             }
         }, feedName);
         pushThread.start();
     }
 
-    private void run(BucketStreamAggregator bucketStreamAggregator) {
-        BucketStreamAggregatorState state = new BucketStreamAggregatorState();
-        for (int i = 0; i < vbuckets.length; i++) {
-            state.put(new BucketStreamState(vbuckets[i], 0, 0, 0xffffffff, 0, 0xffffffff));
-        }
-        state.updates().subscribe(new Action1<BucketStreamStateUpdatedEvent>() {
-            @Override
-            public void call(BucketStreamStateUpdatedEvent event) {
-                if (event.partialUpdate()) {
-                } else {
-                }
-            }
-        });
+    private void run(DCPConnection connection) {
         try {
-            bucketStreamAggregator.feed(state).toBlocking().forEach(new Action1<DCPRequest>()
{
+            for (int i = 0; i < vbuckets.length; i++) {
+                connection.addStream(vbuckets[i]).toBlocking().single();
+            }
+            connection.subject().takeUntil(Observable.never()).toBlocking().forEach(new Action1<DCPRequest>()
{
                 @Override
                 public void call(DCPRequest dcpRequest) {
                     try {
                         if (dcpRequest instanceof SnapshotMarkerMessage) {
                             SnapshotMarkerMessage message = (SnapshotMarkerMessage) dcpRequest;
-                            BucketStreamState oldState = state.get(message.partition());
-                            state.put(new BucketStreamState(message.partition(), oldState.vbucketUUID(),
-                                    message.endSequenceNumber(), oldState.endSequenceNumber(),
-                                    message.endSequenceNumber(), oldState.snapshotEndSequenceNumber()));
+                            LOGGER.info("snapshot DCP message received: " + message);
                         } else if ((dcpRequest instanceof MutationMessage) || (dcpRequest
instanceof RemoveMessage)) {
                             messages.put(dcpRequest);
                         } else {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
index bc2a980..6c85b20 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
@@ -138,8 +138,8 @@
         for (int i = 0; i < vbuckets.length; i++) {
             vbuckets[i] = listOfAssignedVBuckets.get(i);
         }
-        return new KVReader(feedName + ":" + nodeName + ":" + partition, bucket, password,
couchbaseNodes,
-                vbuckets, ExternalDataUtils.getQueueSize(configuration));
+        return new KVReader(feedName + ":" + nodeName + ":" + partition, bucket, password,
couchbaseNodes, vbuckets,
+                ExternalDataUtils.getQueueSize(configuration));
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
index b75f26c..43132b6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
@@ -132,7 +132,7 @@
             if (nextDeleteKey != null) {
                 final String key = nextDeleteKey;
                 nextDeleteKey = null;
-                return new RemoveMessage(nextDeletePartition, key, cas++, seq++, 0L, bucket);
+                return new RemoveMessage(0, nextDeletePartition, key, cas++, seq++, 0L, bucket);
             }
         }
         generateNextDocument();
@@ -141,12 +141,12 @@
                 final String key = nextUpsertKey;
                 nextUpsertKey = null;
                 upsertCounter++;
-                return new MutationMessage(nextUpsertPartition, key, byteBuff, expiration++,
seq++, 0, 0, lockTime++,
-                        cas++, bucket);
+                return new MutationMessage(byteBuff.readableBytes(), nextUpsertPartition,
key, byteBuff, expiration++,
+                        seq++, 0, 0, lockTime++, cas++, bucket);
             }
         }
-        return new MutationMessage(assigned.get(counter % assigned.size()), generateKey(),
byteBuff, expiration++,
-                seq++, 0, 0, lockTime++, cas++, bucket);
+        return new MutationMessage(byteBuff.readableBytes(), assigned.get(counter % assigned.size()),
generateKey(),
+                byteBuff, expiration++, seq++, 0, 0, lockTime++, cas++, bucket);
     }
 
     private void generateNextDocument() {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/805
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I54f8a313c871428b2468b74c0760e8d28e810699
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message