asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "abdullah alamoudi (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: Skip Flush messages in KV reader
Date Sat, 26 Mar 2016 17:03:28 GMT
abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/758

Change subject: Skip Flush messages in KV reader
......................................................................

Skip Flush messages in KV reader

Change-Id: I0e1a714035551771fe8a583a9bbb4098511ff150
---
M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
1 file changed, 19 insertions(+), 5 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/58/758/1

diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
index 4e41357..7307727 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.input.record.reader.kv;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.ArrayBlockingQueue;
 
 import org.apache.asterix.external.api.IDataFlowController;
@@ -92,10 +93,10 @@
     }
 
     private void connect() {
-        core.send(new SeedNodesRequest(sourceNodes))
-                .timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT).toBlocking().single();
-        core.send(new OpenBucketRequest(bucket, password))
-                .timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT).toBlocking().single();
+        core.send(new SeedNodesRequest(sourceNodes)).timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT)
+                .toBlocking().single();
+        core.send(new OpenBucketRequest(bucket, password)).timeout(KVReaderFactory.TIMEOUT,
KVReaderFactory.TIME_UNIT)
+                .toBlocking().single();
         this.pushThread = new Thread(new Runnable() {
             @Override
             public void run() {
@@ -126,11 +127,24 @@
                         if (dcpRequest instanceof SnapshotMarkerMessage) {
                             SnapshotMarkerMessage message = (SnapshotMarkerMessage) dcpRequest;
                             BucketStreamState oldState = state.get(message.partition());
+                            /*
                             state.put(new BucketStreamState(message.partition(), oldState.vbucketUUID(),
                                     message.endSequenceNumber(), oldState.endSequenceNumber(),
                                     message.endSequenceNumber(), oldState.snapshotEndSequenceNumber()));
+                                    */
                         } else if ((dcpRequest instanceof MutationMessage) || (dcpRequest
instanceof RemoveMessage)) {
-                            messages.put(dcpRequest);
+                            String key = ((MutationMessage) dcpRequest).key();
+                            if (key.startsWith("__")) {
+                                String content = ((MutationMessage) dcpRequest).content()
+                                        .toString(StandardCharsets.UTF_8);
+                                if (content.startsWith("__")) {
+                                    LOGGER.warn("DCP Control Message Received: " + content);
+                                } else {
+                                    messages.put(dcpRequest);
+                                }
+                            } else {
+                                messages.put(dcpRequest);
+                            }
                         } else {
                             LOGGER.warn("Unknown type of DCP messages: " + dcpRequest);
                         }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/758
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I0e1a714035551771fe8a583a9bbb4098511ff150
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message