apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [1/6] apex-malhar git commit: Added Beam Examples and Implementations of Accumulation.
Date Thu, 25 Aug 2016 16:43:17 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 17f6c5523 -> dcca7752a


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java
b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java
deleted file mode 100644
index 77a08a6..0000000
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.api.impl.accumulation;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * TopN accumulation
- */
-public class TopN<T> implements Accumulation<T, List<T>, List<T>>
-{
-
-  int n;
-
-  Comparator<T> comparator;
-
-  public void setN(int n)
-  {
-    this.n = n;
-  }
-
-  public void setComparator(Comparator<T> comparator)
-  {
-    this.comparator = comparator;
-  }
-
-  @Override
-  public List<T> defaultAccumulatedValue()
-  {
-    return new LinkedList<>();
-  }
-
-  @Override
-  public List<T> accumulate(List<T> accumulatedValue, T input)
-  {
-    int k = 0;
-    for (T inMemory : accumulatedValue) {
-      if (comparator != null) {
-        if (comparator.compare(inMemory, input) < 0) {
-          break;
-        }
-      } else if (input instanceof Comparable) {
-        if (((Comparable<T>)input).compareTo(inMemory) > 0) {
-          break;
-        }
-      } else {
-        throw new RuntimeException("Tuple cannot be compared");
-      }
-      k++;
-    }
-    accumulatedValue.add(k, input);
-    if (accumulatedValue.size() > n) {
-      accumulatedValue.remove(accumulatedValue.get(accumulatedValue.size() - 1));
-    }
-    return accumulatedValue;
-  }
-
-  @Override
-  public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
-  {
-    accumulatedValue1.addAll(accumulatedValue2);
-    if (comparator != null) {
-      Collections.sort(accumulatedValue1, Collections.reverseOrder(comparator));
-    } else {
-      Collections.sort(accumulatedValue1, Collections.reverseOrder());
-    }
-    if (accumulatedValue1.size() > n) {
-      return accumulatedValue1.subList(0, n);
-    } else {
-      return accumulatedValue1;
-    }
-  }
-
-  @Override
-  public List<T> getOutput(List<T> accumulatedValue)
-  {
-    return accumulatedValue;
-  }
-
-  @Override
-  public List<T> getRetraction(List<T> accumulatedValue)
-  {
-    return new LinkedList<>();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/stream/src/test/resources/words/word.txt
----------------------------------------------------------------------
diff --git a/stream/src/test/resources/words/word.txt b/stream/src/test/resources/words/word.txt
new file mode 100644
index 0000000..a8e8c35
--- /dev/null
+++ b/stream/src/test/resources/words/word.txt
@@ -0,0 +1,2 @@
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4
word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4
word2 word1 error bye


Mime
View raw message