Return-Path: X-Original-To: apmail-apex-commits-archive@minotaur.apache.org Delivered-To: apmail-apex-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0CB7E18665 for ; Wed, 2 Mar 2016 23:32:46 +0000 (UTC) Received: (qmail 32911 invoked by uid 500); 2 Mar 2016 23:32:45 -0000 Delivered-To: apmail-apex-commits-archive@apex.apache.org Received: (qmail 32815 invoked by uid 500); 2 Mar 2016 23:32:45 -0000 Mailing-List: contact commits-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list commits@apex.incubator.apache.org Received: (qmail 32799 invoked by uid 99); 2 Mar 2016 23:32:45 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Mar 2016 23:32:45 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id DE008C61FE for ; Wed, 2 Mar 2016 23:32:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id p9GtMzlJxnOs for ; Wed, 2 Mar 2016 23:32:36 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 8BE035FC45 for ; Wed, 2 Mar 2016 23:32:32 +0000 (UTC) Received: (qmail 29213 invoked by uid 99); 2 Mar 2016 23:32:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Mar 2016 23:32:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D1482E2F34; Wed, 2 Mar 2016 23:32:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: thw@apache.org To: commits@apex.incubator.apache.org Date: Wed, 02 Mar 2016 23:33:02 -0000 Message-Id: <1111668bbf614bbbb551754442c744d3@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [33/50] incubator-apex-core git commit: SPOI-6737 #resolve Moving operator guides under shared space. http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/2cec5265/application_development.md ---------------------------------------------------------------------- diff --git a/application_development.md b/application_development.md new file mode 100644 index 0000000..ba6670b --- /dev/null +++ b/application_development.md @@ -0,0 +1,3047 @@ +Application Developer Guide +=========================== + +Real-time big data processing is not only important but has become +critical for businesses which depend on accurate and timely analysis of +their business data. A few businesses have yielded to very expensive +solutions like building an in-house, real-time analytics infrastructure +supported by an internal development team, or buying expensive +proprietary software. A large number of businesses are dealing with the +requirement just by trying to make Hadoop do their batch jobs in smaller +iterations. Over the last few years, Hadoop has become ubiquitous in the +big data processing space, replacing expensive proprietary hardware and +software solutions for massive data processing with very cost-effective, +fault-tolerant, open-sourced, and commodity-hardware-based solutions. +While Hadoop has been a game changer for companies, it is primarily a +batch-oriented system, and does not yet have a viable option for +real-time data processing.  Most companies with real-time data +processing end up having to build customized solutions in addition to +their Hadoop infrastructure. + +  + +The DataTorrent platform is designed to process massive amounts of +real-time events natively in Hadoop. This can be event ingestion, +processing, and aggregation for real-time data analytics, or can be +real-time business logic decisioning such as cell tower load balancing, +real-time ads bidding, or fraud detection.  The platform has the ability +to repair itself in real-time (without data loss) if hardware fails, and +adapt to changes in load by adding and removing computing resources +automatically. + + + +DataTorrent is a native Hadoop application. It runs as a YARN +(Hadoop 2.x) application and leverages Hadoop as a distributed operating +system. All the basic distributed operating system capabilities of +Hadoop like resource allocation ( [Resource Manager](#h.1ksv4uv) [)](#h.1ksv4uv), +distributed file system ([HDFS](#h.3j2qqm3)[)](#h.3j2qqm3), [multi-tenancy](#h.3q5sasy)[,](#h.3q5sasy)  +[security](#h.3q5sasy) [,](#h.3q5sasy) [fault-tolerance](#h.2nusc19)[,](#h.2nusc19) [scalability](#h.34g0dwd)[,](#h.34g0dwd) etc. +are supported natively in all streaming applications.  Just as Hadoop +for map-reduce handles all the details of the application allowing you +to only focus on writing the application (the mapper and reducer +functions), the platform handles all the details of streaming execution, +allowing you to only focus on your business logic. Using the platform +removes the need to maintain separate clusters for real-time +applications. + + + +In the platform, building a streaming application can be extremely +easy and intuitive.  The application is represented as a Directed +Acyclic Graph (DAG) of computation units called [operators](#h.3o7alnk)[ ](#h.3o7alnk)interconnected +by the data-flow edges called [streams](#h.nmf14n) +[.](#h.nmf14n) The operators process input +streams and produce output streams. A library of common operators is +provided to enable quick application development.  In case the desired +processing is not available in the Operator Library, one can easily +write a custom operator. We refer those interested in creating their own +operators to the [Operator Development Guide](operator_development.md). + +Running A Test Application +======================================= + +This chapter will help you with a quick start on running an +application. If you are starting with the platform for the first time, +it would be informative to open an existing application and see it run. +Do the following steps to run the PI demo, which computes the value of +PI  in a simple +manner: + +1. Open up platform files in your IDE (for example NetBeans, or Eclipse) +2. Open Demos project +3. Open Test Packages and run ApplicationTest.java in pi package +4. See the results in your system console + + + +Congratulations, you just ran your first real-time streaming demo +:) This demo is very simple and has four operators. The first operator +emits random integers between 0 to 30, 000. The second operator receives +these coefficients and emits a hashmap with x and y values each time it +receives two values. The third operator takes these values and computes +x\*\*2+y\*\*2. The last operator counts how many computed values from +the previous operator were less than or equal to 30, 000\*\*2. Assuming +this count is N, then PI is computed as N/number of values received. +Here is the code snippet for the PI application. This code populates the +DAG. Do not worry about what each line does, we will cover these +concepts later in this document. + + +```java +// Generates random numbers +RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); +rand.setMinvalue(0); +rand.setMaxvalue(30000); + +// Generates a round robin HashMap of "x" and "y" +RoundRobinHashMap rrhm = dag.addOperator("rrhm", new RoundRobinHashMap()); +rrhm.setKeys(new String[] { "x", "y" }); + +// Calculates pi from x and y +JavaScriptOperator calc = dag.addOperator("picalc", new Script()); +calc.setPassThru(false); +calc.put("i",0); +calc.put("count",0); +calc.addSetupScript("function pi() { if (x*x+y*y <= "+maxValue*maxValue+") { i++; } count++; return i / count * 4; }"); +calc.setInvoke("pi"); +dag.addStream("rand_rrhm", rand.integer_data, rrhm.data); +dag.addStream("rrhm_calc", rrhm.map, calc.inBindings); + +// puts results on system console +ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); +dag.addStream("rand_console",calc.result, console.input); +``` + + +You can review the other demos and see what they do. The examples +given in the Demos project cover various features of the platform and we +strongly encourage you to read these to familiarize yourself with the +platform. In the remaining part of this document we will go through +details needed for you to develop and run streaming applications in +Malhar. + +Test Application: Yahoo! Finance Quotes +---------------------------------------------------- + +The PI application was to +get you started. It is a basic application and does not fully illustrate +the features of the platform. For the purpose of describing concepts, we +will consider the test application shown in Figure 1. The application +downloads tick data from [Yahoo! Finance](http://finance.yahoo.com)  and computes the +following for four tickers, namely [IBM](http://finance.yahoo.com/q?s=IBM), +[GOOG](http://finance.yahoo.com/q?s=GOOG), [YHOO](http://finance.yahoo.com/q?s=YHOO). + +1. Quote: Consisting of last trade price, last trade time, and + total volume for the day +2. Per-minute chart data: Highest trade price, lowest trade + price, and volume during that minute +3. Simple Moving Average: trade price over 5 minutes + + +Total volume must ensure that all trade volume for that day is +added, i.e. data loss would result in wrong results. Charting data needs +all the trades in the same minute to go to the same slot, and then on it +starts afresh, so again data loss would result in wrong results. The +aggregation for charting data is done over 1 minute. Simple moving +average computes the average price over a 5 minute sliding window; it +too would produce wrong results if there is data loss. Figure 1 shows +the application with no partitioning. + + + +![](images/application_development/ApplicationDeveloperGuide.html-image00.png) + + + +The operator StockTickerInput: StockTickerInput[ ](http://docs.google.com/../apidocs/com/datatorrent/demos/yahoofinance/StockTickInput.html)is +the input operator that reads live data from Yahoo! Finance once per +interval (user configurable in milliseconds), and emits the price, the +incremental volume, and the last trade time of each stock symbol, thus +emulating real ticks from the exchange.  We utilize the Yahoo! Finance +CSV web service interface.  For example: + + +``` +$ GET 'http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&f=sl1vt1' +"IBM",203.966,1513041,"1:43pm" +"GOOG",762.68,1879741,"1:43pm" +"AAPL",444.3385,11738366,"1:43pm" +"YHOO",19.3681,14707163,"1:43pm" +``` + + +Among all the operators in Figure 1, StockTickerInput is the only +operator that requires extra code because it contains a custom mechanism +to get the input data.  Other operators are used unchanged from the +Malhar library. + + +Here is the class implementation for StockTickInput: + + +```java +package com.datatorrent.demos.yahoofinance; + +import au.com.bytecode.opencsv.CSVReader; +import com.datatorrent.annotation.OutputPortFieldAnnotation; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.lib.util.KeyValPair; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.*; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpStatus; +import org.apache.commons.httpclient.cookie.CookiePolicy; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.params.DefaultHttpParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This operator sends price, volume and time into separate ports and calculates incremental volume. + */ +public class StockTickInput implements InputOperator +{ + private static final Logger logger = LoggerFactory.getLogger(StockTickInput.class); + /** + * Timeout interval for reading from server. 0 or negative indicates no timeout. + */ + public int readIntervalMillis = 500; + /** + * The URL of the web service resource for the POST request. + */ + private String url; + public String[] symbols; + private transient HttpClient client; + private transient GetMethod method; + private HashMap lastVolume = new HashMap(); + private boolean outputEvenIfZeroVolume = false; + /** + * The output port to emit price. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort> price = new DefaultOutputPort>(); + /** + * The output port to emit incremental volume. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort> volume = new DefaultOutputPort>(); + /** + * The output port to emit last traded time. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort> time = new DefaultOutputPort>(); + + /** + * Prepare URL from symbols and parameters. URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&f=sl1vt1 + * + * @return the URL + */ + private String prepareURL() + { + String str = "http://download.finance.yahoo.com/d/quotes.csv?s="; + for (int i = 0; i < symbols.length; i++) { + if (i != 0) { + str += ","; + } + str += symbols[i]; + } + str += "&f=sl1vt1&e=.csv"; + return str; + } + + @Override + public void setup(OperatorContext context) + { + url = prepareURL(); + client = new HttpClient(); + method = new GetMethod(url); + DefaultHttpParams.getDefaultParams().setParameter("http.protocol.cookie-policy", CookiePolicy.BROWSER_COMPATIBILITY); + } + + @Override + public void teardown() + { + } + + @Override + public void emitTuples() + { + + try { + int statusCode = client.executeMethod(method); + if (statusCode != HttpStatus.SC_OK) { + System.err.println("Method failed: " + method.getStatusLine()); + } + else { + InputStream istream = method.getResponseBodyAsStream(); + // Process response + InputStreamReader isr = new InputStreamReader(istream); + CSVReader reader = new CSVReader(isr); + List myEntries = reader.readAll(); + for (String[] stringArr: myEntries) { + ArrayList tuple = new ArrayList(Arrays.asList(stringArr)); + if (tuple.size() != 4) { + return; + } + // input csv is ,,,