flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [2/5] flink git commit: [FLINK-2105] Add support for sorted but sparse test data generation
Date Tue, 04 Aug 2015 22:41:07 GMT
[FLINK-2105] Add support for sorted but sparse test data generation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db0b0087
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db0b0087
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db0b0087

Branch: refs/heads/master
Commit: db0b0087b02985f55bcc6e65571b11ca33b0886f
Parents: 0dc6849
Author: Johann Kovacs <me@jkovacs.de>
Authored: Fri Jul 10 17:26:05 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Aug 4 21:35:27 2015 +0200

----------------------------------------------------------------------
 .../runtime/operators/testutils/TestData.java   | 207 +++++++++++++++++++
 1 file changed, 207 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db0b0087/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
index fd34a3b..8688d4e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.operators.testutils;
 import java.util.Comparator;
 import java.util.Random;
 
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
@@ -279,6 +281,169 @@ public final class TestData {
 			this.counter = 0;
 		}
 	}
+
+	/**
+	 * Tuple2<Integer, String> generator.
+	 */
+	public static class TupleGenerator implements MutableObjectIterator<Tuple2<Integer,
String>> {
+
+		public enum KeyMode {
+			SORTED, RANDOM, SORTED_SPARSE
+		};
+
+		public enum ValueMode {
+			FIX_LENGTH, RANDOM_LENGTH, CONSTANT
+		};
+
+		private static char[] alpha = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k',
'l', 'm', 'a', 'b', 'c',
+				'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm' };
+
+		private final long seed;
+
+		private final int keyMax;
+
+		private final float keyDensity;
+
+		private final int valueLength;
+
+		private final KeyMode keyMode;
+
+		private final ValueMode valueMode;
+
+		private Random random;
+
+		private int counter;
+
+		private int key;
+		private String value;
+
+		public TupleGenerator(long seed, int keyMax, int valueLength) {
+			this(seed, keyMax, valueLength, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+		}
+
+		public TupleGenerator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode
valueMode) {
+			this(seed, keyMax, valueLength, keyMode, valueMode, null);
+		}
+
+		public TupleGenerator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode
valueMode, String constant) {
+			this(seed, keyMax, 1.0f, valueLength, keyMode, valueMode, constant);
+		}
+
+		public TupleGenerator(long seed, int keyMax, float keyDensity, int valueLength, KeyMode
keyMode, ValueMode valueMode, String constant) {
+			this.seed = seed;
+			this.keyMax = keyMax;
+			this.keyDensity = keyDensity;
+			this.valueLength = valueLength;
+			this.keyMode = keyMode;
+			this.valueMode = valueMode;
+
+			this.random = new Random(seed);
+			this.counter = 0;
+
+			this.value = constant == null ? null : constant;
+		}
+
+		public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) {
+			this.key = nextKey();
+			if (this.valueMode != ValueMode.CONSTANT) {
+				this.value = randomString();
+			}
+			reuse.setFields(this.key, this.value);
+			return reuse;
+		}
+
+		public Tuple2<Integer, String> next() {
+			return next(new Tuple2<Integer, String>());
+		}
+
+		public boolean next(org.apache.flink.types.Value[] target) {
+			this.key = nextKey();
+			// TODO change this to something proper
+			((IntValue)target[0]).setValue(this.key);
+			((IntValue)target[1]).setValue(random.nextInt());
+			return true;
+		}
+
+		private int nextKey() {
+			if (keyMode == KeyMode.SORTED) {
+				return ++counter;
+			} else if (keyMode == KeyMode.SORTED_SPARSE) {
+				int max = (int) (1 / keyDensity);
+				counter += random.nextInt(max) + 1;
+				return counter;
+			} else {
+				return Math.abs(random.nextInt() % keyMax) + 1;
+			}
+		}
+
+		public void reset() {
+			this.random = new Random(seed);
+			this.counter = 0;
+		}
+
+		private String randomString() {
+			int length;
+
+			if (valueMode == ValueMode.FIX_LENGTH) {
+				length = valueLength;
+			} else {
+				length = valueLength - random.nextInt(valueLength / 3);
+			}
+
+			StringBuilder sb = new StringBuilder();
+			for (int i = 0; i < length; i++) {
+				sb.append(alpha[random.nextInt(alpha.length)]);
+			}
+			return sb.toString();
+		}
+
+	}
+
+
+	/**
+	 * Record reader mock.
+	 */
+	public static class TupleGeneratorIterator implements MutableObjectIterator<Tuple2<Integer,
String>> {
+
+		private final TupleGenerator generator;
+
+		private final int numberOfRecords;
+
+		private int counter;
+
+		public TupleGeneratorIterator(TupleGenerator generator, int numberOfRecords) {
+			this.generator = generator;
+			this.generator.reset();
+			this.numberOfRecords = numberOfRecords;
+			this.counter = 0;
+		}
+
+		@Override
+		public Tuple2<Integer, String> next(Tuple2<Integer, String> target) {
+			if (counter < numberOfRecords) {
+				counter++;
+				return generator.next(target);
+			}
+			else {
+				return null;
+			}
+		}
+
+		@Override
+		public Tuple2<Integer, String> next() {
+			if (counter < numberOfRecords) {
+				counter++;
+				return generator.next();
+			}
+			else {
+				return null;
+			}
+		}
+
+		public void reset() {
+			this.counter = 0;
+		}
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -325,4 +490,46 @@ public final class TestData {
 			this.pos = 0;
 		}
 	}
+
+	public static class TupleConstantValueIterator implements MutableObjectIterator<Tuple2<Integer,
String>> {
+
+		private int key;
+		private String value;
+
+		private final String valueValue;
+
+
+		private final int numPairs;
+
+		private int pos;
+
+
+		public TupleConstantValueIterator(int keyValue, String valueValue, int numPairs) {
+			this.key = keyValue;
+			this.valueValue = valueValue;
+			this.numPairs = numPairs;
+		}
+
+		@Override
+		public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) {
+			if (pos < this.numPairs) {
+				this.value = this.valueValue + ' ' + pos;
+				reuse.setFields(this.key, this.value);
+				pos++;
+				return reuse;
+			}
+			else {
+				return null;
+			}
+		}
+
+		@Override
+		public Tuple2<Integer, String> next() {
+			return next(new Tuple2<Integer, String>());
+		}
+
+		public void reset() {
+			this.pos = 0;
+		}
+	}
 }


Mime
View raw message