asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xikui Wang (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: This patch includes following changes:
Date Fri, 15 Jul 2016 22:10:42 GMT
Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1002

Change subject: This patch includes following changes:
......................................................................

This patch includes following changes:

1. ExtendedTweetParser to parse more than fix attributes.

2. Changed the twitter feeds message unit from Status to String.

3. Fixed bug ASTERIXDB-1471.

4. Fixed bug ASTERIXDB-1352.

Change-Id: I7021e7b779de05b9ec999a8d5f8464fb0ab413c0
---
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
M asterixdb/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
12 files changed, 408 insertions(+), 149 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/02/1002/1

diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
index e31325a..80e716d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -28,13 +28,9 @@
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-import twitter4j.Query;
-import twitter4j.QueryResult;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
+import twitter4j.*;
 
-public class TwitterPullRecordReader implements IRecordReader<Status> {
+public class TwitterPullRecordReader implements IRecordReader<String> {
 
     private Query query;
     private Twitter twitter;
@@ -42,14 +38,14 @@
     private QueryResult result;
     private int nextTweetIndex = 0;
     private long lastTweetIdReceived = 0;
-    private GenericRecord<Status> record;
+    private GenericRecord<String> record;
 
     public TwitterPullRecordReader(Twitter twitter, String keywords, int requestInterval)
{
         this.twitter = twitter;
         this.requestInterval = requestInterval;
         this.query = new Query(keywords);
         this.query.setCount(100);
-        this.record = new GenericRecord<Status>();
+        this.record = new GenericRecord<>();
     }
 
     @Override
@@ -62,7 +58,7 @@
     }
 
     @Override
-    public IRawRecord<Status> next() throws IOException, InterruptedException {
+    public IRawRecord<String> next() throws IOException, InterruptedException {
         if (result == null || nextTweetIndex >= result.getTweets().size()) {
             Thread.sleep(1000 * requestInterval);
             query.setSinceId(lastTweetIdReceived);
@@ -79,7 +75,8 @@
             if (lastTweetIdReceived < tweet.getId()) {
                 lastTweetIdReceived = tweet.getId();
             }
-            record.set(tweet);
+            String jsonTweet = TwitterObjectFactory.getRawJSON(tweet); // transform tweet
obj to json
+            record.set(jsonTweet);
             return record;
         } else {
             return null;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index f04cdb9..02c3963 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -27,30 +27,25 @@
 import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.asterix.external.util.FeedLogManager;
 
-import twitter4j.FilterQuery;
-import twitter4j.StallWarning;
-import twitter4j.Status;
-import twitter4j.StatusDeletionNotice;
-import twitter4j.StatusListener;
-import twitter4j.TwitterStream;
+import twitter4j.*;
 
-public class TwitterPushRecordReader implements IRecordReader<Status> {
-    private LinkedBlockingQueue<Status> inputQ;
+public class TwitterPushRecordReader implements IRecordReader<String> {
+    private LinkedBlockingQueue<String> inputQ;
     private TwitterStream twitterStream;
-    private GenericRecord<Status> record;
+    private GenericRecord<String> record;
     private boolean closed = false;
 
     public TwitterPushRecordReader(TwitterStream twitterStream, FilterQuery query) {
-        record = new GenericRecord<Status>();
-        inputQ = new LinkedBlockingQueue<Status>();
+        record = new GenericRecord<>();
+        inputQ = new LinkedBlockingQueue<>();
         this.twitterStream = twitterStream;//TwitterUtil.getTwitterStream(configuration);
         this.twitterStream.addListener(new TweetListener(inputQ));
         this.twitterStream.filter(query);
     }
 
     public TwitterPushRecordReader(TwitterStream twitterStream) {
-        record = new GenericRecord<Status>();
-        inputQ = new LinkedBlockingQueue<Status>();
+        record = new GenericRecord<>();
+        inputQ = new LinkedBlockingQueue<>();
         this.twitterStream = twitterStream;//
         this.twitterStream.addListener(new TweetListener(inputQ));
         twitterStream.sample();
@@ -72,8 +67,8 @@
     }
 
     @Override
-    public IRawRecord<Status> next() throws IOException, InterruptedException {
-        Status tweet = inputQ.poll();
+    public IRawRecord<String> next() throws IOException, InterruptedException {
+        String tweet = inputQ.poll();
         if (tweet == null) {
             return null;
         }
@@ -93,15 +88,16 @@
 
     private class TweetListener implements StatusListener {
 
-        private LinkedBlockingQueue<Status> inputQ;
+        private LinkedBlockingQueue<String> inputQ;
 
-        public TweetListener(LinkedBlockingQueue<Status> inputQ) {
+        public TweetListener(LinkedBlockingQueue<String> inputQ) {
             this.inputQ = inputQ;
         }
 
         @Override
         public void onStatus(Status tweet) {
-            inputQ.add(tweet);
+            String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
+            inputQ.add(jsonTweet);
         }
 
         @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 541737a..172b22b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -37,7 +37,7 @@
 import twitter4j.FilterQuery;
 import twitter4j.Status;
 
-public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status> {
+public class TwitterRecordReaderFactory implements IRecordReaderFactory<String> {
 
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = Logger.getLogger(TwitterRecordReaderFactory.class.getName());
@@ -114,7 +114,7 @@
     }
 
     @Override
-    public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx,
int partition)
+    public IRecordReader<? extends String> createRecordReader(IHyracksTaskContext ctx,
int partition)
             throws HyracksDataException {
         if (pull) {
             return new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
@@ -133,8 +133,8 @@
     }
 
     @Override
-    public Class<? extends Status> getRecordClass() {
-        return Status.class;
+    public Class<? extends String> getRecordClass() {
+        return String.class;
     }
 
     private boolean validateConfiguration(Map<String, String> configuration) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
index 5923354..e686b97 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
@@ -70,23 +70,12 @@
 import org.apache.asterix.external.library.java.JObjects.JString;
 import org.apache.asterix.external.library.java.JObjects.JTime;
 import org.apache.asterix.external.library.java.JObjects.JUnorderedList;
-import org.apache.asterix.om.base.ACircle;
-import org.apache.asterix.om.base.ADuration;
-import org.apache.asterix.om.base.ALine;
-import org.apache.asterix.om.base.APoint;
-import org.apache.asterix.om.base.APoint3D;
-import org.apache.asterix.om.base.APolygon;
-import org.apache.asterix.om.base.ARectangle;
+import org.apache.asterix.om.base.*;
 import org.apache.asterix.om.pointables.AFlatValuePointable;
 import org.apache.asterix.om.pointables.AListVisitablePointable;
 import org.apache.asterix.om.pointables.ARecordVisitablePointable;
 import org.apache.asterix.om.pointables.base.IVisitablePointable;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AbstractCollectionType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.EnumDeserializer;
-import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.*;
 import org.apache.asterix.om.util.container.IObjectPool;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.util.string.UTF8StringReader;
@@ -471,9 +460,14 @@
                 for (IVisitablePointable fieldPointable : fieldPointables) {
                     closedPart = index < recordType.getFieldTypes().length;
                     IVisitablePointable tt = fieldTypeTags.get(index);
-                    IAType fieldType = closedPart ? recordType.getFieldTypes()[index] : null;
                     ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
                             .deserialize(tt.getByteArray()[tt.getStartOffset()]);
+                    IAType fieldType = null;
+                    if(closedPart){
+                        fieldType = recordType.getFieldTypes()[index];
+                    }
+                    else
+                        fieldType = ATypeMachine(typeTag);
                     IVisitablePointable fieldName = fieldNames.get(index);
                     typeInfo.reset(fieldType, typeTag);
                     switch (typeTag) {
@@ -539,12 +533,13 @@
             IJObject listItem = null;
             int index = 0;
             try {
-
                 for (IVisitablePointable itemPointable : items) {
                     IVisitablePointable itemTagPointable = itemTags.get(index);
                     ATypeTag itemTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
                             .deserialize(itemTagPointable.getByteArray()[itemTagPointable.getStartOffset()]);
-                    typeInfo.reset(listType.getType(), listType.getTypeTag());
+                    IAType fieldType = null;
+                    fieldType = ATypeMachine(itemTypeTag);
+                    typeInfo.reset(fieldType, itemTypeTag);
                     switch (itemTypeTag) {
                         case RECORD:
                             listItem = pointableVisitor.visit((ARecordVisitablePointable)
itemPointable, typeInfo);
@@ -557,10 +552,7 @@
                             throw new IllegalArgumentException(
                                     "Cannot parse list item of type " + listType.getTypeTag());
                         default:
-                            IAType itemType = ((AbstractCollectionType) listType).getItemType();
-                            typeInfo.reset(itemType, itemType.getTypeTag());
                             listItem = pointableVisitor.visit((AFlatValuePointable) itemPointable,
typeInfo);
-
                     }
                     list.add(listItem);
                 }
@@ -580,4 +572,65 @@
         }
 
     }
+
+    public static IAType ATypeMachine(ATypeTag typeTag){
+        IAType aType = null;
+        switch (typeTag){
+            case BOOLEAN:
+                aType = BuiltinType.ABOOLEAN;
+                break;
+            case INT8:
+                aType = BuiltinType.AINT8;
+                break;
+            case INT16:
+                aType = BuiltinType.AINT16;
+                break;
+            case INT32:
+                aType = BuiltinType.AINT32;
+                break;
+            case INT64:
+                aType = BuiltinType.AINT64;
+                break;
+            case FLOAT:
+                aType = BuiltinType.AFLOAT;
+                break;
+            case DOUBLE:
+                aType = BuiltinType.ADOUBLE;
+                break;
+            case STRING:
+                aType = BuiltinType.ASTRING;
+                break;
+            case POINT:
+                aType = BuiltinType.APOINT;
+                break;
+            case POINT3D:
+                aType = BuiltinType.APOINT3D;
+                break;
+            case LINE:
+                aType = BuiltinType.ALINE;
+                break;
+            case DATE:
+                aType = BuiltinType.ADATE;
+                break;
+            case DATETIME:
+                aType = BuiltinType.ADATETIME;
+                break;
+            case DURATION:
+                aType = BuiltinType.ADURATION;
+                break;
+            case RECORD:
+                aType = ARecordType.FULLY_OPEN_RECORD_TYPE;
+                break;
+            case UNORDEREDLIST:
+                aType = AUnorderedListType.FULLY_OPEN_UNORDEREDLIST_TYPE;
+                break;
+            case ORDEREDLIST:
+                aType = AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE;
+                break;
+            default:
+                aType = BuiltinType.ANY;
+                break;
+        }
+        return aType;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
index 522da06..fe3ec65 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
@@ -18,111 +18,234 @@
  */
 package org.apache.asterix.external.parser;
 
-import java.io.DataOutput;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.builders.*;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
-import org.apache.asterix.external.library.java.JObjectUtil;
-import org.apache.asterix.external.util.Datatypes.Tweet;
-import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableRecord;
+import org.apache.asterix.om.base.AMutablePoint;
 import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.base.ANull;
+import org.apache.asterix.om.types.*;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.storage.am.common.ophelpers.LongArrayList;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+import org.eclipse.jetty.util.ajax.JSON;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
 
-import twitter4j.Status;
-import twitter4j.User;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 
-public class TweetParser implements IRecordDataParser<Status> {
+import static org.apache.asterix.om.types.ATypeTag.*;
 
-    private IAObject[] mutableTweetFields;
-    private IAObject[] mutableUserFields;
-    private AMutableRecord mutableRecord;
-    private AMutableRecord mutableUser;
-    private final Map<String, Integer> userFieldNameMap = new HashMap<>();
-    private final Map<String, Integer> tweetFieldNameMap = new HashMap<>();
-    private RecordBuilder recordBuilder = new RecordBuilder();
+public class TweetParser extends AbstractDataParser implements IRecordDataParser<String>
{
+    private final IObjectPool<IARecordBuilder, ATypeTag> recordBuilderPool = new ListObjectPool<IARecordBuilder,
ATypeTag>(
+            new RecordBuilderFactory());
+    private final IObjectPool<IAsterixListBuilder, ATypeTag> listBuilderPool = new
ListObjectPool<IAsterixListBuilder, ATypeTag>(
+            new ListBuilderFactory());
+    private final IObjectPool<IMutableValueStorage, ATypeTag> abvsBuilderPool = new
ListObjectPool<IMutableValueStorage, ATypeTag>(
+            new AbvsBuilderFactory());
+    private ARecordType recordType;
+    private UTF8StringWriter utf8Writer = new UTF8StringWriter();
 
     public TweetParser(ARecordType recordType) {
-        initFieldNames(recordType);
-        mutableUserFields = new IAObject[] { new AMutableString(null), new AMutableString(null),
new AMutableInt32(0),
-                new AMutableInt32(0), new AMutableString(null), new AMutableInt32(0) };
-        mutableUser = new AMutableRecord((ARecordType) recordType.getFieldTypes()[tweetFieldNameMap.get(Tweet.USER)],
-                mutableUserFields);
-
-        mutableTweetFields = new IAObject[] { new AMutableString(null), mutableUser, new
AMutableDouble(0),
-                new AMutableDouble(0), new AMutableString(null), new AMutableString(null)
};
-        mutableRecord = new AMutableRecord(recordType, mutableTweetFields);
+        this.recordType = recordType;
     }
 
-    // Initialize the hashmap values for the field names and positions
-    private void initFieldNames(ARecordType recordType) {
-        String tweetFields[] = recordType.getFieldNames();
-        for (int i = 0; i < tweetFields.length; i++) {
-            tweetFieldNameMap.put(tweetFields[i], i);
-            if (tweetFields[i].equals(Tweet.USER)) {
-                IAType fieldType = recordType.getFieldTypes()[i];
-                if (fieldType.getTypeTag() == ATypeTag.RECORD) {
-                    String userFields[] = ((ARecordType) fieldType).getFieldNames();
-                    for (int j = 0; j < userFields.length; j++) {
-                        userFieldNameMap.put(userFields[j], j);
-                    }
-                }
+    private void parseUnorderedList(JSONArray jArray, DataOutput output) throws IOException,
JSONException {
+        ArrayBackedValueStorage itemBuffer = getTempBuffer();
+        UnorderedListBuilder unorderedListBuilder = (UnorderedListBuilder) getUnorderedListBuilder();
 
+        unorderedListBuilder.reset(null);
+        for (int iter1 = 0; iter1 < jArray.length(); iter1++) {
+            itemBuffer.reset();
+            if(writeField(jArray.get(iter1),null,itemBuffer.getDataOutput()))
+                unorderedListBuilder.addItem(itemBuffer);
+        }
+        unorderedListBuilder.write(output, true);
+    }
+
+    private boolean writeField(Object fieldObj, IAType fieldType, DataOutput out)
+            throws IOException, JSONException {
+        // save fieldType for closed type check
+        String nstt;
+        boolean writeResult = true;
+        if(fieldType!=null){
+            if(fieldType.getTypeTag() == BuiltinType.ASTRING.getTypeTag()){
+                out.write(BuiltinType.ASTRING.getTypeTag().serialize());
+                utf8Writer.writeUTF8(fieldObj.toString(), out);
+            }
+            else if (fieldType.getTypeTag() == INT64){
+                aInt64.setValue((long) fieldObj);
+                int64Serde.serialize(aInt64, out);
+            }
+            else if(fieldType.getTypeTag() == INT32){
+                out.write(BuiltinType.AINT32.getTypeTag().serialize());
+                out.writeInt((Integer) fieldObj);
+            }
+            else if(fieldType.getTypeTag() == DOUBLE){
+                out.write(BuiltinType.ADOUBLE.getTypeTag().serialize());
+                out.writeDouble((Double) fieldObj);
+            }
+            else if(fieldType.getTypeTag() == BOOLEAN){
+                out.write(BuiltinType.ABOOLEAN.getTypeTag().serialize());
+                out.writeBoolean((Boolean) fieldObj);
+            }
+            else if(fieldType.getTypeTag() ==  RECORD){
+                writeRecord((JSONObject)fieldObj, out,(ARecordType) fieldType);
+            }
+            else
+                writeResult = false;
+        }
+        else {
+            try {
+                if (fieldObj == JSONObject.NULL) {
+                    nullSerde.serialize(ANull.NULL, out);
+                } else if (fieldObj instanceof Integer) {
+                    out.write(BuiltinType.AINT32.getTypeTag().serialize());
+                    out.writeInt((Integer) fieldObj);
+                } else if (fieldObj instanceof Boolean) {
+                    out.write(BuiltinType.ABOOLEAN.getTypeTag().serialize());
+                    out.writeBoolean((Boolean) fieldObj);
+                } else if (fieldObj instanceof Double) {
+                    out.write(BuiltinType.ADOUBLE.getTypeTag().serialize());
+                    out.writeDouble((Double) fieldObj);
+                } else if (fieldObj instanceof Long) {
+                    out.write(BuiltinType.AINT64.getTypeTag().serialize());
+                    out.writeLong((Long) fieldObj);
+                } else if (fieldObj instanceof String) {
+                    out.write(BuiltinType.ASTRING.getTypeTag().serialize());
+                    utf8Writer.writeUTF8((String) fieldObj, out);
+                } else if (fieldObj instanceof JSONArray) {
+                    if (((JSONArray) fieldObj).length() != 0)
+                        parseUnorderedList((JSONArray) fieldObj, out);
+                    else
+                        writeResult = false;
+                } else if (fieldObj instanceof JSONObject) {
+                    if (((JSONObject) fieldObj).length() != 0)
+                        writeRecord((JSONObject) fieldObj, out, null);
+                    else
+                        writeResult = false;
+                }
+            } catch (JSONException e) {
+                writeResult = false;
             }
         }
+        return writeResult;
     }
 
+    private int checkAttrNameIdx(String[] nameList, String name){
+        int idx = 0;
+        if(nameList!=null)
+            for(String nln :nameList){
+                if(name.equals(nln))
+                    return idx;
+                idx++;
+            }
+        return -1;
+    }
+
+    public void writeRecord(JSONObject obj, DataOutput out, ARecordType curRecType) throws
IOException, JSONException {
+        IAType[] curTypes = null;
+        String[] curFNames = null;
+
+        ArrayBackedValueStorage fieldValueBuffer = getTempBuffer();
+        ArrayBackedValueStorage fieldNameBuffer = getTempBuffer();
+        IARecordBuilder recBuilder = getRecordBuilder();
+
+        int fieldN;
+        int attrIdx;
+
+        if (curRecType != null) {
+            curTypes = curRecType.getFieldTypes();
+            curFNames = curRecType.getFieldNames();
+        }
+
+        recBuilder.reset(curRecType);
+        recBuilder.init();
+
+        if(curRecType!=null && !curRecType.isOpen()){
+            // closed record type
+            fieldN = curFNames.length;
+            for (int iter1 = 0; iter1 < fieldN; iter1++) {
+                fieldValueBuffer.reset();
+                DataOutput fieldOutput = fieldValueBuffer.getDataOutput();
+                if(obj.isNull(curFNames[iter1])){
+                    if(curRecType.isClosedField(curFNames[iter1]))
+                        throw new HyracksDataException("Closed field "+curFNames[iter1]+"
has null value.");
+                    else
+                        continue;
+                }
+                else {
+                 if (writeField(obj.get(curFNames[iter1]), curTypes[iter1], fieldOutput))
+                    recBuilder.addField(iter1, fieldValueBuffer);
+                }
+            }
+        } else{
+            //open record type
+            int closedFieldCount = 0;
+            IAType curFieldType = null;
+            for (String attrName : JSONObject.getNames(obj)){
+                if(obj.isNull(attrName)||obj.length()==0) continue;
+                attrIdx = checkAttrNameIdx(curFNames, attrName);
+                if(curRecType!=null)
+                    curFieldType = curRecType.getFieldType(attrName);
+                fieldValueBuffer.reset();
+                fieldNameBuffer.reset();
+                DataOutput fieldOutput = fieldValueBuffer.getDataOutput();
+                if (writeField(obj.get(attrName), curFieldType, fieldOutput)) {
+                    if(attrIdx == -1){
+                        aString.setValue(attrName);
+                        stringSerde.serialize(aString, fieldNameBuffer.getDataOutput());
+                        recBuilder.addField(fieldNameBuffer, fieldValueBuffer);
+                    }
+                    else {
+                        recBuilder.addField(attrIdx, fieldValueBuffer);
+                        closedFieldCount++;
+                    }
+                }
+            }
+            if(curRecType!=null && closedFieldCount<curFNames.length)
+                throw new HyracksDataException("Non-null field is null");
+        }
+        recBuilder.write(out, true);
+    }
+
+
+    private IARecordBuilder getRecordBuilder() {
+        return recordBuilderPool.allocate(ATypeTag.RECORD);
+    }
+
+    private IAsterixListBuilder getUnorderedListBuilder() {
+        return listBuilderPool.allocate(ATypeTag.UNORDEREDLIST);
+    }
+
+    private ArrayBackedValueStorage getTempBuffer() {
+        return (ArrayBackedValueStorage) abvsBuilderPool.allocate(ATypeTag.BINARY);
+    }
+
+
     @Override
-    public void parse(IRawRecord<? extends Status> record, DataOutput out) throws HyracksDataException
{
-        Status tweet = record.get();
-        User user = tweet.getUser();
-        // Tweet user data
-        ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.SCREEN_NAME)])
-                .setValue(JObjectUtil.getNormalizedString(user.getScreenName()));
-        ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.LANGUAGE)])
-                .setValue(JObjectUtil.getNormalizedString(user.getLang()));
-        ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FRIENDS_COUNT)]).setValue(user.getFriendsCount());
-        ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.STATUS_COUNT)]).setValue(user.getStatusesCount());
-        ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.NAME)])
-                .setValue(JObjectUtil.getNormalizedString(user.getName()));
-        ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FOLLOWERS_COUNT)])
-                .setValue(user.getFollowersCount());
-
-        // Tweet data
-        ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.ID)]).setValue(String.valueOf(tweet.getId()));
-
-        int userPos = tweetFieldNameMap.get(Tweet.USER);
-        for (int i = 0; i < mutableUserFields.length; i++) {
-            ((AMutableRecord) mutableTweetFields[userPos]).setValueAtPos(i, mutableUserFields[i]);
+    public void parse(IRawRecord<? extends String> record, DataOutput out) throws HyracksDataException
{
+        try {
+            //TODO get rid of this temporary json
+            resetPools();
+            JSONObject jsObj = new JSONObject(record.get());
+            writeRecord(jsObj, out, recordType);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
         }
-        if (tweet.getGeoLocation() != null) {
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)])
-                    .setValue(tweet.getGeoLocation().getLatitude());
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)])
-                    .setValue(tweet.getGeoLocation().getLongitude());
-        } else {
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(0);
-            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(0);
-        }
-        ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.CREATED_AT)])
-                .setValue(JObjectUtil.getNormalizedString(tweet.getCreatedAt().toString()));
-        ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.MESSAGE)])
-                .setValue(JObjectUtil.getNormalizedString(tweet.getText()));
+    }
 
-        for (int i = 0; i < mutableTweetFields.length; i++) {
-            mutableRecord.setValueAtPos(i, mutableTweetFields[i]);
-        }
-        recordBuilder.reset(mutableRecord.getType());
-        recordBuilder.init();
-        IDataParser.writeRecord(mutableRecord, out, recordBuilder);
+    private void resetPools() {
+        listBuilderPool.reset();
+        recordBuilderPool.reset();
+        abvsBuilderPool.reset();
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
index d6e536d..7e0bee5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
@@ -28,7 +28,7 @@
 
 import twitter4j.Status;
 
-public class TweetParserFactory implements IRecordDataParserFactory<Status> {
+public class TweetParserFactory implements IRecordDataParserFactory<String> {
 
     private static final long serialVersionUID = 1L;
     private ARecordType recordType;
@@ -44,14 +44,14 @@
     }
 
     @Override
-    public IRecordDataParser<Status> createRecordParser(IHyracksTaskContext ctx) {
+    public IRecordDataParser<String> createRecordParser(IHyracksTaskContext ctx) {
         TweetParser dataParser = new TweetParser(recordType);
         return dataParser;
     }
 
     @Override
-    public Class<? extends Status> getRecordClass() {
-        return Status.class;
+    public Class<? extends String> getRecordClass() {
+        return String.class;
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
index e1a7911..42214c8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
@@ -42,21 +42,103 @@
     public static class Tweet {
         public static final String ID = "id";
         public static final String USER = "user";
-        public static final String LATITUDE = "latitude";
-        public static final String LONGITUDE = "longitude";
+        public static final String GEOLOCATION = "geo";
         public static final String CREATED_AT = "created_at";
-        public static final String MESSAGE = "message_text";
-
+        public static final String TEXT = "text";
         public static final String COUNTRY = "country";
+        public static final String PLACE = "place";
+        public static final String SOURCE = "source";
+        public static final String TRUNCATED = "truncated";
+        public static final String IN_REPLY_TO_STATUS_ID = "in_reply_to_status_id";
+        public static final String IN_REPLY_TO_USER_ID = "in_reply_to_user_id";
+        public static final String IN_REPLY_TO_SCREENNAME = "in_reply_to_screen_name";
+        public static final String FAVORITED = "favorited";
+        public static final String RETWEETED = "retweeted";
+        public static final String FAVORITE_COUNT = "favorite_count";
+        public static final String RETWEET_COUNT = "retweet_count";
+        public static final String CONTRIBUTORS = "contributors";
+        public static final String LANGUAGE = "lang";
+        public static final String FILTER_LEVEL = "filter_level";
+        public static final String TIMESTAMP_MS = "timestamp_ms";
+        public static final String IS_QUOTE_STATUS = "is_quote_status";
+        // in API but not int JSON
+//        public static final String SENSITIVE = "sensitive";
+//        public static final String RETWEETED_BY_ME = "retweeted_by_me";
+//        public static final String CURRENT_USER_RETWEET_ID = "current_user_retweet_id";
 
+        // consistency consider
+        public static final String MESSAGE = "text_message";
+        public static final String LATITUDE = "latitude";
+        public static final String LONGITUDE = "longititude";
         // User fields (for the sub record "user")
         public static final String SCREEN_NAME = "screen_name";
-        public static final String LANGUAGE = "language";
+        public static final String USER_PREFERRED_LANGUAGE = "user_preferred_language";
         public static final String FRIENDS_COUNT = "friends_count";
         public static final String STATUS_COUNT = "status_count";
         public static final String NAME = "name";
         public static final String FOLLOWERS_COUNT = "followers_count";
 
+    }
+
+    public static final class Tweet_Place{
+        public static final String ID = "id";
+        public static final String URL = "url";
+        public static final String PLACE_TYPE = "place_type";
+        public static final String NAME = "name";
+        public static final String FULL_NAME = "full_name";
+        public static final String COUNTRY_CODE = "country_code";
+        public static final String COUNTRY = "country";
+        public static final String BOUNDING_BOX = "bounding_box";
+        public static final String ATTRIBUTES = "attributes";
+    }
+
+    public static final class Tweet_User{
+        public static final String ID = "id";
+        public static final String NAME = "name";
+        public static final String SCREEN_NAME = "screen_name";
+        public static final String LOCATION = "location";
+        public static final String DESCRIPTION = "description";
+        public static final String CONTRIBUTORS_ENABLED = "contributors_enabled";
+        public static final String PROFILE_IMAGE_URL = "profile_image_url";
+        public static final String PROFILE_IMAGE_URL_HTTPS = "profile_image_url_https";
+        public static final String URL = "url";
+        public static final String PROTECTED = "protected";
+        public static final String FOLLOWERS_COUNT = "followers_count";
+        public static final String PROFILE_BACKGROUND_COLOR = "profile_background_color";
+        public static final String PROFILE_TEXT_COLOR = "profile_text_color";
+        public static final String PROFILE_LINK_COLOR = "profile_link_color";
+        public static final String PROFILE_SIDEBAR_FILL_COLOR = "profile_sidebar_fill_color";
+        public static final String PROFILE_SIDEBAR_BORDER_COLOR = "profile_sidebar_border_color";
+        public static final String PROFILE_USE_BACKGROUND_IMAGE = "profile_use_background_image";
+        public static final String DEFAULT_PROFILE = "default_profile";
+        public static final String DEFAULT_PROFILE_IMAGE = "default_profile_image";
+        public static final String FRIENDS_COUNT = "friends_count";
+        public static final String CREATED_AT = "CREATED_AT";
+        public static final String FAVOURITES_COUNT = "favourites_count";
+        public static final String UTC_OFFSET = "utc_offset";
+        public static final String TIME_ZONE = "time_zone";
+        public static final String PROFILE_BACKGROUND_IMAGE_URL = "profile_background_image_url";
+        public static final String PROFILE_BACKGROUND_IMAGE_URL_HTTPS = "profile_background_image_url_https";
+        public static final String PROFILE_BANNER_URL = "profile_banner_url";
+        public static final String LANG = "lang";
+        public static final String STATUSES_COUNT = "statuses_count";
+        public static final String GEO_ENABLED = "geo_enabled";
+        public static final String VERIFIED = "verified";
+        public static final String IS_TRANSLATOR = "is_translator";
+        public static final String LISTED_COUNT = "listed_count";
+        public static final String FOLLOW_REQUEST_SENT = "follow_request_sent";
+        // skip Entities, attrs in API but not in JSON is as below
+//        public static final String WITHHELD_IN_COUNTRIES = "withheld_in_countries";
+//        public static final String BIGGER_PROFILE_IMAGE_URL = "bigger_profile_image_url";
+//        public static final String MINI_PROFILE_IMAGE_URL = "mini_profile_image_url";
+//        public static final String ORIGINAL_PROFILE_IMAGE_URL = "original_profile_image_url";
+//        public static final String SHOW_ALL_INLINE_MEDIA = "show_all_inline_media";
+//        public static final String PROFILE_BANNER_RETINA_URL = "profile_banner_retina_url";
+//        public static final String PROFILE_BANNER_IPAD_URL = "profile_banner_ipad_url";
+//        public static final String PROFILE_BANNER_IPAD_RETINA_URL = "profile_banner_ipad_retina_url";
+//        public static final String PROFILE_BANNER_MOBILE_URL = "profile_banner_mobile_url";
+//        public static final String PROFILE_BANNER_MOBILE_RETINA_URL = "profile_banner_mobile_retina_url";
+//        public static final String PROFILE_BACKGROUND_TILED = "profile_background_tiled";
     }
 
     /*
@@ -77,4 +159,5 @@
         public static final String TOPICS = "topics";
     }
 
+
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 55dee04..0bc38df 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -221,7 +221,7 @@
     public static final String KEY_STREAM_SOURCE = "stream-source";
     public static final String EXTERNAL = "external";
     public static final String KEY_READER_FACTORY = "reader-factory";
-    public static final String READER_RSS = "rss";
+    public static final String READER_RSS = "rss_feed";
     public static final String FORMAT_CSV = "csv";
 
     public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
index 4fb602b..6aac9f1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
@@ -205,6 +205,7 @@
     private static ConfigurationBuilder getAuthConfiguration(Map<String, String> configuration)
{
         ConfigurationBuilder cb = new ConfigurationBuilder();
         cb.setDebugEnabled(true);
+        cb.setJSONStoreEnabled(true);
         String oAuthConsumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
         String oAuthConsumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
         String oAuthAccessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
diff --git a/asterixdb/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
b/asterixdb/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
index 60e6e89..8dcac13 100644
--- a/asterixdb/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
+++ b/asterixdb/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
@@ -85,7 +85,7 @@
         this.buffer = buffer;
         tokenBegin = bufpos = 0;
         containsEscapes = false;
-        line++;
+//        line++;
         tokenBegin = -1;
     }
 
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
index e16633e..a5af127 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.om.types;
 
+import org.apache.asterix.om.base.AOrderedList;
 import org.apache.asterix.om.base.IAObject;
 import org.json.JSONException;
 import org.json.JSONObject;
@@ -26,6 +27,8 @@
 
     private static final long serialVersionUID = 1L;
 
+    public static final AOrderedListType FULL_OPEN_ORDEREDLIST_TYPE = new AOrderedListType(null,"");
+
     /**
      * @param itemType
      *            if null, the list is untyped
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
index 80b13b5..febc6ad 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.om.types;
 
+import org.apache.asterix.om.base.AUnorderedList;
 import org.apache.asterix.om.base.IAObject;
 import org.json.JSONException;
 import org.json.JSONObject;
@@ -26,6 +27,8 @@
 
     private static final long serialVersionUID = 1L;
 
+    public static final AUnorderedListType FULLY_OPEN_UNORDEREDLIST_TYPE = new AUnorderedListType(null,"");
+
     /**
      * @param itemType
      *            if null, the collection is untyped

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1002
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7021e7b779de05b9ec999a8d5f8464fb0ab413c0
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xkkwww@gmail.com>


Mime
View raw message