apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2547) Add ride data processing example to Apex Library
Date Thu, 02 Nov 2017 17:41:00 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236234#comment-16236234
] 

ASF GitHub Bot commented on APEXMALHAR-2547:
--------------------------------------------

davidyan74 closed pull request #675: APEXMALHAR-2547 nyc taxi example checkin
URL: https://github.com/apache/apex-malhar/pull/675
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/examples/nyctaxi/README.md b/examples/nyctaxi/README.md
new file mode 100644
index 0000000000..3aa6868e21
--- /dev/null
+++ b/examples/nyctaxi/README.md
@@ -0,0 +1,87 @@
+# Apache Apex Example (NYC Taxi Data)
+
+## Overview
+
+This is an example that demonstrates how Apex can be used for processing ride service data,
using the freely available
+historical Yellow Cab trip data on New York City government's web site.
+
+It uses concepts of event-time windowing, out-of-order processing and streaming windows.
+
+## Instructions
+
+### Data preparation
+Download some Yellow Cab trip data CSV files from the nyc.gov website.
+
+Let's say the data is saved as yellow_tripdata_2016-01.csv.
+
+Because the trip data source is wildly unordered, sort the data with some random deviation.
+```bash
+bash> sort -t, -k2 yellow_tripdata_2016-01.csv > yellow_tripdata_sorted_2016-01.csv
+```
+
+Then add some random deviation to the sorted data:
+
+```bash
+bash> cat nyctaxidata/yellow_tripdata_sorted_2016-01.csv | perl -e '@lines = (); while
(<>) { if (@lines && rand(10) < 1) { print shift @lines;  } if (rand(50)
< 1) { push @lines, $_; } else { print $_; } }' > yellow_tripdata_sorted_random_2016-01.csv
+```
+
+Then create an HDFS directory and copy the csv file there:
+
+```bash
+bash> hdfs dfs -mkdir nyctaxidata
+bash> hdfs dfs -copyFromLocal yellow_tripdata_sorted_random_2016-01.csv nyctaxidata/
+```
+
+### Setting up pubsub server
+
+bash> git clone https://github.com/atrato/pubsub-server
+
+Then build and run the pubsub server (the message broker):
+
+bash> cd pubsub-server; mvn compile exec:java
+
+The pubsub server is now running, listening to the default port 8890 on localhost.
+
+### Running the application
+
+Open the Apex CLI command prompt and run the application:
+
+```bash
+bash> apex
+apex> launch target/malhar-examples-nyc-taxi-3.8.0-SNAPSHOT.apa
+```
+
+After the application has been running for 5 minutes, we can start querying the data. The
reason why we need to wait
+5 minutes is because we need to wait for the first window to pass the watermark for the triggers
to be fired by the
+WindowedOperator. Subsequent triggers will be fired every one minute since the slideBy is
one minute.
+
+We can use the Simple WebSocket Client Google Chrome extension to query the data. Open the
extension in Chrome and
+connect to "ws://localhost:8890/pubsub". Subscribe to the query result topic first because
results to any query will be
+delivered to this topic by sending this to the websocket connection:
+
+```json
+{"type":"subscribe","topic":"nyctaxi.result"}
+```
+
+Issue a query with latitude/longitude somewhere in Manhattan:
+
+```json
+{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}}
+```
+
+You should get back something like the following:
+
+```json
+{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10011"},"timestamp":1500769034523}
+```
+
+The result to the same query changes as time goes by since we have "real-time" ride data
coming in:
+```json
+{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}}
+{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10003"},"timestamp":1500769158530}
+{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}}
+{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10011"},"timestamp":1500769827538}
+{"type":"publish","topic":"nyctaxi.query","data":{"lat":40.731829, "lon":-73.989181}}
+{"type":"data","topic":"nyctaxi.result","data":{"currentZip":"10003","driveToZip":"10012"},"timestamp":1500770540527}
+```
+
diff --git a/examples/nyctaxi/pom.xml b/examples/nyctaxi/pom.xml
new file mode 100644
index 0000000000..990718aa4e
--- /dev/null
+++ b/examples/nyctaxi/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>malhar-examples-nyc-taxi</artifactId>
+  <packaging>jar</packaging>
+
+  <name>NYC Taxi Data Example for Apache Apex</name>
+  <description>Apex example applications that processes NYC Taxi Data.</description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.8.0-SNAPSHOT</version>
+  </parent>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.1</version>
+    </dependency>
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>2.9.1</version>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/examples/nyctaxi/src/assemble/appPackage.xml b/examples/nyctaxi/src/assemble/appPackage.xml
new file mode 100644
index 0000000000..4138cf201e
--- /dev/null
+++ b/examples/nyctaxi/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+
diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/Application.java
b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/Application.java
new file mode 100644
index 0000000000..63abe0b9da
--- /dev/null
+++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/Application.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.nyctaxi;
+
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.accumulation.SumDouble;
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
+import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
+import org.apache.commons.lang3.mutable.MutableDouble;
+import org.apache.hadoop.conf.Configuration;
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
+
+/**
+ * The DAG definition of the example that illustrates New York City taxi ride data processing.
+ */
+@ApplicationAnnotation(name = "NycTaxiExample")
+public class Application implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000);
+    NycTaxiDataReader inputOperator = new NycTaxiDataReader();
+    inputOperator.setDirectory("/user/" + System.getProperty("user.name") + "/nyctaxidata");
+    inputOperator.getScanner().setFilePatternRegexp(".*\\.csv$");
+    dag.addOperator("NycTaxiDataReader", inputOperator);
+    NycTaxiCsvParser parser = dag.addOperator("NycTaxiCsvParser", new NycTaxiCsvParser());
+    NycTaxiZipFareExtractor extractor = dag.addOperator("NycTaxiZipFareExtractor", new NycTaxiZipFareExtractor());
+
+    KeyedWindowedOperatorImpl<String, Double, MutableDouble, Double> windowedOperator
= new KeyedWindowedOperatorImpl<>();
+
+    // 5-minute windows slide by 1 minute
+    windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(5)).slideBy(Duration.standardMinutes(1)));
+
+    // Because we only care about the last 5 minutes, and the watermark is set at t-1 minutes,
lateness horizon is set to 4 minutes.
+    windowedOperator.setAllowedLateness(Duration.standardMinutes(4));
+    windowedOperator.setAccumulation(new SumDouble());
+    windowedOperator.setTriggerOption(TriggerOption.AtWatermark());
+    windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, MutableDouble>());
+    windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
+
+    dag.addOperator("WindowedOperator", windowedOperator);
+
+    NycTaxiDataServer dataServer = dag.addOperator("NycTaxiDataServer", new NycTaxiDataServer());
+    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
+    dag.addStream("input_to_parser", inputOperator.output, parser.input);
+    dag.addStream("parser_to_extractor", parser.output, extractor.input);
+    dag.addStream("extractor_to_windowed", extractor.output, windowedOperator.input);
+    dag.addStream("extractor_watermark", extractor.watermarkOutput, windowedOperator.controlInput);
+    dag.addStream("windowed_to_console", windowedOperator.output, dataServer.input, console.input);
+
+    PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery();
+    wsQuery.enableEmbeddedMode();
+    wsQuery.setTopic("nyctaxi.query");
+    try {
+      wsQuery.setUri(new URI("ws://localhost:8890/pubsub"));
+    } catch (URISyntaxException ex) {
+      throw Throwables.propagate(ex);
+    }
+    dataServer.setEmbeddableQueryInfoProvider(wsQuery);
+    PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
+    wsResult.setTopic("nyctaxi.result");
+    try {
+      wsResult.setUri(new URI("ws://localhost:8890/pubsub"));
+    } catch (URISyntaxException ex) {
+      throw Throwables.propagate(ex);
+    }
+    dag.addStream("server_to_query_output", dataServer.queryResult, wsResult.input);
+  }
+}
diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycLocationUtils.java
b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycLocationUtils.java
new file mode 100644
index 0000000000..08f59b48ce
--- /dev/null
+++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycLocationUtils.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.nyctaxi;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Throwables;
+
+/**
+ * Provides utilities for zip codes and lat-lon coordinates in New York City.
+ */
+public class NycLocationUtils
+{
+  public static class ZipRecord
+  {
+    public final String zip;
+    public final double lat;
+    public final double lon;
+    public String[] neighboringZips;
+
+    public ZipRecord(String zip, double lat, double lon)
+    {
+      this.zip = zip;
+      this.lat = lat;
+      this.lon = lon;
+    }
+  }
+
+  private static Map<String, ZipRecord> zipRecords = new HashMap<>();
+
+  static {
+    // setup of NYC zip data.
+    try (BufferedReader br = new BufferedReader(
+        new InputStreamReader(NycLocationUtils.class.getResourceAsStream("/nyc_zip_codes.csv"))))
{
+      String line;
+      while ((line = br.readLine()) != null) {
+        String[] s = line.split(",");
+        String zip = s[0].trim();
+        double lat = Double.valueOf(s[1].trim());
+        double lon = Double.valueOf(s[2].trim());
+        zipRecords.put(zip, new ZipRecord(zip, lat, lon));
+      }
+    } catch (IOException ex) {
+      throw Throwables.propagate(ex);
+    }
+    for (Map.Entry<String, ZipRecord> entry : zipRecords.entrySet()) {
+      final ZipRecord entryValue = entry.getValue();
+      List<String> zips = new ArrayList<>(zipRecords.keySet());
+
+      Collections.sort(zips, new Comparator<String>()
+      {
+        @Override
+        public int compare(String s1, String s2)
+        {
+          ZipRecord z1 = zipRecords.get(s1);
+          ZipRecord z2 = zipRecords.get(s2);
+          double dist1 = Math.pow(z1.lat - entryValue.lat, 2) + Math.pow(z1.lon - entryValue.lon,
2);
+          double dist2 = Math.pow(z2.lat - entryValue.lat, 2) + Math.pow(z2.lon - entryValue.lon,
2);
+          return Double.compare(dist1, dist2);
+        }
+      });
+      entryValue.neighboringZips = zips.subList(0, 8).toArray(new String[]{});
+    }
+  }
+
+  public static String getZip(double lat, double lon)
+  {
+    // Brute force to get the nearest zip centoid. Should be able to optimize this.
+    double minDist = Double.MAX_VALUE;
+    String zip = null;
+    for (Map.Entry<String, ZipRecord> entry : zipRecords.entrySet()) {
+      ZipRecord zipRecord = entry.getValue();
+      double dist = Math.pow(zipRecord.lat - lat, 2) + Math.pow(zipRecord.lon - lon, 2);
+      if (dist < minDist) {
+        zip = entry.getKey();
+        minDist = dist;
+      }
+    }
+    return zip;
+  }
+
+  public static String[] getNeighboringZips(String zip)
+  {
+    ZipRecord zipRecord = zipRecords.get(zip);
+    if (zipRecord != null) {
+      return zipRecord.neighboringZips;
+    } else {
+      return null;
+    }
+  }
+}
diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java
b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java
new file mode 100644
index 0000000000..f65e816058
--- /dev/null
+++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.nyctaxi;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Operator that parses historical New York City Yellow Cab ride data
+ * from http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml.
+ */
+public class NycTaxiCsvParser extends BaseOperator
+{
+  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+  {
+    @Override
+    public void process(String tuple)
+    {
+      String[] values = tuple.split(",");
+      Map<String, String> outputTuple = new HashMap<>();
+      if (StringUtils.isNumeric(values[0])) {
+        outputTuple.put("pickup_time", values[1]);
+        outputTuple.put("pickup_lon", values[5]);
+        outputTuple.put("pickup_lat", values[6]);
+        outputTuple.put("total_fare", values[18]);
+        output.emit(outputTuple);
+      }
+    }
+  };
+
+  public final transient DefaultOutputPort<Map<String, String>> output = new
DefaultOutputPort<>();
+}
diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java
b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java
new file mode 100644
index 0000000000..01313cad8f
--- /dev/null
+++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.nyctaxi;
+
+import java.io.IOException;
+
+import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator;
+
+/**
+ * Operator that reads historical New York City Yellow Cab ride data
+ * from http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml.
+ *
+ * Note that unlike the raw LineByLineFileInputOperator, we advance the streaming
+ * window whenever we see a difference in the timestamp in the data.
+ */
+public class NycTaxiDataReader extends LineByLineFileInputOperator
+{
+  private String currentTimestamp;
+  private transient boolean suspendEmit = false;
+
+  public NycTaxiDataReader()
+  {
+    // Whether or not to advance the window does not depend on the size. It solely
+    // depends on the timestamp of the data. This is why we are setting this to Integer.MAX_VALUE.
+    // See below for "suspendEmit".
+    emitBatchSize = Integer.MAX_VALUE;
+  }
+
+  @Override
+  protected boolean suspendEmit()
+  {
+    return suspendEmit;
+  }
+
+  @Override
+  protected String readEntity() throws IOException
+  {
+    String line = super.readEntity();
+    String[] fields = line.split(",");
+    String timestamp = fields[1];
+    if (currentTimestamp == null) {
+      currentTimestamp = timestamp;
+    } else if (timestamp != currentTimestamp) {
+      // suspend emit until the next streaming window when timestamp is different from the
current timestamp.
+      suspendEmit = true;
+      currentTimestamp = timestamp;
+    }
+    return line;
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    // Resume emit since we now have a new streaming window.
+    suspendEmit = false;
+  }
+}
diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataServer.java
b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataServer.java
new file mode 100644
index 0000000000..fd94e200da
--- /dev/null
+++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataServer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.nyctaxi;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.Map;
+
+
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.appdata.AbstractAppDataServer;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.experimental.AppData;
+import com.datatorrent.lib.util.KeyValPair;
+
+
+/**
+ * Operator that reads the KeyValPair tuples from the Windowed Operator and serves live queries.
+ *
+ * The KeyValPair input tuples are zip to total payment of the window. They are collected
by an internal map so that
+ * the data can be served.
+ */
+public class NycTaxiDataServer extends AbstractAppDataServer<String>
+{
+  public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String,
Double>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String,
Double>>>()
+  {
+    @Override
+    public void process(Tuple.WindowedTuple<KeyValPair<String, Double>> tuple)
+    {
+      if (!currentWindowHasData) {
+        currentData = new HashMap<>();
+        currentWindowHasData = true;
+      }
+      KeyValPair<String, Double> tupleValue = tuple.getValue();
+      currentData.put(tupleValue.getKey(), tupleValue.getValue());
+    }
+  };
+
+  @AppData.ResultPort
+  public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<>();
+
+  private Map<String, Double> servingData = new HashMap<>();
+  private transient Map<String, Double> currentData = new HashMap<>();
+  private transient ArrayDeque<String> resultQueue = new ArrayDeque<>();
+  private boolean currentWindowHasData = false;
+
+  @Override
+  public void beginWindow(long l)
+  {
+    super.beginWindow(l);
+    currentWindowHasData = false;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    while (!resultQueue.isEmpty()) {
+      String result = resultQueue.remove();
+      queryResult.emit(result);
+    }
+    servingData = currentData;
+    super.endWindow();
+  }
+
+
+  @Override
+  protected void processQuery(String queryStr)
+  {
+    try {
+      JSONObject query = new JSONObject(queryStr);
+      JSONObject result = new JSONObject();
+      double lat = query.getDouble("lat");
+      double lon = query.getDouble("lon");
+      Pair<String, String> zips = recommendZip(lat, lon);
+      result.put("currentZip", zips.getLeft());
+      result.put("driveToZip", zips.getRight());
+      resultQueue.add(result.toString());
+    } catch (JSONException e) {
+      LOG.error("Unrecognized query: {}", queryStr);
+    }
+  }
+
+  public Pair<String, String> recommendZip(double lat, double lon)
+  {
+    String currentZip = NycLocationUtils.getZip(lat, lon);
+    String zip = currentZip;
+    String[] neighboringZips = NycLocationUtils.getNeighboringZips(zip);
+    double dollars = servingData.containsKey(zip) ? servingData.get(zip) : 0;
+    LOG.info("Current zip: {}={}", zip, dollars);
+    for (String neigboringZip : neighboringZips) {
+      double tmpDollars = servingData.containsKey(neigboringZip) ? servingData.get(neigboringZip)
: 0;
+      LOG.info("Neighboring zip: {}={}", neigboringZip, tmpDollars);
+      if (tmpDollars > dollars) {
+        dollars = tmpDollars;
+        zip = neigboringZip;
+      }
+    }
+    return new ImmutablePair<>(currentZip, zip);
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(NycTaxiDataServer.class);
+
+}
diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiZipFareExtractor.java
b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiZipFareExtractor.java
new file mode 100644
index 0000000000..57b7d458e2
--- /dev/null
+++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiZipFareExtractor.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.nyctaxi;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.Tuple;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Operator that fills in the zip code based on the lat-lon coordinates in the incoming tuples
and prepares
+ * the tuples for the WindowedOperator downstream. It also generates a watermark that is
t - 1 minute.
+ */
+public class NycTaxiZipFareExtractor extends BaseOperator
+{
+  public static class Watermark implements ControlTuple.Watermark
+  {
+    private long timestamp;
+
+    private Watermark()
+    {
+      // for kryo
+    }
+
+    public Watermark(long timestamp)
+    {
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public long getTimestamp()
+    {
+      return this.timestamp;
+    }
+  }
+
+  public final transient DefaultInputPort<Map<String, String>> input = new DefaultInputPort<Map<String,
String>>()
+  {
+    @Override
+    public void process(Map<String, String> tuple)
+    {
+      try {
+        String zip = NycLocationUtils.getZip(Double.valueOf(tuple.get("pickup_lat")), Double.valueOf(tuple.get("pickup_lon")));
+        Date date = dateFormat.parse(tuple.get("pickup_time"));
+        long timestamp = date.getTime();
+        double fare = Double.valueOf(tuple.get("total_fare"));
+        output.emit(new Tuple.TimestampedTuple<>(timestamp, new KeyValPair<>(zip,
fare)));
+        if (timestamp > currentTimestamp) {
+          currentTimestamp = timestamp;
+          watermarkOutput.emit(new Watermark(timestamp - 60 * 1000));
+        }
+      } catch (ParseException ex) {
+        LOG.warn("Ignoring tuple with bad timestamp {}", tuple.get("pickup_time"));
+      }
+    }
+  };
+
+  public final transient DefaultOutputPort<Tuple.TimestampedTuple<KeyValPair<String,
Double>>> output = new DefaultOutputPort<>();
+  public final transient DefaultOutputPort<Watermark> watermarkOutput = new DefaultOutputPort<>();
+
+  private transient SimpleDateFormat dateFormat;
+  private long currentTimestamp = -1;
+
+  private static final Logger LOG = LoggerFactory.getLogger(NycTaxiZipFareExtractor.class);
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
+    dateFormat.setTimeZone(TimeZone.getTimeZone("America/New_York"));
+  }
+
+}
diff --git a/examples/nyctaxi/src/main/resources/nyc_zip_codes.csv b/examples/nyctaxi/src/main/resources/nyc_zip_codes.csv
new file mode 100644
index 0000000000..4eb72b006d
--- /dev/null
+++ b/examples/nyctaxi/src/main/resources/nyc_zip_codes.csv
@@ -0,0 +1,223 @@
+10001,40.750633, -73.997177
+10002,40.715775, -73.986212
+10003,40.731829, -73.989181
+10004,40.688630, -74.018244
+10005,40.706027, -74.008835
+10006,40.709614, -74.012954
+10007,40.713848, -74.007755
+10009,40.726399, -73.978631
+10010,40.739065, -73.982255
+10011,40.742039, -74.000620
+10012,40.725581, -73.998078
+10013,40.720103, -74.004903
+10014,40.734012, -74.006746
+10016,40.745224, -73.978297
+10017,40.752360, -73.972493
+10018,40.755319, -73.993114
+10019,40.765823, -73.987169
+10020,40.758236, -73.978833
+10021,40.769258, -73.958751
+10022,40.758628, -73.967948
+10023,40.775921, -73.982607
+10024,40.798452, -73.974428
+10025,40.798601, -73.966622
+10026,40.802381, -73.952681
+10027,40.811407, -73.953060
+10028,40.776441, -73.953509
+10029,40.791763, -73.943970
+10030,40.818267, -73.942856
+10031,40.825288, -73.950045
+10032,40.838815, -73.942836
+10033,40.850545, -73.933983
+10034,40.867076, -73.924312
+10035,40.795455, -73.929655
+10036,40.759260, -73.989860
+10037,40.812957, -73.937376
+10038,40.709278, -74.002562
+10039,40.830867, -73.936218
+10040,40.858305, -73.930549
+10044,40.761915, -73.949962
+10065,40.764612, -73.963122
+10069,40.775906, -73.990358
+10075,40.773361, -73.956216
+10103,40.760780, -73.977670
+10110,40.754499, -73.982256
+10111,40.759114, -73.977596
+10112,40.759167, -73.979668
+10115,40.810852, -73.963744
+10119,40.750310, -73.992979
+10128,40.781432, -73.950013
+10152,40.758404, -73.972031
+10153,40.763622, -73.972439
+10154,40.757779, -73.972487
+10162,40.769300, -73.949915
+10165,40.752131, -73.978722
+10167,40.754648, -73.974771
+10168,40.751448, -73.977103
+10169,40.754391, -73.976098
+10170,40.752625, -73.975877
+10171,40.755899, -73.973858
+10172,40.755273, -73.974315
+10173,40.754131, -73.979364
+10174,40.751441, -73.975003
+10177,40.755139, -73.975934
+10199,40.751393, -73.997229
+10271,40.708205, -74.010504
+10278,40.715182, -74.003778
+10279,40.712626, -74.008669
+10280,40.708538, -74.016650
+10282,40.716921, -74.015066
+10301,40.627456, -74.094407
+10302,40.630688, -74.137776
+10303,40.629885, -74.174130
+10304,40.609227, -74.092575
+10305,40.596691, -74.074866
+10306,40.571768, -74.125950
+10307,40.509183, -74.237785
+10308,40.551884, -74.147646
+10309,40.531346, -74.219857
+10310,40.632648, -74.116148
+10311,40.605241, -74.179503
+10312,40.545237, -74.180443
+10314,40.599263, -74.165748
+10451,40.820479, -73.925084
+10452,40.837393, -73.923437
+10453,40.852779, -73.912332
+10454,40.805489, -73.916585
+10455,40.814710, -73.908593
+10456,40.829881, -73.908120
+10457,40.847150, -73.898680
+10458,40.862543, -73.888143
+10459,40.825867, -73.892942
+10460,40.841758, -73.879571
+10461,40.847381, -73.840584
+10462,40.843280, -73.860389
+10463,40.880678, -73.906540
+10464,40.867787, -73.799920
+10465,40.822615, -73.822239
+10466,40.890964, -73.846239
+10467,40.869953, -73.865746
+10468,40.868093, -73.899730
+10469,40.868607, -73.848133
+10470,40.889530, -73.872662
+10471,40.898868, -73.903328
+10472,40.829556, -73.869336
+10473,40.818690, -73.858474
+10474,40.810549, -73.884367
+10475,40.875169, -73.823817
+11001,40.723317, -73.704949
+11003,40.699176, -73.706166
+11004,40.746204, -73.711478
+11005,40.756596, -73.714178
+11010,40.700587, -73.675018
+11020,40.771442, -73.714819
+11021,40.784319, -73.731488
+11023,40.798909, -73.733653
+11024,40.816251, -73.742872
+11030,40.793409, -73.688549
+11040,40.745347, -73.680292
+11042,40.758534, -73.697235
+11050,40.839900, -73.693124
+11096,40.621346, -73.756990
+11101,40.747155, -73.939750
+11102,40.772884, -73.926295
+11103,40.762574, -73.913447
+11104,40.744634, -73.920201
+11105,40.778877, -73.906769
+11106,40.762211, -73.931528
+11109,40.745115, -73.956928
+11201,40.693682, -73.989693
+11203,40.649591, -73.934371
+11204,40.618779, -73.984826
+11205,40.694696, -73.966286
+11206,40.701954, -73.942358
+11207,40.670747, -73.894209
+11208,40.669769, -73.871372
+11209,40.621982, -74.030324
+11210,40.628147, -73.946324
+11211,40.712597, -73.953098
+11212,40.662936, -73.913029
+11213,40.671078, -73.936336
+11214,40.599148, -73.996090
+11215,40.662688, -73.986740
+11216,40.680768, -73.949316
+11217,40.682306, -73.978099
+11218,40.643468, -73.976046
+11219,40.632667, -73.996669
+11220,40.641221, -74.016862
+11221,40.691340, -73.927879
+11222,40.727790, -73.947605
+11223,40.597139, -73.973428
+11224,40.577372, -73.988706
+11225,40.663046, -73.954219
+11226,40.646448, -73.956649
+11228,40.616695, -74.013047
+11229,40.601293, -73.944493
+11230,40.622164, -73.965105
+11231,40.677916, -74.005154
+11232,40.656546, -74.007355
+11233,40.678308, -73.919936
+11234,40.605080, -73.911721
+11235,40.583949, -73.949096
+11236,40.639413, -73.900664
+11237,40.704160, -73.921139
+11238,40.679171, -73.963804
+11239,40.647735, -73.879477
+11351,40.780747, -73.825301
+11354,40.768208, -73.827403
+11355,40.751452, -73.821031
+11356,40.784850, -73.841279
+11357,40.786393, -73.810864
+11358,40.760471, -73.796371
+11359,40.791781, -73.776875
+11360,40.780379, -73.781230
+11361,40.764191, -73.772775
+11362,40.756574, -73.737845
+11363,40.772616, -73.746526
+11364,40.745289, -73.760586
+11365,40.739634, -73.794490
+11366,40.728152, -73.785019
+11367,40.730145, -73.827030
+11368,40.751718, -73.851822
+11369,40.763365, -73.872374
+11370,40.765393, -73.893243
+11371,40.773894, -73.873475
+11372,40.751690, -73.883638
+11373,40.738837, -73.878535
+11374,40.726418, -73.861526
+11375,40.720934, -73.846151
+11377,40.744819, -73.905156
+11378,40.724744, -73.909639
+11379,40.716748, -73.879601
+11385,40.700671, -73.889433
+11411,40.694021, -73.736216
+11412,40.698095, -73.758986
+11413,40.671659, -73.752568
+11414,40.657604, -73.844804
+11415,40.707917, -73.828212
+11416,40.684654, -73.849548
+11417,40.676446, -73.844443
+11418,40.700272, -73.835971
+11419,40.688673, -73.822918
+11420,40.673583, -73.817730
+11421,40.694062, -73.858626
+11422,40.660060, -73.736012
+11423,40.715606, -73.768471
+11424,40.714304, -73.827263
+11425,40.607754, -74.023937
+11426,40.736425, -73.722376
+11427,40.730904, -73.745661
+11428,40.721016, -73.742245
+11429,40.709766, -73.738653
+11430,40.646964, -73.784813
+11432,40.715359, -73.793071
+11433,40.698162, -73.786893
+11434,40.676808, -73.776425
+11435,40.701265, -73.809605
+11436,40.675807, -73.796622
+11451,40.701282, -73.795972
+11691,40.601278, -73.761651
+11692,40.594095, -73.792896
+11693,40.590692, -73.809749
+11694,40.578270, -73.844762
+11697,40.555688, -73.920663
diff --git a/examples/pom.xml b/examples/pom.xml
index cfd8431f61..a0b126b45f 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -202,6 +202,7 @@
     <module>s3</module>
     <module>jdbc</module>
     <module>exactly-once</module>
+    <module>nyctaxi</module>
   </modules>
 
   <dependencies>
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
index eae9e1231b..e06d411198 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
@@ -207,7 +207,6 @@ public Thread newThread(Runnable r)
         @Override
         public void onMessage(String string)
         {
-          LOG.debug("Got: " + string);
           try {
             T o = convertMessage(string);
             if (!(skipNull && o == null)) {
diff --git a/pom.xml b/pom.xml
index 16360deb8d..5101d7aa9a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,13 +65,13 @@
         <configuration>
           <excludes combine.children="append">
             <exclude>**/src/test/resources/**/sample_logs/**</exclude>
-            <exclude>src/test/resources/*.csv</exclude>
             <exclude>src/test/resources/*.xsd</exclude>
             <exclude>**/src/main/resources/com/datatorrent/apps/logstream/**</exclude>
             <exclude>src/main/c/zmq_push/Makefile</exclude>
             <exclude>src/test/resources/com/datatorrent/contrib/romesyndication/*.rss</exclude>
             <exclude>**/*.md</exclude>
             <exclude>**/*.yml</exclude>
+            <exclude>**/*.csv</exclude>
             <exclude>.mvn/jvm.config</exclude>
           </excludes>
           <mapping combine.children="append">
@@ -92,7 +92,6 @@
         <configuration>
           <excludes combine.children="append">
             <exclude>**/src/test/resources/**/sample_logs/**</exclude>
-            <exclude>src/test/resources/*.csv</exclude>
             <exclude>src/test/resources/*.xsd</exclude>
             <exclude>**/src/main/resources/com/datatorrent/apps/logstream/**</exclude>
             <exclude>src/main/c/zmq_push/Makefile</exclude>
@@ -101,6 +100,7 @@
             <exclude>**/*.json</exclude>
             <exclude>**/*.md</exclude>
             <exclude>**/*.yml</exclude>
+            <exclude>**/*.csv</exclude>
             <exclude>.mvn/jvm.config</exclude>
           </excludes>
         </configuration>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add ride data processing example to Apex Library
> ------------------------------------------------
>
>                 Key: APEXMALHAR-2547
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2547
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>          Components: examples
>            Reporter: David Yan
>            Assignee: David Yan
>            Priority: Major
>
> Since ride data is something people process in real time (think Uber and Lyft), we should
add an example of how that data can be processed using Apex.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Mime
View raw message