##### Site index · List index
Message view
Top
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Date Mon, 23 Mar 2015 14:15:12 GMT
```
]

ASF GitHub Bot commented on FLINK-1560:
---------------------------------------

Github user mbalassi commented on a diff in the pull request:

---
@@ -103,57 +115,124 @@ public static void main(String[] args) throws Exception {
// *************************************************************************

/**
-	 * Iteration step function which takes an input (Double , Integer) and
-	 * produces an output (Double + random, Integer + 1).
+	 * Generate random integer pairs from the range from 0 to BOUND/2
+	 */
+	private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer,
Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private Random rnd = new Random();
+
+		@Override
+		public void run(Collector<Tuple2<Integer, Integer>> collector) throws Exception
{
+			while(true) {
+				int first = rnd.nextInt(BOUND/2 - 1) + 1;
+				int second = rnd.nextInt(BOUND/2 - 1) + 1;
+
+				collector.collect(new Tuple2<Integer, Integer>(first, second));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			// no cleanup needed
+		}
+	}
+
+	/**
+	 * Generate random integer pairs from the range from 0 to BOUND/2
*/
-	public static class Step extends
-			RichMapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>>
{
+	private static class FibonacciInputMap implements MapFunction<String, Tuple2<Integer,
Integer>> {
private static final long serialVersionUID = 1L;
-		private transient Random rnd;

-		public void open(Configuration parameters) {
-			rnd = new Random();
+		@Override
+		public Tuple2<Integer, Integer> map(String value) throws Exception {
+			String record = value.substring(1, value.length()-1);
+			String[] splitted = record.split(",");
+			return new Tuple2<Integer, Integer>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
}
+	}
+
+	/**
+	 * Map the inputs so that the next Fibonacci numbers can be calculated while preserving
the original input tuple
+	 * A counter is attached to the tuple and incremented in every iteration step
+	 */
+	public static class InputMap implements MapFunction<Tuple2<Integer, Integer>,
Tuple5<Integer, Integer, Integer, Integer, Integer>> {
+
+		@Override
+		public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer,
Integer> value) throws
+				Exception {
+			return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1,
value.f0, value.f1, 0);
+		}
+	}
+
+	/**
+	 * Iteration step function that calculates the next Fibonacci number
+	 */
+	public static class Step implements
+			MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer,
Integer, Integer, Integer, Integer>> {
+		private static final long serialVersionUID = 1L;

@Override
-		public Tuple2<Double, Integer> map(Tuple2<Double, Integer> value) throws
Exception {
-			return new Tuple2<Double, Integer>(value.f0 + rnd.nextDouble(), value.f1 + 1);
+		public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer,
Integer, Integer, Integer, Integer> value) throws Exception {
+			return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1,
value.f3, value.f2 + value.f3, ++value.f4);
}
}

/**
* OutputSelector testing which tuple needs to be iterated again.
*/
-	public static class MySelector implements OutputSelector<Tuple2<Double, Integer>>
{
+	public static class MySelector implements OutputSelector<Tuple5<Integer, Integer,
Integer, Integer, Integer>> {
private static final long serialVersionUID = 1L;

@Override
-		public Iterable<String> select(Tuple2<Double, Integer> value) {
+		public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer,
Integer> value) {
List<String> output = new ArrayList<String>();
-			if (value.f0 > 100) {
-			} else {
+			if (value.f2 < BOUND && value.f3 < BOUND) {
+			} else {
}
return output;
}
+	}
+
+	/**
+	 * Giving back the input pair and the counter
+	 */
+	public static class OutputMap implements MapFunction<Tuple5<Integer, Integer,
Integer, Integer, Integer>, Tuple2<Tuple2<Integer, Integer>, Integer>> {

+		@Override
+		public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer,
Integer, Integer, Integer, Integer> value) throws
+				Exception {
+			return new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer,
Integer>(value.f0, value.f1), value.f4);
+		}
}

// *************************************************************************
// UTIL METHODS
// *************************************************************************

+	private static boolean fileInput = true;
private static boolean fileOutput = false;
+	private static String inputPath = "/home/szape/result.txt";
--- End diff --

:)

> Add ITCases for streaming examples
> ----------------------------------
>
>          Issue Type: Test
>          Components: Streaming
>    Affects Versions: 0.9
>            Reporter: Márton Balassi
>            Assignee: Péter Szabó
>
> Currently there are no tests for consistency of the streaming example programs. This
might be a real show stopper for users who encounter an issue there.

--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

```
Mime
View raw message