apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [13/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.
Date Tue, 07 Mar 2017 06:58:18 GMT
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionBucketOperator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionBucketOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionBucketOperator.java
new file mode 100644
index 0000000..4a57207
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionBucketOperator.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.text.DecimalFormat;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+import org.apache.commons.lang.mutable.MutableLong;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * A bucket-like operator to accept merchant transaction object and dissipate the
+ * transaction amount to the further downstream operator for calculating min, max and std-deviation.
+ *
+ * @since 0.9.0
+ */
+public class MerchantTransactionBucketOperator extends BaseOperator
+{
+  private static final Logger logger = LoggerFactory.getLogger(MerchantTransactionBucketOperator.class);
+  /*
+  public final transient DefaultOutputPort<KeyValPair<MerchantKey, String>> binOutputPort =
+          new DefaultOutputPort<KeyValPair<MerchantKey, String>>();
+  */
+  public final transient DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>> binCountOutputPort =
+      new DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>>();
+  public final transient DefaultOutputPort<KeyValPair<MerchantKey, Long>> txOutputPort =
+      new DefaultOutputPort<KeyValPair<MerchantKey, Long>>();
+  public final transient DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>> ccAlertOutputPort =
+      new DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>>();
+  public final transient DefaultOutputPort<Map<String, Object>> summaryTxnOutputPort =
+      new DefaultOutputPort<Map<String, Object>>();
+  private MutableLong totalTxns = new MutableLong(0);
+  private MutableLong txnsInLastSecond = new MutableLong(0);
+  private MutableDouble amtInLastSecond = new MutableDouble(0);
+  private transient DecimalFormat amtFormatter = new DecimalFormat("#.##");
+  public transient DefaultInputPort<MerchantTransaction> inputPort = new DefaultInputPort<MerchantTransaction>()
+  {
+    @Override
+    public void process(MerchantTransaction tuple)
+    {
+      processTuple(tuple);
+    }
+
+  };
+  public transient DefaultInputPort<MerchantTransaction> txUserInputPort = new DefaultInputPort<MerchantTransaction>()
+  {
+    @Override
+    public void process(MerchantTransaction tuple)
+    {
+      processTuple(tuple);
+    }
+
+  };
+
+  public void endWindow()
+  {
+    Map<String, Object> summary = new HashMap<String, Object>();
+    double avg;
+    if (txnsInLastSecond.longValue() == 0) {
+      avg = 0;
+    } else {
+      avg = amtInLastSecond.doubleValue() / txnsInLastSecond.longValue();
+    }
+    summary.put("totalTxns", totalTxns);
+    summary.put("txnsInLastSecond", txnsInLastSecond);
+    summary.put("amtInLastSecond", amtFormatter.format(amtInLastSecond));
+    summary.put("avgAmtInLastSecond", amtFormatter.format(avg));
+    summaryTxnOutputPort.emit(summary);
+    txnsInLastSecond.setValue(0);
+    amtInLastSecond.setValue(0);
+  }
+
+  private void processTuple(MerchantTransaction tuple)
+  {
+    emitBankIdNumTuple(tuple, binCountOutputPort);
+    emitMerchantKeyTuple(tuple, txOutputPort);
+    emitCreditCardKeyTuple(tuple, ccAlertOutputPort);
+    totalTxns.increment();
+    txnsInLastSecond.increment();
+    amtInLastSecond.add(tuple.amount);
+  }
+
+  private void emitMerchantKeyTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<MerchantKey, Long>> outputPort)
+  {
+    MerchantKey key = getMerchantKey(tuple);
+    KeyValPair<MerchantKey, Long> keyValPair = new KeyValPair<MerchantKey, Long>(key, tuple.amount);
+    outputPort.emit(keyValPair);
+  }
+
+  //private void emitBankIdNumTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<MerchantKey, String>> outputPort)
+  private void emitBankIdNumTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>> outputPort)
+  {
+    MerchantKey key = getMerchantKey(tuple);
+    KeyValPair<MerchantKey, String> keyValPair = new KeyValPair<MerchantKey, String>(key, tuple.bankIdNum);
+    outputPort.emit(new KeyValPair<KeyValPair<MerchantKey, String>, Integer>(keyValPair, 1));
+  }
+
+  private void emitCreditCardKeyTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>> outputPort)
+  {
+    MerchantKey key = getMerchantKey(tuple);
+
+    CreditCardData data = new CreditCardData();
+    data.fullCcNum = tuple.fullCcNum;
+    data.amount = tuple.amount;
+
+    KeyValPair<MerchantKey, CreditCardData> keyValPair = new KeyValPair<MerchantKey, CreditCardData>(key, data);
+    outputPort.emit(keyValPair);
+  }
+
+  private MerchantKey getMerchantKey(MerchantTransaction tuple)
+  {
+    MerchantKey key = new MerchantKey();
+    key.merchantId = tuple.merchantId;
+    key.terminalId = tuple.terminalId;
+    key.zipCode = tuple.zipCode;
+    key.country = tuple.country;
+    key.merchantType = tuple.merchantType;
+    key.userGenerated = tuple.userGenerated;
+    key.time = tuple.time;
+    return key;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionGenerator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionGenerator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionGenerator.java
new file mode 100644
index 0000000..2327344
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionGenerator.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.examples.frauddetect.util.JsonUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Information tuple generator with randomness.
+ *
+ * @since 0.9.0
+ */
+public class MerchantTransactionGenerator extends BaseOperator implements InputOperator
+{
+  private final Random randomNum = new Random();
+  public static final int[] zipCodes = {94086, 94087, 94088, 94089, 94090, 94091, 94092, 94093};
+  public static final String[] merchantIds = {"Wal-Mart", "Target", "Amazon", "Apple", "Sears", "Macys", "JCPenny", "Levis"};
+//    public static final String bankIdNums[] = { "1111 1111 1111", "2222 2222 2222", "3333 3333 3333", "4444 4444 4444", "5555 5555 5555", "6666 6666 6666", "7777 7777 7777", "8888 8888 8888"};
+//    public static final String ccNums[] = { "0001", "0002", "0003", "0004", "0005", "0006", "0007", "0008"};
+//    public static final String bankIdNums[] = { "1111 1111 1111", "2222 2222 2222", "3333 3333 3333", "4444 4444 4444"};
+//    public static final String ccNums[] = { "0001", "0002", "0003", "0004"};
+//    public static final int zipCodes[] = { 94086, 94087, 94088, 94089, 94090};
+//    public static final String merchantIds[] = { "Wal-Mart", "Target", "Amazon", "Apple"};
+//    private int bankIdNumMin = 0;
+//    private int bankIdNumMax = bankIdNums.length - 1;
+//    private int ccMin = 0;
+//    private int ccMax = ccNums.length - 1;
+  private int amountMin = 1;
+  private int amountMax = 400;
+  private int merchantIdMin = 0;
+  private int merchantIdMax = merchantIds.length - 1;
+  private int terminalIdMin = 1;
+  private int terminalIdMax = 8;
+  private int zipMin = 0;
+  private int zipMax = zipCodes.length - 1;
+  private int tupleBlastSize = 2000;
+  private boolean stopGeneration = false;
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+  }
+
+  public transient DefaultOutputPort<MerchantTransaction> txOutputPort =
+      new DefaultOutputPort<MerchantTransaction>();
+  public transient DefaultOutputPort<String> txDataOutputPort =
+      new DefaultOutputPort<String>();
+
+  @Override
+  public void emitTuples()
+  {
+    int count = 0;
+    List<MerchantTransaction> txList = new ArrayList();
+
+    while (!stopGeneration && count < getTupleBlastSize()) {
+
+      String bankIdNum = genBankIdNum();
+      String ccNum = genCC();
+      int merchant = genMerchantId();
+      int terminal = genTerminalId();
+      int zip = genZip();
+
+      long amount = genAmount();
+
+//            int bankIdNum = 1;
+//            int ccNum = 2;
+//            long amount = 5000;
+//            int merchant = 3;
+//            int terminal = 4;
+//            int zip = 0;
+
+      MerchantTransaction tx = new MerchantTransaction();
+      tx.bankIdNum = bankIdNum;
+      tx.ccNum = ccNum;
+      tx.fullCcNum = tx.bankIdNum + " " + tx.ccNum;
+      tx.amount = amount;
+      tx.merchantId = merchantIds[merchant];
+
+      // its INTERNET merchant
+      tx.merchantType = merchant == 2 || merchant == 3
+                        ? MerchantTransaction.MerchantType.INTERNET
+                        : MerchantTransaction.MerchantType.BRICK_AND_MORTAR;
+
+      tx.transactionType = MerchantTransaction.TransactionType.POS;
+
+      // set terminal only for a BRICK_AND_MORTAR merchant
+      if (merchant != 2 && merchant != 3) {
+        tx.terminalId = terminal;
+      }
+      tx.zipCode = zipCodes[zip];
+      tx.country = "USA";
+      tx.time = System.currentTimeMillis();
+
+      tx.userGenerated = false;
+
+      txOutputPort.emit(tx);
+
+      txList.add(tx);
+
+      count++;
+    }
+    for (MerchantTransaction txData : txList) {
+      try {
+        txDataOutputPort.emit(JsonUtils.toJson(txData));
+      } catch (IOException e) {
+        logger.warn("Exception while converting object to JSON", e);
+      }
+    }
+    txList.clear();
+
+    try {
+      Thread.sleep(100);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  public String genBankIdNum()
+  {
+    // Bank ID will be between 1000 0000 and 3500 0000 (25 BINs)
+    int base = randomNum.nextInt(100) + 100;
+    return base + "0 0000";
+  }
+
+  public String genCC()
+  {
+    // CC will be 1000 0000 to 1400 0000 (400,000 cards per BIN)
+    int base = (randomNum.nextInt(100000) + 10000000);
+    String baseString = Integer.toString(base);
+    return baseString.substring(0, 4) + " " + baseString.substring(4);
+  }
+
+  public int genAmount()
+  {
+    int lowRange = 50;
+    int range = amountMax - amountMin + randomNum.nextInt(lowRange);
+    return amountMin + randomNum.nextInt(range);
+  }
+
+  public int genMerchantId()
+  {
+    int range = merchantIdMax - merchantIdMin + 1;
+    return merchantIdMin + randomNum.nextInt(range);
+  }
+
+  public int genTerminalId()
+  {
+    int range = terminalIdMax - terminalIdMin + 1;
+    return terminalIdMin + randomNum.nextInt(range);
+  }
+
+  public int genZip()
+  {
+    int range = zipMax - zipMin + 1;
+    return zipMin + randomNum.nextInt(range);
+  }
+
+  public void setStopGeneration(boolean stopGeneration)
+  {
+    this.stopGeneration = stopGeneration;
+  }
+
+  /**
+   * @return the tupleBlastSize
+   */
+  public int getTupleBlastSize()
+  {
+    return tupleBlastSize;
+  }
+
+  /**
+   * @param tupleBlastSize the tupleBlastSize to set
+   */
+  public void setTupleBlastSize(int tupleBlastSize)
+  {
+    this.tupleBlastSize = tupleBlastSize;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(MerchantTransactionGenerator.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionInputHandler.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionInputHandler.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionInputHandler.java
new file mode 100644
index 0000000..6dd9c2f
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionInputHandler.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Common utility class that can be used by all other operators to handle user input
+ * captured from the Web socket input port.
+ *
+ * @since 0.9.0
+ */
+public class MerchantTransactionInputHandler extends BaseOperator
+{
+  public static final String KEY_BANK_ID_NUMBER = "bankIdNum"; // first 12 digits
+  public static final String KEY_CREDIT_CARD_NUMBER = "ccNum"; // last 4 digits
+  public static final String KEY_MERCHANT_ID = "merchantId";
+  public static final String KEY_TERMINAL_ID = "terminalId";
+  public static final String KEY_ZIP_CODE = "zipCode";
+  public static final String KEY_AMOUNT = "amount";
+  public transient DefaultOutputPort<MerchantTransaction> txOutputPort =
+      new DefaultOutputPort<MerchantTransaction>();
+  public transient DefaultInputPort<Map<String, String>> userTxInputPort = new DefaultInputPort<Map<String, String>>()
+  {
+    @Override
+    public void process(Map<String, String> tuple)
+    {
+      try {
+        txOutputPort.emit(processInput(tuple));
+      } catch (Exception exc) {
+        logger.error("Exception while handling the input", exc);
+      }
+    }
+
+  };
+
+  public MerchantTransaction processInput(Map<String, String> tuple)
+  {
+    String bankIdNum = null;
+    String ccNum = null;
+    String merchantId = null;
+    Integer terminalId = null;
+    Integer zipCode = null;
+    Long amount = null;
+    for (Map.Entry<String, String> e : tuple.entrySet()) {
+      if (e.getKey().equals(KEY_BANK_ID_NUMBER)) {
+        bankIdNum = e.getValue();
+      }
+      if (e.getKey().equals(KEY_CREDIT_CARD_NUMBER)) {
+        ccNum = e.getValue();
+      }
+      if (e.getKey().equals(KEY_MERCHANT_ID)) {
+        merchantId = e.getValue();
+      }
+      if (e.getKey().equals(KEY_TERMINAL_ID)) {
+        terminalId = new Integer(e.getValue());
+      }
+      if (e.getKey().equals(KEY_ZIP_CODE)) {
+        zipCode = new Integer(e.getValue());
+      }
+      if (e.getKey().equals(KEY_AMOUNT)) {
+        amount = new Long(e.getValue());
+      }
+    }
+
+    if (bankIdNum == null || ccNum == null || merchantId == null || terminalId == null || zipCode == null || amount == null) {
+      throw new IllegalArgumentException("Missing required input!");
+    }
+
+    MerchantTransaction tx = new MerchantTransaction();
+    tx.bankIdNum = bankIdNum;
+    tx.ccNum = ccNum;
+    tx.fullCcNum = bankIdNum + " " + ccNum;
+    tx.merchantId = merchantId;
+    tx.terminalId = terminalId;
+    tx.zipCode = zipCode;
+    tx.country = "USA";
+    tx.amount = amount;
+    tx.merchantType = tx.merchantId.equalsIgnoreCase(MerchantTransactionGenerator.merchantIds[2])
+            || tx.merchantId.equalsIgnoreCase(MerchantTransactionGenerator.merchantIds[3])
+                      ? MerchantTransaction.MerchantType.INTERNET
+                      : MerchantTransaction.MerchantType.BRICK_AND_MORTAR;
+    tx.transactionType = MerchantTransaction.TransactionType.POS;
+
+    tx.userGenerated = true;
+    tx.time = System.currentTimeMillis();
+
+    return tx;
+
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(MerchantTransactionInputHandler.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumKeyVal.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumKeyVal.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumKeyVal.java
new file mode 100644
index 0000000..dc2c942
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumKeyVal.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.util.ArrayList;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+
+import com.datatorrent.lib.multiwindow.AbstractSlidingWindowKeyVal;
+import com.datatorrent.lib.util.KeyValPair;
+
+
+/**
+ * Sliding window sum operator
+ *
+ * @since 0.9.0
+ */
+public class SlidingWindowSumKeyVal<K, V extends Number> extends AbstractSlidingWindowKeyVal<K, V, SlidingWindowSumObject>
+{
+
+  /**
+   * Output port to emit simple moving average (SMA) of last N window as Double.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleSum = new DefaultOutputPort<KeyValPair<K, Double>>();
+  /**
+   * Output port to emit simple moving average (SMA) of last N window as Float.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Float>> floatSum = new DefaultOutputPort<KeyValPair<K, Float>>();
+  /**
+   * Output port to emit simple moving average (SMA) of last N window as Long.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Long>> longSum = new DefaultOutputPort<KeyValPair<K, Long>>();
+  /**
+   * Output port to emit simple moving average (SMA) of last N window as
+   * Integer.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Integer>> integerSum = new DefaultOutputPort<KeyValPair<K, Integer>>();
+
+
+  @Override
+  public void processDataTuple(KeyValPair<K, V> tuple)
+  {
+    K key = tuple.getKey();
+    ArrayList<SlidingWindowSumObject> stateList = buffer.get(key);
+    if (stateList == null) {
+      stateList = new ArrayList<SlidingWindowSumObject>();
+      for (int i = 0; i < windowSize; ++i) {
+        stateList.add(new SlidingWindowSumObject());
+      }
+      buffer.put(key, stateList);
+    }
+    SlidingWindowSumObject state = stateList.get(currentstate);
+    state.add(tuple.getValue());
+  }
+
+  @Override
+  public void emitTuple(K key, ArrayList<SlidingWindowSumObject> obj)
+  {
+    double sum = 0;
+    for (int i = 0; i < obj.size(); ++i) {
+      SlidingWindowSumObject state = obj.get(i);
+      sum += state.getSum();
+    }
+    if (doubleSum.isConnected()) {
+      doubleSum.emit(new KeyValPair<K, Double>(key, sum));
+    }
+    if (floatSum.isConnected()) {
+      floatSum.emit(new KeyValPair<K, Float>(key, (float)sum));
+    }
+    if (longSum.isConnected()) {
+      longSum.emit(new KeyValPair<K, Long>(key, (long)sum));
+    }
+    if (integerSum.isConnected()) {
+      integerSum.emit(new KeyValPair<K, Integer>(key, (int)sum));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumObject.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumObject.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumObject.java
new file mode 100644
index 0000000..fc5f95d
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumObject.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+
+import org.apache.commons.lang.mutable.MutableDouble;
+
+import com.datatorrent.lib.multiwindow.SimpleMovingAverageObject;
+
+/**
+ * State object for sliding window sum
+ *
+ * @since 0.9.0
+ */
+public class SlidingWindowSumObject extends SimpleMovingAverageObject
+{
+
+  MutableDouble sum = new MutableDouble(0);
+
+  public void add(Number n)
+  {
+    sum.add(n);
+  }
+
+  @Override
+  public double getSum()
+  {
+    return sum.doubleValue();
+  }
+
+  @Override
+  public void clear()
+  {
+    sum.setValue(0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsAggregator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsAggregator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsAggregator.java
new file mode 100644
index 0000000..71f3035
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsAggregator.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.examples.frauddetect.util.JsonUtils;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.HighLow;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Operator to aggregate the min, max, sma, std-dev and variance for the given key.
+ *
+ * @since 0.9.0
+ */
+public class TransactionStatsAggregator extends BaseOperator
+{
+  public Map<MerchantKey, TransactionStatsData> aggrgateMap =
+      new HashMap<MerchantKey, TransactionStatsData>();
+  public final transient DefaultOutputPort<String> txDataOutputPort = new DefaultOutputPort<String>();
+  public final transient DefaultInputPort<KeyValPair<MerchantKey, HighLow<Long>>> rangeInputPort =
+      new DefaultInputPort<KeyValPair<MerchantKey, HighLow<Long>>>()
+  {
+    @Override
+    public void process(KeyValPair<MerchantKey, HighLow<Long>> tuple)
+    {
+      TransactionStatsData data = getDataObjectFromMap(tuple.getKey());
+      // HighLow is not currently typed, casting till it is fixed
+      data.min = tuple.getValue().getLow();
+      data.max = tuple.getValue().getHigh();
+    }
+
+  };
+  public final transient DefaultInputPort<KeyValPair<MerchantKey, Long>> smaInputPort =
+      new DefaultInputPort<KeyValPair<MerchantKey, Long>>()
+  {
+    @Override
+    public void process(KeyValPair<MerchantKey, Long> tuple)
+    {
+      TransactionStatsData data = getDataObjectFromMap(tuple.getKey());
+      data.sma = tuple.getValue();
+    }
+
+  };
+
+  private TransactionStatsData getDataObjectFromMap(MerchantKey key)
+  {
+    TransactionStatsData data = aggrgateMap.get(key);
+    if (data == null) {
+      data = new TransactionStatsData();
+      data.time = System.currentTimeMillis();
+      data.merchantId = key.merchantId;
+      data.terminalId = key.terminalId == null ? 0 : key.terminalId;
+      data.zipCode = key.zipCode;
+      data.merchantType = key.merchantType;
+      aggrgateMap.put(key, data);
+    }
+    return data;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    for (Map.Entry<MerchantKey, TransactionStatsData> entry : aggrgateMap.entrySet()) {
+      try {
+        txDataOutputPort.emit(JsonUtils.toJson(entry.getValue()));
+      } catch (IOException e) {
+        logger.warn("Exception while converting object to JSON", e);
+      }
+    }
+    aggrgateMap.clear();
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(TransactionStatsAggregator.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsData.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsData.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsData.java
new file mode 100644
index 0000000..f0b2b86
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsData.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+/**
+ * POJO to capture transaction data related to min, max, sma, std-dev, variance.
+ *
+ * @since 0.9.0
+ */
+public class TransactionStatsData
+{
+  public String merchantId;
+  public int terminalId;
+  public int zipCode;
+  public MerchantTransaction.MerchantType merchantType;
+  public long min;
+  public long max;
+  public double sma;
+  public long time;
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/HdfsStringOutputOperator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/HdfsStringOutputOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/HdfsStringOutputOperator.java
new file mode 100644
index 0000000..89c4bcd
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/HdfsStringOutputOperator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect.operator;
+
+import java.io.File;
+
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+
+/**
+ * Adapter for writing Strings to HDFS
+ * <p>
+ * Serializes tuples into a HDFS file.<br/>
+ * </p>
+ *
+ * @since 0.9.4
+ */
+public class HdfsStringOutputOperator extends AbstractFileOutputOperator<String>
+{
+  private transient String outputFileName;
+  private transient String contextId;
+  private int index = 0;
+
+  public HdfsStringOutputOperator()
+  {
+    setMaxLength(1024 * 1024);
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    contextId = context.getValue(DAGContext.APPLICATION_NAME);
+    outputFileName = File.separator + contextId +
+                     File.separator + "transactions.out.part";
+    super.setup(context);
+  }
+
+  @Override
+  public byte[] getBytesForTuple(String t)
+  {
+    return t.getBytes();
+  }
+
+  @Override
+  protected String getFileName(String tuple)
+  {
+    return outputFileName;
+  }
+
+  @Override
+  public String getPartFileName(String fileName, int part)
+  {
+    return fileName + part;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/MongoDBOutputOperator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/MongoDBOutputOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/MongoDBOutputOperator.java
new file mode 100644
index 0000000..e059c03
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/MongoDBOutputOperator.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect.operator;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.WriteConcern;
+import com.mongodb.WriteResult;
+import com.mongodb.util.JSON;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+
+/**
+ * Operator to write data into MongoDB
+ *
+ * @since 0.9.0
+ */
+public class MongoDBOutputOperator extends BaseOperator
+{
+  @NotNull
+  protected String hostName;
+  @NotNull
+  protected String dataBase;
+  @NotNull
+  protected String collection;
+
+  protected WriteConcern writeConcern = WriteConcern.ACKNOWLEDGED;
+
+  protected String userName;
+  protected String passWord;
+
+  protected transient MongoClient mongoClient;
+  protected transient DB db;
+  protected transient DBCollection dbCollection;
+
+  protected List<DBObject> dataList = new ArrayList<DBObject>();
+
+  public MongoDBOutputOperator()
+  {
+  }
+
+  /**
+   * Take the JSON formatted string and convert it to DBObject
+   */
+  public final transient DefaultInputPort<String> inputPort = new DefaultInputPort<String>()
+  {
+    @Override
+    public void process(String tuple)
+    {
+      dataList.add((DBObject)JSON.parse(tuple));
+    }
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    try {
+      mongoClient = new MongoClient(hostName);
+      db = mongoClient.getDB(dataBase);
+      if (userName != null && passWord != null) {
+        if (!db.authenticate(userName, passWord.toCharArray())) {
+          throw new IllegalArgumentException("MongoDB authentication failed. Illegal username and password for MongoDB!!");
+        }
+      }
+      dbCollection = db.getCollection(collection);
+    } catch (UnknownHostException ex) {
+      logger.debug(ex.toString());
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    // nothing
+  }
+
+  @Override
+  public void endWindow()
+  {
+    logger.debug("mongo datalist size: " + dataList.size());
+    if (dataList.size() > 0) {
+      WriteResult result = dbCollection.insert(dataList, writeConcern);
+      logger.debug("Result for MongoDB insert: " + result);
+      dataList.clear();
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    if (mongoClient != null) {
+      mongoClient.close();
+    }
+  }
+
+  public String getHostName()
+  {
+    return hostName;
+  }
+
+  public void setHostName(String hostName)
+  {
+    this.hostName = hostName;
+  }
+
+  public String getDataBase()
+  {
+    return dataBase;
+  }
+
+  public void setDataBase(String dataBase)
+  {
+    this.dataBase = dataBase;
+  }
+
+  public String getCollection()
+  {
+    return collection;
+  }
+
+  public void setCollection(String collection)
+  {
+    this.collection = collection;
+  }
+
+  public String getUserName()
+  {
+    return userName;
+  }
+
+  public void setUserName(String userName)
+  {
+    this.userName = userName;
+  }
+
+  public String getPassWord()
+  {
+    return passWord;
+  }
+
+  public void setPassWord(String passWord)
+  {
+    this.passWord = passWord;
+  }
+
+  public WriteConcern getWriteConcern()
+  {
+    return writeConcern;
+  }
+
+  public void setWriteConcern(WriteConcern writeConcern)
+  {
+    this.writeConcern = writeConcern;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(MongoDBOutputOperator.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/util/JsonUtils.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/util/JsonUtils.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/util/JsonUtils.java
new file mode 100644
index 0000000..932ef64
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/util/JsonUtils.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect.util;
+
+import java.io.IOException;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Utility class to deal with JSON and Object
+ *
+ * @since 0.9.0
+ */
+public class JsonUtils
+{
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  public static String toJson(Object obj) throws IOException
+  {
+    return mapper.writeValueAsString(obj);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/resources/META-INF/properties.xml b/examples/frauddetect/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..7a42ac4
--- /dev/null
+++ b/examples/frauddetect/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,167 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+<property>
+	<name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
+	<value>1000</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.ccUserAlertQueryOutput.topic</name>
+	<value>examples.app.frauddetect.fraudAlert</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.binUserAlertOutput.topic</name>
+	<value>examples.app.frauddetect.fraudAlert</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.avgUserAlertQueryOutput.topic</name>
+	<value>examples.app.frauddetect.fraudAlert</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.txSummaryWsOutput.topic</name>
+	<value>examples.app.frauddetect.txSummary</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.bankInfoFraudDetector.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>10</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.bankInfoFraudDetector.threshold
+	</name>
+	<value>20</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.rangePerMerchant.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>1</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.txReceiver.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>1</value>
+</property>
+<!-- property>
+	<name>dt.application.frauddetect.operator.smaPerMerchant.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>1</value>
+</property-->
+<property>
+	<name>dt.application.FraudDetectExample.operator.smaPerMerchant.windowSize
+	</name>
+	<value>30</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.movingSum.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>10</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.movingSum.windowSize
+	</name>
+	<value>3</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.avgAlerter.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>1</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.avgAlerter.threshold
+	</name>
+	<value>1200</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.amountFraudDetector.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>1</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.amountFraudDetector.threshold
+	</name>
+	<value>420</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.hostName</name>
+	<value>localhost</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.dataBase</name>
+	<value>frauddetect</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.collection</name>
+	<value>txStats</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.hostName</name>
+	<value>localhost</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.dataBase</name>
+	<value>frauddetect</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.collection</name>
+	<value>binAlerts</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.hostName</name>
+	<value>localhost</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.dataBase</name>
+	<value>frauddetect</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.collection</name>
+	<value>ccAlerts</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.hostName</name>
+	<value>localhost</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.dataBase</name>
+	<value>frauddetect</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.collection</name>
+	<value>avgAlerts</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.txStatsAggregator.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>1</value>
+</property>
+
+<property>
+     <name>dt.application.FraudDetectExample.port.*.attr.QUEUE_CAPACITY</name>
+     <value>32000</value>
+  </property>
+ <property>
+    <name>dt.application.FraudDetectExample.operator.*.attr.MEMORY_MB</name>
+    <value>2048</value>
+  </property>
+
+</configuration>
+
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/test/java/org/apache/apex/examples/frauddetect/FrauddetectApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/test/java/org/apache/apex/examples/frauddetect/FrauddetectApplicationTest.java b/examples/frauddetect/src/test/java/org/apache/apex/examples/frauddetect/FrauddetectApplicationTest.java
new file mode 100644
index 0000000..ffb1cf2
--- /dev/null
+++ b/examples/frauddetect/src/test/java/org/apache/apex/examples/frauddetect/FrauddetectApplicationTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Fraud detection application test
+ */
+public class FrauddetectApplicationTest
+{
+
+  public FrauddetectApplicationTest()
+  {
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    try {
+      Application application = new Application();
+      Configuration conf = new Configuration(false);
+      conf.addResource("dt-site-frauddetect.xml");
+      LocalMode lma = LocalMode.newInstance();
+      lma.prepareDAG(application, conf);
+      lma.getController().run(120000);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/test/resources/dt-site-frauddetect.xml
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/test/resources/dt-site-frauddetect.xml b/examples/frauddetect/src/test/resources/dt-site-frauddetect.xml
new file mode 100644
index 0000000..7a404c4
--- /dev/null
+++ b/examples/frauddetect/src/test/resources/dt-site-frauddetect.xml
@@ -0,0 +1,173 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+ <property>
+	<name>dt.application.FraudDetectExample.class</name>
+	<value>com.datatorrent.examples.frauddetect.Application</value>
+	<description>An alias for the application</description>
+</property>
+<property>
+	<name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
+	<value>1000</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.ccUserAlertQueryOutput.topic</name>
+	<value>examples.app.frauddetect.fraudAlert</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.binUserAlertOutput.topic</name>
+	<value>examples.app.frauddetect.fraudAlert</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.avgUserAlertQueryOutput.topic</name>
+	<value>examples.app.frauddetect.fraudAlert</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.txSummaryWsOutput.topic</name>
+	<value>examples.app.frauddetect.txSummary</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.bankInfoFraudDetector.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>10</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.bankInfoFraudDetector.threshold
+	</name>
+	<value>20</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.rangePerMerchant.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>1</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.txReceiver.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>1</value>
+</property>
+<!-- property>
+	<name>dt.application.frauddetect.operator.smaPerMerchant.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>1</value>
+</property-->
+<property>
+	<name>dt.application.FraudDetectExample.operator.smaPerMerchant.windowSize
+	</name>
+	<value>30</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.movingSum.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>10</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.movingSum.windowSize
+	</name>
+	<value>3</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.avgAlerter.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>1</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.avgAlerter.threshold
+	</name>
+	<value>1200</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.amountFraudDetector.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>1</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.amountFraudDetector.threshold
+	</name>
+	<value>420</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.hostName</name>
+	<value>localhost</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.dataBase</name>
+	<value>frauddetect</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.collection</name>
+	<value>txStats</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.hostName</name>
+	<value>localhost</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.dataBase</name>
+	<value>frauddetect</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.collection</name>
+	<value>binAlerts</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.hostName</name>
+	<value>localhost</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.dataBase</name>
+	<value>frauddetect</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.collection</name>
+	<value>ccAlerts</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.hostName</name>
+	<value>localhost</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.dataBase</name>
+	<value>frauddetect</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.collection</name>
+	<value>avgAlerts</value>
+</property>
+<property>
+	<name>dt.application.FraudDetectExample.operator.txStatsAggregator.attr.APPLICATION_WINDOW_COUNT
+	</name>
+	<value>1</value>
+</property>
+
+<property>
+     <name>dt.application.FraudDetectExample.port.*.attr.QUEUE_CAPACITY</name>
+     <value>32000</value>
+  </property>
+<property>
+        <name>dt.application.FraudDetectExampleq.operator.*.attr.MEMORY_MB</name>
+        <value>2048</value>
+</property>
+
+
+</configuration>
+
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/test/resources/log4j.properties b/examples/frauddetect/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/frauddetect/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/pom.xml
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/pom.xml b/examples/highlevelapi/pom.xml
new file mode 100644
index 0000000..da843b7
--- /dev/null
+++ b/examples/highlevelapi/pom.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>malhar-examples-highlevelapi</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar High-Level API Example</name>
+  <description>Apex exmaple applications that use High-level API to construct a dag</description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.7.0-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.9.1</version>
+        <executions>
+          <execution>
+            <id>attach-artifacts</id>
+            <phase>package</phase>
+            <goals>
+              <goal>attach-artifact</goal>
+            </goals>
+            <configuration>
+              <artifacts>
+                <artifact>
+                  <file>target/${project.artifactId}-${project.version}.apa</file>
+                  <type>apa</type>
+                </artifact>
+              </artifacts>
+              <skipAttach>false</skipAttach>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.10</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <!-- required by twitter example -->
+      <groupId>org.twitter4j</groupId>
+      <artifactId>twitter4j-core</artifactId>
+      <version>4.0.4</version>
+    </dependency>
+    <dependency>
+      <!-- required by twitter example -->
+      <groupId>org.twitter4j</groupId>
+      <artifactId>twitter4j-stream</artifactId>
+      <version>4.0.4</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-contrib</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-stream</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.hsqldb</groupId>
+      <artifactId>hsqldb</artifactId>
+      <version>2.3.1</version>
+    </dependency>
+    <dependency>
+      <groupId>com.h2database</groupId>
+      <artifactId>h2</artifactId>
+      <version>1.4.192</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>2.9.1</version>
+    </dependency>
+    <dependency>
+      <!--This dependency is needed for StreamingWordExtractTest-->
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>commons-compiler</artifactId>
+      <version>2.7.8</version>
+      <type>jar</type>
+    </dependency>
+    <dependency>
+      <!--This dependency is needed for StreamingWordExtractTest-->
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>janino</artifactId>
+      <version>2.7.8</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/assemble/appPackage.xml b/examples/highlevelapi/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/highlevelapi/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
new file mode 100644
index 0000000..327c882
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam MinimalWordCount Example
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "MinimalWordCount")
+public class MinimalWordCount implements StreamingApplication
+{
+  public static class Collector extends BaseOperator
+  {
+    static Map<String, Long> result;
+    private static boolean done = false;
+
+    public static boolean isDone()
+    {
+      return done;
+    }
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      done = false;
+      result = new HashMap<>();
+    }
+
+    public final transient DefaultInputPort<KeyValPair<String, Long>> input = new DefaultInputPort<KeyValPair<String, Long>>()
+    {
+      @Override
+      public void process(KeyValPair<String, Long> tuple)
+      {
+        if (tuple.getKey().equals("bye")) {
+          done = true;
+        }
+        result.put(tuple.getKey(), tuple.getValue());
+      }
+    };
+  }
+
+  /**
+   * Populate the dag using High-Level API.
+   * @param dag
+   * @param conf
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    Collector collector = new Collector();
+    // Create a stream reading from a file line by line using StreamFactory.
+    StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput"))
+        // Use a flatmap transformation to extract words from the incoming stream of lines.
+        .flatMap(new Function.FlatMapFunction<String, String>()
+        {
+          @Override
+          public Iterable<String> f(String input)
+          {
+            return Arrays.asList(input.split("[^a-zA-Z']+"));
+
+          }
+        }, name("ExtractWords"))
+        // Apply windowing to the stream for counting, in this case, the window option is global window.
+        .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+        // Count the appearances of every word.
+        .countByKey(new Function.ToKeyValue<String, String, Long>()
+        {
+          @Override
+          public Tuple<KeyValPair<String, Long>> f(String input)
+          {
+            return new Tuple.PlainTuple<KeyValPair<String, Long>>(new KeyValPair<String, Long>(input, 1L));
+          }
+        }, name("countByKey"))
+        // Format the counting result to a readable format by unwrapping the tuples.
+        .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, KeyValPair<String, Long>>()
+        {
+          @Override
+          public KeyValPair<String, Long> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+          {
+            return input.getValue();
+          }
+        }, name("FormatResults"))
+        // Print the result.
+        .print(name("console"))
+        // Attach a collector to the stream to collect results.
+        .endWith(collector, collector.input, name("Collector"))
+        // populate the dag using the stream.
+        .populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
new file mode 100644
index 0000000..5b83bd0
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.Duration;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam WindowedWordCount Example.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "WindowedWordCount")
+public class WindowedWordCount implements StreamingApplication
+{
+  static final int WINDOW_SIZE = 1;  // Default window duration in minutes
+
+  /**
+   * A input operator that reads from and output a file line by line to downstream with a time gap between
+   * every two lines.
+   */
+  public static class TextInput extends BaseOperator implements InputOperator
+  {
+    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+    private boolean done = false;
+
+    private transient BufferedReader reader;
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      done = false;
+      initReader();
+    }
+
+    private void initReader()
+    {
+      try {
+        InputStream resourceStream = this.getClass().getResourceAsStream("/wordcount/word.txt");
+        reader = new BufferedReader(new InputStreamReader(resourceStream));
+      } catch (Exception ex) {
+        throw Throwables.propagate(ex);
+      }
+    }
+
+    @Override
+    public void teardown()
+    {
+      IOUtils.closeQuietly(reader);
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      if (!done) {
+        try {
+          String line = reader.readLine();
+          if (line == null) {
+            done = true;
+            reader.close();
+          } else {
+            this.output.emit(line);
+          }
+          Thread.sleep(50);
+        } catch (IOException ex) {
+          throw new RuntimeException(ex);
+        } catch (InterruptedException e) {
+          throw Throwables.propagate(e);
+        }
+      }
+    }
+  }
+
+  public static class Collector extends BaseOperator
+  {
+    private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>();
+    private static boolean done = false;
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      done = false;
+    }
+
+    public static boolean isDone()
+    {
+      return done;
+    }
+
+    public static Map<KeyValPair<Long, String>, Long> getResult()
+    {
+      return result;
+    }
+
+    public final transient DefaultInputPort<PojoEvent> input = new DefaultInputPort<PojoEvent>()
+    {
+      @Override
+      public void process(PojoEvent tuple)
+      {
+        result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getWord()), tuple.getCount());
+        if (tuple.getWord().equals("bye")) {
+          done = true;
+        }
+      }
+    };
+  }
+
+  /**
+   * A Pojo Tuple class used for outputting result to JDBC.
+   */
+  public static class PojoEvent
+  {
+    private String word;
+    private long count;
+    private long timestamp;
+
+    @Override
+    public String toString()
+    {
+      return "PojoEvent (word=" + getWord() + ", count=" + getCount() + ", timestamp=" + getTimestamp() + ")";
+    }
+
+    public String getWord()
+    {
+      return word;
+    }
+
+    public void setWord(String word)
+    {
+      this.word = word;
+    }
+
+    public long getCount()
+    {
+      return count;
+    }
+
+    public void setCount(long count)
+    {
+      this.count = count;
+    }
+
+    public long getTimestamp()
+    {
+      return timestamp;
+    }
+
+    public void setTimestamp(long timestamp)
+    {
+      this.timestamp = timestamp;
+    }
+  }
+
+  /**
+   * A map function that wrap the input string with a random generated timestamp.
+   */
+  public static class AddTimestampFn implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
+  {
+    private static final Duration RAND_RANGE = Duration.standardMinutes(10);
+    private final Long minTimestamp;
+
+    AddTimestampFn()
+    {
+      this.minTimestamp = System.currentTimeMillis();
+    }
+
+    @Override
+    public Tuple.TimestampedTuple<String> f(String input)
+    {
+      // Generate a timestamp that falls somewhere in the past two hours.
+      long randMillis = (long)(Math.random() * RAND_RANGE.getMillis());
+      long randomTimestamp = minTimestamp + randMillis;
+
+      return new Tuple.TimestampedTuple<>(randomTimestamp, input);
+    }
+  }
+
+  /** A MapFunction that converts a Word and Count into a PojoEvent. */
+  public static class FormatAsTableRowFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, PojoEvent>
+  {
+    @Override
+    public PojoEvent f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+    {
+      PojoEvent row = new PojoEvent();
+      row.setTimestamp(input.getTimestamp());
+      row.setCount(input.getValue().getValue());
+      row.setWord(input.getValue().getKey());
+      return row;
+    }
+  }
+
+  /**
+   * Populate dag with High-Level API.
+   * @param dag
+   * @param conf
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    TextInput input = new TextInput();
+    Collector collector = new Collector();
+
+    // Create stream from the TextInput operator.
+    ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(input, input.output, name("input"))
+
+        // Extract all the words from the input line of text.
+        .flatMap(new Function.FlatMapFunction<String, String>()
+        {
+          @Override
+          public Iterable<String> f(String input)
+          {
+            return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
+          }
+        }, name("ExtractWords"))
+
+        // Wrap the word with a randomly generated timestamp.
+        .map(new AddTimestampFn(), name("AddTimestampFn"));
+
+
+    // apply window and trigger option.
+    // TODO: change trigger option to atWaterMark when available.
+    WindowedStream<Tuple.TimestampedTuple<String>> windowedWords = stream
+        .window(new WindowOption.TimeWindows(Duration.standardMinutes(WINDOW_SIZE)),
+        new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1));
+
+
+    WindowedStream<PojoEvent> wordCounts =
+        // Perform a countByKey transformation to count the appearance of each word in every time window.
+        windowedWords.countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
+        {
+          @Override
+          public Tuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input)
+          {
+            return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(),
+              new KeyValPair<String, Long>(input.getValue(), 1L));
+          }
+        }, name("count words"))
+
+        // Format the output and print out the result.
+        .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print(name("console"));
+
+    wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag);
+  }
+}


Mime
View raw message