apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [2/6] apex-malhar git commit: Added Beam Examples and Implementations of Accumulation.
Date Thu, 25 Aug 2016 16:43:18 GMT
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/resources/log4j.properties b/demos/highlevelapi/src/test/resources/log4j.properties
new file mode 100644
index 0000000..592eb19
--- /dev/null
+++ b/demos/highlevelapi/src/test/resources/log4j.properties
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=INFO,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=INFO
+#log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=WARN
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+#log4j.logger.org=INFO
+
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/resources/sampletweets.txt
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/resources/sampletweets.txt b/demos/highlevelapi/src/test/resources/sampletweets.txt
new file mode 100644
index 0000000..475fada
--- /dev/null
+++ b/demos/highlevelapi/src/test/resources/sampletweets.txt
@@ -0,0 +1,207 @@
+Tweet content
+"Apple has $233 billion in cash. It could buy all
+
+—@NFL teams
+—@NBA teams
+—@MLB teams
+—@NHL teams
+
+...and still have $80 billion left. $AAPL"
+Read $MRNJ #NEWS, $HEMP & $GRCU r above .01, that's where we r goin🚀 $SPY $MSFT $SBUX $SFOR $VRX $AAPL $TSLA $GOOG $FB $EURUSD $USDJPY $MLCG
+$RXSF is leadin their sector accordin 2 @EdisonMediaCen $AAPL $SPY $TSLA $FB $EURUSD $ALK $IBB $EW $AMZN $GBPUSD $GM https://t.co/LYY2mHn755
+Philstockworld Top Trade Review $AAPL $MSFT #Dividends $USO $HOV $TWTR -- https://t.co/JArXsIm7CI https://t.co/kRR9ezhm9E
+Philstockworld Top Trade Review: $AAPL $ABX $BA $CAKE $CMG $DIS $IBM $GILD $LL $UNG $SPY -- https://t.co/EX5SYjdwBC https://t.co/7FBZwVZ63v
+"Monday’s Oil Mess: Rent-A-Rebel Jacks up Prices into the Holiday $USO $AAPL
+#Earnings -- https://t.co/cGHB3WDKA8 https://t.co/JFZIBcom1n"
+Meaningless Monday Market Movement! $AAPL $SQQQ #oil #Brexit https://t.co/j4Iqg7E1HN
+"S&P Futures Back over 2,050, for Now
+$SPY $AAPL $SQQQ #China #Debt #Hedging -- https://t.co/2dOc5T89S3 https://t.co/TDPVdNRNQF"
+"💥TURN YOUR $500 INTO $5,000+💥
+
+JOIN #TEAMBILLIONAIRE⤵
+📧 pennystockhotline@gmail.com
+
+#PENNYSTOCKS $AAPL $GSAT $MGT
+https://t.co/lwAGjfmIP3"
+Trendless Tuesday - Watch Yesterday’s Fake Gains Disappear $AAPL #China $FXI #Earnings -- https://t.co/GpgGqoOlFn https://t.co/FRuixv5aZF
+"💥TURN YOUR $500 INTO $5,000+💥
+
+JOIN #TEAMBILLIONAIRE⤵
+📧 pennystockhotline@gmail.com
+
+#PENNYSTOCKS $AAPL $UVXY $JDST
+https://t.co/lwAGjfmIP3"
+"Apple has $233 billion in cash. It could buy:
+
+Uber
+Tesla
+Twitter
+Airbnb
+Netflix
+Yahoo
+
+...and still have $18 billion left. $AAPL"
+Option Opportunity Portfolio May Review – Up 19.3% In 30 Days! $ABX $FCX $USO $AAPL $DIS - https://t.co/rp3kMsRZ3E https://t.co/TKkc15pKcR
+Waiting for the Fed – Apple Gives Us Huge Wins: $AAPL $SQQQ #GDP #Nikkei #Futures #Oil -- https://t.co/Al3pkf350V https://t.co/LktIRF4F2b
+Tempting Tuesday - S&P 2,100 is Still the Line to Watch Ahead of the Fed $AAPL $QQQ -- https://t.co/t1eDfKHJnk https://t.co/BAW3RAe7SC
+Our $SQQQ Hedge is Up 314% and Our Futures Are Up $4,850, You're Welcome!  $AAPL -- https://t.co/eUQ2kCkCOY https://t.co/Yk98oyqMZl
+"TURN YOUR 💲500 INTO 💲5,000$💥
+
+JOIN #TEAMBILLIONAIRE ⤵
+📧 pennystockhotline@gmail.com
+
+#PENNYSTOCKS $TWTR $AAPL $LNKD
+https://t.co/euJFNQX1g4"
+"TURN YOUR 💲500 INTO 💲5,000$💥
+
+JOIN #TEAMBILLIONAIRE ⤵
+📧 pennystockhotline@gmail.com
+
+#PENNYSTOCKS $TALK $PPPI $AAPL https://t.co/oSn11kxftM"
+Bears today. We getting paid! $AAPL $TWTR $BWLD $NFLX https://t.co/CCi0S3skJJ
+"Apple has $233 billion in cash. It could buy all
+
+—@NFL teams
+—@NBA teams
+—@MLB teams
+—@NHL teams
+
+...and still have $80 billion left. $AAPL"
+Are you in Sync with the market? https://t.co/ZtHHCrSAf8 #stocks #finance #investing #trading $AAPL $LNKD $NFLX  $GOOGL $FB
+The Last Time These Insiders Purchased This Stock It Sky Rocketed 1000%+ https://t.co/bmNAHBoQBD $DIA $QQQ $SPY $GOOG $AAPL $BAC $TWTR $FB
+"This Hacker Made Amazon’s Alexa, Google Now, and Siri Command One Another
+https://t.co/YXP3yqmf4H $AAPL $AMZN $GOOG https://t.co/NG7r6qgfRt"
+"Over the last 3 years, the top 14 automakers upped their combined R&D spend by $192 million.
+
+$AAPL upped R&D spend by $5 billion.
+
+- MS"
+Volatility can be your friend. https://t.co/aHz2r8HHD2 #stocks #trading #investing #financials #learntodaytrade $FB $AAPL $LNKD $NFLX
+"PERCENTAGE of Apple's Revenues:
+FY 2006:
+iPod 40%
+Mac 38%
+Services 10%
+Others 12%
+
+FY 2015:
+iPhone 66%
+Mac 11%
+iPad 10%
+Others 13%
+
+$AAPL"
+Apple recovered $40 million worth of gold from recycled iPhones, iPads & Macs in 2015. https://t.co/XPBWlM6cBs $AAPL https://t.co/P0LMSRw7Ot
+"Apple's iPhone sales sink for 1st time ever last quarter
+https://t.co/TAKjUwl4Yc  @DavidGoldmanCNN @cnntech  $AAPL https://t.co/OrDp4BDpsD"
+$BAC is down 5% since our article was posted on Friday https://t.co/al8AgaSsiI $DIA $QQQ $SPY $AAPL $GOOG $FB $TWTR $STUDY $NFLX $LNKD $IBM
+Ben Franklin: The First Proponent Of Dividend Growth Investing? https://t.co/dx7FE2G9AH $AAPL $ACN $AL $BEN $CSV $HON $IJR $JNJ $JWN $PEGI
+$5,000 Friday the 13th - Yesterday's Futures Trades Pay Off Nicely $USO $SPY $AAPL  -- https://t.co/3RUEjAq1bO https://t.co/2L7cdebTlT
+I DON'T SEE ANY BUBBLE RIGHT NOW , I SWEAR ! $SPX $SPY $DIA $DJI $AAPL  $VIX $TVIX $C $BAC $GM $GE $FB #STOCKMARKET https://t.co/E5954RIpC7
+Terrible $AAPL quarter, finally. On the way to becoming $NOK. Tech is mean reverting, today's leaders are almost always tomorrow's laggards.
+The iPhone 7S could look radically different from the iPhones of today https://t.co/eQxUMAZ4eM $AAPL https://t.co/HIH3QqKpIC
+"No Bull: The Evidence
+https://t.co/Md2SNpjdwd
+$SPX $MSFT $GOOGL $AAPL $NFLX $AMZN $FB $DIS $V $BAC $GS $WMT $SBUX https://t.co/1oISHNX4cJ"
+The iPhone 7S could look radically different from the iPhones of today https://t.co/KgeVSjmcGe $AAPL https://t.co/7hFtg37oJu
+There was a 3rd Apple founder, Ronald Wayne, who sold his 10% stake for $800 in 1976. Today his share would've been worth $65 Billion. $AAPL
+Twitter Stock Set to Breakout Soon https://t.co/u4V6ChhpOW $TWTR $DIA $QQQ $SPY $AAPL $GLD $GDX $NUGT $DUST $BAC $GOOG $FB $STUDY $NFLX $IBM
+Alibaba Stock Price Breaks The 50 Day Moving Average https://t.co/ABOVWI6j2G $BABA $AAPL $YHOO $COST $UWTI $CSC $MON https://t.co/VlWGDxrQXh
+I still can’t shake the feeling that $AAPL is slowly taking themselves private. https://t.co/XIAMvppDWh https://t.co/kdMGCGbMaJ
+$SPX ROADMAP 2016 #STOCKMARKET $INTC $F $SPY $AAPL $AMZN $C $VIX $FB $TWTR $GOOGL $UVXY $FAZ $FEZ $MSFT $GS $BAC $AA https://t.co/owuQ9awcDw
+"Want to know why $GOOG is so impressive and why $AAPL is so fucked? Read this years founders' letter from $GOOG:
+
+https://t.co/LiBjGZwyKw"
+"GET READY. Here are the companies reporting earnings next week: https://t.co/NXptPkQX70
+
+$AAPL $FB $TWTR $CMG $GILD https://t.co/tcIoCZdOZi"
+$SPX THIS TIME IT'S DIFFERENT! $SPY $DIA $SDOW $S  $FAZ $FEZ $AAPL $MSFT $BAC $C $JPM $GS $SIRI $AMZN $F $VIX $TVIX https://t.co/pkYVgNKv3P
+$SPX ROADMAP 2016 #STOCKS $TVIX $VXX $VALE $AAPL $AKS $FCX $MSFT $AA $MU $VIX $SPX $SPY #TRADING $PCLN $SIRI $ MCD https://t.co/6UH5He38h1
+The iPhone 6S is the first iPhone ever to sell fewer models than its predecessor https://t.co/s8iQOvPQeR $AAPL https://t.co/QmQROtQ9vY
+11/ For example, buy an Echo and see your behavior change. The future is happening, and $AAPL seems, to me, asleep.
+$RLYP $SPY $KORS $WDAY $MSFT $AAPL $QLIK $TIVO $NXPI $CPXX $AVGO $ZOES $LE $TICC $SLB $FCEL $VRA $MLNX $ASNA $ICPT https://t.co/LXMpz4rFG0
+#STOCKMARKET GRAVITY LESSONS: what goes up  must come down $SPX $SPY $DIA $QQQ $TVIX $VIX $AAPL $C $FB $PCLN $BAC $F https://t.co/8HQHBEgSj5
+Should Icahn's exit or Buffett's entry affect your $AAPL judgment? The Big Name Effect. https://t.co/9Z2ok61MUh https://t.co/udAQLfdJFe
+Apple revenue drops 13 percent, ending 13 years of growth. Greater China was especially weak, down 26 percent. $AAPL https://t.co/q4ovXUenBU
+It was a $18 billion day for Apple. https://t.co/iRbGeoTmCJ $AAPL
+"Apple has $233 billion in cash. It could buy:
+
+Uber
+Tesla
+Twitter
+Airbnb
+Netflix
+Yahoo
+
+...and still have $18 billion left. $AAPL"
+#3 TOP 2111.05 #STOCKS #STOCKMARKET #TRADING $SPX $SPY $VIX $TVIX $AAPL $SIRI $C $BAC $JPM $AMZN $MSFT $FB $TWTR $F https://t.co/gSqmN0fVON
+Google #IO16: Android's failure to innovate hands a Apple free run at WWDC $GOOG $AAPL https://t.co/FTs9M8JD5g https://t.co/20uou1gUkW
+$SPX 2134.72..2116.48...2111.05 HOUSTON WE HAVE A PROBLEM ! #STOCKMARKET $VIX $SPY $DIA $AAPL $C $BAC $FB $VXX $MSFT https://t.co/du3QfPUM4Q
+top #earnings $FB $AAPL $AMZN $TWTR $CMG $F $GILD $LNKD $FCX $CELG $SWKS $JBLU $T $NXPI $BA  https://t.co/lObOE0uRjZ https://t.co/94F6GJc3hE
+The iPhone 6S is the first iPhone ever to sell fewer models than its predecessor https://t.co/ZVeQ9a4Yrh $AAPL https://t.co/2Ntpbxwlyo
+You do not want to miss this incredibly candid look into $AAPL w/ @tim_cook! Tune into @MadMoneyOnCNBC on @CNBC now! https://t.co/budv4qfvju
+Foxconn axes 60,000 jobs in one Chinese factory as robots take over: https://t.co/BnFdjGCmLf $AAPL https://t.co/WhRHer8jdN
+Warren Buffett's Berkshire Hathaway reports 9.8M share stake in $AAPL https://t.co/nXmvK6PV7M https://t.co/MAcMz0iTg6
+Apple is about to report its worst quarter in 13 years on Tuesday https://t.co/NJ3hwunHCx $AAPL https://t.co/YLTmnpqNjI
+Everyone who wants an iPhone has one. $AAPL is now a consumer staple stock and will trade on replacement / shareholder yield.
+Financial Armageddon’ is imminent, the next major crash will happen in 2016 $VXX $VIX $TVIX  $SPX $SPY $AAPL $MSFT $BAC $C $FB $DJI $DIA $F
+"Apple is NO longer the largest US stock by market cap. Google is: https://t.co/i81Y83jQJC
+
+$GOOGL $AAPL https://t.co/cRCKRYBICS"
+Exclusive: Apple hires former Tesla VP Chris Porritt for ‘special [car] project’ https://t.co/7knsloxvJW $TSLA $AAPL https://t.co/X8cYztExoP
+$SPX on the top of downtrend Channel Be careful! #STOCKMARKET $SPY $AAPL $AMZN $TSLA $FB $QQQ $DIA $NFLX $PCLN $C $F https://t.co/UKZCyLYuBq
+UPDATE: Apple CEO Cook says in conference call that smartphone marker is 'currently not growing' $AAPL https://t.co/WeECmrdv1j
+In February Charlie Munger was asked why Berkshire owns $GM. The $AAPL stake isn't anymore complicated than this: https://t.co/Rwkb30OEgq
+Talking to @SquawkStreet about $AAPL & more at @NYSE ! https://t.co/m05b68VLMp
+iPhone sales sour #Apple's earning: https://t.co/962fj9SWsc $AAPL https://t.co/nz9FRK6sNK
+People aren’t upgrading smartphones as quickly and that is bad for Apple https://t.co/EOEJPfNR8Z 🔓 $AAPL
+"$NXPI $JBLU $FCX $AAPL $CMG
+$TWTR $EBAY $BWLD $PNRA $CRUS
+$FB $FSLR $UPS $CELG $AMZN
+$LNKD $BIDU $SWKS $GILD $HELE https://t.co/rQUmhHgYn0"
+People mad that Icahn sold $AAPL without giving them the head’s up - How much in commissions did you pay him this year?
+Cool stat: $AAPL's $46 billion loss in market cap overnight is greater than the market cap of 391 S&P 500 companies https://t.co/1ms1YZzTbP
+Apple. You've come a long way... https://t.co/WGvk8K8MYv $AAPL https://t.co/3Wo0hAwRAc
+"Someone is building the Internet's biggest list of stock market cliches and it's amazing: https://t.co/mIV169cF36
+
+$SPY $AAPL $EURUSD"
+JUST IN: Apple delays earnings release by one day, to April 26th after the bell. • $AAPL
+Apple's market value is down nearly three Twitters $AAPL $TWTR
+Trump warns of a tech bubble: https://t.co/6Ks1yTa4Zc $AAPL $FB $AMZN He's 100% right about this. https://t.co/dJgTLk5JOB
+Apple could sell its billionth iPhone in just a few months' time https://t.co/g6VYDFIE3d $AAPL https://t.co/jzucmxDYXe
+$SPX  KEEP BLOWING #STOCKMARKET #BUBBLE #STOCKS $MSFT $GS $AAPL $SPY $DIA $DJI $C $SIRI $PCLN  $BAC $JPM $VIX $TVIX https://t.co/GPFBb0uCLF
+Will Apple $AAPL fall from tree? 12-mo descending triangle. I've no interest to short it, but it will be wild ride https://t.co/AnjsIKmIHI
+Tim Cook shouldn't be doing TV w/out a new product. Looks desperate. Not a consumer-facing guy. $AAPL https://t.co/Z4UFSimTLg
+When will Apple will sell its billionth iPhone? It may be sooner than you think: https://t.co/5IaF018N1p $AAPL https://t.co/cCIgtKqWHA
+#Stockmarket downtrend continues next week $spx  $spy $vix $tvix $dji $aapl $jpm $bac $c $msft $pcln $wmt $ba https://t.co/1TTlgnKnZc
+$AAPL https://t.co/AFANPYHnoq
+40 years ago this month, Apple co-founder Ronald Wayne sold his 10% stake in $AAPL for $800. Current value: $61 billion.
+Warren Buffett's Berkshire Hathaway reports 9.8M share stake in $AAPL https://t.co/rXWwuyIooI https://t.co/TztgKCcWWy
+Apple's iBooks and iTunes Movies in China have been shut down after less than 7 months https://t.co/ZuGXZqSHma $AAPL https://t.co/1OHGC9YiUf
+Possible buy on $AAPL as it drops onto it's 9 DEMA support #1Broker #Bitcoin #Blockchain https://t.co/WWssD01joh https://t.co/jOKJyG9EaJ
+"Apple is down 7% after earnings.
+
+That's about $40 BILLION in market cap gone in 30 minutes. Poof.
+
+$AAPL: https://t.co/ggfmPjJjkW"
+B4 CRASH 2008 - Paulson's speech:" OUR FINANCIAL SYSTEM IS STRONG" $VXX $VIX $TVIX $UVXY $SPX $SPY $AAPL $MSFT $BAC $C $FB $DJI $DIA $F
+$ONCI is ready to RUN this week! #stockmarket #pennystocks #parabolic $CDNL $MGT $GOOGL $AAPL $TSLA $TWTR $ONCI https://t.co/wwqf0RNOix
+Apple could sell its billionth iPhone in just a few months' time https://t.co/u2qFZ440dH $AAPL https://t.co/8cAchiZ0vC
+The iPhone might radically change in 2017 $AAPL https://t.co/IXLdCfEdus https://t.co/GpdMvFZPjE
+"The growth of smartphones. On one graph.
+
+A great share via: https://t.co/2hAJlarjSM
+
+$AAPL $GOOGL $MSFT https://t.co/BAwQRvYzou"
+"$AAPL finished last quarter with $232 billion in cash, meanwhile Kanye running up debts making records for Tidal.
+
+Bro."
+Which is bullish for $AAPL if you know anything about $GS https://t.co/WWssD01joh  https://t.co/CQk8iKMI7w
+"The tech stocks with the MOST revenue
+
+1. $AAPL
+2. $AMZN
+3. $MSFT
+
+Visual by @OphirGottlieb: https://t.co/GpZ5ct2z5r https://t.co/H6sNKdtBHd"
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/resources/wordcount/word.txt
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/resources/wordcount/word.txt b/demos/highlevelapi/src/test/resources/wordcount/word.txt
new file mode 100644
index 0000000..edd0f51
--- /dev/null
+++ b/demos/highlevelapi/src/test/resources/wordcount/word.txt
@@ -0,0 +1,8 @@
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+bye
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index e9f2daf..3528e7a2 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -174,6 +174,7 @@
       <id>all-modules</id>
       <modules>
         <module>distributedistinct</module>
+        <module>highlevelapi</module>
       </modules>
     </profile>
   </profiles>
@@ -192,7 +193,6 @@
     <module>r</module>
     <module>echoserver</module>
     <module>iteration</module>
-    <module>highlevelapi</module>
   </modules>
 
   <dependencies>
@@ -232,17 +232,6 @@
         </exclusion>
       </exclusions>
     </dependency>
-    <dependency>
-      <groupId>org.apache.apex</groupId>
-      <artifactId>malhar-stream</artifactId>
-      <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>*</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java
new file mode 100644
index 0000000..57db6d7
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+/**
+ * Average Accumulation
+ */
+public class Average implements Accumulation<Double, MutablePair<Double, Long>, Double>
+{
+  @Override
+  public MutablePair<Double, Long> defaultAccumulatedValue()
+  {
+    return new MutablePair<>(0.0, 0L);
+  }
+  
+  @Override
+  public MutablePair<Double, Long> accumulate(MutablePair<Double, Long> accu, Double input)
+  {
+    accu.setLeft(accu.getLeft() * ((double)accu.getRight() / (accu.getRight() + 1)) + input / (accu.getRight() + 1));
+    accu.setRight(accu.getRight() + 1);
+    return accu;
+  }
+  
+  @Override
+  public MutablePair<Double, Long> merge(MutablePair<Double, Long> accu1, MutablePair<Double, Long> accu2)
+  {
+    accu1.setLeft(accu1.getLeft() * ((double)accu1.getRight() / accu1.getRight() + accu2.getRight()) +
+        accu2.getLeft() * ((double)accu2.getRight() / accu1.getRight() + accu2.getRight()));
+    accu1.setRight(accu1.getRight() + accu2.getRight());
+    return accu1;
+  }
+  
+  @Override
+  public Double getOutput(MutablePair<Double, Long> accumulatedValue)
+  {
+    return accumulatedValue.getLeft();
+  }
+  
+  @Override
+  public Double getRetraction(Double value)
+  {
+    // TODO: Need to add implementation for retraction.
+    return 0.0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java
new file mode 100644
index 0000000..2c01a0b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang3.mutable.MutableLong;
+
+/**
+ * Count Accumulation
+ */
+public class Count implements Accumulation<Long, MutableLong, Long>
+{
+
+  @Override
+  public MutableLong defaultAccumulatedValue()
+  {
+    return new MutableLong(0);
+  }
+
+  @Override
+  public MutableLong accumulate(MutableLong accumulatedValue, Long input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+
+  @Override
+  public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
+  {
+    accumulatedValue1.add(accumulatedValue2);
+    return accumulatedValue1;
+  }
+
+  @Override
+  public Long getOutput(MutableLong accumulatedValue)
+  {
+    return accumulatedValue.getValue();
+  }
+
+  @Override
+  public Long getRetraction(Long value)
+  {
+    return -value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java
new file mode 100644
index 0000000..5716cad
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Fold Accumulation Adaptor class
+ */
+public abstract class FoldFn<INPUT, OUTPUT> implements Accumulation<INPUT, OUTPUT, OUTPUT>
+{
+
+  public FoldFn()
+  {
+  }
+
+  public FoldFn(OUTPUT initialVal)
+  {
+    this.initialVal = initialVal;
+  }
+
+  private OUTPUT initialVal;
+
+  @Override
+  public OUTPUT defaultAccumulatedValue()
+  {
+    return initialVal;
+  }
+
+  @Override
+  public OUTPUT accumulate(OUTPUT accumulatedValue, INPUT input)
+  {
+    return fold(accumulatedValue, input);
+  }
+
+  @Override
+  public OUTPUT getOutput(OUTPUT accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+
+  @Override
+  public OUTPUT getRetraction(OUTPUT value)
+  {
+    return null;
+  }
+
+  abstract OUTPUT fold(OUTPUT result, INPUT input);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java
new file mode 100644
index 0000000..632cad5
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Group accumulation.
+ */
+public class Group<T> implements Accumulation<T, List<T>, List<T>>
+{
+  @Override
+  public List<T> defaultAccumulatedValue()
+  {
+    return new ArrayList<>();
+  }
+  
+  @Override
+  public List<T> accumulate(List<T> accumulatedValue, T input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+  
+  @Override
+  public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
+  {
+    accumulatedValue1.addAll(accumulatedValue2);
+    return accumulatedValue1;
+  }
+  
+  @Override
+  public List<T> getOutput(List<T> accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+  
+  @Override
+  public List<T> getRetraction(List<T> value)
+  {
+    // TODO: Need to add implementation for retraction.
+    return new ArrayList<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java
new file mode 100644
index 0000000..1002b49
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import java.util.Comparator;
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Max accumulation.
+ */
+public class Max<T> implements Accumulation<T, T, T>
+{
+  
+  Comparator<T> comparator;
+  
+  public void setComparator(Comparator<T> comparator)
+  {
+    this.comparator = comparator;
+  }
+  
+  @Override
+  public T defaultAccumulatedValue()
+  {
+    return null;
+  }
+  
+  @Override
+  public T accumulate(T accumulatedValue, T input)
+  {
+    if (accumulatedValue == null) {
+      return input;
+    } else if (comparator != null) {
+      return (comparator.compare(input, accumulatedValue) > 0) ? input : accumulatedValue;
+    } else if (input instanceof Comparable) {
+      return (((Comparable)input).compareTo(accumulatedValue) > 0) ? input : accumulatedValue;
+    } else {
+      throw new RuntimeException("Tuple cannot be compared");
+    }
+  }
+  
+  @Override
+  public T merge(T accumulatedValue1, T accumulatedValue2)
+  {
+    return accumulate(accumulatedValue1, accumulatedValue2);
+  }
+  
+  @Override
+  public T getOutput(T accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+  
+  @Override
+  public T getRetraction(T value)
+  {
+    // TODO: Need to add implementation for retraction.
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java
new file mode 100644
index 0000000..66248f4
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import java.util.Comparator;
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Min accumulation
+ */
+public class Min<T> implements Accumulation<T, T, T>
+{
+  
+  Comparator<T> comparator;
+  
+  public void setComparator(Comparator<T> comparator)
+  {
+    this.comparator = comparator;
+  }
+  
+  @Override
+  public T defaultAccumulatedValue()
+  {
+    return null;
+  }
+  
+  @Override
+  public T accumulate(T accumulatedValue, T input)
+  {
+    if (accumulatedValue == null) {
+      return input;
+    } else if (comparator != null) {
+      return (comparator.compare(input, accumulatedValue) < 0) ? input : accumulatedValue;
+    } else if (input instanceof Comparable) {
+      return (((Comparable)input).compareTo(accumulatedValue) < 0) ? input : accumulatedValue;
+    } else {
+      throw new RuntimeException("Tuple cannot be compared");
+    }
+  }
+  
+  @Override
+  public T merge(T accumulatedValue1, T accumulatedValue2)
+  {
+    return accumulate(accumulatedValue1, accumulatedValue2);
+  }
+  
+  @Override
+  public T getOutput(T accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+  
+  @Override
+  public T getRetraction(T value)
+  {
+    // TODO: Need to add implementation for retraction.
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java
new file mode 100644
index 0000000..c21ab32
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * An easy to use reduce Accumulation
+ * @param <INPUT>
+ */
+public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT>
+{
+  @Override
+  public INPUT defaultAccumulatedValue()
+  {
+    return null;
+  }
+
+  @Override
+  public INPUT accumulate(INPUT accumulatedValue, INPUT input)
+  {
+    if (accumulatedValue == null) {
+      return input;
+    }
+    return reduce(accumulatedValue, input);
+  }
+
+  @Override
+  public INPUT merge(INPUT accumulatedValue1, INPUT accumulatedValue2)
+  {
+    return reduce(accumulatedValue1, accumulatedValue2);
+  }
+
+  @Override
+  public INPUT getOutput(INPUT accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+
+  @Override
+  public INPUT getRetraction(INPUT value)
+  {
+    return null;
+  }
+
+  public abstract INPUT reduce(INPUT input1, INPUT input2);
+
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java
new file mode 100644
index 0000000..b7cd770
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * RemoveDuplicates Accumulation.
+ * @param <T>
+ */
+public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>>
+{
+  @Override
+  public Set<T> defaultAccumulatedValue()
+  {
+    return new HashSet<>();
+  }
+  
+  @Override
+  public Set<T> accumulate(Set<T> accumulatedValue, T input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+  
+  @Override
+  public Set<T> merge(Set<T> accumulatedValue1, Set<T> accumulatedValue2)
+  {
+    for (T item : accumulatedValue2) {
+      accumulatedValue1.add(item);
+    }
+    return accumulatedValue1;
+  }
+  
+  @Override
+  public List<T> getOutput(Set<T> accumulatedValue)
+  {
+    if (accumulatedValue == null) {
+      return new ArrayList<>();
+    } else {
+      return new ArrayList<>(accumulatedValue);
+    }
+  }
+  
+  @Override
+  public List<T> getRetraction(List<T> value)
+  {
+    // TODO: Need to add implementation for retraction.
+    return new ArrayList<>(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java
new file mode 100644
index 0000000..60b195b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableDouble;
+
+/**
+ * Sum Accumulation for doubles.
+ */
+public class SumDouble implements Accumulation<Double, MutableDouble, Double>
+{
+  @Override
+  public MutableDouble defaultAccumulatedValue()
+  {
+    return new MutableDouble(0.0);
+  }
+  
+  @Override
+  public MutableDouble accumulate(MutableDouble accumulatedValue, Double input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+  
+  @Override
+  public MutableDouble merge(MutableDouble accumulatedValue1, MutableDouble accumulatedValue2)
+  {
+    accumulatedValue1.add(accumulatedValue2);
+    return accumulatedValue1;
+  }
+  
+  @Override
+  public Double getOutput(MutableDouble accumulatedValue)
+  {
+    return accumulatedValue.doubleValue();
+  }
+  
+  @Override
+  public Double getRetraction(Double value)
+  {
+    return -value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java
new file mode 100644
index 0000000..14e69e2
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableFloat;
+
+/**
+ * Sum Accumulation for floats.
+ */
+public class SumFloat implements Accumulation<Float, MutableFloat, Float>
+{
+  @Override
+  public MutableFloat defaultAccumulatedValue()
+  {
+    return new MutableFloat(0.);
+  }
+  
+  @Override
+  public MutableFloat accumulate(MutableFloat accumulatedValue, Float input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+  
+  @Override
+  public MutableFloat merge(MutableFloat accumulatedValue1, MutableFloat accumulatedValue2)
+  {
+    accumulatedValue1.add(accumulatedValue2);
+    return accumulatedValue1;
+  }
+  
+  @Override
+  public Float getOutput(MutableFloat accumulatedValue)
+  {
+    return accumulatedValue.floatValue();
+  }
+  
+  @Override
+  public Float getRetraction(Float value)
+  {
+    return -value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java
new file mode 100644
index 0000000..886a7d0
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableInt;
+
+/**
+ * Sum accumulation for integers.
+ */
+public class SumInt implements Accumulation<Integer, MutableInt, Integer>
+{
+  @Override
+  public MutableInt defaultAccumulatedValue()
+  {
+    return new MutableInt(0);
+  }
+  
+  @Override
+  public MutableInt accumulate(MutableInt accumulatedValue, Integer input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+  
+  @Override
+  public MutableInt merge(MutableInt accumulatedValue1, MutableInt accumulatedValue2)
+  {
+    accumulatedValue1.add(accumulatedValue2);
+    return accumulatedValue1;
+  }
+  
+  @Override
+  public Integer getOutput(MutableInt accumulatedValue)
+  {
+    return accumulatedValue.intValue();
+  }
+  
+  @Override
+  public Integer getRetraction(Integer value)
+  {
+    return -value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java
new file mode 100644
index 0000000..469eef9
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableLong;
+
+/**
+ * Sum accumulation for longs.
+ */
+public class SumLong implements Accumulation<Long, MutableLong, Long>
+{
+  @Override
+  public MutableLong defaultAccumulatedValue()
+  {
+    return new MutableLong(0L);
+  }
+  
+  @Override
+  public MutableLong accumulate(MutableLong accumulatedValue, Long input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+  
+  @Override
+  public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
+  {
+    accumulatedValue1.add(accumulatedValue2);
+    return accumulatedValue1;
+  }
+  
+  @Override
+  public Long getOutput(MutableLong accumulatedValue)
+  {
+    return accumulatedValue.longValue();
+  }
+  
+  @Override
+  public Long getRetraction(Long value)
+  {
+    return -value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java
new file mode 100644
index 0000000..7dad8cc
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * TopN accumulation
+ */
+public class TopN<T> implements Accumulation<T, List<T>, List<T>>
+{
+  int n;
+
+  Comparator<T> comparator;
+
+  public void setN(int n)
+  {
+    this.n = n;
+  }
+
+  public void setComparator(Comparator<T> comparator)
+  {
+    this.comparator = comparator;
+  }
+
+  @Override
+  public List<T> defaultAccumulatedValue()
+  {
+    return new LinkedList<>();
+  }
+
+  @Override
+  public List<T> accumulate(List<T> accumulatedValue, T input)
+  {
+    int k = 0;
+    for (T inMemory : accumulatedValue) {
+      if (comparator != null) {
+        if (comparator.compare(inMemory, input) < 0) {
+          break;
+        }
+      } else if (input instanceof Comparable) {
+        if (((Comparable<T>)input).compareTo(inMemory) > 0) {
+          break;
+        }
+      } else {
+        throw new RuntimeException("Tuple cannot be compared");
+      }
+      k++;
+    }
+    accumulatedValue.add(k, input);
+    if (accumulatedValue.size() > n) {
+      accumulatedValue.remove(accumulatedValue.get(accumulatedValue.size() - 1));
+    }
+    return accumulatedValue;
+  }
+
+  @Override
+  public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
+  {
+    accumulatedValue1.addAll(accumulatedValue2);
+    if (comparator != null) {
+      Collections.sort(accumulatedValue1, Collections.reverseOrder(comparator));
+    } else {
+      Collections.sort(accumulatedValue1, Collections.reverseOrder());
+    }
+    if (accumulatedValue1.size() > n) {
+      return accumulatedValue1.subList(0, n);
+    } else {
+      return accumulatedValue1;
+    }
+  }
+
+  @Override
+  public List<T> getOutput(List<T> accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+
+  @Override
+  public List<T> getRetraction(List<T> accumulatedValue)
+  {
+    return new LinkedList<>();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java
new file mode 100644
index 0000000..d9f9cfd
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Generalized TopNByKey accumulation
+ */
+public class TopNByKey<K, V> implements
+    Accumulation<KeyValPair<K, V>, Map<K, V>, List<KeyValPair<K, V>>>
+{
+  int n = 10;
+
+  Comparator<V> comparator;
+
+  public void setN(int n)
+  {
+    this.n = n;
+  }
+
+  public void setComparator(Comparator<V> comparator)
+  {
+    this.comparator = comparator;
+  }
+
+  @Override
+  public Map<K, V> defaultAccumulatedValue()
+  {
+    return new HashMap<>();
+  }
+
+  @Override
+  public Map<K, V> accumulate(Map<K, V> accumulatedValue, KeyValPair<K, V> input)
+  {
+    accumulatedValue.put(input.getKey(), input.getValue());
+    return accumulatedValue;
+  }
+
+  @Override
+  public Map<K, V> merge(Map<K, V> accumulatedValue1, Map<K, V> accumulatedValue2)
+  {
+    for (Map.Entry<K, V> entry : accumulatedValue2.entrySet()) {
+      if (!accumulatedValue1.containsKey(entry.getKey())) {
+        accumulatedValue1.put(entry.getKey(), entry.getValue());
+      } else if (comparator != null) {
+        if (comparator.compare(entry.getValue(), accumulatedValue1.get(entry.getKey())) > 0) {
+          accumulatedValue1.put(entry.getKey(), entry.getValue());
+        }
+      } else if (entry.getValue() instanceof Comparable) {
+        if (((Comparable<V>)entry.getValue()).compareTo(accumulatedValue1.get(entry.getKey())) > 0) {
+          accumulatedValue1.put(entry.getKey(), entry.getValue());
+        }
+      }
+    }
+    return accumulatedValue1;
+  }
+
+  @Override
+  public List<KeyValPair<K, V>> getOutput(Map<K, V> accumulatedValue)
+  {
+    LinkedList<KeyValPair<K, V>> result = new LinkedList<>();
+    for (Map.Entry<K, V> entry : accumulatedValue.entrySet()) {
+      int k = 0;
+      for (KeyValPair<K, V> inMemory : result) {
+        if (comparator != null) {
+          if (comparator.compare(entry.getValue(), inMemory.getValue()) > 0) {
+            break;
+          }
+        } else if (entry.getValue() instanceof Comparable) {
+          if (((Comparable<V>)entry.getValue()).compareTo(inMemory.getValue()) > 0) {
+            break;
+          }
+        }
+        k++;
+      }
+      result.add(k, new KeyValPair<K, V>(entry.getKey(), entry.getValue()));
+      if (result.size() > n) {
+        result.remove(result.get(result.size() - 1));
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public List<KeyValPair<K, V>> getRetraction(List<KeyValPair<K, V>> value)
+  {
+    // TODO: Need to add implementation for retraction.
+    return new LinkedList<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java
new file mode 100644
index 0000000..fb4de3c
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+/**
+ * Test for {@link Average}.
+ */
+public class AverageTest
+{
+  @Test
+  public void AverageTest()
+  {
+    Average ave = new Average();
+    MutablePair<Double, Long> accu = ave.defaultAccumulatedValue();
+    
+    for (int i = 1; i <= 10; i++) {
+      accu = ave.accumulate(accu, (double)i);
+    }
+    Assert.assertTrue(5.5 == accu.getLeft());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java
new file mode 100644
index 0000000..4e6f8f1
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.apex.malhar.lib.window.Tuple;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Test for {@link ReduceFn}.
+ */
+public class FoldFnTest
+{
+  public static class NumGen extends BaseOperator implements InputOperator
+  {
+    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
+  
+    public static int count = 0;
+    private int i = 0;
+  
+    public NumGen()
+    {
+      count = 0;
+      i = 0;
+    }
+  
+    @Override
+    public void emitTuples()
+    {
+      while (i <= 7) {
+        try {
+          Thread.sleep(50);
+        } catch (InterruptedException e) {
+          // Ignore it.
+        }
+        count++;
+        if (i >= 0) {
+          output.emit(i++);
+        }
+      }
+      i = -1;
+    }
+  }
+  
+  public static class Collector extends BaseOperator
+  {
+    private static int result;
+    
+    public transient DefaultInputPort<Tuple.WindowedTuple<Integer>> input = new DefaultInputPort<Tuple.WindowedTuple<Integer>>()
+    {
+      @Override
+      public void process(Tuple.WindowedTuple<Integer> tuple)
+      {
+        result = tuple.getValue();
+      }
+    };
+    
+    public int getResult()
+    {
+      return result;
+    }
+  }
+  
+  public static class Plus extends FoldFn<Integer, Integer>
+  {
+    @Override
+    public Integer merge(Integer accumulatedValue1, Integer accumulatedValue2)
+    {
+      return fold(accumulatedValue1, accumulatedValue2);
+    }
+    
+    @Override
+    public Integer fold(Integer input1, Integer input2)
+    {
+      if (input1 == null) {
+        return input2;
+      }
+      return input1 + input2;
+    }
+  }
+  
+  @Test
+  public void FoldFnTest()
+  {
+    
+    FoldFn<String, String> concat = new FoldFn<String, String>()
+    {
+      @Override
+      public String merge(String accumulatedValue1, String accumulatedValue2)
+      {
+        return fold(accumulatedValue1, accumulatedValue2);
+      }
+  
+      @Override
+      public String fold(String input1, String input2)
+      {
+        return input1 + ", " + input2;
+      }
+    };
+    
+    String[] ss = new String[]{"b", "c", "d", "e"};
+    String base = "a";
+    
+    for (String s : ss) {
+      base = concat.accumulate(base, s);
+    }
+    Assert.assertEquals("a, b, c, d, e", base);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java
new file mode 100644
index 0000000..a9aac77
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link Group}.
+ */
+public class GroupTest
+{
+  @Test
+  public void GroupTest()
+  {
+    Group<Integer> group = new Group<>();
+    
+    List<Integer> accu = group.defaultAccumulatedValue();
+    Assert.assertEquals(0, accu.size());
+    Assert.assertEquals(1, group.accumulate(accu, 10).size());
+    Assert.assertEquals(2, group.accumulate(accu, 11).size());
+    Assert.assertEquals(3, group.accumulate(accu, 11).size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java
new file mode 100644
index 0000000..c873125
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import java.util.Comparator;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for Max accumulation
+ */
+public class MaxTest
+{
+  @Test
+  public void MaxTest()
+  {
+    Max<Integer> max = new Max<>();
+    
+    Assert.assertEquals((Integer)5, max.accumulate(5, 3));
+    Assert.assertEquals((Integer)6, max.accumulate(4, 6));
+    Assert.assertEquals((Integer)5, max.merge(5, 2));
+  
+    Comparator<Integer> com = new Comparator<Integer>()
+    {
+      @Override
+      public int compare(Integer o1, Integer o2)
+      {
+        return -(o1.compareTo(o2));
+      }
+    };
+    
+    max.setComparator(com);
+    Assert.assertEquals((Integer)3, max.accumulate(5, 3));
+    Assert.assertEquals((Integer)4, max.accumulate(4, 6));
+    Assert.assertEquals((Integer)2, max.merge(5, 2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java
new file mode 100644
index 0000000..74816b0
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import java.util.Comparator;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link Min}.
+ */
+public class MinTest
+{
+  @Test
+  public void MinTest()
+  {
+    Min<Integer> min = new Min<>();
+    
+    Assert.assertEquals((Integer)3, min.accumulate(5, 3));
+    Assert.assertEquals((Integer)4, min.accumulate(4, 6));
+    Assert.assertEquals((Integer)2, min.merge(5, 2));
+    
+    Comparator<Integer> com = new Comparator<Integer>()
+    {
+      @Override
+      public int compare(Integer o1, Integer o2)
+      {
+        return -(o1.compareTo(o2));
+      }
+    };
+    
+    min.setComparator(com);
+    Assert.assertEquals((Integer)5, min.accumulate(5, 3));
+    Assert.assertEquals((Integer)6, min.accumulate(4, 6));
+    Assert.assertEquals((Integer)5, min.merge(5, 2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java
new file mode 100644
index 0000000..6b5bbad
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link ReduceFn}.
+ */
+public class ReduceFnTest
+{
+  
+  @Test
+  public void ReduceFnTest()
+  {
+    ReduceFn<String> concat = new ReduceFn<String>()
+    {
+      @Override
+      public String reduce(String input1, String input2)
+      {
+        return input1 + ", " + input2;
+      }
+    };
+    
+    String[] ss = new String[]{"b", "c", "d", "e"};
+    String base = "a";
+    
+    for (String s : ss) {
+      base = concat.accumulate(base, s);
+    }
+    Assert.assertEquals("a, b, c, d, e", base);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java
new file mode 100644
index 0000000..f0196d2
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link RemoveDuplicates}.
+ */
+public class RemoveDuplicatesTest
+{
+  @Test
+  public void RemoveDuplicatesTest()
+  {
+    RemoveDuplicates<Integer> rd = new RemoveDuplicates<>();
+    
+    Set<Integer> accu = rd.defaultAccumulatedValue();
+    Assert.assertEquals(0, accu.size());
+    Assert.assertEquals(1, rd.accumulate(accu, 10).size());
+    Assert.assertEquals(2, rd.accumulate(accu, 11).size());
+    Assert.assertEquals(2, rd.accumulate(accu, 11).size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java
new file mode 100644
index 0000000..65b6480
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.commons.lang.mutable.MutableDouble;
+import org.apache.commons.lang.mutable.MutableFloat;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.lang.mutable.MutableLong;
+
+/**
+ * Test for different Sum Accumulations.
+ */
+public class SumTest
+{
+  @Test
+  public void SumTest()
+  {
+    SumInt si = new SumInt();
+    SumLong sl = new SumLong();
+    SumFloat sf = new SumFloat();
+    SumDouble sd = new SumDouble();
+    
+    Assert.assertEquals(new MutableInt(10), si.accumulate(si.defaultAccumulatedValue(), 10));
+    Assert.assertEquals(new MutableInt(11), si.accumulate(new MutableInt(1), 10));
+    Assert.assertEquals(new MutableInt(22), si.merge(new MutableInt(1), new MutableInt(21)));
+    
+    Assert.assertEquals(new MutableLong(10L), sl.accumulate(sl.defaultAccumulatedValue(), 10L));
+    Assert.assertEquals(new MutableLong(22L), sl.accumulate(new MutableLong(2L), 20L));
+    Assert.assertEquals(new MutableLong(41L), sl.merge(new MutableLong(32L), new MutableLong(9L)));
+    
+    Assert.assertEquals(new MutableFloat(9.0F), sf.accumulate(sf.defaultAccumulatedValue(), 9.0F));
+    Assert.assertEquals(new MutableFloat(22.5F), sf.accumulate(new MutableFloat(2.5F), 20F));
+    Assert.assertEquals(new MutableFloat(41.0F), sf.merge(new MutableFloat(33.1F), new MutableFloat(7.9F)));
+    
+    Assert.assertEquals(new MutableDouble(9.0), sd.accumulate(sd.defaultAccumulatedValue(), 9.0));
+    Assert.assertEquals(new MutableDouble(22.5), sd.accumulate(new MutableDouble(2.5), 20.0));
+    Assert.assertEquals(new MutableDouble(41.0), sd.merge(new MutableDouble(33.1), new MutableDouble(7.9)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java
new file mode 100644
index 0000000..3f6ac09
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl.accumulation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Unit test for TopNByKey accumulation
+ */
+public class TopNByKeyTest
+{
+  @Test
+  public void TopNByKeyTest() throws Exception
+  {
+    TopNByKey<String, Integer> topNByKey = new TopNByKey<>();
+    topNByKey.setN(3);
+    Map<String, Integer> accu = topNByKey.defaultAccumulatedValue();
+  
+    Assert.assertEquals(0, accu.size());
+    
+    accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("1", 1));
+    accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("3", 3));
+    
+    List<KeyValPair<String, Integer>> result1 = new ArrayList<>();
+  
+    result1.add(new KeyValPair<String, Integer>("3", 3));
+    result1.add(new KeyValPair<String, Integer>("1", 1));
+    
+    Assert.assertEquals(result1, topNByKey.getOutput(accu));
+    
+    accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("2", 2));
+  
+    List<KeyValPair<String, Integer>> result2 = new ArrayList<>();
+  
+    result2.add(new KeyValPair<String, Integer>("3", 3));
+    result2.add(new KeyValPair<String, Integer>("2", 2));
+    result2.add(new KeyValPair<String, Integer>("1", 1));
+    
+    Assert.assertEquals(result2, topNByKey.getOutput(accu));
+    
+    accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("5", 5));
+    accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("4", 4));
+  
+    List<KeyValPair<String, Integer>> result3 = new ArrayList<>();
+    
+    result3.add(new KeyValPair<String, Integer>("5", 5));
+    result3.add(new KeyValPair<String, Integer>("4", 4));
+    result3.add(new KeyValPair<String, Integer>("3", 3));
+    
+    Assert.assertEquals(result3, topNByKey.getOutput(accu));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
index bc99035..84f05fc 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
@@ -27,9 +27,9 @@ import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
 import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
+import org.apache.apex.malhar.lib.window.impl.accumulation.FoldFn;
+import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn;
 import org.apache.apex.malhar.stream.api.function.Function;
-import org.apache.apex.malhar.stream.api.impl.accumulation.FoldFn;
-import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.lib.util.KeyValPair;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
index a293ea8..ebd5eea 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
@@ -32,15 +32,16 @@ import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
 import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
 import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
 import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
+import org.apache.apex.malhar.lib.window.impl.accumulation.Count;
+import org.apache.apex.malhar.lib.window.impl.accumulation.FoldFn;
+import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn;
+import org.apache.apex.malhar.lib.window.impl.accumulation.TopN;
+
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.Option;
 import org.apache.apex.malhar.stream.api.WindowedStream;
 import org.apache.apex.malhar.stream.api.function.Function;
 
-import org.apache.apex.malhar.stream.api.impl.accumulation.Count;
-import org.apache.apex.malhar.stream.api.impl.accumulation.FoldFn;
-import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn;
-import org.apache.apex.malhar.stream.api.impl.accumulation.TopN;
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -62,7 +63,7 @@ public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements Wind
 
   protected Duration allowedLateness;
 
-  private class ConvertFn<T> implements Function.MapFunction<T, Tuple<T>>
+  private static class ConvertFn<T> implements Function.MapFunction<T, Tuple<T>>
   {
 
     @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java
deleted file mode 100644
index 68f1b9e..0000000
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.api.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang3.mutable.MutableLong;
-
-/**
- * Count Accumulation
- */
-public class Count implements Accumulation<Long, MutableLong, Long>
-{
-
-  @Override
-  public MutableLong defaultAccumulatedValue()
-  {
-    return new MutableLong(0);
-  }
-
-  @Override
-  public MutableLong accumulate(MutableLong accumulatedValue, Long input)
-  {
-    accumulatedValue.add(input);
-    return accumulatedValue;
-  }
-
-  @Override
-  public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
-  {
-    accumulatedValue1.add(accumulatedValue2);
-    return accumulatedValue1;
-  }
-
-  @Override
-  public Long getOutput(MutableLong accumulatedValue)
-  {
-    return accumulatedValue.getValue();
-  }
-
-  @Override
-  public Long getRetraction(Long value)
-  {
-    return -value;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java
deleted file mode 100644
index 3ab6892..0000000
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.api.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * Fold Accumulation Adaptor class
- */
-public abstract class FoldFn<INPUT, OUTPUT> implements Accumulation<INPUT, OUTPUT, OUTPUT>
-{
-
-  public FoldFn()
-  {
-  }
-
-  public FoldFn(OUTPUT initialVal)
-  {
-    this.initialVal = initialVal;
-  }
-
-  private OUTPUT initialVal;
-
-  @Override
-  public OUTPUT defaultAccumulatedValue()
-  {
-    return initialVal;
-  }
-
-  @Override
-  public OUTPUT accumulate(OUTPUT accumulatedValue, INPUT input)
-  {
-    return fold(accumulatedValue, input);
-  }
-
-  @Override
-  public OUTPUT getOutput(OUTPUT accumulatedValue)
-  {
-    return accumulatedValue;
-  }
-
-  @Override
-  public OUTPUT getRetraction(OUTPUT value)
-  {
-    return null;
-  }
-
-  abstract OUTPUT fold(OUTPUT result, INPUT input);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java
deleted file mode 100644
index b4507bc..0000000
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.api.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * An easy to use reduce Accumulation
- * @param <INPUT>
- */
-public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT>
-{
-  @Override
-  public INPUT defaultAccumulatedValue()
-  {
-    return null;
-  }
-
-  @Override
-  public INPUT accumulate(INPUT accumulatedValue, INPUT input)
-  {
-    if (accumulatedValue == null) {
-      return input;
-    }
-    return reduce(accumulatedValue, input);
-  }
-
-  @Override
-  public INPUT merge(INPUT accumulatedValue1, INPUT accumulatedValue2)
-  {
-    return reduce(accumulatedValue1, accumulatedValue2);
-  }
-
-  @Override
-  public INPUT getOutput(INPUT accumulatedValue)
-  {
-    return accumulatedValue;
-  }
-
-  @Override
-  public INPUT getRetraction(INPUT value)
-  {
-    return null;
-  }
-
-  public abstract INPUT reduce(INPUT input1, INPUT input2);
-
-
-}


Mime
View raw message