asterixdb-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Raman Grover (Code Review)" <do-not-re...@asterix-gerrit.ics.uci.edu>
Subject Change in asterixdb[master]: Fix for Issue 929
Date Fri, 14 Aug 2015 19:54:09 GMT
Raman Grover has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/354

Change subject: Fix for Issue 929
......................................................................

Fix for Issue 929

commit 58317b37a3a7b2546b4780f4427ae1ed21a4ece9
Author: Ubuntu <raman@ramangro.ramangro.d3.internal.cloudapp.net>
Date:   Fri Aug 14 10:47:06 2015 +0000

    Fix for Issue 929:
      a) Added documenation for use of OAuth keys and tokens when using the built-in Twitter
adaptor
      b) Modified RSS feed adaptor and added documenation

Change-Id: I5521287a4fa1818c78a4f83b1a3cabeea8e6096d
---
M asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
M asterix-doc/src/site/markdown/feeds/tutorial.md
M asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
M asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
M asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
M asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
6 files changed, 155 insertions(+), 16 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/54/354/1

diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index dad5975..08af842 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -187,6 +187,7 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -2032,6 +2033,7 @@
 
             FeedId feedId = new FeedId(dataverseName, feedName);
             List<FeedConnectionId> activeConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(feedId);
+            System.out.println("numnber of active connections " + activeConnections.size());
             if (activeConnections != null && !activeConnections.isEmpty()) {
                 StringBuilder builder = new StringBuilder();
                 for (FeedConnectionId connectionId : activeConnections) {
@@ -2041,6 +2043,7 @@
                 throw new AlgebricksException("Feed " + feedId
                         + " is currently active and connected to the following dataset(s)
\n" + builder.toString());
             } else {
+                System.out.println("Attempt to drop feed");
                 MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName);
             }
 
@@ -2129,10 +2132,10 @@
             // All Metadata checks have passed. Feed connect request is valid. //
 
             FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedPolicy.getProperties());
-            Pair<FeedConnectionRequest, Boolean> p = getFeedConnectionRequest(dataverseName,
feed,
+            Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple =
getFeedConnectionRequest(dataverseName, feed,
                     cbfs.getDatasetName(), feedPolicy, mdTxnCtx);
-            FeedConnectionRequest connectionRequest = p.first;
-            boolean createFeedIntakeJob = p.second;
+            FeedConnectionRequest connectionRequest = triple.first;
+            boolean createFeedIntakeJob = triple.second;
 
             FeedLifecycleListener.INSTANCE.registerFeedEventSubscriber(feedConnId, eventSubscriber);
             subscriberRegistered = true;
@@ -2142,6 +2145,11 @@
                         feedId.getDataverse(), feedId.getFeedName());
                 Pair<JobSpecification, IFeedAdapterFactory> pair = FeedOperations.buildFeedIntakeJobSpec(primaryFeed,
                         metadataProvider, policyAccessor);
+                // adapter configuration are valid at this stage
+                // register the feed joints (these are auto-de-registered)
+                for (IFeedJoint fj : triple.third){
+                    FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);   
+                }
                 runJob(hcc, pair.first, false);
                 IFeedAdapterFactory adapterFactory = pair.second;
                 if (adapterFactory.isRecordTrackingEnabled()) {
@@ -2149,6 +2157,10 @@
                             adapterFactory.createIntakeProgressTracker());
                 }
                 eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED);
+            } else {
+                for (IFeedJoint fj : triple.third){
+                    FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);   
+                }
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -2193,7 +2205,7 @@
      * @return
      * @throws MetadataException
      */
-    private Pair<FeedConnectionRequest, Boolean> getFeedConnectionRequest(String dataverse,
Feed feed, String dataset,
+    private Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> getFeedConnectionRequest(String
dataverse, Feed feed, String dataset,
             FeedPolicy feedPolicy, MetadataTransactionContext mdTxnCtx) throws MetadataException
{
         IFeedJoint sourceFeedJoint = null;
         FeedConnectionRequest request = null;
@@ -2235,9 +2247,6 @@
                         ConnectionLocation.SOURCE_FEED_COMPUTE_STAGE, FeedJointType.COMPUTE,
connectionId);
                 jointsToRegister.add(computeFeedJoint);
             }
-            for (IFeedJoint joint : jointsToRegister) {
-                FeedLifecycleListener.INSTANCE.registerFeedJoint(joint);
-            }
         } else {
             sourceFeedJoint = FeedLifecycleListener.INSTANCE.getFeedJoint(feedJointKey);
             connectionLocation = sourceFeedJoint.getConnectionLocation();
@@ -2250,7 +2259,7 @@
                 dataset, feedPolicy.getPolicyName(), feedPolicy.getProperties(), feed.getFeedId());
 
         sourceFeedJoint.addConnectionRequest(request);
-        return new Pair<FeedConnectionRequest, Boolean>(request, needIntakeJob);
+        return new Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>>(request,
needIntakeJob, jointsToRegister);
     }
     
     /*
diff --git a/asterix-doc/src/site/markdown/feeds/tutorial.md b/asterix-doc/src/site/markdown/feeds/tutorial.md
index 886d29d..047e482 100644
--- a/asterix-doc/src/site/markdown/feeds/tutorial.md
+++ b/asterix-doc/src/site/markdown/feeds/tutorial.md
@@ -6,7 +6,7 @@
   * [Data Feed Basics](#DataFeedBasics)
     * [Collecting Data: Feed Adaptors](#FeedAdaptors)
     * [Preprocessing Collected Data](#PreprocessingCollectedData)
-  * [Creating an External Dataset](#IntroductionCreatingAnExternalDataset)
+  * [Creating an External UDF](#IntroductionCreatingAnExternalUDF)
 
 ## <a id="Introduction">Introduction</a> <font size="4"><a href="#toc">[Back
to TOC]</a></font> ##
 In this document, we describe the support for data ingestion in AsterixDB, an open-source
Big Data Management System (BDMS) that provides a platform for storage and analysis of large
volumes of semi-structured data. Data feeds are a new mechanism for having
@@ -62,7 +62,7 @@
 Next we make use of the create feed AQL statement to define our example data feed. 
 
         create feed TwitterFeed if not exists using "push_twitter"
-        (("type-name"="Tweet"),("location"="US"));
+        (("type-name"="Tweet"));
 
 Note that the create feed statement does not initiate the flow of data from Twitter into
our AsterixDB instance. Instead, the create feed statement only results in registering the
feed with AsterixDB. The flow of data along a feed is initiated when it is connected
 to a target dataset using the connect feed statement (which we shall revisit later).
@@ -106,7 +106,7 @@
 
         create feed ProcessedTwitterFeed if not exists
         using "push_twitter"
-        (("type-name"="Tweet"),("location"="US"));
+        (("type-name"="Tweet"));
         apply function testlib#processRawTweet;
 
 Note that a feed adaptor and a UDF act as pluggable components. These
@@ -216,7 +216,7 @@
 excess.records.throttle  | Set to true if rate of arrival of records is required to be reduced
in an adaptive manner to prevent having any excess records.                              
                                | false         |
 excess.records.elastic   | Set to true if the system should attempt to resolve resource bottlenecks
by re-structuring and/or rescheduling the feed ingestion pipeline.                       
                           | false         |
 recover.soft.failure     | Set to true if the feed must attempt to survive any runtime exception.
A false value permits an early termination of a feed in such an event.                   
                             | true          |
-recover.hard.failure     | Set to true if the feed must attempt to survive a hardware failures
(loss of AsterixDB node(s)). A false value permits the early termination of a feed in the
event of a hardware failure     |               |
+recover.hard.failure     | Set to true if the feed must attempt to survive a hardware failures
(loss of AsterixDB node(s)). A false value permits the early termination of a feed in the
event of a hardware failure     | false         |
 
 Note that the end user may choose to form a custom policy. E.g.
 it is possible in AsterixDB to create a custom policy that spills excess
@@ -284,5 +284,88 @@
         functionHelper.setResult(result);
     }
 
+####Installing an External Library####
 
+'''Creating an AsterixDB Library[[BR]]'''[[BR]]We need to install our Java UDF so that we
may use it in AQL statements/queries.[[BR]] An AsterixDB library has a pre-defined structure
which is as follows.
 
+'''--jar''' '''file''' containing all class files. [[BR]]This is the jar file that would
contain the class files for your UDF source code. In the case of our application, it will
include the class files for the function and associated factory.
+
+--'''library descriptor.xml[[BR]]'''  This is a descriptor that provide meta-information
about the library.
+
+{{{
+<externalLibrary xmlns="library">
+	<language>JAVA</language>
+	<libraryFunctions>
+		<libraryFunction>
+			<function_type>SCALAR</function_type>
+			<name>hashTags</name>
+			<arguments>Tweet</arguments>
+			<return_type>ProcessedTweet</return_type>
+			<definition>edu.uci.ics.asterix.external.udf.HashTagsFunctionFactory
+			</definition>
+		</libraryFunction>
+	</libraryFunctions>
+</externalLibrary>
+
+}}}
+--'''lib/<other dependency jars>'''
+
+If the Java UDF requires additional dependency jars, you may add them under a "lib" folder
is required. The UDF in our application does not have any dependency jars and so we need not
have the lib directory in our library bundle.
+
+We create a zip bundle that contains the jar file and the library descriptor xml file. The
zip would have the following structure.
+
+{{{
+$ unzip -l ./tweetlib.zip 
+Archive:  ./tweetlib.zip
+  Length     Date   Time    Name
+ --------    ----   ----    ----
+   760817  04-23-14 17:16   hash-tags.jar
+      405  04-23-14 17:16   tweet.xml
+ --------                   -------
+   761222                   2 files
+}}}
+'''Installing an AsterixDB Library''' We assume you have followed the [http://asterixdb.ics.uci.edu/documentation/install.html
instructions] to set up a running AsterixDB instance. Let us refer your AsterixDB instance
by the name "my_asterix".
+
+'''''Step 1'':''' Stop the AsterixDB instance if it is in the ACTIVE state.
+
+{{{
+$ managix stop  -n my_asterix
+}}}
+'''''Step 2'':''' Install the library using Managix install command. Just to illustrate,
we use the help command to look up the syntax
+
+{{{
+$ managix help  -cmd install
+Installs a library to an asterix instance.
+Arguments/Options
+-n  Name of Asterix Instance
+-d  Name of the dataverse under which the library will be installed
+-l  Name of the library
+-p  Path to library zip bundle
+}}}
+Above is a sample output and explains the usage and the required parameters. Each library
has a name and is installed under a dataverse. Recall that we had created a dataverse by the
name - "feeds" prior to  creating our datatypes and dataset. We shall name our library - "tweetlib",
but ofcourse, you may choose another name.
+
+You may download the pre-packaged library [attachment:tweetlib.zip here] and place the downloaded
library (a zip bundle) at a convenient location on your disk. To install the library, use
the Managix install command. An example is shown below.
+
+{{{
+$ managix install -n my_asterix -d feeds -l tweetlib -p <put the absolute path of the
library zip bundle here> 
+}}}
+You should see the following message:
+
+{{{
+INFO: Installed library tweetlib
+}}}
+We shall next start our AsterixDB instance using the start command as shown below.
+
+{{{
+$ managix start -n my_asterix
+}}}
+You may now use the AsterixDB library in AQL statements and queries. To look at the installed
artifacts, you may execute the following query at the AsterixDB web-console.
+
+{{{
+for $x in dataset Metadata.Function 
+return $x
+
+for $x in dataset Metadata.Library
+return $x
+}}}
+Our library is now installed and is ready to be used.  So far we have done the following.
[[BR]]a) Created a dataverse and defined the required datatypes [[BR]]b) Created a dataset
to persist the ingested tweets[[BR]]c) Created a Java UDF that would provide the pre-processing
logic [[BR]]d) Packaged the Java UDF into an AsterixDB library and installed the library
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
index 8af49fb..f77efbf 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
@@ -20,6 +20,7 @@
 import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
 import edu.uci.ics.asterix.external.dataset.adapter.PushBasedTwitterAdapter;
 import edu.uci.ics.asterix.external.util.TwitterUtil;
+import edu.uci.ics.asterix.external.util.TwitterUtil.AuthenticationConstants;
 import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
@@ -62,6 +63,16 @@
         this.outputType = outputType;
         this.configuration = configuration;
         TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
+        boolean requiredParamsSpecified = validateConfiguration(configuration);
+        if(!requiredParamsSpecified){
+           StringBuilder builder = new StringBuilder();
+           builder.append("One or more parameters are missing from adapter configuration\n");
+           builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
+           builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
+           builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
+           builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
+           throw new Exception(builder.toString());
+        }
     }
 
     @Override
@@ -80,4 +91,16 @@
         return null;
     }
 
+    private boolean validateConfiguration(Map<String, String> configuration) {
+        String consumerKey  = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
+        String consumerSecret  = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
+        String accessToken  = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
+        String tokenSecret  = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
+        
+        if(consumerKey == null  || consumerSecret == null || accessToken == null || tokenSecret
== null){
+            return false;
+        }
+        return true;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 4ab7d22..baab239 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -32,7 +32,7 @@
 
     private static final long serialVersionUID = 1L;
 
-    private static final String KEY_RSS_URL = "rss_url";
+    private static final String KEY_RSS_URL = "url";
 
     private List<String> feedURLs = new ArrayList<String>();
     private String id_prefix = "";
@@ -46,6 +46,7 @@
         super(configuration, ctx);
         id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
         this.recordType = recordType;
+        reconfigure(configuration);
     }
 
     private void initializeFeedURLs(String rssURLProperty) {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
index 8a74963..2737582 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
@@ -18,6 +18,8 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.logging.Logger;
+import java.util.logging.Level;
 
 import twitter4j.FilterQuery;
 import twitter4j.Twitter;
@@ -28,6 +30,9 @@
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 
 public class TwitterUtil {
+
+
+    private static Logger LOGGER = Logger.getLogger(TwitterUtil.class.getName());
 
     public static class ConfigurationConstants {
         public static final String KEY_LOCATION = "location";
@@ -77,7 +82,22 @@
 
     public static Twitter getTwitterService(Map<String, String> configuration) {
         ConfigurationBuilder cb = getAuthConfiguration(configuration);
-        TwitterFactory tf = new TwitterFactory(cb.build());
+        TwitterFactory tf = null;
+        try{
+          tf = new TwitterFactory(cb.build());
+        } catch (Exception e){
+         if (LOGGER.isLoggable(Level.WARNING)){
+            StringBuilder builder = new StringBuilder();
+            builder.append("Twitter Adapter requires the following config parameters\n");
+            builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
+            builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
+            builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
+            builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
+            LOGGER.warning(builder.toString()); 
+            LOGGER.warning("Unable to configure Twitter adapter due to incomplete/incorrect
authentication credentials");
+            LOGGER.warning("For details on how to obtain OAuth authentication token, visit
https://dev.twitter.com/oauth/overview/application-owner-access-tokens");
+         }  
+        }
         Twitter twitter = tf.getInstance();
         return twitter;
     }
@@ -132,8 +152,10 @@
                     break;
             }
         } catch (Exception e) {
-            throw new AsterixException("Incorrect configuration! unable to load authentication
credentials "
-                    + e.getMessage());
+            if(LOGGER.isLoggable(Level.WARNING)){
+                LOGGER.warning("unable to load authentication credentials from auth.properties
file" + 
+             "credential information will be obtained from adapter's configuration");
+            }
         }
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 97c90f8..62688de 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -323,6 +323,7 @@
                 "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory",
                 "edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory",
                 "edu.uci.ics.asterix.external.adapter.factory.PullBasedTwitterAdapterFactory",
+                "edu.uci.ics.asterix.external.adapter.factory.PushBasedTwitterAdapterFactory",
                 "edu.uci.ics.asterix.external.adapter.factory.RSSFeedAdapterFactory",
                 "edu.uci.ics.asterix.external.adapter.factory.CNNFeedAdapterFactory",
                 "edu.uci.ics.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory",

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/354
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I5521287a4fa1818c78a4f83b1a3cabeea8e6096d
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Raman Grover <ramangrover29@gmail.com>

Mime
View raw message