apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [22/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.
Date Tue, 07 Mar 2017 06:58:27 GMT
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/pom.xml
----------------------------------------------------------------------
diff --git a/demos/mobile/pom.xml b/demos/mobile/pom.xml
deleted file mode 100644
index 1d2c66c..0000000
--- a/demos/mobile/pom.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <artifactId>mobile-demo</artifactId>
-  <packaging>jar</packaging>
-
-  <name>Apache Apex Malhar Mobile Demo</name>
-  <description></description>
-
-  <parent>
-    <groupId>org.apache.apex</groupId>
-    <artifactId>malhar-demos</artifactId>
-    <version>3.7.0-SNAPSHOT</version>
-  </parent>
-
-  <properties>
-    <skipTests>true</skipTests>
-  </properties>
-
-  <dependencies>
-    <!-- add your dependencies here -->
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-servlet</artifactId>
-      <version>8.1.10.v20130312</version>
-      <scope>test</scope>
-      <type>jar</type>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-websocket</artifactId>
-      <version>8.1.10.v20130312</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-      <version>3.1</version>
-      <type>jar</type>
-    </dependency>
-  </dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/mobile/src/assemble/appPackage.xml b/demos/mobile/src/assemble/appPackage.xml
deleted file mode 100644
index 4138cf2..0000000
--- a/demos/mobile/src/assemble/appPackage.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
-  <id>appPackage</id>
-  <formats>
-    <format>jar</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <fileSets>
-    <fileSet>
-      <directory>${basedir}/target/</directory>
-      <outputDirectory>/app</outputDirectory>
-      <includes>
-        <include>${project.artifactId}-${project.version}.jar</include>
-      </includes>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/target/deps</directory>
-      <outputDirectory>/lib</outputDirectory>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/site/conf</directory>
-      <outputDirectory>/conf</outputDirectory>
-      <includes>
-        <include>*.xml</include>
-      </includes>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/main/resources/META-INF</directory>
-      <outputDirectory>/META-INF</outputDirectory>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/main/resources/app</directory>
-      <outputDirectory>/app</outputDirectory>
-    </fileSet>
-  </fileSets>
-
-</assembly>
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java
deleted file mode 100644
index 30d7281..0000000
--- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mobile;
-
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Random;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang.mutable.MutableLong;
-import org.apache.commons.lang3.Range;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StatsListener;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.counters.BasicCounters;
-import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
-import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
-import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
-
-/**
- * Mobile Demo Application:
- * <p>
- * This demo simulates large number of cell phones in the range of 40K to 200K
- * and tracks a given cell number across cell towers. It also displays the changing locations of the cell number on a google map.
- *
- * This demo demonstrates the scalability feature of the Apex platform.
- * It showcases the ability of the platform to scale up and down as the phone numbers generated increase and decrease respectively.
- * If the tuples processed per second by the pmove operator increase beyond 30,000, more partitions of the pmove operator gets deployed until
- * each of the partition processes around 10000 to 30000 tuples per second.
- * If the tuples processed per second drops below 10,000, the platform merges the operators until the partition count drops down to the original.
- * The load can be varied using the tuplesBlast property.
- * If the tuplesBlast is set to 200, 40K cell phones are generated.
- * If the tuplesBlast is set to 1000, 200K cell phones are generated.
- * The tuplesBlast property can be set using dtcli command: 'set-operator-property pmove tuplesBlast 1000'.
- *
- *
- * The specs are as such<br>
- * Depending on the tuplesBlast property, large number of cell phone numbers are generated.
- * They jump a cell tower frequently. Sometimes
- * within a second sometimes in 10 seconds. The aim is to demonstrate the
- * following abilities<br>
- * <ul>
- * <li>Entering query dynamically: The phone numbers are added to locate its gps
- * in run time.</li>
- * <li>Changing functionality dynamically: The load is changed by making
- * functional changes on the load generator operator (phonegen)(</li>
- * <li>Auto Scale up/Down with load: Operator pmove increases and decreases
- * partitions as per load</li>
- * <li></li>
- * </ul>
- *
- * Refer to demos/docs/MobileDemo.md for more information.
- *
- * <p>
- *
- * Running Java Test or Main app in IDE:
- *
- * <pre>
- * LocalMode.runApp(new Application(), 600000); // 10 min run
- * </pre>
- *
- * Run Success : <br>
- * For successful deployment and run, user should see following output on
- * console: <br>
- *
- * <pre>
- * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
- * phoneLocationQueryResult: {phone=5554995, location=(10,4), queryId=q1}
- * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
- * phoneLocationQueryResult: {phone=5554995, location=(10,4), queryId=q1}
- * phoneLocationQueryResult: {phone=5554995, location=(10,5), queryId=q1}
- * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
- * phoneLocationQueryResult: {phone=5554995, location=(9,5), queryId=q1}
- * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
- * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
- * phoneLocationQueryResult: {phone=5554995, location=(9,5), queryId=q1}
- * phoneLocationQueryResult: {phone=5554995, location=(9,5), queryId=q1}
- * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
- * </pre>
- *
- * * <b>Application DAG : </b><br>
- * <img src="doc-files/mobile.png" width=600px > <br>
- *
- * @since 0.3.2
- */
-@ApplicationAnnotation(name = "MobileDemo")
-public class Application implements StreamingApplication
-{
-  public static final String PHONE_RANGE_PROP = "dt.application.MobileDemo.phoneRange";
-  public static final String TOTAL_SEED_NOS = "dt.application.MobileDemo.totalSeedNumbers";
-  public static final String COOL_DOWN_MILLIS = "dt.application.MobileDemo.coolDownMillis";
-  public static final String MAX_THROUGHPUT = "dt.application.MobileDemo.maxThroughput";
-  public static final String MIN_THROUGHPUT = "dt.application.MobileDemo.minThroughput";
-  private static final Logger LOG = LoggerFactory.getLogger(Application.class);
-  private Range<Integer> phoneRange = Range.between(5550000, 5559999);
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    String lPhoneRange = conf.get(PHONE_RANGE_PROP, null);
-    if (lPhoneRange != null) {
-      String[] tokens = lPhoneRange.split("-");
-      if (tokens.length != 2) {
-        throw new IllegalArgumentException("Invalid range: " + lPhoneRange);
-      }
-      this.phoneRange = Range.between(Integer.parseInt(tokens[0]), Integer.parseInt(tokens[1]));
-    }
-    LOG.debug("Phone range {}", this.phoneRange);
-
-    RandomEventGenerator phones = dag.addOperator("Receiver", RandomEventGenerator.class);
-    phones.setMinvalue(this.phoneRange.getMinimum());
-    phones.setMaxvalue(this.phoneRange.getMaximum());
-
-    PhoneMovementGenerator movementGen = dag.addOperator("LocationFinder", PhoneMovementGenerator.class);
-    dag.setAttribute(movementGen, OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
-
-    StatelessThroughputBasedPartitioner<PhoneMovementGenerator> partitioner = new StatelessThroughputBasedPartitioner<PhoneMovementGenerator>();
-    partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 45000));
-    partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));
-    partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));
-    dag.setAttribute(movementGen, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));
-    dag.setAttribute(movementGen, OperatorContext.PARTITIONER, partitioner);
-
-    // generate seed numbers
-    Random random = new Random();
-    int maxPhone = phoneRange.getMaximum() - phoneRange.getMinimum();
-    int phonesToDisplay = conf.getInt(TOTAL_SEED_NOS, 10);
-    for (int i = phonesToDisplay; i-- > 0; ) {
-      int phoneNo = phoneRange.getMinimum() + random.nextInt(maxPhone + 1);
-      LOG.info("seed no: " + phoneNo);
-      movementGen.phoneRegister.add(phoneNo);
-    }
-    // done generating data
-    LOG.info("Finished generating seed data.");
-
-    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-    URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
-    PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator("LocationResults", new PubSubWebSocketOutputOperator<Object>());
-    wsOut.setUri(uri);
-    PubSubWebSocketInputOperator<Map<String, String>> wsIn = dag.addOperator("QueryLocation", new PubSubWebSocketInputOperator<Map<String, String>>());
-    wsIn.setUri(uri);
-    // default partitioning: first connected stream to movementGen will be partitioned
-    dag.addStream("Phone-Data", phones.integer_data, movementGen.data);
-    dag.addStream("Results", movementGen.locationQueryResult, wsOut.input);
-    dag.addStream("Query", wsIn.outputPort, movementGen.phoneQuery);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java
deleted file mode 100644
index f6708ba..0000000
--- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mobile;
-
-import java.util.Map;
-import java.util.Random;
-import javax.validation.constraints.Min;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * Generates mobile numbers that will be displayed in mobile demo just after launch.<br></br>
- * Operator attributes:<b>
- * <ul>
- *   <li>initialDisplayCount: No. of seed phone numbers that will be generated.</li>
- *   <li>maxSeedPhoneNumber: The largest seed phone number.</li>
- * </ul>
- * </b>
- *
- * @since 0.3.5
- */
-public class PhoneEntryOperator extends BaseOperator
-{
-  private static Logger LOG = LoggerFactory.getLogger(PhoneEntryOperator.class);
-
-  private boolean seedGenerationDone = false;
-
-  @Min(0)
-  private int initialDisplayCount = 0;
-
-  private int maxSeedPhoneNumber = 0;
-  private int rangeLowerEndpoint;
-  private int rangeUpperEndpoint;
-
-  /**
-   * Sets the initial number of phones to display on the google map.
-   *
-   * @param i the count of initial phone numbers to display
-   */
-  public void setInitialDisplayCount(int i)
-  {
-    initialDisplayCount = i;
-  }
-
-  /**
-   * Sets the range for the phone numbers generated by the operator.
-   *
-   * @param i the range within which the phone numbers are randomly generated.
-   */
-  public void setPhoneRange(Range<Integer> phoneRange)
-  {
-    this.rangeLowerEndpoint = phoneRange.lowerEndpoint();
-    this.rangeUpperEndpoint = phoneRange.upperEndpoint();
-  }
-
-  /**
-   * Sets the max seed for random phone number generation
-   *
-   * @param i the number to initialize the random number phone generator.
-   */
-  public void setMaxSeedPhoneNumber(int number)
-  {
-    this.maxSeedPhoneNumber = number;
-  }
-
-  @InputPortFieldAnnotation(optional = true)
-  public final transient DefaultInputPort<Map<String, String>> locationQuery = new DefaultInputPort<Map<String, String>>()
-  {
-    @Override
-    public void process(Map<String, String> tuple)
-    {
-      seedPhones.emit(tuple);
-    }
-  };
-
-  public final transient DefaultOutputPort<Map<String, String>> seedPhones = new DefaultOutputPort<Map<String, String>>();
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-    if (!seedGenerationDone) {
-      Random random = new Random();
-      int maxPhone = (maxSeedPhoneNumber <= rangeUpperEndpoint && maxSeedPhoneNumber >= rangeLowerEndpoint) ? maxSeedPhoneNumber : rangeUpperEndpoint;
-      maxPhone -= 5550000;
-      int phonesToDisplay = initialDisplayCount > maxPhone ? maxPhone : initialDisplayCount;
-      for (int i = phonesToDisplay; i-- > 0; ) {
-        int phoneNo = 5550000 + random.nextInt(maxPhone + 1);
-        LOG.info("seed no: " + phoneNo);
-        Map<String, String> valueMap = Maps.newHashMap();
-        valueMap.put(PhoneMovementGenerator.KEY_COMMAND, PhoneMovementGenerator.COMMAND_ADD);
-        valueMap.put(PhoneMovementGenerator.KEY_PHONE, Integer.toString(phoneNo));
-        seedPhones.emit(valueMap);
-      }
-      // done generating data
-      seedGenerationDone = true;
-      LOG.info("Finished generating seed data.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java
deleted file mode 100644
index a46e6d4..0000000
--- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mobile;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
-import javax.validation.constraints.Min;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang.mutable.MutableLong;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.counters.BasicCounters;
-import com.datatorrent.lib.util.HighLow;
-
-/**
- * <p>
- * This operator generates the GPS locations for the phone numbers specified.
- * The range of phone numbers or a specific phone number can be set for which the GPS locations will be generated.
- * It supports querying the locations of a given phone number.
- * This is a partionable operator that can partition as the tuplesBlast increases.
- * </p>
- *
- * @since 0.3.2
- */
-public class PhoneMovementGenerator extends BaseOperator
-{
-  public final transient DefaultInputPort<Integer> data = new DefaultInputPort<Integer>()
-  {
-    @Override
-    public void process(Integer tuple)
-    {
-      HighLow<Integer> loc = gps.get(tuple);
-      if (loc == null) {
-        loc = new HighLow<Integer>(random.nextInt(range), random.nextInt(range));
-        gps.put(tuple, loc);
-      }
-      int xloc = loc.getHigh();
-      int yloc = loc.getLow();
-      int state = rotate % 4;
-
-      // Compute new location
-      int delta = random.nextInt(100);
-      if (delta >= threshold) {
-        if (state < 2) {
-          xloc++;
-        } else {
-          xloc--;
-        }
-        if (xloc < 0) {
-          xloc += range;
-        }
-      }
-      delta = random.nextInt(100);
-      if (delta >= threshold) {
-        if ((state == 1) || (state == 3)) {
-          yloc++;
-        } else {
-          yloc--;
-        }
-        if (yloc < 0) {
-          yloc += range;
-        }
-      }
-      xloc %= range;
-      yloc %= range;
-
-      // Set new location
-      HighLow<Integer> nloc = newgps.get(tuple);
-      if (nloc == null) {
-        newgps.put(tuple, new HighLow<Integer>(xloc, yloc));
-      } else {
-        nloc.setHigh(xloc);
-        nloc.setLow(yloc);
-      }
-      rotate++;
-    }
-  };
-
-  @InputPortFieldAnnotation(optional = true)
-  public final transient DefaultInputPort<Map<String,String>> phoneQuery = new DefaultInputPort<Map<String,String>>()
-  {
-    @Override
-    public void process(Map<String,String> tuple)
-    {
-      LOG.info("new query {}", tuple);
-      String command = tuple.get(KEY_COMMAND);
-      if (command != null) {
-        if (command.equals(COMMAND_ADD)) {
-          commandCounters.getCounter(CommandCounters.ADD).increment();
-          String phoneStr = tuple.get(KEY_PHONE);
-          registerPhone(phoneStr);
-        } else if (command.equals(COMMAND_ADD_RANGE)) {
-          commandCounters.getCounter(CommandCounters.ADD_RANGE).increment();
-          registerPhoneRange(tuple.get(KEY_START_PHONE), tuple.get(KEY_END_PHONE));
-        } else if (command.equals(COMMAND_DELETE)) {
-          commandCounters.getCounter(CommandCounters.DELETE).increment();
-          String phoneStr = tuple.get(KEY_PHONE);
-          deregisterPhone(phoneStr);
-        } else if (command.equals(COMMAND_CLEAR)) {
-          commandCounters.getCounter(CommandCounters.CLEAR).increment();
-          clearPhones();
-        }
-      }
-    }
-  };
-
-  public static final String KEY_COMMAND = "command";
-  public static final String KEY_PHONE = "phone";
-  public static final String KEY_LOCATION = "location";
-  public static final String KEY_REMOVED = "removed";
-  public static final String KEY_START_PHONE = "startPhone";
-  public static final String KEY_END_PHONE = "endPhone";
-
-  public static final String COMMAND_ADD = "add";
-  public static final String COMMAND_ADD_RANGE = "addRange";
-  public static final String COMMAND_DELETE = "del";
-  public static final String COMMAND_CLEAR = "clear";
-
-  final Set<Integer> phoneRegister = Sets.newHashSet();
-
-  private final transient HashMap<Integer, HighLow<Integer>> gps = new HashMap<Integer, HighLow<Integer>>();
-  private final Random random = new Random();
-  private int range = 50;
-  private int threshold = 80;
-  private int rotate = 0;
-
-  protected BasicCounters<MutableLong> commandCounters;
-
-  private transient OperatorContext context;
-  private final transient HashMap<Integer, HighLow<Integer>> newgps = new HashMap<Integer, HighLow<Integer>>();
-
-  public PhoneMovementGenerator()
-  {
-    this.commandCounters = new BasicCounters<MutableLong>(MutableLong.class);
-  }
-
-  /**
-   * @return the range of the phone numbers
-   */
-  @Min(0)
-  public int getRange()
-  {
-    return range;
-  }
-
-  /**
-   * Sets the range of phone numbers for which the GPS locations need to be generated.
-   *
-   * @param i the range of phone numbers to set
-   */
-  public void setRange(int i)
-  {
-    range = i;
-  }
-
-  /**
-   * @return the threshold
-   */
-  @Min(0)
-  public int getThreshold()
-  {
-    return threshold;
-  }
-
-  /**
-   * Sets the threshold that decides how frequently the GPS locations are updated.
-   *
-   * @param i the value that decides how frequently the GPS locations change.
-   */
-  public void setThreshold(int i)
-  {
-    threshold = i;
-  }
-
-  private void registerPhone(String phoneStr)
-  {
-    // register the phone channel
-    if (Strings.isNullOrEmpty(phoneStr)) {
-      return;
-    }
-    try {
-      Integer phone = new Integer(phoneStr);
-      registerSinglePhone(phone);
-    } catch (NumberFormatException nfe) {
-      LOG.warn("Invalid no {}", phoneStr);
-    }
-  }
-
-  private void registerPhoneRange(String startPhoneStr, String endPhoneStr)
-  {
-    if (Strings.isNullOrEmpty(startPhoneStr) || Strings.isNullOrEmpty(endPhoneStr)) {
-      LOG.warn("Invalid phone range {} {}", startPhoneStr, endPhoneStr);
-      return;
-    }
-    try {
-      Integer startPhone = new Integer(startPhoneStr);
-      Integer endPhone = new Integer(endPhoneStr);
-      if (endPhone < startPhone) {
-        LOG.warn("Invalid phone range {} {}", startPhone, endPhone);
-        return;
-      }
-      for (int i = startPhone; i <= endPhone; i++) {
-        registerSinglePhone(i);
-      }
-    } catch (NumberFormatException nfe) {
-      LOG.warn("Invalid phone range <{},{}>", startPhoneStr, endPhoneStr);
-    }
-  }
-
-  private void registerSinglePhone(int phone)
-  {
-    phoneRegister.add(phone);
-    LOG.debug("Registered query id with phone {}", phone);
-    emitQueryResult(phone);
-  }
-
-  private void deregisterPhone(String phoneStr)
-  {
-    if (Strings.isNullOrEmpty(phoneStr)) {
-      return;
-    }
-    try {
-      Integer phone = new Integer(phoneStr);
-      // remove the channel
-      if (phoneRegister.contains(phone)) {
-        phoneRegister.remove(phone);
-        LOG.debug("Removing query id {}", phone);
-        emitPhoneRemoved(phone);
-      }
-    } catch (NumberFormatException nfe) {
-      LOG.warn("Invalid phone {}", phoneStr);
-    }
-  }
-
-  private void clearPhones()
-  {
-    phoneRegister.clear();
-    LOG.info("Clearing phones");
-  }
-
-  public final transient DefaultOutputPort<Map<String, String>> locationQueryResult = new DefaultOutputPort<Map<String, String>>();
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    this.context = context;
-    commandCounters.setCounter(CommandCounters.ADD, new MutableLong());
-    commandCounters.setCounter(CommandCounters.ADD_RANGE, new MutableLong());
-    commandCounters.setCounter(CommandCounters.DELETE, new MutableLong());
-    commandCounters.setCounter(CommandCounters.CLEAR, new MutableLong());
-  }
-
-  /**
-   * Emit all the data and clear the hash
-   */
-  @Override
-  public void endWindow()
-  {
-    for (Map.Entry<Integer, HighLow<Integer>> e: newgps.entrySet()) {
-      HighLow<Integer> loc = gps.get(e.getKey());
-      if (loc == null) {
-        gps.put(e.getKey(), e.getValue());
-      } else {
-        loc.setHigh(e.getValue().getHigh());
-        loc.setLow(e.getValue().getLow());
-      }
-    }
-    boolean found = false;
-    for (Integer phone: phoneRegister) {
-      emitQueryResult( phone);
-      found = true;
-    }
-    if (!found) {
-      LOG.debug("No phone number");
-    }
-    newgps.clear();
-    context.setCounters(commandCounters);
-  }
-
-  private void emitQueryResult(Integer phone)
-  {
-    HighLow<Integer> loc = gps.get(phone);
-    if (loc != null) {
-      Map<String, String> queryResult = new HashMap<String, String>();
-      queryResult.put(KEY_PHONE, String.valueOf(phone));
-      queryResult.put(KEY_LOCATION, loc.toString());
-      locationQueryResult.emit(queryResult);
-    }
-  }
-
-  private void emitPhoneRemoved(Integer phone)
-  {
-    Map<String,String> removedResult = Maps.newHashMap();
-    removedResult.put(KEY_PHONE, String.valueOf(phone));
-    removedResult.put(KEY_REMOVED,"true");
-    locationQueryResult.emit(removedResult);
-  }
-
-  public static enum CommandCounters
-  {
-    ADD, ADD_RANGE, DELETE, CLEAR
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(PhoneMovementGenerator.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/main/java/com/datatorrent/demos/mobile/doc-files/Mobile.png
----------------------------------------------------------------------
diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/doc-files/Mobile.png b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/doc-files/Mobile.png
deleted file mode 100644
index a25da0d..0000000
Binary files a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/doc-files/Mobile.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/main/java/com/datatorrent/demos/mobile/package-info.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/package-info.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/package-info.java
deleted file mode 100644
index 378e7a0..0000000
--- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Mobile phones tracking demonstration application.
- */
-package com.datatorrent.demos.mobile;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/mobile/src/main/resources/META-INF/properties.xml b/demos/mobile/src/main/resources/META-INF/properties.xml
deleted file mode 100644
index 247bd82..0000000
--- a/demos/mobile/src/main/resources/META-INF/properties.xml
+++ /dev/null
@@ -1,82 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<configuration>
-  <property>
-    <name>dt.application.MobileDemo.coolDownMillis</name>
-    <value>45000</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.maxThroughput</name>
-    <value>30000</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.minThroughput</name>
-    <value>10000</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.Receiver.tuplesBlast</name>
-    <value>200</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.Receiver.tuplesBlastIntervalMillis</name>
-    <value>5</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.Receiver.outputport.integer_data.attr.QUEUE_CAPACITY</name>
-    <value>32768</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.LocationFinder.range</name>
-    <value>20</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.LocationFinder.threshold</name>
-    <value>80</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.LocationFinder.inputport.data.attr.QUEUE_CAPACITY</name>
-    <value>32768</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.LocationResults.prop.topic</name>
-    <value>demos.mobile.phoneLocationQueryResult</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.QueryLocation.prop.topic</name>
-    <value>demos.mobile.phoneLocationQuery</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.*.attr.MEMORY_MB</name>
-    <value>768</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.*.attr.JVM_OPTIONS</name>
-    <value>-Xmx128m</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.*.port.*.attr.BUFFER_MEMORY_MB</name>
-    <value>256</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.attr.MASTER_MEMORY_MB</name>
-    <value>1024</value>
-  </property>
-</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/site/conf/my-app-conf1.xml
----------------------------------------------------------------------
diff --git a/demos/mobile/src/site/conf/my-app-conf1.xml b/demos/mobile/src/site/conf/my-app-conf1.xml
deleted file mode 100644
index f35873b..0000000
--- a/demos/mobile/src/site/conf/my-app-conf1.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<configuration>
-  <property>
-    <name>dt.attr.MASTER_MEMORY_MB</name>
-    <value>1024</value>
-  </property>
-</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java b/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
deleted file mode 100644
index 87e40bf..0000000
--- a/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mobile;
-
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.servlet.Servlet;
-
-import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.LocalMode;
-
-import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
-import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
-import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-public class ApplicationTest
-{
-  private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
-
-  public ApplicationTest()
-  {
-  }
-
-  /**
-   * Test of getApplication method, of class Application.
-   */
-  @Test
-  public void testGetApplication() throws Exception
-  {
-    Configuration conf = new Configuration(false);
-    conf.addResource("dt-site-mobile.xml");
-    Server server = new Server(0);
-    Servlet servlet = new SamplePubSubWebSocketServlet();
-    ServletHolder sh = new ServletHolder(servlet);
-    ServletContextHandler contextHandler = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
-    contextHandler.addServlet(sh, "/pubsub");
-    contextHandler.addServlet(sh, "/*");
-    server.start();
-    Connector[] connector = server.getConnectors();
-    conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort());
-    URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub");
-
-    PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>();
-    outputOperator.setUri(uri);
-    outputOperator.setTopic(conf.get("dt.application.MobileDemo.operator.QueryLocation.topic"));
-
-    PubSubWebSocketInputOperator<Map<String, String>> inputOperator = new PubSubWebSocketInputOperator<Map<String, String>>();
-    inputOperator.setUri(uri);
-    inputOperator.setTopic(conf.get("dt.application.MobileDemo.operator.LocationResults.topic"));
-
-    CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
-    inputOperator.outputPort.setSink(sink);
-
-    Map<String, String> data = new HashMap<String, String>();
-    data.put("command", "add");
-    data.put("phone", "5559990");
-
-    Application app = new Application();
-    LocalMode lma = LocalMode.newInstance();
-    lma.prepareDAG(app, conf);
-    LocalMode.Controller lc = lma.getController();
-    lc.setHeartbeatMonitoringEnabled(false);
-    lc.runAsync();
-    Thread.sleep(5000);
-    inputOperator.setup(null);
-    outputOperator.setup(null);
-    inputOperator.activate(null);
-    outputOperator.beginWindow(0);
-    outputOperator.input.process(data);
-    outputOperator.endWindow();
-    inputOperator.beginWindow(0);
-    int timeoutMillis = 5000;
-    while (sink.collectedTuples.size() < 5 && timeoutMillis > 0) {
-      inputOperator.emitTuples();
-      timeoutMillis -= 20;
-      Thread.sleep(20);
-    }
-    inputOperator.endWindow();
-    lc.shutdown();
-    inputOperator.teardown();
-    outputOperator.teardown();
-    server.stop();
-    Assert.assertTrue("size of output is 5 ", sink.collectedTuples.size() == 5);
-    for (Object obj : sink.collectedTuples) {
-      Assert.assertEquals("Expected phone number", "5559990", ((Map<String, String>)obj).get("phone"));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/test/resources/dt-site-mobile.xml
----------------------------------------------------------------------
diff --git a/demos/mobile/src/test/resources/dt-site-mobile.xml b/demos/mobile/src/test/resources/dt-site-mobile.xml
deleted file mode 100644
index 1759746..0000000
--- a/demos/mobile/src/test/resources/dt-site-mobile.xml
+++ /dev/null
@@ -1,87 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<configuration>
-  <property>
-    <name>dt.application.MobileDemo.class</name>
-    <value>com.datatorrent.demos.mobile.Application</value>
-    <description>An alias for the application</description>
-  </property>
-  <!--property>
-    <name>dt.attr.GATEWAY_CONNECT_ADDRESS</name>
-    <value>localhost:19090</value>
-  </property-->
-  <property>
-    <name>dt.application.MobileDemo.totalSeedNumbers</name>
-    <value>0</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.coolDownMillis</name>
-    <value>45000</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.maxThroughput</name>
-    <value>30000</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.minThroughput</name>
-    <value>1</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.Receiver.tuplesBlast</name>
-    <value>200</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.Receiver.tuplesBlastIntervalMillis</name>
-    <value>5</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.Receiver.outputport.integer_data.attr.QUEUE_CAPACITY</name>
-    <value>32768</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.LocationFinder.range</name>
-    <value>20</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.LocationFinder.threshold</name>
-    <value>80</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.LocationFinder.inputport.data.attr.QUEUE_CAPACITY</name>
-    <value>32768</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.LocationResults.topic</name>
-    <value>resultTopic</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.QueryLocation.topic</name>
-    <value>queryTopic</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.operator.*.attr.MEMORY_MB</name>
-    <value>2048</value>
-  </property>
-  <property>
-    <name>dt.application.MobileDemo.attr.MASTER_MEMORY_MB</name>
-    <value>1024</value>
-  </property>
-</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/mobile/src/test/resources/log4j.properties b/demos/mobile/src/test/resources/log4j.properties
deleted file mode 100644
index cf0d19e..0000000
--- a/demos/mobile/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-log4j.rootLogger=DEBUG,CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
-test.log.console.threshold=DEBUG
-
-log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.RFA.File=/tmp/app.log
-
-# to enable, add SYSLOG to rootLogger
-log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
-log4j.appender.SYSLOG.syslogHost=127.0.0.1
-log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
-log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
-log4j.appender.SYSLOG.Facility=LOCAL1
-
-log4j.logger.org=info
-#log4j.logger.org.apache.commons.beanutils=warn
-log4j.logger.com.datatorrent=debug
-log4j.logger.org.apache.apex=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/pom.xml
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/pom.xml b/demos/mrmonitor/pom.xml
deleted file mode 100644
index 9373063..0000000
--- a/demos/mrmonitor/pom.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <artifactId>mrmonitor</artifactId>
-  <packaging>jar</packaging>
-
-  <name>Apache Apex Malhar MR Monitoring Demo</name>
-  <description></description>
-
-  <parent>
-    <groupId>org.apache.apex</groupId>
-    <artifactId>malhar-demos</artifactId>
-    <version>3.7.0-SNAPSHOT</version>
-  </parent>
-
-  <properties>
-    <skipTests>true</skipTests>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-websocket</artifactId>
-      <version>8.1.10.v20130312</version>
-      <scope>test</scope>
-      <type>jar</type>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-servlet</artifactId>
-      <version>8.1.10.v20130312</version>
-      <scope>test</scope>
-      <type>jar</type>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.httpcomponents</groupId>
-      <artifactId>httpclient</artifactId>
-      <version>4.3.5</version>
-      <type>jar</type>
-    </dependency>
-  </dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/assemble/appPackage.xml b/demos/mrmonitor/src/assemble/appPackage.xml
deleted file mode 100644
index 4138cf2..0000000
--- a/demos/mrmonitor/src/assemble/appPackage.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
-  <id>appPackage</id>
-  <formats>
-    <format>jar</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <fileSets>
-    <fileSet>
-      <directory>${basedir}/target/</directory>
-      <outputDirectory>/app</outputDirectory>
-      <includes>
-        <include>${project.artifactId}-${project.version}.jar</include>
-      </includes>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/target/deps</directory>
-      <outputDirectory>/lib</outputDirectory>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/site/conf</directory>
-      <outputDirectory>/conf</outputDirectory>
-      <includes>
-        <include>*.xml</include>
-      </includes>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/main/resources/META-INF</directory>
-      <outputDirectory>/META-INF</outputDirectory>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/main/resources/app</directory>
-      <outputDirectory>/app</outputDirectory>
-    </fileSet>
-  </fileSets>
-
-</assembly>
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java
deleted file mode 100644
index 5625439..0000000
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mrmonitor;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.testbench.SeedEventGenerator;
-
-/**
- * Application
- *
- * @since 2.0.0
- */
-@ApplicationAnnotation(name = "MyFirstApplication")
-public class Application implements StreamingApplication
-{
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    // Sample DAG with 2 operators
-    // Replace this code with the DAG you want to build
-
-    SeedEventGenerator seedGen = dag.addOperator("seedGen", SeedEventGenerator.class);
-    seedGen.setSeedStart(1);
-    seedGen.setSeedEnd(10);
-    seedGen.addKeyData("x", 0, 10);
-    seedGen.addKeyData("y", 0, 100);
-
-    ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
-    cons.setStringFormat("hello: %s");
-
-    dag.addStream("seeddata", seedGen.val_list, cons.input).setLocality(Locality.CONTAINER_LOCAL);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java
deleted file mode 100644
index 7930405..0000000
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mrmonitor;
-
-/**
- * <p>Constants class.</p>
- *
- * @since 0.3.4
- */
-public interface Constants
-{
-
-  public static final int MAX_NUMBER_OF_JOBS = 25;
-
-  public static final String REDUCE_TASK_TYPE = "REDUCE";
-  public static final String MAP_TASK_TYPE = "MAP";
-  public static final String TASK_TYPE = "type";
-  public static final String TASK_ID = "id";
-
-  public static final String LEAGACY_TASK_ID = "taskId";
-  public static final int MAX_TASKS = 2000;
-
-  public static final String QUERY_APP_ID = "app_id";
-  public static final String QUERY_JOB_ID = "job_id";
-  public static final String QUERY_HADOOP_VERSION = "hadoop_version";
-  public static final String QUERY_API_VERSION = "api_version";
-  public static final String QUERY_RM_PORT = "rm_port";
-  public static final String QUERY_HS_PORT = "hs_port";
-  public static final String QUERY_HOST_NAME = "hostname";
-  public static final String QUERY_KEY_COMMAND = "command";
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java
deleted file mode 100644
index 263a1a7..0000000
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java
+++ /dev/null
@@ -1,623 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mrmonitor;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Operator.IdleTimeHandler;
-
-import com.datatorrent.demos.mrmonitor.MRStatusObject.TaskObject;
-
-/**
- * <p>
- * MRJobStatusOperator class.
- * </p>
- *
- * @since 0.3.4
- */
-public class MRJobStatusOperator implements Operator, IdleTimeHandler
-{
-  private static final Logger LOG = LoggerFactory.getLogger(MRJobStatusOperator.class);
-
-  private static final String JOB_PREFIX = "job_";
-  /**
-   * This outputs the meta information of the job
-   */
-  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
-  /**
-   * This outputs the map task information of the job
-   */
-  public final transient DefaultOutputPort<String> mapOutput = new DefaultOutputPort<String>();
-  /**
-   * This outputs the reduce task information of the job
-   */
-  public final transient DefaultOutputPort<String> reduceOutput = new DefaultOutputPort<String>();
-  /**
-   * This outputs the counter information of the job
-   */
-  public final transient DefaultOutputPort<String> counterOutput = new DefaultOutputPort<String>();
-  /**
-   * This is time in Ms before making new request for data
-   */
-  private transient int sleepTime = 100;
-  /**
-   * This is the number of consecutive windows of no change before the job is removed from map
-   */
-  private int maxRetrials = 10;
-  /**
-   * The number of minutes for which the status history of map and reduce tasks is stored
-   */
-  private int statusHistoryTime = 60;
-  private Map<String, MRStatusObject> jobMap = new HashMap<String, MRStatusObject>();
-  /**
-   * This represents the maximum number of jobs the single instance of this operator is going to server at any time
-   */
-  private int maxJobs = Constants.MAX_NUMBER_OF_JOBS;
-  private transient Iterator<MRStatusObject> iterator;
-
-  /*
-   * each input string is a map of the following format {"app_id":<>,"hadoop_version":<>,"api_version":<>,"command":<>,
-   * "hostname":<>,"hs_port":<>,"rm_port":<>,"job_id":<>}
-   */
-  public final transient DefaultInputPort<MRStatusObject> input = new DefaultInputPort<MRStatusObject>()
-  {
-    @Override
-    public void process(MRStatusObject mrStatusObj)
-    {
-
-      if (jobMap == null) {
-        jobMap = new HashMap<String, MRStatusObject>();
-      }
-
-      if (jobMap.size() >= maxJobs) {
-        return;
-      }
-
-      if ("delete".equalsIgnoreCase(mrStatusObj.getCommand())) {
-        removeJob(mrStatusObj.getJobId());
-        JSONObject outputJsonObject = new JSONObject();
-        try {
-          outputJsonObject.put("id", mrStatusObj.getJobId());
-          outputJsonObject.put("removed", "true");
-          output.emit(outputJsonObject.toString());
-        } catch (JSONException e) {
-          LOG.warn("Error creating JSON: {}", e.getMessage());
-        }
-        return;
-      }
-      if ("clear".equalsIgnoreCase(mrStatusObj.getCommand())) {
-        clearMap();
-        return;
-      }
-
-      if (jobMap.get(mrStatusObj.getJobId()) != null) {
-        mrStatusObj = jobMap.get(mrStatusObj.getJobId());
-      }
-      if (mrStatusObj.getHadoopVersion() == 2) {
-        getJsonForJob(mrStatusObj);
-      } else if (mrStatusObj.getHadoopVersion() == 1) {
-        getJsonForLegacyJob(mrStatusObj);
-      }
-      mrStatusObj.setStatusHistoryCount(statusHistoryTime);
-      iterator = jobMap.values().iterator();
-      emitHelper(mrStatusObj);
-    }
-  };
-
-  public int getStatusHistoryTime()
-  {
-    return statusHistoryTime;
-  }
-
-  public void setStatusHistoryTime(int statusHistoryTime)
-  {
-    this.statusHistoryTime = statusHistoryTime;
-    if (jobMap != null && jobMap.size() > 0) {
-      for (Entry<String, MRStatusObject> entry : jobMap.entrySet()) {
-        entry.getValue().setStatusHistoryCount(statusHistoryTime);
-      }
-    }
-
-  }
-
-  /**
-   * This method gets the latest status of the job from the Resource Manager for jobs submitted on hadoop 2.x version
-   *
-   * @param statusObj
-   */
-  private void getJsonForJob(MRStatusObject statusObj)
-  {
-
-    String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/proxy/application_" + statusObj.getAppId() + "/ws/v1/mapreduce/jobs/job_" + statusObj.getJobId();
-    String responseBody = MRUtil.getJsonForURL(url);
-
-    JSONObject jsonObj = MRUtil.getJsonObject(responseBody);
-
-    if (jsonObj == null) {
-      url = "http://" + statusObj.getUri() + ":" + statusObj.getHistoryServerPort() + "/ws/v1/history/mapreduce/jobs/job_" + statusObj.getJobId();
-      responseBody = MRUtil.getJsonForURL(url);
-      jsonObj = MRUtil.getJsonObject(responseBody);
-    }
-
-    if (jsonObj != null) {
-      if (jobMap.get(statusObj.getJobId()) != null) {
-        MRStatusObject tempObj = jobMap.get(statusObj.getJobId());
-        if (tempObj.getJsonObject().toString().equals(jsonObj.toString())) {
-          getJsonsForTasks(statusObj);
-          getCounterInfoForJob(statusObj);
-          return;
-        }
-      }
-      statusObj.setModified(true);
-      statusObj.setJsonObject(jsonObj);
-      getCounterInfoForJob(statusObj);
-      getJsonsForTasks(statusObj);
-      jobMap.put(statusObj.getJobId(), statusObj);
-    }
-  }
-
-  /**
-   * This method is used to collect the metric information about the job
-   *
-   * @param statusObj
-   */
-  private void getCounterInfoForJob(MRStatusObject statusObj)
-  {
-    String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/proxy/application_" + statusObj.getAppId() + "/ws/v1/mapreduce/jobs/job_" + statusObj.getJobId() + "/counters";
-    String responseBody = MRUtil.getJsonForURL(url);
-    JSONObject jsonObj = MRUtil.getJsonObject(responseBody);
-    if (jsonObj == null) {
-      url = "http://" + statusObj.getUri() + ":" + statusObj.getHistoryServerPort() + "/ws/v1/history/mapreduce/jobs/job_" + statusObj.getJobId() + "/counters";
-      responseBody = MRUtil.getJsonForURL(url);
-      jsonObj = MRUtil.getJsonObject(responseBody);
-    }
-
-    if (jsonObj != null) {
-      if (statusObj.getMetricObject() == null) {
-        statusObj.setMetricObject(new TaskObject(jsonObj));
-      } else if (!statusObj.getMetricObject().getJsonString().equalsIgnoreCase(jsonObj.toString())) {
-        statusObj.getMetricObject().setJson(jsonObj);
-        statusObj.getMetricObject().setModified(true);
-      }
-    }
-  }
-
-  /**
-   * This method gets the latest status of the tasks for a job from the Resource Manager for jobs submitted on hadoop
-   * 2.x version
-   *
-   * @param statusObj
-   */
-  private void getJsonsForTasks(MRStatusObject statusObj)
-  {
-    String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/proxy/application_" + statusObj.getAppId() + "/ws/v1/mapreduce/jobs/job_" + statusObj.getJobId() + "/tasks/";
-    String responseBody = MRUtil.getJsonForURL(url);
-
-    JSONObject jsonObj = MRUtil.getJsonObject(responseBody);
-    if (jsonObj == null) {
-      url = "http://" + statusObj.getUri() + ":" + statusObj.getHistoryServerPort() + "/ws/v1/history/mapreduce/jobs/job_" + statusObj.getJobId() + "/tasks/";
-      responseBody = MRUtil.getJsonForURL(url);
-
-      jsonObj = MRUtil.getJsonObject(responseBody);
-    }
-
-    if (jsonObj != null) {
-
-      try {
-        Map<String, TaskObject> mapTaskOject = statusObj.getMapJsonObject();
-        Map<String, TaskObject> reduceTaskOject = statusObj.getReduceJsonObject();
-        JSONArray taskJsonArray = jsonObj.getJSONObject("tasks").getJSONArray("task");
-
-        for (int i = 0; i < taskJsonArray.length(); i++) {
-          JSONObject taskObj = taskJsonArray.getJSONObject(i);
-          if (Constants.REDUCE_TASK_TYPE.equalsIgnoreCase(taskObj.getString(Constants.TASK_TYPE))) {
-            if (reduceTaskOject.get(taskObj.getString(Constants.TASK_ID)) != null) {
-              TaskObject tempTaskObj = reduceTaskOject.get(taskObj.getString(Constants.TASK_ID));
-              if (tempTaskObj.getJsonString().equals(taskObj.toString())) {
-                continue;
-              }
-              tempTaskObj.setJson(taskObj);
-              tempTaskObj.setModified(true);
-              reduceTaskOject.put(taskObj.getString(Constants.TASK_ID), tempTaskObj);
-              continue;
-            }
-            reduceTaskOject.put(taskObj.getString(Constants.TASK_ID), new TaskObject(taskObj));
-          } else {
-            if (mapTaskOject.get(taskObj.getString(Constants.TASK_ID)) != null) {
-              TaskObject tempTaskObj = mapTaskOject.get(taskObj.getString(Constants.TASK_ID));
-              if (tempTaskObj.getJsonString().equals(taskObj.toString())) {
-                continue;
-              }
-              tempTaskObj.setJson(taskObj);
-              tempTaskObj.setModified(true);
-              mapTaskOject.put(taskObj.getString(Constants.TASK_ID), tempTaskObj);
-              continue;
-            }
-            mapTaskOject.put(taskObj.getString(Constants.TASK_ID), new TaskObject(taskObj));
-          }
-        }
-        statusObj.setMapJsonObject(mapTaskOject);
-        statusObj.setReduceJsonObject(reduceTaskOject);
-      } catch (Exception e) {
-        LOG.info("exception: {}", e.getMessage());
-      }
-    }
-
-  }
-
-  /**
-   * This method gets the latest status of the job from the Task Manager for jobs submitted on hadoop 1.x version
-   *
-   * @param statusObj
-   */
-  private void getJsonForLegacyJob(MRStatusObject statusObj)
-  {
-
-    String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/jobdetails.jsp?format=json&jobid=job_" + statusObj.getJobId();
-    String responseBody = MRUtil.getJsonForURL(url);
-
-    JSONObject jsonObj = MRUtil.getJsonObject(responseBody);
-    if (jsonObj == null) {
-      return;
-    }
-
-    if (jobMap.get(statusObj.getJobId()) != null) {
-      MRStatusObject tempObj = jobMap.get(statusObj.getJobId());
-      if (tempObj.getJsonObject().toString().equals(jsonObj.toString())) {
-        getJsonsForLegacyTasks(statusObj, "map");
-        getJsonsForLegacyTasks(statusObj, "reduce");
-        // output.emit(jsonObj.toString());
-        // removeJob(statusObj.getJobId());
-        return;
-      }
-    }
-
-    // output.emit(jsonObj.toString());
-    statusObj.setModified(true);
-    statusObj.setJsonObject(jsonObj);
-    getJsonsForLegacyTasks(statusObj, "map");
-    getJsonsForLegacyTasks(statusObj, "reduce");
-    jobMap.put(statusObj.getJobId(), statusObj);
-
-  }
-
-  /**
-   * This method gets the latest status of the tasks for a job from the Task Manager for jobs submitted on hadoop 1.x
-   * version
-   *
-   * @param statusObj
-   * @param type
-   */
-  private void getJsonsForLegacyTasks(MRStatusObject statusObj, String type)
-  {
-    try {
-      JSONObject jobJson = statusObj.getJsonObject();
-      int totalTasks = ((JSONObject)((JSONObject)jobJson.get(type + "TaskSummary")).get("taskStats")).getInt("numTotalTasks");
-      Map<String, TaskObject> taskMap;
-      if (type.equalsIgnoreCase("map")) {
-        taskMap = statusObj.getMapJsonObject();
-      } else {
-        taskMap = statusObj.getReduceJsonObject();
-      }
-
-      int totalPagenums = (totalTasks / Constants.MAX_TASKS) + 1;
-      String baseUrl = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/jobtasks.jsp?type=" + type + "&format=json&jobid=job_" + statusObj.getJobId() + "&pagenum=";
-
-      for (int pagenum = 1; pagenum <= totalPagenums; pagenum++) {
-
-        String url = baseUrl + pagenum;
-        String responseBody = MRUtil.getJsonForURL(url);
-
-        JSONObject jsonObj = MRUtil.getJsonObject(responseBody);
-        if (jsonObj == null) {
-          return;
-        }
-
-        JSONArray taskJsonArray = jsonObj.getJSONArray("tasksInfo");
-
-        for (int i = 0; i < taskJsonArray.length(); i++) {
-          JSONObject taskObj = taskJsonArray.getJSONObject(i);
-          {
-            if (taskMap.get(taskObj.getString(Constants.LEAGACY_TASK_ID)) != null) {
-              TaskObject tempReduceObj = taskMap.get(taskObj.getString(Constants.LEAGACY_TASK_ID));
-              if (tempReduceObj.getJsonString().equals(taskObj.toString())) {
-                // tempReduceObj.setModified(false);
-                // taskMap.put(taskObj.getString(Constants.TASK_ID), tempReduceObj);
-                continue;
-              }
-              tempReduceObj.setJson(taskObj);
-              tempReduceObj.setModified(true);
-              taskMap.put(taskObj.getString(Constants.TASK_ID), tempReduceObj);
-              continue;
-
-            }
-            taskMap.put(taskObj.getString(Constants.LEAGACY_TASK_ID), new TaskObject(taskObj));
-          }
-        }
-      }
-
-      if (type.equalsIgnoreCase("map")) {
-        statusObj.setMapJsonObject(taskMap);
-      } else {
-        statusObj.setReduceJsonObject(taskMap);
-      }
-    } catch (Exception e) {
-      LOG.info(e.getMessage());
-    }
-
-  }
-
-  @Override
-  public void handleIdleTime()
-  {
-    try {
-      Thread.sleep(sleepTime);//
-    } catch (InterruptedException ie) {
-      // If this thread was intrrupted by nother thread
-    }
-    if (!iterator.hasNext()) {
-      iterator = jobMap.values().iterator();
-    }
-
-    if (iterator.hasNext()) {
-      MRStatusObject obj = iterator.next();
-      if (obj.getHadoopVersion() == 2) {
-        getJsonForJob(obj);
-      } else if (obj.getHadoopVersion() == 1) {
-        getJsonForLegacyJob(obj);
-      }
-    }
-  }
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    iterator = jobMap.values().iterator();
-    sleepTime = context.getValue(OperatorContext.SPIN_MILLIS);
-  }
-
-  @Override
-  public void teardown()
-  {
-  }
-
-  @Override
-  public void beginWindow(long arg0)
-  {
-  }
-
-  private void emitHelper(MRStatusObject obj)
-  {
-    try {
-      obj.setModified(false);
-      output.emit(obj.getJsonObject().toString());
-      JSONObject outputJsonObject = new JSONObject();
-
-      outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
-      outputJsonObject.put("mapHistory", new JSONArray(obj.getMapStatusHistory()));
-      outputJsonObject.put("reduceHistory", new JSONArray(obj.getReduceStatusHistory()));
-      outputJsonObject.put("physicalMemoryHistory", new JSONArray(obj.getPhysicalMemeoryStatusHistory()));
-      outputJsonObject.put("virtualMemoryHistory", new JSONArray(obj.getVirtualMemoryStatusHistory()));
-      outputJsonObject.put("cpuHistory", new JSONArray(obj.getCpuStatusHistory()));
-      output.emit(outputJsonObject.toString());
-      obj.setChangedHistoryStatus(false);
-
-      outputJsonObject = new JSONObject();
-      outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
-      JSONArray arr = new JSONArray();
-
-      for (Map.Entry<String, TaskObject> mapEntry : obj.getMapJsonObject().entrySet()) {
-        TaskObject json = mapEntry.getValue();
-        json.setModified(false);
-        arr.put(json.getJson());
-      }
-
-      outputJsonObject.put("tasks", arr);
-      mapOutput.emit(outputJsonObject.toString());
-
-      outputJsonObject = new JSONObject();
-      outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
-      arr = new JSONArray();
-
-      for (Map.Entry<String, TaskObject> mapEntry : obj.getReduceJsonObject().entrySet()) {
-        TaskObject json = mapEntry.getValue();
-        json.setModified(false);
-        arr.put(json.getJson());
-      }
-
-      outputJsonObject.put("tasks", arr);
-      reduceOutput.emit(outputJsonObject.toString());
-      obj.setRetrials(0);
-    } catch (Exception e) {
-      LOG.warn("error creating json {}", e.getMessage());
-    }
-
-  }
-
-  @Override
-  public void endWindow()
-  {
-    List<String> delList = new ArrayList<String>();
-    try {
-      for (Map.Entry<String, MRStatusObject> entry : jobMap.entrySet()) {
-        MRStatusObject obj = entry.getValue();
-
-        JSONObject outputJsonObject = new JSONObject();
-        outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
-
-        boolean modified = false;
-
-        if (obj.isModified()) {
-          modified = true;
-          obj.setModified(false);
-          output.emit(obj.getJsonObject().toString());
-          if (obj.isChangedHistoryStatus()) {
-            outputJsonObject.put("mapHistory", new JSONArray(obj.getMapStatusHistory()));
-            outputJsonObject.put("reduceHistory", new JSONArray(obj.getReduceStatusHistory()));
-            outputJsonObject.put("physicalMemoryHistory", new JSONArray(obj.getPhysicalMemeoryStatusHistory()));
-            outputJsonObject.put("virtualMemoryHistory", new JSONArray(obj.getVirtualMemoryStatusHistory()));
-            outputJsonObject.put("cpuHistory", new JSONArray(obj.getCpuStatusHistory()));
-            output.emit(outputJsonObject.toString());
-            obj.setChangedHistoryStatus(false);
-          }
-        }
-        outputJsonObject = new JSONObject();
-        outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
-        JSONArray arr = new JSONArray();
-
-        for (Map.Entry<String, TaskObject> mapEntry : obj.getMapJsonObject().entrySet()) {
-          TaskObject json = mapEntry.getValue();
-          if (json.isModified()) {
-            modified = true;
-            json.setModified(false);
-            arr.put(json.getJson());
-          }
-        }
-
-        if (arr.length() > 0) {
-          outputJsonObject.put("tasks", arr);
-          mapOutput.emit(outputJsonObject.toString());
-        }
-
-        outputJsonObject = new JSONObject();
-        outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
-        arr = new JSONArray();
-
-        for (Map.Entry<String, TaskObject> mapEntry : obj.getReduceJsonObject().entrySet()) {
-          TaskObject json = mapEntry.getValue();
-          if (json.isModified()) {
-            modified = true;
-            json.setModified(false);
-            arr.put(json.getJson());
-          }
-        }
-        if (arr.length() > 0) {
-          outputJsonObject.put("tasks", arr);
-          reduceOutput.emit(outputJsonObject.toString());
-        }
-
-        if (obj.getMetricObject() != null && obj.getMetricObject().isModified()) {
-          modified = true;
-          obj.getMetricObject().setModified(false);
-          counterOutput.emit(obj.getMetricObject().getJsonString());
-        }
-
-        if (!modified) {
-          if (obj.getRetrials() >= maxRetrials) {
-            delList.add(obj.getJobId());
-          } else {
-            obj.setRetrials(obj.getRetrials() + 1);
-          }
-        } else {
-          obj.setRetrials(0);
-        }
-      }
-    } catch (Exception ex) {
-      LOG.warn("error creating json {}", ex.getMessage());
-    }
-
-    if (!delList.isEmpty()) {
-      Iterator<String> itr = delList.iterator();
-      while (itr.hasNext()) {
-        removeJob(itr.next());
-      }
-    }
-
-  }
-
-  /**
-   * This method removes the job from the map
-   *
-   * @param jobId
-   */
-  public void removeJob(String jobId)
-  {
-    if (jobMap != null) {
-      jobMap.remove(jobId);
-      iterator = jobMap.values().iterator();
-    }
-  }
-
-  /**
-   * This method clears the job map
-   */
-  public void clearMap()
-  {
-    if (jobMap != null) {
-      jobMap.clear();
-      iterator = jobMap.values().iterator();
-    }
-  }
-
-  /**
-   * This returns the maximum number of jobs the single instance of this operator is going to server at any time
-   *
-   * @return
-   */
-  public int getMaxJobs()
-  {
-    return maxJobs;
-  }
-
-  /**
-   * This sets the maximum number of jobs the single instance of this operator is going to server at any time
-   *
-   * @param maxJobs
-   */
-  public void setMaxJobs(int maxJobs)
-  {
-    this.maxJobs = maxJobs;
-  }
-
-  /**
-   * This sets the number of consecutive windows of no change before the job is removed from map
-   *
-   * @return
-   */
-  public int getMaxRetrials()
-  {
-    return maxRetrials;
-  }
-
-  /**
-   * This returns the number of consecutive windows of no change before the job is removed from map
-   *
-   * @param maxRetrials
-   */
-  public void setMaxRetrials(int maxRetrials)
-  {
-    this.maxRetrials = maxRetrials;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java
deleted file mode 100644
index 037378a..0000000
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mrmonitor;
-
-import java.net.URI;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
-import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
-
-/**
- * <p>
- * MRDebuggerApplication class.
- * </p>
- *
- * @since 0.3.4
- */
-@ApplicationAnnotation(name = "MRMonitoringDemo")
-public class MRMonitoringApplication implements StreamingApplication
-{
-
-  private static final Logger logger = LoggerFactory.getLogger(MRMonitoringApplication.class);
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-    MRJobStatusOperator mrJobOperator = dag.addOperator("JobMonitor", new MRJobStatusOperator());
-    URI uri = URI.create("ws://" + daemonAddress + "/pubsub");
-    logger.info("WebSocket with daemon at {}", daemonAddress);
-
-    PubSubWebSocketInputOperator wsIn = dag.addOperator("Query", new PubSubWebSocketInputOperator());
-    wsIn.setUri(uri);
-
-    MapToMRObjectOperator queryConverter = dag.addOperator("QueryConverter", new MapToMRObjectOperator());
-
-    /**
-     * This is used to emit the meta data about the job
-     */
-    PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator("JobOutput", new PubSubWebSocketOutputOperator<Object>());
-    wsOut.setUri(uri);
-
-    /**
-     * This is used to emit the information of map tasks of the job
-     */
-    PubSubWebSocketOutputOperator<Object> wsMapOut = dag.addOperator("MapJob", new PubSubWebSocketOutputOperator<Object>());
-    wsMapOut.setUri(uri);
-
-    /**
-     * This is used to emit the information of reduce tasks of the job
-     */
-    PubSubWebSocketOutputOperator<Object> wsReduceOut = dag.addOperator("ReduceJob", new PubSubWebSocketOutputOperator<Object>());
-    wsReduceOut.setUri(uri);
-
-    /**
-     * This is used to emit the metric information of the job
-     */
-    PubSubWebSocketOutputOperator<Object> wsCounterOut = dag.addOperator("JobCounter", new PubSubWebSocketOutputOperator<Object>());
-    wsCounterOut.setUri(uri);
-
-    dag.addStream("QueryConversion", wsIn.outputPort, queryConverter.input);
-    dag.addStream("QueryProcessing", queryConverter.output, mrJobOperator.input);
-    dag.addStream("JobData", mrJobOperator.output, wsOut.input);
-    dag.addStream("MapData", mrJobOperator.mapOutput, wsMapOut.input);
-    dag.addStream("ReduceData", mrJobOperator.reduceOutput, wsReduceOut.input);
-    dag.addStream("CounterData", mrJobOperator.counterOutput, wsCounterOut.input);
-  }
-
-}


Mime
View raw message