flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [26/34] incubator-flink git commit: [streaming] Created several examples for the usage of policy based windowing.
Date Fri, 05 Dec 2014 17:26:31 GMT
[streaming] Created several examples for the usage of policy based windowing.

[streaming] Fixed windowing examples by removing manual setting of source parallelism

[streaming] Extended streaming guide with a section about the new policy based windowing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3d11d5ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3d11d5ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3d11d5ba

Branch: refs/heads/master
Commit: 3d11d5baea1513a819f189f199502fc3979336b8
Parents: 3f50683
Author: Jonas Traub (powibol) <jon@s-traub.com>
Authored: Tue Oct 28 11:14:33 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Fri Dec 5 16:45:09 2014 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         |  19 +++
 .../examples/windowing/BasicExample.java        |  76 ++++++++++++
 .../examples/windowing/DeltaExtractExample.java |  78 ++++++++++++
 .../windowing/MultiplePoliciesExample.java      |  84 +++++++++++++
 .../examples/windowing/SlidingExample.java      | 101 ++++++++++++++++
 .../windowing/TimeWindowingExample.java         | 120 +++++++++++++++++++
 6 files changed, 478 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3d11d5ba/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 264140a..a8002ff 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -285,6 +285,25 @@ The transformation calls a `GroupReduceFunction` for each data batch
or data win
 dataStream.batch(1000, 100).reduceGroup(reducer);
 ~~~
 
+#### Policy based windowing
+The policy based windowing is an highly flexible way to specify your stream discretisation
also called windowing semantics. Two types of policies are used for such a specification:
+
+1) `TriggerPolicy` This policy defines when to trigger the reduce UDF on the current window
and emit the result. In the API it completes a window statement such as: `.window(..).every(...)`,
while we pass the triggering policy within `every`. Several predefind policies are provided
in the API, including delta-based, punctuation based, count-based and time-based policies.
Anyhow, policies are in general UDFs and can implement any custom behaviour as well.
+
+2) `Eviction Policy` This policy defines the length of a window as a means of a predicate
for evicting tuples when they are no longer needed. In the API this can be defined by the
`.window(..)` operation on a stream. There are mostely the same predefined policy types provided
as for trigger policies.
+
+For example, if we want a fife tuples long sliding window that triggers every second we can
write the following:
+~~~java
+myStream.window(Count.of(5)).every(Time.of(1,TimeUnit.SECOND))
+~~~
+
+In addition to this, multiple policies can be added to a `LinkedList` and then be used in
parallel by writing the following:
+~~~java
+myStream.window(ListOfTriggerPolicies,ListOfEvictionPolicies)
+~~~
+
+Especially when time based windowing is used, you may want to trigger not only when an element
arrives but also in between. Active policies provide this functionality. The predefined time-based
policies are already implemented in such an active way and can hold as an example in case
you want to implement your own user defined active policy. Anyhow, as our time-based trigger
and eviction policies can work with user defined `TimeStamp` implementations, this policies
already cover most use cases.
+
 ### Co operators
 
 Co operators allow the users to jointly transform two `DataStreams` of different types providing
a simple way to jointly manipulate a shared state. It is designed to support joint stream
transformations where merging is not appropriate due to different data types or the in cases
when user needs explicit track of the datas origin.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3d11d5ba/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/BasicExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/BasicExample.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/BasicExample.java
new file mode 100644
index 0000000..c0e768b
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/BasicExample.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.util.Collector;
+
+/**
+ * A minimal example as introduction to the policy based windowing
+ */
+public class BasicExample {
+
+	private static final int PARALLELISM = 1;
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(PARALLELISM);
+
+		// This reduce function does a String concat.
+		ReduceFunction<String> reduceFunction = new ReduceFunction<String>() {
+
+			/**
+			 * Auto generates version ID
+			 */
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public String reduce(String value1, String value2) throws Exception {
+				return value1 + "|" + value2;
+			}
+
+		};
+
+		DataStream<Tuple2<String, String[]>> stream = env.addSource(new BasicSource())
+				.window(Count.of(5)).every(Count.of(2)).reduce(reduceFunction);
+
+		stream.print();
+
+		env.execute();
+	}
+
+	public static class BasicSource implements SourceFunction<String> {
+
+		private static final long serialVersionUID = 1L;
+		String str = new String("streaming");
+
+		@Override
+		public void invoke(Collector<String> out) throws Exception {
+			// continuous emit
+			while (true) {
+				out.collect(str);
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3d11d5ba/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
new file mode 100644
index 0000000..0622dbf
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.deltafunction.EuclideanDistance;
+import org.apache.flink.streaming.api.windowing.extractor.FieldsFromTuple;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.helper.Delta;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example gives an impression about how to use delta policies. It also
+ * shows how extractors can be used.
+ */
+public class DeltaExtractExample {
+
+	private static final int PARALLELISM = 1;
+
+	@SuppressWarnings({ "serial", "rawtypes", "unchecked" })
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(PARALLELISM);
+
+		ReduceFunction<Tuple3<Double, Double, String>> concatStrings = new ReduceFunction<Tuple3<Double,
Double, String>>() {
+			@Override
+			public Tuple3 reduce(Tuple3 value1, Tuple3 value2) throws Exception {
+				return new Tuple3(value1.f0, value2.f1, value1.f2 + "|" + value2.f2);
+			}
+		};
+
+		DataStream dstream = env
+				.addSource(new CountingSource())
+				.window(Delta.of(new EuclideanDistance(new FieldsFromTuple(0, 1)), new Tuple3(0d,
+						0d, "foo"), 1.2)).every(Count.of(2)).reduce(concatStrings);
+
+		dstream.print();
+		env.execute();
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class CountingSource implements SourceFunction<Tuple3<Double, Double,
String>> {
+
+		private int counter = 0;
+
+		@Override
+		public void invoke(Collector<Tuple3<Double, Double, String>> collector) throws
Exception {
+			while (true) {
+				if (counter > 9999) {
+					counter = 0;
+				}
+				collector.collect(new Tuple3<Double, Double, String>((double) counter,
+						(double) counter + 1, "V" + counter++));
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3d11d5ba/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
new file mode 100644
index 0000000..069693f
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.util.Collector;
+
+import java.util.LinkedList;
+
+/**
+ * This example uses count based tumbling windowing with multiple eviction
+ * policies at the same time.
+ */
+public class MultiplePoliciesExample {
+
+	private static final int PARALLELISM = 1;
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(PARALLELISM);
+
+		LinkedList<TriggerPolicy<String>> policies = new LinkedList<TriggerPolicy<String>>();
+		policies.add(new CountTriggerPolicy<String>(5));
+		policies.add(new CountTriggerPolicy<String>(8));
+
+		// This reduce function does a String concat.
+		ReduceFunction<String> reducer = new ReduceFunction<String>() {
+
+			/**
+			 * Auto generates version ID
+			 */
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public String reduce(String value1, String value2) throws Exception {
+				return value1 + "|" + value2;
+			}
+
+		};
+
+		DataStream<Tuple2<String, String[]>> stream = env.addSource(new BasicSource()).window(
+				policies, reducer);
+
+		stream.print();
+
+		env.execute();
+	}
+
+	public static class BasicSource implements SourceFunction<String> {
+
+		private static final long serialVersionUID = 1L;
+		String str = new String("streaming");
+
+		@Override
+		public void invoke(Collector<String> out) throws Exception {
+			// continuous emit
+			while (true) {
+				out.collect(str);
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3d11d5ba/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
new file mode 100644
index 0000000..1768480
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example uses count based sliding windows to illustrate different
+ * possibilities for the realization of sliding windows. Take a look on the code
+ * which is commented out to see different setups.
+ */
+public class SlidingExample {
+
+	private static final int PARALLELISM = 1;
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(PARALLELISM);
+
+		/*
+		 * SIMPLE-EXAMPLE: Use this to always keep the newest 10 elements in the
+		 * buffer Resulting windows will have an overlap of 5 elements
+		 */
+		// TriggerPolicy<String> triggerPolicy=new
+		// CountTriggerPolicy<String>(5);
+		// EvictionPolicy<String> evictionPolicy=new
+		// CountEvictionPolicy<String>(10);
+
+		/*
+		 * ADVANCED-EXAMPLE: Use this to have the last element of the last
+		 * window as first element of the next window while the window size is
+		 * always 5
+		 */
+		TriggerPolicy<String> triggerPolicy = new CountTriggerPolicy<String>(4, -1);
+		EvictionPolicy<String> evictionPolicy = new CountEvictionPolicy<String>(5,
4);
+
+		// This reduce function does a String concat.
+		ReduceFunction<String> reduceFunction = new ReduceFunction<String>() {
+
+			/**
+			 * default version ID
+			 */
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public String reduce(String value1, String value2) throws Exception {
+				return value1 + "|" + value2;
+			}
+
+		};
+
+		DataStream<Tuple2<String, String[]>> stream = env.addSource(new CountingSource()).window(
+				triggerPolicy, evictionPolicy, reduceFunction);
+
+		stream.print();
+
+		env.execute();
+	}
+
+	@SuppressWarnings("serial")
+	private static class CountingSource implements SourceFunction<String> {
+
+		private int counter = 0;
+
+		@Override
+		public void invoke(Collector<String> collector) throws Exception {
+			// continuous emit
+			while (true) {
+				if (counter > 9999) {
+					counter = 0;
+				}
+				collector.collect("V" + counter++);
+			}
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3d11d5ba/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
new file mode 100644
index 0000000..115363b
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example shows the functionality of time based windows. It utilizes the
+ * {@link ActiveTriggerPolicy} implementation in the {@link TimeTriggerPolicy}.
+ */
+public class TimeWindowingExample {
+
+	private static final int PARALLELISM = 1;
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(PARALLELISM);
+
+		// Prevent output from being blocked
+		env.setBufferTimeout(100);
+
+		// Trigger every 1000ms
+		TriggerPolicy<Integer> triggerPolicy = new TimeTriggerPolicy<Integer>(1000L,
+				new DefaultTimeStamp<Integer>(), new Extractor<Long, Integer>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Integer extract(Long in) {
+						return in.intValue();
+					}
+
+				});
+
+		// Always keep the newest 100 elements in the buffer
+		EvictionPolicy<Integer> evictionPolicy = new CountEvictionPolicy<Integer>(100);
+
+		// This reduce function does a String concat.
+		ReduceFunction<Integer> reduceFunction = new ReduceFunction<Integer>() {
+
+			/**
+			 * default version ID
+			 */
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer reduce(Integer value1, Integer value2) throws Exception {
+				return value1 + value2;
+			}
+
+		};
+
+		DataStream<Tuple2<Integer, String[]>> stream = env.addSource(new CountingSourceWithSleep()).window(triggerPolicy,
evictionPolicy, reduceFunction);
+
+		stream.print();
+
+		env.execute();
+	}
+
+	/**
+	 * This data source emit one element every 0.001 sec. The output is an
+	 * Integer counting the output elements. As soon as the counter reaches
+	 * 10000 it is reset to 0. On each reset the source waits 5 sec. before it
+	 * restarts to produce elements.
+	 */
+	@SuppressWarnings("serial")
+	private static class CountingSourceWithSleep implements SourceFunction<Integer> {
+
+		private int counter = 0;
+
+		@Override
+		public void invoke(Collector<Integer> collector) throws Exception {
+			// continuous emit
+			while (true) {
+				if (counter > 9999) {
+					System.out.println("Source pauses now!");
+					Thread.sleep(5000);
+					System.out.println("Source continouse with emitting now!");
+					counter = 0;
+				}
+				collector.collect(counter);
+
+				// Wait 0.001 sec. before the next emit. Otherwise the source is
+				// too fast for local tests and you might always see
+				// SUM[k=1..9999](k) as result.
+				Thread.sleep(1);
+
+				counter++;
+			}
+		}
+
+	}
+}


Mime
View raw message