apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject incubator-apex-core git commit: APEXCORE-349 return strings for attribute values in REST service
Date Sat, 20 Feb 2016 01:16:43 GMT
Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.3 c7d27d643 -> fafc4242f


APEXCORE-349 return strings for attribute values in REST service


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/fafc4242
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/fafc4242
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/fafc4242

Branch: refs/heads/release-3.3
Commit: fafc4242fd94714aed4857b552f530cb5f1b84e8
Parents: c7d27d6
Author: David Yan <david@datatorrent.com>
Authored: Wed Feb 17 13:43:35 2016 -0800
Committer: Thomas Weise <thomas@datatorrent.com>
Committed: Fri Feb 19 17:07:36 2016 -0800

----------------------------------------------------------------------
 .../java/com/datatorrent/api/StringCodec.java   |  8 +++++++
 .../metric/AutoMetricBuiltInTransport.java      |  6 +++++
 .../stram/StreamingContainerManager.java        | 23 +++++++++-----------
 .../stram/plan/logical/LogicalPlan.java         |  2 +-
 .../com/datatorrent/stram/webapp/AppInfo.java   |  6 ++---
 .../stram/webapp/StramWebServices.java          | 23 ++++++++++++++------
 .../stram/StreamingContainerManagerTest.java    |  4 ++--
 .../stram/support/StramTestSupport.java         | 10 ++++-----
 .../stram/webapp/StramWebServicesTest.java      | 19 +++++++++++++++-
 9 files changed, 68 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/api/src/main/java/com/datatorrent/api/StringCodec.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/StringCodec.java b/api/src/main/java/com/datatorrent/api/StringCodec.java
index 72d5f34..8d39320 100644
--- a/api/src/main/java/com/datatorrent/api/StringCodec.java
+++ b/api/src/main/java/com/datatorrent/api/StringCodec.java
@@ -134,6 +134,11 @@ public interface StringCodec<T>
    * string as an argument.If properties are specified then properties will be set on the
object. The properties
    * are defined in property=value format separated by colon(:)
    *
+   * Note that the {@link #toString(Object) toString} method is by default NOT the proper
reverse of the {@link
+   * #fromString(String) fromString} method. In order for the {@link #toString(Object) toString}
method to become a
+   * proper reverse of the {@link #fromString(String) fromString} method, T's {@link T#toString()
toString} method
+   * must output null or <Constructor_String> or the <Constructor_String>:<Property_String>
format as stated above.
+   *
    * @param <T> Type of the object which is converted to/from String
    */
   public class Object2String<T> implements StringCodec<T>, Serializable
@@ -195,6 +200,9 @@ public interface StringCodec<T>
     @Override
     public String toString(T pojo)
     {
+      if (pojo == null) {
+        return null;
+      }
       String arg = pojo.toString();
       if (arg == null) {
         return pojo.getClass().getCanonicalName();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java
b/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java
index 1a71ee1..ebd1e7f 100644
--- a/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java
+++ b/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java
@@ -48,6 +48,12 @@ public class AutoMetricBuiltInTransport implements AutoMetric.Transport,
Seriali
   }
 
   @Override
+  public String toString()
+  {
+    return this.topic;
+  }
+
+  @Override
   public void push(String jsonData) throws IOException
   {
     throw new UnsupportedOperationException("This class is a placeholder and is supposed
to replaced by internal " +

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index df3bfc4..0e7091b 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -2837,7 +2837,7 @@ public class StreamingContainerManager implements PlanContext
     }
   }
 
-  public Map<String, Object> getPortAttributes(String operatorId, String portName)
+  public Attribute.AttributeMap getPortAttributes(String operatorId, String portName)
   {
     OperatorMeta logicalOperator = plan.getLogicalPlan().getOperatorMeta(operatorId);
     if (logicalOperator == null) {
@@ -2848,24 +2848,21 @@ public class StreamingContainerManager implements PlanContext
     Operators.describe(logicalOperator.getOperator(), portMap);
     PortContextPair<InputPort<?>> inputPort = portMap.inputPorts.get(portName);
     if (inputPort != null) {
-      HashMap<String, Object> portAttributeMap = new HashMap<String, Object>();
       InputPortMeta portMeta = logicalOperator.getMeta(inputPort.component);
-      Map<Attribute<Object>, Object> rawAttributes = Attribute.AttributeMap.AttributeInitializer.getAllAttributes(portMeta,
Context.PortContext.class);
-      for (Map.Entry<Attribute<Object>, Object> attEntry : rawAttributes.entrySet())
{
-        portAttributeMap.put(attEntry.getKey().getSimpleName(), attEntry.getValue());
+      try {
+        return portMeta.getAttributes().clone();
+      } catch (CloneNotSupportedException ex) {
+        throw new RuntimeException("Cannot clone port attributes", ex);
       }
-      return portAttributeMap;
-    }
-    else {
+    } else {
       PortContextPair<OutputPort<?>> outputPort = portMap.outputPorts.get(portName);
       if (outputPort != null) {
-        HashMap<String, Object> portAttributeMap = new HashMap<String, Object>();
         OutputPortMeta portMeta = logicalOperator.getMeta(outputPort.component);
-        Map<Attribute<Object>, Object> rawAttributes = Attribute.AttributeMap.AttributeInitializer.getAllAttributes(portMeta,
Context.PortContext.class);
-        for (Map.Entry<Attribute<Object>, Object> attEntry : rawAttributes.entrySet())
{
-          portAttributeMap.put(attEntry.getKey().getSimpleName(), attEntry.getValue());
+        try {
+          return portMeta.getAttributes().clone();
+        } catch (CloneNotSupportedException ex) {
+          throw new RuntimeException("Cannot clone port attributes", ex);
         }
-        return portAttributeMap;
       }
       throw new IllegalArgumentException("Invalid port name " + portName);
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 6d7ebe1..8fa71bb 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1923,7 +1923,7 @@ public class LogicalPlan implements Serializable, DAG
    * @see <a href="http://en.wikipedia.org/wiki/Tarjan%E2%80%99s_strongly_connected_components_algorithm">http://en.wikipedia.org/wiki/Tarjan%E2%80%99s_strongly_connected_components_algorithm</a>
    *
    * @param om
-   * @param cycles
+   * @param ctx
    */
   public void findStronglyConnected(OperatorMeta om, ValidationContext ctx)
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/engine/src/main/java/com/datatorrent/stram/webapp/AppInfo.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/AppInfo.java b/engine/src/main/java/com/datatorrent/stram/webapp/AppInfo.java
index 3e6a4af..109a401 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/AppInfo.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/AppInfo.java
@@ -60,7 +60,7 @@ public class AppInfo {
   protected boolean gatewayConnected;
   protected List<AppDataSource> appDataSources;
   protected Map<String, Object> metrics;
-  public Map<String, Object> attributes;
+  public Map<String, String> attributes;
   public String appMasterTrackingUrl;
   public String version;
   public AppStats stats;
@@ -215,9 +215,9 @@ public class AppInfo {
     this.stats = context.getStats();
     this.gatewayAddress = context.getGatewayAddress();
     this.version = VersionInfo.getBuildVersion();
-    this.attributes = new TreeMap<String, Object>();
+    this.attributes = new TreeMap<>();
     for (Map.Entry<Attribute<Object>, Object> entry : AttributeMap.AttributeInitializer.getAllAttributes(context,
DAGContext.class).entrySet()) {
-      this.attributes.put(entry.getKey().getSimpleName(), entry.getValue());
+      this.attributes.put(entry.getKey().getSimpleName(), entry.getKey().codec.toString(entry.getValue()));
     }
     this.gatewayConnected = context.isGatewayConnected();
     this.appDataSources = context.getAppDataSources();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
index fd47d35..f735253 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
@@ -602,10 +602,11 @@ public class StramWebServices
     if (logicalOperator == null) {
       throw new NotFoundException();
     }
-    HashMap<String, Object> map = new HashMap<String, Object>();
+    HashMap<String, String> map = new HashMap<>();
     for (Entry<Attribute<?>, Object> entry : dagManager.getOperatorAttributes(operatorName).entrySet())
{
-      if (attributeName == null || entry.getKey().name.equals(attributeName)) {
-        map.put(entry.getKey().name, entry.getValue());
+      if (attributeName == null || entry.getKey().getSimpleName().equals(attributeName))
{
+        Entry<Attribute<Object>, Object> entry1 = (Entry<Attribute<Object>,
Object>)(Entry)entry;
+        map.put(entry1.getKey().getSimpleName(), entry1.getKey().codec.toString(entry1.getValue()));
       }
     }
     return new JSONObject(map);
@@ -616,10 +617,11 @@ public class StramWebServices
   @Produces(MediaType.APPLICATION_JSON)
   public JSONObject getApplicationAttributes(@QueryParam("attributeName") String attributeName)
   {
-    HashMap<String, Object> map = new HashMap<String, Object>();
+    HashMap<String, String> map = new HashMap<>();
     for (Entry<Attribute<?>, Object> entry : dagManager.getApplicationAttributes().entrySet())
{
-      if (attributeName == null || entry.getKey().name.equals(attributeName)) {
-        map.put(entry.getKey().name, entry.getValue());
+      if (attributeName == null || entry.getKey().getSimpleName().equals(attributeName))
{
+        Entry<Attribute<Object>, Object> entry1 = (Entry<Attribute<Object>,
Object>)(Entry)entry;
+        map.put(entry1.getKey().getSimpleName(), entry1.getKey().codec.toString(entry1.getValue()));
       }
     }
     return new JSONObject(map);
@@ -736,7 +738,14 @@ public class StramWebServices
     if (logicalOperator == null) {
       throw new NotFoundException();
     }
-    return new JSONObject(dagManager.getPortAttributes(operatorName, portName));
+    HashMap<String, String> map = new HashMap<>();
+    for (Entry<Attribute<?>, Object> entry : dagManager.getPortAttributes(operatorName,
portName).entrySet()) {
+      if (attributeName == null || entry.getKey().getSimpleName().equals(attributeName))
{
+        Entry<Attribute<Object>, Object> entry1 = (Entry<Attribute<Object>,
Object>)(Entry)entry;
+        map.put(entry1.getKey().getSimpleName(), entry1.getKey().codec.toString(entry1.getValue()));
+      }
+    }
+    return new JSONObject(map);
   }
 
   @GET

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 8fc957b..5edc582 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -938,7 +938,7 @@ public class StreamingContainerManagerTest
       dag.setAttribute(LogicalPlan.GATEWAY_CONNECT_ADDRESS, "localhost:" + port);
       StramLocalCluster lc = new StramLocalCluster(dag);
       StreamingContainerManager dnmgr = lc.dnmgr;
-      StramAppContext appContext = new StramTestSupport.TestAppContext();
+      StramAppContext appContext = new StramTestSupport.TestAppContext(dag.getAttributes());
 
       AppDataPushAgent pushAgent = new AppDataPushAgent(dnmgr, appContext);
       pushAgent.init();
@@ -1000,7 +1000,7 @@ public class StreamingContainerManagerTest
     dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, new TestMetricTransport("xyz"));
     StramLocalCluster lc = new StramLocalCluster(dag);
     StreamingContainerManager dnmgr = lc.dnmgr;
-    StramAppContext appContext = new StramTestSupport.TestAppContext();
+    StramAppContext appContext = new StramTestSupport.TestAppContext(dag.getAttributes());
 
     AppDataPushAgent pushAgent = new AppDataPushAgent(dnmgr, appContext);
     pushAgent.init();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
index cf2a887..ad66ede 100644
--- a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
+++ b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java
@@ -130,9 +130,7 @@ abstract public class StramTestSupport
   /**
    * Create an appPackage zip using the sample appPackage located in
    * src/test/resources/testAppPackage/testAppPackageSrc.
-   * @param file  The file whose path will be used to create the appPackage zip
    * @return      The File object that can be used in the AppPackage constructor.
-   * @throws net.lingala.zip4j.exception.ZipException
    */
   public static File createAppPackageFile()
   {
@@ -479,16 +477,16 @@ abstract public class StramTestSupport
     final long startTime = System.currentTimeMillis();
     final String gatewayAddress = "localhost:9090";
 
-    public TestAppContext(int appid, int numJobs, int numTasks, int numAttempts)
+    public TestAppContext(Attribute.AttributeMap attributeMap, int appid, int numJobs, int
numTasks, int numAttempts)
     {
-      super(new Attribute.AttributeMap.DefaultAttributeMap(), null); // this needs to be
done in a proper way - may cause application errors.
+      super(attributeMap, null); // this needs to be done in a proper way - may cause application
errors.
       this.appID = ApplicationId.newInstance(0, appid);
       this.appAttemptID = ApplicationAttemptId.newInstance(this.appID, numAttempts);
     }
 
-    public TestAppContext()
+    public TestAppContext(Attribute.AttributeMap attributeMap)
     {
-      this(0, 1, 1, 1);
+      this(attributeMap, 0, 1, 1, 1);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
index d44be87..c083891 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java
@@ -46,6 +46,7 @@ import com.sun.jersey.test.framework.WebAppDescriptor;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -60,6 +61,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
 
 import com.datatorrent.api.Context;
+import com.datatorrent.common.metric.AutoMetricBuiltInTransport;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.stram.StramAppContext;
 import com.datatorrent.stram.StreamingContainerManager;
@@ -133,9 +135,10 @@ public class StramWebServicesTest extends JerseyTest
         String workingDir = new File("target", StramWebServicesTest.class.getName()).getAbsolutePath();
         dag.setAttribute(LogicalPlan.APPLICATION_PATH, workingDir);
         dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir,
null));
+        dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, new AutoMetricBuiltInTransport("xyz"));
         final DummyStreamingContainerManager streamingContainerManager = new DummyStreamingContainerManager(dag);
 
-        appContext = new TestAppContext();
+        appContext = new TestAppContext(dag.getAttributes());
         bind(JAXBContextResolver.class);
         bind(StramWebServices.class);
         bind(GenericExceptionHandler.class);
@@ -332,6 +335,20 @@ public class StramWebServicesTest extends JerseyTest
     }
   }
 
+
+  @Test
+  public void testAttributes() throws Exception
+  {
+    WebResource r = resource();
+    ClientResponse response = r.path(StramWebServices.PATH + "/")
+        .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+    JSONObject json = response.getEntity(JSONObject.class);
+    JSONObject attrs = json.getJSONObject("attributes");
+    Assert.assertEquals(AutoMetricBuiltInTransport.class.getName() + ":xyz",
+        attrs.getString(Context.DAGContext.METRICS_TRANSPORT.getSimpleName()));
+  }
+
   @Test
   public void testSubmitLogicalPlanChange() throws JSONException, Exception
   {


Mime
View raw message