apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tus...@apache.org
Subject [1/2] apex-core git commit: APEXCORE-522 - Promote singleton usage pattern for String2String, Long2String and other StringCodecs
Date Thu, 15 Dec 2016 09:07:46 GMT
Repository: apex-core
Updated Branches:
  refs/heads/master a9e4e053b -> b74d68967


APEXCORE-522 - Promote singleton usage pattern for String2String, Long2String and other StringCodecs


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/2e54dd08
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/2e54dd08
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/2e54dd08

Branch: refs/heads/master
Commit: 2e54dd0813ebd8f926fede49aef3ae664c645afd
Parents: 93f790a
Author: Vlad Rozov <v.rozov@datatorrent.com>
Authored: Wed Sep 7 09:12:54 2016 -0700
Committer: Vlad Rozov <v.rozov@datatorrent.com>
Committed: Wed Dec 7 15:43:43 2016 -0800

----------------------------------------------------------------------
 .../java/com/datatorrent/api/Attribute.java     |  18 +-
 .../main/java/com/datatorrent/api/Context.java  |  57 +++---
 .../java/com/datatorrent/api/StringCodec.java   | 178 ++++++++++++++++++-
 .../com/datatorrent/api/Object2StringTest.java  |  22 +--
 .../partitioner/StatelessPartitionerTest.java   |   3 +-
 .../stram/plan/logical/LogicalPlan.java         |   8 +-
 6 files changed, 220 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/2e54dd08/api/src/main/java/com/datatorrent/api/Attribute.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java
index 8dede2f..2efc84f 100644
--- a/api/src/main/java/com/datatorrent/api/Attribute.java
+++ b/api/src/main/java/com/datatorrent/api/Attribute.java
@@ -29,12 +29,6 @@ import java.util.Set;
 
 import com.google.common.base.Throwables;
 
-import com.datatorrent.api.StringCodec.Boolean2String;
-import com.datatorrent.api.StringCodec.Enum2String;
-import com.datatorrent.api.StringCodec.Integer2String;
-import com.datatorrent.api.StringCodec.Long2String;
-import com.datatorrent.api.StringCodec.String2String;
-
 /**
  * Attribute represents the attribute which can be set on various components in the system.
  *
@@ -295,17 +289,7 @@ public class Attribute<T> implements Serializable
                 StringCodec<?> codec = null;
                 if (attribute.defaultValue != null) {
                   Class<?> klass = attribute.defaultValue.getClass();
-                  if (klass == String.class) {
-                    codec = new String2String();
-                  } else if (klass == Integer.class) {
-                    codec = new Integer2String();
-                  } else if (klass == Long.class) {
-                    codec = new Long2String();
-                  } else if (klass == Boolean.class) {
-                    codec = new Boolean2String();
-                  } else if (Enum.class.isAssignableFrom(klass)) {
-                    codec = new Enum2String(klass);
-                  }
+                  codec = StringCodec.Factory.getInstance(klass);
                 }
                 if (codec != null) {
                   Field codecField = Attribute.class.getDeclaredField("codec");

http://git-wip-us.apache.org/repos/asf/apex-core/blob/2e54dd08/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 187bf08..3d3cffe 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -173,13 +173,13 @@ public interface Context
      * that can be received on the port. If it is unspecified the engine may use
      * a generic codec.
      */
-    Attribute<StreamCodec<?>> STREAM_CODEC = new Attribute<StreamCodec<?>>(new
Object2String<StreamCodec<?>>());
+    Attribute<StreamCodec<?>> STREAM_CODEC = new Attribute<>(Object2String.<StreamCodec<?>>getInstance());
 
     /**
      * Provides the tuple class which the port receives or emits. While this attribute is
null by default,
      * whether it is needed or not is controlled through the port annotation.
      */
-    Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
+    Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(Class2String.getInstance());
 
     @SuppressWarnings("FieldNameHidesFieldInSuperclass")
     long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class);
@@ -207,12 +207,12 @@ public interface Context
      * is equivalent to infinity; The operator hence will be attempted to be recovered indefinitely
unless this value
      * is set to anything else.
      */
-    Attribute<Integer> RECOVERY_ATTEMPTS = new Attribute<Integer>(new Integer2String());
+    Attribute<Integer> RECOVERY_ATTEMPTS = new Attribute<>(Integer2String.getInstance());
     /**
      * Specify a listener to process and optionally react to operator status updates.
      * The handler will be called for each physical operator as statistics are updated during
heartbeat processing.
      */
-    Attribute<Collection<StatsListener>> STATS_LISTENERS = new Attribute<Collection<StatsListener>>(new
Collection2String<StatsListener>(",", new Object2String<StatsListener>(":")));
+    Attribute<Collection<StatsListener>> STATS_LISTENERS = new Attribute<>(Collection2String.getInstance(",",
Object2String.<StatsListener>getInstance(":")));
     /**
      * Conveys whether the Operator is stateful or stateless. If the operator is stateless,
no checkpointing is required
      * by the engine. The attribute is ignored when the operator was already declared stateless
through the
@@ -231,7 +231,7 @@ public interface Context
     /**
      * The options to be pass to JVM when launching the operator. Options such as java maximum
heap size can be specified here.
      */
-    Attribute<String> JVM_OPTIONS = new Attribute<String>(new String2String());
+    Attribute<String> JVM_OPTIONS = new Attribute<>(String2String.getInstance());
     /**
      * Attribute of the operator that tells the platform how many streaming windows make
1 application window.
      */
@@ -241,7 +241,7 @@ public interface Context
      * slided by duration determined using value of this attribute. Default value is null
which is equivalent to that of {@link #APPLICATION_WINDOW_COUNT}.
      * The value should range between  (0 - {@link #APPLICATION_WINDOW_COUNT})
      */
-    Attribute<Integer> SLIDE_BY_WINDOW_COUNT = new Attribute<Integer>(new Integer2String());
+    Attribute<Integer> SLIDE_BY_WINDOW_COUNT = new Attribute<>(Integer2String.getInstance());
 
     /**
      * Attribute of the operator that hints at the optimal checkpoint boundary.
@@ -257,15 +257,15 @@ public interface Context
      * For example, the user may wish to specify a locality constraint for an input operator
relative to its data source.
      * The attribute can then be set to the host name that is specified in the operator specific
connect string property.
      */
-    Attribute<String> LOCALITY_HOST = new Attribute<String>(new String2String());
+    Attribute<String> LOCALITY_HOST = new Attribute<>(String2String.getInstance());
     /**
      * Name of rack to directly control locality of an operator. Complementary to stream
locality (RACK_LOCAL affinity).
      */
-    Attribute<String> LOCALITY_RACK = new Attribute<String>(new String2String());
+    Attribute<String> LOCALITY_RACK = new Attribute<>(String2String.getInstance());
     /**
      * The agent which can be used to checkpoint the windows.
      */
-    Attribute<StorageAgent> STORAGE_AGENT = new Attribute<StorageAgent>(new Object2String<StorageAgent>());
+    Attribute<StorageAgent> STORAGE_AGENT = new Attribute<>(Object2String.<StorageAgent>getInstance());
     /**
      * The payload processing mode for this operator - at most once, exactly once, or default
at least once.
      * If the processing mode for an operator is specified as AT_MOST_ONCE and no processing
mode is specified for the downstream
@@ -293,27 +293,26 @@ public interface Context
      * If the attribute is not set and the operator implements Partitioner interface, then
the instance of the operator
      * is used otherwise default default partitioning is used.
      */
-    Attribute<Partitioner<? extends Operator>> PARTITIONER = new Attribute<Partitioner<?
extends Operator>>(new Object2String<Partitioner<? extends Operator>>());
+    Attribute<Partitioner<? extends Operator>> PARTITIONER = new Attribute<>(Object2String.<Partitioner<?
extends Operator>>getInstance());
 
     /**
      * Aggregates physical counters to a logical counter.
      * @deprecated  use {@link #METRICS_AGGREGATOR}
      */
     @Deprecated
-    Attribute<CountersAggregator> COUNTERS_AGGREGATOR = new Attribute<CountersAggregator>(new
Object2String<CountersAggregator>());
+    Attribute<CountersAggregator> COUNTERS_AGGREGATOR = new Attribute<>(Object2String.<CountersAggregator>getInstance());
 
     /**
      * Aggregates metrics of physical instances of an operator. This handler is called with
the metrics data of a
      * particular window from all the physical instances so that it can be aggregated into
a logical view.
      */
-    Attribute<AutoMetric.Aggregator> METRICS_AGGREGATOR = new Attribute<AutoMetric.Aggregator>(new
Object2String<AutoMetric.Aggregator>());
+    Attribute<AutoMetric.Aggregator> METRICS_AGGREGATOR = new Attribute<>(Object2String.<AutoMetric.Aggregator>getInstance());
 
     /**
      * Provides dimension aggregations and time buckets information for logical metrics.
The information provided
      * by this construct is conveyed to tracker application and influences the aggregations
done on it by the tracker.
      */
-    Attribute<AutoMetric.DimensionsScheme> METRICS_DIMENSIONS_SCHEME = new Attribute<AutoMetric.DimensionsScheme>(new
-        Object2String<AutoMetric.DimensionsScheme>());
+    Attribute<AutoMetric.DimensionsScheme> METRICS_DIMENSIONS_SCHEME = new Attribute<>(Object2String.<AutoMetric.DimensionsScheme>getInstance());
 
     /**
      * Return the operator runtime id.
@@ -353,7 +352,7 @@ public interface Context
     /**
      * URL to the application's documentation.
      */
-    Attribute<String> APPLICATION_DOC_LINK = new Attribute<String>(new String2String());
+    Attribute<String> APPLICATION_DOC_LINK = new Attribute<>(String2String.getInstance());
 
     /**
      * URL to the application's app data, if any. If not set, an empty string is the default.
@@ -364,12 +363,12 @@ public interface Context
      * <code>"http://mynetwork.net/my/appdata/dashboard?appId=application_1355713111917_0002"</code>.
      * </p>
      */
-    Attribute<String> APPLICATION_DATA_LINK = new Attribute<String>(new String2String());
+    Attribute<String> APPLICATION_DATA_LINK = new Attribute<>(String2String.getInstance());
     /**
      * Transport to push the stats and the metrics.
      * If using the built-in transport, please use an AutoMetricBuiltInTransport object
      */
-    Attribute<AutoMetric.Transport> METRICS_TRANSPORT = new Attribute<>(new Object2String<AutoMetric.Transport>());
+    Attribute<AutoMetric.Transport> METRICS_TRANSPORT = new Attribute<>(Object2String.<AutoMetric.Transport>getInstance());
     /**
      * Application instance identifier. An application with the same name can run in multiple
instances, each with a
      * unique identifier. The identifier is set by the client that submits the application
and can be used in operators
@@ -379,12 +378,12 @@ public interface Context
      * <code>application_1355713111917_0002</code>). Note that only the full
id string uniquely identifies an application,
      * the integer offset will reset on RM restart.
      */
-    Attribute<String> APPLICATION_ID = new Attribute<String>(new String2String());
+    Attribute<String> APPLICATION_ID = new Attribute<>(String2String.getInstance());
     /**
      * Application package source. If the application is launched using an app package, this
attribute contains the
      * information of the app package. It is in the format of {user}|{appPackageName}|{appPackageVersion}
      */
-    Attribute<String> APP_PACKAGE_SOURCE = new Attribute<String>(new String2String());
+    Attribute<String> APP_PACKAGE_SOURCE = new Attribute<>(String2String.getInstance());
     /**
      * Dump extra debug information in launcher, master and containers.
      */
@@ -392,7 +391,7 @@ public interface Context
     /**
      * The options to be pass to JVM when launching the containers. Options such as java
maximum heap size can be specified here.
      */
-    Attribute<String> CONTAINER_JVM_OPTIONS = new Attribute<String>(new String2String());
+    Attribute<String> CONTAINER_JVM_OPTIONS = new Attribute<>(String2String.getInstance());
     /**
      * The amount of memory to be requested for the application master. Not used in local
mode.
      * Default value is 1GB.
@@ -414,7 +413,7 @@ public interface Context
     /**
      * The path to store application dependencies, recording and other generated files for
application master and containers.
      */
-    Attribute<String> APPLICATION_PATH = new Attribute<String>(new String2String());
+    Attribute<String> APPLICATION_PATH = new Attribute<>(String2String.getInstance());
     /**
      * The size limit for a file where tuple recordings are stored. When tuples are being
recorded they are stored
      * in files. When a file size reaches this limit a new file is created and tuples start
getting stored in the new file. Default value is 128k.
@@ -429,7 +428,7 @@ public interface Context
     /**
      * Address to which the application side connects to DT Gateway, in the form of host:port.
This will override "dt.gateway.listenAddress" in the configuration.
      */
-    Attribute<String> GATEWAY_CONNECT_ADDRESS = new Attribute<String>(new String2String());
+    Attribute<String> GATEWAY_CONNECT_ADDRESS = new Attribute<>(String2String.getInstance());
     /**
      * Whether or not gateway is expecting SSL connection.
      */
@@ -437,11 +436,11 @@ public interface Context
     /**
      * The username for logging in to the gateway, if authentication is enabled.
      */
-    Attribute<String> GATEWAY_USER_NAME = new Attribute<String>(new String2String());
+    Attribute<String> GATEWAY_USER_NAME = new Attribute<>(String2String.getInstance());
     /**
      * The password for logging in to the gateway, if authentication is enabled.
      */
-    Attribute<String> GATEWAY_PASSWORD = new Attribute<String>(new String2String());
+    Attribute<String> GATEWAY_PASSWORD = new Attribute<>(String2String.getInstance());
     /**
      * The timeout when connecting to the pubsub service in gateway
      */
@@ -494,18 +493,18 @@ public interface Context
     /**
      * The agent which can be used to find the jvm options for the container.
      */
-    Attribute<ContainerOptConfigurator> CONTAINER_OPTS_CONFIGURATOR = new Attribute<ContainerOptConfigurator>(new
Object2String<ContainerOptConfigurator>());
+    Attribute<ContainerOptConfigurator> CONTAINER_OPTS_CONFIGURATOR = new Attribute<>(Object2String.<ContainerOptConfigurator>getInstance());
     /**
      * The policy for enabling stram web services authentication.<br/>
      * See {@link StramHTTPAuthentication} for the different options.<br/>
      * Default value is StramHTTPAuthentication.FOLLOW_HADOOP_AUTH
      */
-    Attribute<StramHTTPAuthentication> STRAM_HTTP_AUTHENTICATION = new Attribute<>(StramHTTPAuthentication.FOLLOW_HADOOP_AUTH,
new StringCodec.Enum2String<>(StramHTTPAuthentication.class));
+    Attribute<StramHTTPAuthentication> STRAM_HTTP_AUTHENTICATION = new Attribute<>(StramHTTPAuthentication.FOLLOW_HADOOP_AUTH,
StringCodec.Enum2String.getInstance(StramHTTPAuthentication.class));
     /**
      * The string codec map for classes that are to be set or get through properties as strings.
      * Only supports string codecs that have a constructor with no arguments
      */
-    Attribute<Map<Class<?>, Class<? extends StringCodec<?>>>>
STRING_CODECS = new Attribute<Map<Class<?>, Class<? extends StringCodec<?>>>>(new
Map2String<Class<?>, Class<? extends StringCodec<?>>>(",", "=", new
Class2String<Object>(), new Class2String<StringCodec<?>>()));
+    Attribute<Map<Class<?>, Class<? extends StringCodec<?>>>>
STRING_CODECS = new Attribute<>(Map2String.getInstance(",", "=", Class2String.getInstance(),
Class2String.<StringCodec<?>>getInstance()));
 
     /**
      * The number of consecutive container failures that should lead to
@@ -522,7 +521,7 @@ public interface Context
     /**
      * Affinity rules for specifying affinity and anti-affinity between logical operators
      */
-    Attribute<AffinityRulesSet> AFFINITY_RULES_SET = new Attribute<AffinityRulesSet>(new
JsonStringCodec<AffinityRulesSet>(AffinityRulesSet.class));
+    Attribute<AffinityRulesSet> AFFINITY_RULES_SET = new Attribute<>(JsonStringCodec.getInstance(AffinityRulesSet.class));
 
     /**
      * Comma separated list of jar file dependencies to be deployed with the application.
@@ -530,7 +529,7 @@ public interface Context
      * that are made available through the distributed file system to application master
      * and child containers.
      */
-    Attribute<String> LIBRARY_JARS = new Attribute<>(new StringCodec.String2String());
+    Attribute<String> LIBRARY_JARS = new Attribute<>(String2String.getInstance());
 
     @SuppressWarnings(value = "FieldNameHidesFieldInSuperclass")
     long serialVersionUID = AttributeMap.AttributeInitializer.initialize(DAGContext.class);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/2e54dd08/api/src/main/java/com/datatorrent/api/StringCodec.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/StringCodec.java b/api/src/main/java/com/datatorrent/api/StringCodec.java
index 2cd6a4f..d4a0a41 100644
--- a/api/src/main/java/com/datatorrent/api/StringCodec.java
+++ b/api/src/main/java/com/datatorrent/api/StringCodec.java
@@ -62,8 +62,44 @@ public interface StringCodec<T>
    */
   String toString(T pojo);
 
+  class Factory
+  {
+    public static StringCodec<?> getInstance(Class<?> cls)
+    {
+      if (cls == String.class) {
+        return String2String.getInstance();
+      } else if (cls == Integer.class) {
+        return Integer2String.getInstance();
+      } else if (cls == Long.class) {
+        return Long2String.getInstance();
+      } else if (cls == Boolean.class) {
+        return Boolean2String.getInstance();
+      } else if (Enum.class.isAssignableFrom(cls)) {
+        return Enum2String.getInstance(cls);
+      } else {
+        return null;
+      }
+    }
+  }
+
   class String2String implements StringCodec<String>, Serializable
   {
+    @SuppressWarnings("deprecation")
+    private static final String2String instance = new String2String();
+
+    public static StringCodec<String> getInstance()
+    {
+      return instance;
+    }
+
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #getInstance()}
+     */
+    @Deprecated
+    public String2String()
+    {
+    }
+
     @Override
     public String fromString(String string)
     {
@@ -81,6 +117,22 @@ public interface StringCodec<T>
 
   class Integer2String implements StringCodec<Integer>, Serializable
   {
+    @SuppressWarnings("deprecation")
+    private static final Integer2String instance = new Integer2String();
+
+    public static StringCodec<Integer> getInstance()
+    {
+      return instance;
+    }
+
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #getInstance()}
+     */
+    @Deprecated
+    public Integer2String()
+    {
+    }
+
     @Override
     public Integer fromString(String string)
     {
@@ -98,6 +150,22 @@ public interface StringCodec<T>
 
   class Long2String implements StringCodec<Long>, Serializable
   {
+    @SuppressWarnings("deprecation")
+    private static final Long2String instance = new Long2String();
+
+    public static StringCodec<Long> getInstance()
+    {
+      return instance;
+    }
+
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #getInstance()}
+     */
+    @Deprecated
+    public Long2String()
+    {
+    }
+
     @Override
     public Long fromString(String string)
     {
@@ -115,6 +183,22 @@ public interface StringCodec<T>
 
   class Boolean2String implements StringCodec<Boolean>, Serializable
   {
+    @SuppressWarnings("deprecation")
+    private static final Boolean2String instance = new Boolean2String();
+
+    public static StringCodec<Boolean> getInstance()
+    {
+      return instance;
+    }
+
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #getInstance()}
+     */
+    @Deprecated
+    public Boolean2String()
+    {
+    }
+
     @Override
     public Boolean fromString(String string)
     {
@@ -148,21 +232,52 @@ public interface StringCodec<T>
    */
   class Object2String<T> implements StringCodec<T>, Serializable
   {
+    @SuppressWarnings("deprecation")
+    private static final Object2String instance = new Object2String();
+
+    public static <T> StringCodec<T> getInstance()
+    {
+      return instance;
+    }
+
+    public static <T> StringCodec<T> getInstance(String separator)
+    {
+      return getInstance(separator, "=");
+    }
+
+    @SuppressWarnings("deprecation")
+    public static <T> StringCodec<T> getInstance(String separator, String propertySeparator)
+    {
+      return new Object2String<>(separator, propertySeparator);
+    }
+
     public final String separator;
     public final String propertySeparator;
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #getInstance()}
+     */
+    @SuppressWarnings("deprecation")
+    @Deprecated
     public Object2String()
     {
-      separator = ":";
-      propertySeparator = "=";
+      this(":", "=");
     }
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #getInstance(String)}
+     */
+    @SuppressWarnings("deprecation")
+    @Deprecated
     public Object2String(String separator)
     {
-      this.separator = separator;
-      this.propertySeparator = "=";
+      this(separator, "=");
     }
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #getInstance(String, String)}
+     */
+    @Deprecated
     public Object2String(String separator, String propertySeparator)
     {
       this.separator = separator;
@@ -219,11 +334,21 @@ public interface StringCodec<T>
 
   class Map2String<K, V> implements StringCodec<Map<K, V>>, Serializable
   {
+    @SuppressWarnings("deprecation")
+    public static <K, V> StringCodec<Map<K, V>> getInstance(String separator,
String equal, StringCodec<K> keyCodec, StringCodec<V> valueCodec)
+    {
+      return new Map2String<>(separator, equal, keyCodec, valueCodec);
+    }
+
     private final StringCodec<K> keyCodec;
     private final StringCodec<V> valueCodec;
     private final String separator;
     private final String equal;
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #getInstance(String, String, StringCodec,
StringCodec)}
+     */
+    @Deprecated
     public Map2String(String separator, String equal, StringCodec<K> keyCodec, StringCodec<V>
valueCodec)
     {
       this.equal = equal;
@@ -276,9 +401,19 @@ public interface StringCodec<T>
 
   class Collection2String<T> implements StringCodec<Collection<T>>, Serializable
   {
+    @SuppressWarnings("deprecation")
+    public static <T> StringCodec<Collection<T>> getInstance(String separator,
StringCodec<T> codec)
+    {
+      return new Collection2String<>(separator, codec);
+    }
+
     private final String separator;
     private final StringCodec<T> codec;
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #getInstance(String, StringCodec)}
+     */
+    @Deprecated
     public Collection2String(String separator, StringCodec<T> codec)
     {
       this.separator = separator;
@@ -334,6 +469,16 @@ public interface StringCodec<T>
   {
     private final Class<T> clazz;
 
+    @SuppressWarnings("deprecation")
+    public static Enum2String getInstance(Class clazz)
+    {
+      return new Enum2String(clazz);
+    }
+
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #getInstance(Class<T>)}
+     */
+    @Deprecated
     public Enum2String(Class<T> clazz)
     {
       this.clazz = clazz;
@@ -356,6 +501,21 @@ public interface StringCodec<T>
 
   class Class2String<T> implements StringCodec<Class<? extends T>>, Serializable
   {
+    @SuppressWarnings("deprecation")
+    private static final StringCodec instance = new Class2String<>();
+
+    public static <T> StringCodec<Class<? extends T>> getInstance()
+    {
+      return (StringCodec<Class<? extends T>>)instance;
+    }
+
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #getInstance()}
+     */
+    public Class2String()
+    {
+    }
+
     @Override
     @SuppressWarnings({"BroadCatchBlock", "TooBroadCatch"})
     public Class<? extends T> fromString(String string)
@@ -381,8 +541,18 @@ public interface StringCodec<T>
   class JsonStringCodec<T> implements StringCodec<T>, Serializable
   {
     private static final long serialVersionUID = 2513932518264776006L;
+
+    @SuppressWarnings("deprecation")
+    public static <T> StringCodec<T> getInstance(Class<T> clazz)
+    {
+      return new JsonStringCodec<>(clazz);
+    }
+
     Class<?> clazz;
 
+    /**
+     * @deprecated As of release 3.5.0, replaced by {@link #getInstance(Class)}
+     */
     public JsonStringCodec(Class<T> clazz)
     {
       this.clazz = clazz;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/2e54dd08/api/src/test/java/com/datatorrent/api/Object2StringTest.java
----------------------------------------------------------------------
diff --git a/api/src/test/java/com/datatorrent/api/Object2StringTest.java b/api/src/test/java/com/datatorrent/api/Object2StringTest.java
index e42a462..98c584b 100644
--- a/api/src/test/java/com/datatorrent/api/Object2StringTest.java
+++ b/api/src/test/java/com/datatorrent/api/Object2StringTest.java
@@ -20,17 +20,21 @@ package com.datatorrent.api;
 
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
+import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * This tests the Object2String codec
  */
 public class Object2StringTest
 {
+  private StringCodec<TestBean> bean2String;
+
   public static class TestBean
   {
     private int intVal;
@@ -127,10 +131,16 @@ public class Object2StringTest
     }
   }
 
+  @Before
+  public void setup()
+  {
+    bean2String = StringCodec.Object2String.getInstance();
+    assertTrue(bean2String instanceof StringCodec.Object2String);
+  }
+
   @Test
   public void testBeanCodecWithoutConstructorWithoutProperty()
   {
-    StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>();
     String bean = TestBean.class.getName();
     TestBean obj = bean2String.fromString(bean);
     assertEquals("validating the bean",obj,new TestBean());
@@ -139,7 +149,6 @@ public class Object2StringTest
   @Test
   public void testBeanCodecWithConstructorSet()
   {
-    StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>();
     String bean = TestBean.class.getName() + ":testVal";
     TestBean obj = bean2String.fromString(bean);
     assertEquals("validating the bean", obj, new TestBean("testVal"));
@@ -148,7 +157,6 @@ public class Object2StringTest
   @Test
   public void testBeanCodecWithConstructorPropertySet()
   {
-    StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>();
     String bean = TestBean.class.getName() + ":testVal:intVal=10:stringVal=strVal";
     TestBean obj = bean2String.fromString(bean);
     TestBean expectedBean = new TestBean("testVal");
@@ -160,7 +168,6 @@ public class Object2StringTest
   @Test
   public void testBeanCodecWithConstructorSetEmptyProperties()
   {
-    StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>();
     String bean = TestBean.class.getName() + ":testVal:";
     TestBean obj = bean2String.fromString(bean);
     assertEquals("validating the bean",obj,new TestBean("testVal"));
@@ -169,7 +176,6 @@ public class Object2StringTest
   @Test
   public void testBeanCodecOnlyEmptyConstructor()
   {
-    StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>();
     String bean = TestBean.class.getName() + ":";
     TestBean obj = bean2String.fromString(bean);
     assertEquals("validating the bean",obj,new TestBean());
@@ -178,7 +184,6 @@ public class Object2StringTest
   @Test
   public void testBeanCodecOnlyConstructor()
   {
-    StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>();
     String bean = TestBean.class.getName() + ": ";
     TestBean obj = bean2String.fromString(bean);
     assertEquals("validating the bean",obj,new TestBean(" "));
@@ -187,7 +192,6 @@ public class Object2StringTest
   @Test
   public void testBeanCodecEmptyConstructorEmptyProperty()
   {
-    StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>();
     String bean = TestBean.class.getName() + "::";
     TestBean obj = bean2String.fromString(bean);
     assertEquals("validating the bean",obj,new TestBean());
@@ -196,7 +200,6 @@ public class Object2StringTest
   @Test
   public void testBeanCodecWithProperty()
   {
-    StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>();
     String bean = TestBean.class.getName() + "::intVal=1";
     TestBean obj = bean2String.fromString(bean);
     TestBean expectedBean = new TestBean("");
@@ -207,7 +210,6 @@ public class Object2StringTest
   @Test
   public void testBeanCodecWithAllProperties()
   {
-    StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>();
     String bean = TestBean.class.getName() + "::intVal=1:stringVal=testStr:longVal=10";
     TestBean obj = bean2String.fromString(bean);
     TestBean expectedBean = new TestBean("testStr");
@@ -219,7 +221,6 @@ public class Object2StringTest
   @Test
   public void testBeanWithWrongClassName()
   {
-    StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>();
     String bean = TestBean.class.getName() + "1::intVal=1";
     try {
       bean2String.fromString(bean);
@@ -237,7 +238,6 @@ public class Object2StringTest
   @Test
   public void testBeanFailure()
   {
-    StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>();
     String bean = TestBean.class.getName() + "::intVal=1:stringVal=hello:longVal=10";
     TestBean obj = bean2String.fromString(bean);
     TestBean expectedBean = new TestBean("hello");

http://git-wip-us.apache.org/repos/asf/apex-core/blob/2e54dd08/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
index 687957c..e7c4887 100644
--- a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
+++ b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java
@@ -33,6 +33,7 @@ import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.Partitioner.Partition;
+import com.datatorrent.api.StringCodec;
 import com.datatorrent.api.StringCodec.Object2String;
 
 public class StatelessPartitionerTest
@@ -127,7 +128,7 @@ public class StatelessPartitionerTest
   @Test
   public void objectPropertyTest()
   {
-    Object2String<StatelessPartitioner<DummyOperator>> propertyReader = new Object2String<StatelessPartitioner<DummyOperator>>();
+    StringCodec<StatelessPartitioner<DummyOperator>> propertyReader = Object2String.getInstance();
     StatelessPartitioner<DummyOperator> partitioner = propertyReader.fromString("com.datatorrent.common.partitioner.StatelessPartitioner:3");
     Assert.assertEquals(3, partitioner.getPartitionCount());
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/2e54dd08/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index f1ccaef..1371ce8 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -157,8 +157,8 @@ public class LogicalPlan implements Serializable, DAG
   public static Attribute<Boolean> FAST_PUBLISHER_SUBSCRIBER = new Attribute<>(false);
   public static Attribute<Long> HDFS_TOKEN_LIFE_TIME = new Attribute<>(604800000L);
   public static Attribute<Long> RM_TOKEN_LIFE_TIME = new Attribute<>(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
-  public static Attribute<String> PRINCIPAL = new Attribute<String>(null, new
StringCodec.String2String());
-  public static Attribute<String> KEY_TAB_FILE = new Attribute<>((String)null,
new StringCodec.String2String());
+  public static Attribute<String> PRINCIPAL = new Attribute<>(null, StringCodec.String2String.getInstance());
+  public static Attribute<String> KEY_TAB_FILE = new Attribute<>((String)null,
StringCodec.String2String.getInstance());
   public static Attribute<Double> TOKEN_REFRESH_ANTICIPATORY_FACTOR = new Attribute<>(0.7);
   /**
    * Comma separated list of archives to be deployed with the application.
@@ -166,13 +166,13 @@ public class LogicalPlan implements Serializable, DAG
    * that are made available through the distributed file system to application master
    * and child containers.
    */
-  public static Attribute<String> ARCHIVES = new Attribute<>(new StringCodec.String2String());
+  public static Attribute<String> ARCHIVES = new Attribute<>(StringCodec.String2String.getInstance());
   /**
    * Comma separated list of files to be deployed with the application. The launcher will
include the files into the
    * final set of resources that are made available through the distributed file system to
application master and child
    * containers.
    */
-  public static Attribute<String> FILES = new Attribute<>(new StringCodec.String2String());
+  public static Attribute<String> FILES = new Attribute<>(StringCodec.String2String.getInstance());
   /**
    * The maximum number of containers (excluding the application master) that the application
is allowed to request.
    * If the DAG plan requires less containers, remaining count won't be allocated from the
resource manager.


Mime
View raw message