flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [03/50] [abbrv] flink git commit: [FLINK-3357] [core] Drop AbstractID#toShortString()
Date Fri, 12 Feb 2016 11:29:28 GMT
[FLINK-3357] [core] Drop AbstractID#toShortString()

This closes #1601


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28feede7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28feede7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28feede7

Branch: refs/heads/tableOnCalcite
Commit: 28feede7d40dc73ec861cf93393650b8b10afc3a
Parents: 5c47f38
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon Feb 8 16:05:27 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Feb 8 20:18:20 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    |  4 +--
 .../contrib/streaming/state/DbStateBackend.java | 35 ++++++++++----------
 .../streaming/state/DbStateBackendTest.java     | 10 +++---
 .../java/org/apache/flink/util/AbstractID.java  | 15 ---------
 ...taskExecutionAttemptAccumulatorsHandler.java |  2 +-
 .../InputGateDeploymentDescriptor.java          |  2 +-
 .../io/network/partition/ResultPartitionID.java |  2 +-
 7 files changed, 28 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index aaaeea4..eefa4a9 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -81,11 +81,11 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	}
 
 	private File getDbPath(String stateName) {
-		return new File(new File(new File(new File(dbBasePath), jobId.toShortString()), operatorIdentifier),
stateName);
+		return new File(new File(new File(new File(dbBasePath), jobId.toString()), operatorIdentifier),
stateName);
 	}
 
 	private String getCheckpointPath(String stateName) {
-		return checkpointDirectory + "/" + jobId.toShortString() + "/" + operatorIdentifier + "/"
+ stateName;
+		return checkpointDirectory + "/" + jobId.toString() + "/" + operatorIdentifier + "/" +
stateName;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
index 1d1ccd7..5162983 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
@@ -17,14 +17,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import java.io.Serializable;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Random;
-import java.util.concurrent.Callable;
-
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -42,6 +34,14 @@ import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
 import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
 
 /**
@@ -76,6 +76,8 @@ public class DbStateBackend extends AbstractStateBackend {
 
 	private transient Environment env;
 
+	private transient String appId;
+
 	// ------------------------------------------------------
 
 	private final DbBackendConfig dbConfig;
@@ -159,19 +161,14 @@ public class DbStateBackend extends AbstractStateBackend {
 					// store the checkpoint id and timestamp for bookkeeping
 					long handleId = rnd.nextLong();
 
-					// We use the ApplicationID here, because it is restored when
-					// the job is started from a savepoint (whereas the job ID
-					// changes with each submission).
-					String appIdShort = env.getApplicationID().toShortString();
-
 					byte[] serializedState = InstantiationUtil.serializeObject(state);
-					dbAdapter.setCheckpointInsertParams(appIdShort, insertStatement,
+					dbAdapter.setCheckpointInsertParams(appId, insertStatement,
 							checkpointID, timestamp, handleId,
 							serializedState);
 
 					insertStatement.executeUpdate();
 
-					return new DbStateHandle<>(appIdShort, checkpointID, timestamp, handleId,
+					return new DbStateHandle<>(appId, checkpointID, timestamp, handleId,
 							dbConfig, serializedState.length);
 				}
 			}, numSqlRetries, sqlRetrySleep);
@@ -253,6 +250,7 @@ public class DbStateBackend extends AbstractStateBackend {
 
 		this.rnd = new Random();
 		this.env = env;
+		this.appId = env.getApplicationID().toString().substring(0, 16);
 
 		connections = dbConfig.createShardedConnection();
 
@@ -270,8 +268,8 @@ public class DbStateBackend extends AbstractStateBackend {
 		if (nonPartitionedStateBackend == null) {
 			insertStatement = retry(new Callable<PreparedStatement>() {
 				public PreparedStatement call() throws SQLException {
-					dbAdapter.createCheckpointsTable(env.getApplicationID().toShortString(), getConnections().getFirst());
-					return dbAdapter.prepareCheckpointInsert(env.getApplicationID().toShortString(),
+					dbAdapter.createCheckpointsTable(appId, getConnections().getFirst());
+					return dbAdapter.prepareCheckpointInsert(appId,
 							getConnections().getFirst());
 				}
 			}, numSqlRetries, sqlRetrySleep);
@@ -300,9 +298,10 @@ public class DbStateBackend extends AbstractStateBackend {
 	@Override
 	public void disposeAllStateForCurrentJob() throws Exception {
 		if (nonPartitionedStateBackend == null) {
-			dbAdapter.disposeAllStateForJob(env.getApplicationID().toShortString(), connections.getFirst());
+			dbAdapter.disposeAllStateForJob(appId, connections.getFirst());
 		} else {
 			nonPartitionedStateBackend.disposeAllStateForCurrentJob();
 		}
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
index d4883dd..91375e4 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
@@ -136,14 +136,15 @@ public class DbStateBackendTest {
 
 		Environment env = new DummyEnvironment("test", 1, 0);
 		backend.initializeForJob(env, "dummy-setup-ser", StringSerializer.INSTANCE);
+		String appId = env.getApplicationID().toString().substring(0, 16);
 
 		assertNotNull(backend.getConnections());
 		assertTrue(
-				isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString()));
+				isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + appId));
 
 		backend.disposeAllStateForCurrentJob();
 		assertFalse(
-				isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString()));
+				isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + appId));
 		backend.close();
 
 		assertTrue(backend.getConnections().getFirst().isClosed());
@@ -153,6 +154,7 @@ public class DbStateBackendTest {
 	public void testSerializableState() throws Exception {
 		Environment env = new DummyEnvironment("test", 1, 0);
 		DbStateBackend backend = new DbStateBackend(conf);
+		String appId = env.getApplicationID().toString().substring(0, 16);
 
 		backend.initializeForJob(env, "dummy-ser-state", StringSerializer.INSTANCE);
 
@@ -173,12 +175,12 @@ public class DbStateBackendTest {
 		assertEquals(state2, handle2.getState(getClass().getClassLoader()));
 		handle2.discardState();
 
-		assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString()));
+		assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + appId));
 
 		assertEquals(state3, handle3.getState(getClass().getClassLoader()));
 		handle3.discardState();
 
-		assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString()));
+		assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + appId));
 
 		backend.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractID.java b/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
index 5a57900..27276af 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
@@ -51,9 +51,6 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>,
j
 	/** The memoized value returned by toString() */
 	private String toString;
 
-	/** The memoized value returned by toShortString() */
-	private String toShortString;
-
 	// --------------------------------------------------------------------------------------------
 	
 	/**
@@ -145,7 +142,6 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>,
j
 		this.upperPart = in.readLong();
 
 		this.toString = null;
-		this.toShortString = null;
 	}
 
 	@Override
@@ -188,17 +184,6 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>,
j
 
 		return this.toString;
 	}
-
-	public String toShortString() {
-		if (this.toShortString == null) {
-			final byte[] ba = new byte[SIZE_OF_LONG];
-			longToByteArray(upperPart, ba, 0);
-
-			this.toShortString = StringUtils.byteToHexString(ba);
-		}
-
-		return this.toShortString;
-	}
 	
 	@Override
 	public int compareTo(AbstractID o) {

http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 0111e8c..14ccc0c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -48,7 +48,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 
 		gen.writeNumberField("subtask", execAttempt.getVertex().getParallelSubtaskIndex());
 		gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
-		gen.writeStringField("id", execAttempt.getAttemptId().toShortString());
+		gen.writeStringField("id", execAttempt.getAttemptId().toString());
 		
 		gen.writeArrayFieldStart("user-accumulators");
 		for (StringifiedAccumulatorResult acc : accs) {

http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
index 77b072a..8a753c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
@@ -83,7 +83,7 @@ public class InputGateDeploymentDescriptor implements Serializable {
 	public String toString() {
 		return String.format("InputGateDeploymentDescriptor [result id: %s, " +
 						"consumed subpartition index: %d, input channels: %s]",
-				consumedResultId.toShortString(), consumedSubpartitionIndex,
+				consumedResultId.toString(), consumedSubpartitionIndex,
 				Arrays.toString(inputChannels));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
index af2970d..a18abde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
@@ -72,6 +72,6 @@ public final class ResultPartitionID implements Serializable {
 
 	@Override
 	public String toString() {
-		return partitionId.toShortString() + "@" + producerId.toShortString();
+		return partitionId.toString() + "@" + producerId.toString();
 	}
 }


Mime
View raw message