flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [06/13] flink git commit: [FLINK-6210] [rocksdb] Close RocksDB in ListViaMergeSpeedMiniBenchmark && ListViaRangeSpeedMiniBenchmark
Date Fri, 21 Apr 2017 12:24:20 GMT
[FLINK-6210] [rocksdb] Close RocksDB in ListViaMergeSpeedMiniBenchmark && ListViaRangeSpeedMiniBenchmark

This closes #3652


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea054a7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea054a7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea054a7d

Branch: refs/heads/master
Commit: ea054a7d3bc452d153c070b2789ddbe6a2f080a7
Parents: 7a70524
Author: mengji.fy <mengji.fy@taobao.com>
Authored: Thu Mar 30 11:12:41 2017 +0800
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Apr 21 12:00:32 2017 +0200

----------------------------------------------------------------------
 .../ListViaMergeSpeedMiniBenchmark.java         | 73 ++++++++-------
 .../ListViaRangeSpeedMiniBenchmark.java         | 99 +++++++++++---------
 2 files changed, 93 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea054a7d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
index 2a530e1..f3e084f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
@@ -53,52 +53,59 @@ public class ListViaMergeSpeedMiniBenchmark {
 		final String key = "key";
 		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
 
-		final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
-		final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+		try {
+			final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+			final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
 
-		final int num = 50000;
+			final int num = 50000;
 
-		// ----- insert -----
-		System.out.println("begin insert");
+			// ----- insert -----
+			System.out.println("begin insert");
 
-		final long beginInsert = System.nanoTime();
-		for (int i = 0; i < num; i++) {
-			rocksDB.merge(write_options, keyBytes, valueBytes);
-		}
-		final long endInsert = System.nanoTime();
-		System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000)
+ " ms");
+			final long beginInsert = System.nanoTime();
+			for (int i = 0; i < num; i++) {
+				rocksDB.merge(write_options, keyBytes, valueBytes);
+			}
+			final long endInsert = System.nanoTime();
+			System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000)
+ " ms");
 
-		// ----- read (attempt 1) -----
+			// ----- read (attempt 1) -----
 
-		final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
-		final long beginGet1 = System.nanoTime();
-		rocksDB.get(keyBytes, resultHolder);
-		final long endGet1 = System.nanoTime();
+			final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
+			final long beginGet1 = System.nanoTime();
+			rocksDB.get(keyBytes, resultHolder);
+			final long endGet1 = System.nanoTime();
 
-		System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
+			System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
 
-		// ----- read (attempt 2) -----
+			// ----- read (attempt 2) -----
 
-		final long beginGet2 = System.nanoTime();
-		rocksDB.get(keyBytes, resultHolder);
-		final long endGet2 = System.nanoTime();
+			final long beginGet2 = System.nanoTime();
+			rocksDB.get(keyBytes, resultHolder);
+			final long endGet2 = System.nanoTime();
 
-		System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
+			System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
 
-		// ----- compact -----
-		System.out.println("compacting...");
-		final long beginCompact = System.nanoTime();
-		rocksDB.compactRange();
-		final long endCompact = System.nanoTime();
+			// ----- compact -----
+			System.out.println("compacting...");
+			final long beginCompact = System.nanoTime();
+			rocksDB.compactRange();
+			final long endCompact = System.nanoTime();
 
-		System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000)
+ " ms");
+			System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000)
+ " ms");
 
-		// ----- read (attempt 3) -----
+			// ----- read (attempt 3) -----
 
-		final long beginGet3 = System.nanoTime();
-		rocksDB.get(keyBytes, resultHolder);
-		final long endGet3 = System.nanoTime();
+			final long beginGet3 = System.nanoTime();
+			rocksDB.get(keyBytes, resultHolder);
+			final long endGet3 = System.nanoTime();
 
-		System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
+			System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
+		} finally {
+			rocksDB.close();
+			options.close();
+			write_options.close();
+			FileUtils.deleteDirectory(rocksDir);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea054a7d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
index b427ef1..f46e2cd 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
@@ -56,61 +56,68 @@ public class ListViaRangeSpeedMiniBenchmark {
 
 		final String key = "key";
 		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+		
+		try {
 
-		final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
-		final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+			final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+			final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
 
-		final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
+			final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
 
-		final Unsafe unsafe = MemoryUtils.UNSAFE;
-		final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
+			final Unsafe unsafe = MemoryUtils.UNSAFE;
+			final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
 
-		final int num = 50000;
-		System.out.println("begin insert");
+			final int num = 50000;
+			System.out.println("begin insert");
 
-		final long beginInsert = System.nanoTime();
-		for (int i = 0; i < num; i++) {
-			unsafe.putInt(keyTemplate, offset, i);
-			rocksDB.put(write_options, keyTemplate, valueBytes);
-		}
-		final long endInsert = System.nanoTime();
-		System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000)
+ " ms");
-
-		final byte[] resultHolder = new byte[num * valueBytes.length];
-
-		final long beginGet = System.nanoTime();
-
-		final RocksIterator iterator = rocksDB.newIterator();
-		int pos = 0;
-
-		// seek to start
-		unsafe.putInt(keyTemplate, offset, 0);
-		iterator.seek(keyTemplate);
-
-		// mark end
-		unsafe.putInt(keyTemplate, offset, -1);
-
-		// iterate
-		while (iterator.isValid()) {
-			byte[] currKey = iterator.key();
-			if (samePrefix(keyBytes, currKey)) {
-				byte[] currValue = iterator.value();
-				System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
-				pos += currValue.length;
-				iterator.next();
+			final long beginInsert = System.nanoTime();
+			for (int i = 0; i < num; i++) {
+				unsafe.putInt(keyTemplate, offset, i);
+				rocksDB.put(write_options, keyTemplate, valueBytes);
 			}
-			else {
-				break;
+			final long endInsert = System.nanoTime();
+			System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000)
+ " ms");
+
+			final byte[] resultHolder = new byte[num * valueBytes.length];
+
+			final long beginGet = System.nanoTime();
+
+			final RocksIterator iterator = rocksDB.newIterator();
+			int pos = 0;
+
+			try {
+				// seek to start
+				unsafe.putInt(keyTemplate, offset, 0);
+				iterator.seek(keyTemplate);
+
+				// mark end
+				unsafe.putInt(keyTemplate, offset, -1);
+
+				// iterate
+				while (iterator.isValid()) {
+					byte[] currKey = iterator.key();
+					if (samePrefix(keyBytes, currKey)) {
+						byte[] currValue = iterator.value();
+						System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
+						pos += currValue.length;
+						iterator.next();
+					} else {
+						break;
+					}
+				}
+			}finally {
+				iterator.close();
 			}
-		}
 
-		final long endGet = System.nanoTime();
+			final long endGet = System.nanoTime();
 
-		System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms");
-
-		// WriteOptions and RocksDB ultimately extends AbstractNativeReference, so we need to close
resource as well.
-		write_options.close();
-		rocksDB.close();
+			System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms");
+		} finally {
+			rocksDB.close();
+			options.close();
+			write_options.close();
+			FileUtils.deleteDirectory(rocksDir);
+		}
 	}
 
 	private static boolean samePrefix(byte[] prefix, byte[] key) {


Mime
View raw message