asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sjaco...@apache.org
Subject [01/24] incubator-asterixdb git commit: Introduces Feeds 2.0
Date Mon, 29 Jun 2015 19:44:58 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master c0c2c1bf4 -> ae85a1dc8


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index e0691a6..b8ef691 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -1,5 +1,5 @@
 /*
-x * Copyright 2009-2013 by The Regents of the University of California
+ * Copyright 2009-2013 by The Regents of the University of California
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
@@ -19,23 +19,24 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 
 /**
- * TPS can be configured between 1 and 20,000
- * 
- * @author ramang
+ * A simulator of the Twitter Firehose. Generates meaningful tweets
+ * at a configurable rate
  */
 public class TwitterFirehoseFeedAdapter extends StreamBasedAdapter implements IFeedAdapter
{
 
@@ -52,8 +53,8 @@ public class TwitterFirehoseFeedAdapter extends StreamBasedAdapter implements
IF
     private final TwitterServer twitterServer;
 
     public TwitterFirehoseFeedAdapter(Map<String, String> configuration, ITupleParserFactory
parserFactory,
-            ARecordType outputtype, int partition, IHyracksTaskContext ctx) throws Exception
{
-        super(parserFactory, outputtype, ctx);
+            ARecordType outputtype, IHyracksTaskContext ctx, int partition) throws Exception
{
+        super(parserFactory, outputtype, ctx, partition);
         this.twitterServer = new TwitterServer(configuration, partition, outputtype, outputStream,
executorService);
     }
 
@@ -68,7 +69,7 @@ public class TwitterFirehoseFeedAdapter extends StreamBasedAdapter implements
IF
         return inputStream;
     }
 
-    public static class TwitterServer {
+    private static class TwitterServer {
         private final DataProvider dataProvider;
         private final ExecutorService executorService;
 
@@ -88,7 +89,7 @@ public class TwitterFirehoseFeedAdapter extends StreamBasedAdapter implements
IF
 
     }
 
-    public static class DataProvider implements Runnable {
+    private static class DataProvider implements Runnable {
 
         public static final String KEY_MODE = "mode";
 
@@ -105,8 +106,8 @@ public class TwitterFirehoseFeedAdapter extends StreamBasedAdapter implements
IF
 
         public DataProvider(Map<String, String> configuration, ARecordType outputtype,
int partition, OutputStream os)
                 throws Exception {
-            this.tweetGenerator = new TweetGenerator(configuration, partition, TweetGenerator.OUTPUT_FORMAT_ADM_STRING,
-                    os);
+            this.tweetGenerator = new TweetGenerator(configuration, partition);
+            this.tweetGenerator.registerSubscriber(os);
             this.os = os;
             mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
                     : Mode.AGGRESSIVE;
@@ -130,30 +131,29 @@ public class TwitterFirehoseFeedAdapter extends StreamBasedAdapter implements
IF
             long startBatch;
             long endBatch;
 
-            try {
-                while (moreData && continuePush) {
-                    switch (mode) {
-                        case AGGRESSIVE:
-                            moreData = tweetGenerator.setNextRecordBatch(batchSize);
-                            break;
-                        case CONTROLLED:
-                            startBatch = System.currentTimeMillis();
-                            moreData = tweetGenerator.setNextRecordBatch(batchSize);
-                            endBatch = System.currentTimeMillis();
-                            if (endBatch - startBatch < 1000) {
-                                Thread.sleep(1000 - (endBatch - startBatch));
-                            } else {
-                                if (LOGGER.isLoggable(Level.WARNING)) {
-                                    LOGGER.warning("Unable to reach the required tps of "
+ batchSize);
+            while (true) {
+                try {
+                    while (moreData && continuePush) {
+                        switch (mode) {
+                            case AGGRESSIVE:
+                                moreData = tweetGenerator.generateNextBatch(batchSize);
+                                break;
+                            case CONTROLLED:
+                                startBatch = System.currentTimeMillis();
+                                moreData = tweetGenerator.generateNextBatch(batchSize);
+                                endBatch = System.currentTimeMillis();
+                                if (endBatch - startBatch < 1000) {
+                                    Thread.sleep(1000 - (endBatch - startBatch));
                                 }
-                            }
-                            break;
+                                break;
+                        }
+                    }
+                    os.close();
+                    break;
+                } catch (Exception e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Exception in adaptor " + e.getMessage());
                     }
-                }
-                os.close();
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Exception in adapter " + e.getMessage());
                 }
             }
         }
@@ -174,4 +174,16 @@ public class TwitterFirehoseFeedAdapter extends StreamBasedAdapter implements
IF
         return DataExchangeMode.PUSH;
     }
 
+    @Override
+    public boolean handleException(Exception e) {
+        try {
+            twitterServer.stop();
+        } catch (Exception re) {
+            re.printStackTrace();
+            return false;
+        }
+        twitterServer.start();
+        return true;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index f2f730f..56976d2 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -18,41 +18,41 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
 import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.AUnorderedListType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 /**
- * Factory class for creating @see{TwitterFirehoseFeedAdapter}.
- * The adapter simulates a twitter firehose with tweets being "pushed" into Asterix at a
configurable rate
- * measured in terms of TPS (tweets/second). The stream of tweets lasts for a configurable
duration (measured in seconds).
+ * Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
+ * simulates a twitter firehose with tweets being "pushed" into Asterix at a
+ * configurable rate measured in terms of TPS (tweets/second). The stream of
+ * tweets lasts for a configurable duration (measured in seconds).
  */
-public class TwitterFirehoseFeedAdapterFactory extends StreamBasedAdapterFactory implements
ITypedAdapterFactory {
+public class TwitterFirehoseFeedAdapterFactory extends StreamBasedAdapterFactory implements
IFeedAdapterFactory {
 
     private static final long serialVersionUID = 1L;
 
-    /*
-     * Degree of parallelism for feed ingestion activity. Defaults to 1.
-     * This builds up the count constraint for the ingestion operator.
-     */
+    /**
+     * Degree of parallelism for feed ingestion activity. Defaults to 1. This
+     * determines the count constraint for the ingestion operator.
+     **/
     private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
 
-    /*
-     * The absolute locations where ingestion operator instances will be places.
-     */
+    /**
+     * The absolute locations where ingestion operator instances will be placed.
+     **/
     private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
 
-    private static final ARecordType outputType = initOutputType();
+    private ARecordType outputType;
 
     @Override
     public String getName() {
@@ -60,20 +60,16 @@ public class TwitterFirehoseFeedAdapterFactory extends StreamBasedAdapterFactory
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return AdapterType.TYPED;
-    }
-
-    @Override
     public SupportedOperation getSupportedOperations() {
         return SupportedOperation.READ;
     }
 
     @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        configuration.put(KEY_FORMAT, FORMAT_ADM);
+    public void configure(Map<String, String> configuration, ARecordType outputType)
throws Exception {
+        configuration.put(AsterixTupleParserFactory.KEY_FORMAT, AsterixTupleParserFactory.FORMAT_ADM);
         this.configuration = configuration;
-        this.configureFormat(initOutputType());
+        this.outputType = outputType;
+        this.configureFormat(outputType);
     }
 
     @Override
@@ -100,7 +96,7 @@ public class TwitterFirehoseFeedAdapterFactory extends StreamBasedAdapterFactory
 
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws
Exception {
-        return new TwitterFirehoseFeedAdapter(configuration, parserFactory, outputType, partition,
ctx);
+        return new TwitterFirehoseFeedAdapter(configuration, parserFactory, outputType, ctx,
partition);
     }
 
     @Override
@@ -108,27 +104,17 @@ public class TwitterFirehoseFeedAdapterFactory extends StreamBasedAdapterFactory
         return outputType;
     }
 
-    private static ARecordType initOutputType() {
-        ARecordType outputType = null;
-        try {
-            String[] userFieldNames = new String[] { "screen-name", "lang", "friends_count",
"statuses_count", "name",
-                    "followers_count" };
-
-            IAType[] userFieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
BuiltinType.AINT32,
-                    BuiltinType.AINT32, BuiltinType.ASTRING, BuiltinType.AINT32 };
-            ARecordType userRecordType = new ARecordType("TwitterUserType", userFieldNames,
userFieldTypes, false);
-
-            String[] fieldNames = new String[] { "tweetid", "user", "sender-location", "send-time",
"referred-topics",
-                    "message-text" };
+    @Override
+    public InputDataFormat getInputDataFormat() {
+        return InputDataFormat.ADM;
+    }
 
-            AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING,
"referred-topics");
-            IAType[] fieldTypes = new IAType[] { BuiltinType.AINT64, userRecordType, BuiltinType.APOINT,
-                    BuiltinType.ADATETIME, unorderedListType, BuiltinType.ASTRING };
-            outputType = new ARecordType("TweetMessageType", fieldNames, fieldTypes, false);
+    public boolean isRecordTrackingEnabled() {
+        return false;
+    }
 
-        } catch (AsterixException | HyracksDataException e) {
-            throw new IllegalStateException("Unable to initialize output type");
-        }
-        return outputType;
+    public IIntakeProgressTracker createIntakeProgressTracker() {
+        throw new UnsupportedOperationException("Tracking of ingested records not enabled");
     }
+
 }
\ No newline at end of file


Mime
View raw message