apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [1/2] incubator-apex-core git commit: APEXCORE-276 made METRICS_TRANSPORT pluggable
Date Thu, 31 Dec 2015 00:52:14 GMT
Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 587912450 -> 3f76dcbfa


APEXCORE-276 made METRICS_TRANSPORT pluggable


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/845796ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/845796ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/845796ab

Branch: refs/heads/devel-3
Commit: 845796abc5f5d1f92f43b40e105f1b18cb2d8632
Parents: 56b55fe
Author: David Yan <david@datatorrent.com>
Authored: Tue Dec 29 13:46:58 2015 -0800
Committer: David Yan <david@datatorrent.com>
Committed: Wed Dec 30 16:39:40 2015 -0800

----------------------------------------------------------------------
 .../java/com/datatorrent/api/AutoMetric.java    | 19 ++++++
 .../main/java/com/datatorrent/api/Context.java  |  6 +-
 .../metric/AutoMetricBuiltInTransport.java      | 67 +++++++++++++++++++
 .../stram/PubSubWebSocketMetricTransport.java   | 68 ++++++++++++++++++++
 .../stram/StreamingAppMasterService.java        |  2 +-
 .../stram/WebsocketAppDataPusher.java           | 65 -------------------
 .../datatorrent/stram/api/AppDataPusher.java    | 34 ----------
 .../stram/appdata/AppDataPushAgent.java         | 34 +++++-----
 .../stram/StreamingContainerManagerTest.java    | 62 ++++++++++++++----
 9 files changed, 225 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/api/src/main/java/com/datatorrent/api/AutoMetric.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/AutoMetric.java b/api/src/main/java/com/datatorrent/api/AutoMetric.java
index 1c1fb25..74d062a 100644
--- a/api/src/main/java/com/datatorrent/api/AutoMetric.java
+++ b/api/src/main/java/com/datatorrent/api/AutoMetric.java
@@ -18,6 +18,7 @@
  */
 package com.datatorrent.api;
 
+import java.io.IOException;
 import java.lang.annotation.*;
 import java.util.Collection;
 import java.util.Map;
@@ -106,4 +107,22 @@ public @interface AutoMetric
     String[] getDimensionAggregationsFor(String logicalMetricName);
   }
 
+  /**
+   * Interface of transport for STRAM to push metrics data
+   */
+  interface Transport
+  {
+    /**
+     * Pushes the metrics data (in JSON) to the transport.
+     *
+     * @param jsonData The metric data in JSON to be pushed to this transport
+     */
+    void push(String jsonData) throws IOException;
+
+    /**
+     * Returns the number of milliseconds for resending the metric schema. The schema will
need to be resent for
+     * unreliable transport. Return 0 if the schema does not need to be resent.
+     */
+    long getSchemaResendInterval();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 2054920..6092dce 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -334,10 +334,10 @@ public interface Context
      */
     Attribute<String> APPLICATION_DATA_LINK = new Attribute<String>(new String2String());
     /**
-     * Transport to push the stats and the metrics, "builtin:{topic}" if STRAM should push
the data directly
-     * using websocket with the given topic
+     * Transport to push the stats and the metrics.
+     * If using the built-in transport, please use an AutoMetricBuiltInTransport object
      */
-    Attribute<String> METRICS_TRANSPORT = new Attribute<String>(new String2String());
+    Attribute<AutoMetric.Transport> METRICS_TRANSPORT = new Attribute<>(new Object2String<AutoMetric.Transport>());
     /**
      * Application instance identifier. An application with the same name can run in multiple
instances, each with a
      * unique identifier. The identifier is set by the client that submits the application
and can be used in operators

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java
b/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java
new file mode 100644
index 0000000..ee9cbdd
--- /dev/null
+++ b/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.common.metric;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import com.datatorrent.api.AutoMetric;
+
+/**
+ * AutoMetricBuiltinTransport. This will be replaced by the internal websocket pubsub transport
+ * provided here: {@link com.datatorrent.stram.PubSubWebSocketMetricTransport}.
+ */
+public class AutoMetricBuiltInTransport implements AutoMetric.Transport, Serializable
+{
+  private final String topic;
+  private final long schemaResendInterval;
+  private static final long DEFAULT_SCHEMA_RESEND_INTERVAL = 10000;
+
+  public AutoMetricBuiltInTransport(String topic)
+  {
+    this.topic = topic;
+    this.schemaResendInterval = DEFAULT_SCHEMA_RESEND_INTERVAL;
+  }
+
+  public AutoMetricBuiltInTransport(String topic, long schemaResendInterval)
+  {
+    this.topic = topic;
+    this.schemaResendInterval = schemaResendInterval;
+  }
+
+  @Override
+  public void push(String jsonData) throws IOException
+  {
+    throw new UnsupportedOperationException("This class is a placeholder and is supposed
to replaced by internal " +
+        "implementation.");
+  }
+
+  @Override
+  public long getSchemaResendInterval()
+  {
+    return schemaResendInterval;
+  }
+
+  public String getTopic()
+  {
+    return topic;
+  }
+
+  private static final long serialVersionUID = 201512301009L;
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/engine/src/main/java/com/datatorrent/stram/PubSubWebSocketMetricTransport.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/PubSubWebSocketMetricTransport.java
b/engine/src/main/java/com/datatorrent/stram/PubSubWebSocketMetricTransport.java
new file mode 100644
index 0000000..85b8006
--- /dev/null
+++ b/engine/src/main/java/com/datatorrent/stram/PubSubWebSocketMetricTransport.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.common.util.PubSubWebSocketClient;
+
+/**
+ * <p>PubSubWebSocketMetricTransport class.</p>
+ *
+ * @since 3.0.0
+ */
+public class PubSubWebSocketMetricTransport implements AutoMetric.Transport, Serializable
+{
+  private final String topic;
+  private final long schemaResendInterval;
+  protected PubSubWebSocketClient client;
+
+  public PubSubWebSocketMetricTransport(PubSubWebSocketClient wsClient, String topic, long
schemaResendInterval)
+  {
+    client = wsClient;
+    this.topic = topic;
+    this.schemaResendInterval = schemaResendInterval;
+  }
+
+  @Override
+  public void push(String msg) throws IOException
+  {
+    try {
+      client.publish(topic, new JSONObject(msg));
+    } catch (JSONException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public long getSchemaResendInterval()
+  {
+    return schemaResendInterval;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(PubSubWebSocketMetricTransport.class);
+  private static final long serialVersionUID = 201512301008L;
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 5d84e10..db8c255 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -536,7 +536,7 @@ public class StreamingAppMasterService extends CompositeService
     this.heartbeatListener = new StreamingContainerParent(this.getClass().getName(), dnmgr,
delegationTokenManager, rpcListenerCount);
     addService(heartbeatListener);
 
-    String appDataPushTransport = dag.getValue(LogicalPlan.METRICS_TRANSPORT);
+    AutoMetric.Transport appDataPushTransport = dag.getValue(LogicalPlan.METRICS_TRANSPORT);
     if (appDataPushTransport != null) {
       this.appDataPushAgent = new AppDataPushAgent(dnmgr, appContext);
       addService(this.appDataPushAgent);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/engine/src/main/java/com/datatorrent/stram/WebsocketAppDataPusher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/WebsocketAppDataPusher.java b/engine/src/main/java/com/datatorrent/stram/WebsocketAppDataPusher.java
deleted file mode 100644
index f052ded..0000000
--- a/engine/src/main/java/com/datatorrent/stram/WebsocketAppDataPusher.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.stram;
-
-import com.datatorrent.common.util.PubSubWebSocketClient;
-import com.datatorrent.stram.api.AppDataPusher;
-import java.io.IOException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p>WebsocketAppDataPusher class.</p>
- *
- * @since 3.0.0
- */
-public class WebsocketAppDataPusher implements AppDataPusher
-{
-  private final String topic;
-  private long resendSchemaInterval = 10000; // 10 seconds
-  protected PubSubWebSocketClient client;
-
-
-  public WebsocketAppDataPusher(PubSubWebSocketClient wsClient, String topic)
-  {
-    client = wsClient;
-    this.topic = topic;
-  }
-
-  public void setResendSchemaInterval(long resendSchemaInterval)
-  {
-    this.resendSchemaInterval = resendSchemaInterval;
-  }
-
-  @Override
-  public void push(JSONObject msg) throws IOException
-  {
-    client.publish(topic, msg);
-  }
-
-  @Override
-  public long getResendSchemaInterval()
-  {
-    return resendSchemaInterval;
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(WebsocketAppDataPusher.class);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/engine/src/main/java/com/datatorrent/stram/api/AppDataPusher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/AppDataPusher.java b/engine/src/main/java/com/datatorrent/stram/api/AppDataPusher.java
deleted file mode 100644
index 25cc2cf..0000000
--- a/engine/src/main/java/com/datatorrent/stram/api/AppDataPusher.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.stram.api;
-
-import java.io.IOException;
-import org.codehaus.jettison.json.JSONObject;
-
-/**
- * <p>AppDataPusher interface.</p>
- *
- * @since 3.0.0
- */
-public interface AppDataPusher
-{
-  public void push(JSONObject appData) throws IOException;
-
-  public long getResendSchemaInterval();
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java b/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java
index 9389e3c..7fd4c3c 100644
--- a/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java
@@ -41,11 +41,12 @@ import com.google.common.collect.Maps;
 import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context.DAGContext;
 
+import com.datatorrent.api.StringCodec;
+import com.datatorrent.common.metric.AutoMetricBuiltInTransport;
 import com.datatorrent.common.util.Pair;
+import com.datatorrent.stram.PubSubWebSocketMetricTransport;
 import com.datatorrent.stram.StramAppContext;
 import com.datatorrent.stram.StreamingContainerManager;
-import com.datatorrent.stram.WebsocketAppDataPusher;
-import com.datatorrent.stram.api.AppDataPusher;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.webapp.LogicalOperatorInfo;
 
@@ -64,7 +65,7 @@ public class AppDataPushAgent extends AbstractService
   private final StreamingContainerManager dnmgr;
   private final StramAppContext appContext;
   private final AppDataPushThread appDataPushThread = new AppDataPushThread();
-  private AppDataPusher appDataPusher;
+  private AutoMetric.Transport metricsTransport;
   private final Map<Class<?>, List<Field>> cacheFields = new HashMap<Class<?>,
List<Field>>();
   private final Map<Class<?>, Map<String, Method>> cacheGetMethods = new
HashMap<Class<?>, Map<String, Method>>();
 
@@ -81,7 +82,7 @@ public class AppDataPushAgent extends AbstractService
   @Override
   protected void serviceStop() throws Exception
   {
-    if (appDataPusher != null) {
+    if (metricsTransport != null) {
       appDataPushThread.interrupt();
       try {
         appDataPushThread.join();
@@ -95,7 +96,7 @@ public class AppDataPushAgent extends AbstractService
   @Override
   protected void serviceStart() throws Exception
   {
-    if (appDataPusher != null) {
+    if (metricsTransport != null) {
       appDataPushThread.start();
     }
     super.serviceStart();
@@ -110,15 +111,12 @@ public class AppDataPushAgent extends AbstractService
 
   public void init()
   {
-    String appDataPushTransport = dnmgr.getLogicalPlan().getValue(DAGContext.METRICS_TRANSPORT);
-    if (appDataPushTransport.startsWith(APP_DATA_PUSH_TRANSPORT_BUILTIN_VALUE + ":")) {
-      String topic = appDataPushTransport.substring(APP_DATA_PUSH_TRANSPORT_BUILTIN_VALUE.length()
+ 1);
-      appDataPusher = new WebsocketAppDataPusher(dnmgr.getWsClient(), topic);
-      LOG.info("App Data Push Transport set up for {}", appDataPushTransport);
-    } else {
-      // TBD add kakfa
-      LOG.error("App Data Push Transport not recognized: {}", appDataPushTransport);
+    metricsTransport = dnmgr.getLogicalPlan().getValue(DAGContext.METRICS_TRANSPORT);
+    if (metricsTransport instanceof AutoMetricBuiltInTransport) {
+      AutoMetricBuiltInTransport transport = (AutoMetricBuiltInTransport)metricsTransport;
+      metricsTransport = new PubSubWebSocketMetricTransport(dnmgr.getWsClient(), transport.getTopic(),
transport.getSchemaResendInterval());
     }
+    LOG.info("Metrics Transport set up for {}", metricsTransport);
   }
 
   private JSONObject getPushData()
@@ -132,7 +130,6 @@ public class AppDataPushAgent extends AbstractService
       json.put("appUser", appContext.getUser());
       List<LogicalOperatorInfo> logicalOperatorInfoList = dnmgr.getLogicalOperatorInfoList();
       JSONObject logicalOperators = new JSONObject();
-      long resendSchemaInterval = appDataPusher.getResendSchemaInterval();
       for (LogicalOperatorInfo logicalOperator : logicalOperatorInfoList) {
         JSONObject logicalOperatorJson = extractFields(logicalOperator);
         JSONArray metricsList = new JSONArray();
@@ -144,8 +141,9 @@ public class AppDataPushAgent extends AbstractService
             // metric name, aggregated value
             Map<String, Object> aggregates = metrics.second;
             long now = System.currentTimeMillis();
-            if (!operatorsSchemaLastSentTime.containsKey(logicalOperator.name)
-                    || operatorsSchemaLastSentTime.get(logicalOperator.name) < now - resendSchemaInterval)
{
+            if (!operatorsSchemaLastSentTime.containsKey(logicalOperator.name) ||
+                (metricsTransport.getSchemaResendInterval() > 0 &&
+                    operatorsSchemaLastSentTime.get(logicalOperator.name) < now - metricsTransport.getSchemaResendInterval()))
{
               try {
                 pushMetricsSchema(dnmgr.getLogicalPlan().getOperatorMeta(logicalOperator.name),
aggregates);
                 operatorsSchemaLastSentTime.put(logicalOperator.name, now);
@@ -287,12 +285,12 @@ public class AppDataPushAgent extends AbstractService
       schema = getMetricsSchemaData(operatorMeta, aggregates);
       operatorSchemas.put(operatorMeta.getName(), schema);
     }
-    appDataPusher.push(schema);
+    metricsTransport.push(schema.toString());
   }
 
   public void pushData() throws IOException
   {
-    appDataPusher.push(getPushData());
+    metricsTransport.push(getPushData().toString());
   }
 
   public class AppDataPushThread extends Thread

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index f1d6ec4..8fc957b 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -19,6 +19,7 @@
 package com.datatorrent.stram;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.util.*;
 import java.util.concurrent.Future;
@@ -33,6 +34,8 @@ import org.junit.Test;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
+import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG.Locality;
@@ -40,6 +43,7 @@ import com.datatorrent.api.Stats.OperatorStats;
 import com.datatorrent.api.Stats.OperatorStats.PortStats;
 import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.metric.AutoMetricBuiltInTransport;
 import com.datatorrent.common.partitioner.StatelessPartitioner;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.FSStorageAgent;
@@ -75,7 +79,6 @@ import com.datatorrent.stram.support.StramTestSupport.TestMeta;
 import com.datatorrent.stram.tuple.Tuple;
 
 import org.apache.commons.lang.StringUtils;
-import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.eclipse.jetty.websocket.WebSocket;
 
@@ -904,7 +907,7 @@ public class StreamingContainerManagerTest
   public void testAppDataPush() throws Exception
   {
     final String topic = "xyz";
-    final List<JSONObject> messages = new ArrayList<JSONObject>();
+    final List<String> messages = new ArrayList<>();
     EmbeddedWebSocketServer server = new EmbeddedWebSocketServer(0);
     server.setWebSocket(new WebSocket.OnTextMessage()
     {
@@ -912,11 +915,7 @@ public class StreamingContainerManagerTest
       @Override
       public void onMessage(String data)
       {
-        try {
-          messages.add(new JSONObject(data));
-        } catch (JSONException ex) {
-          throw new RuntimeException(ex);
-        }
+        messages.add(data);
       }
 
       @Override
@@ -935,10 +934,9 @@ public class StreamingContainerManagerTest
       TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
       GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
       dag.addStream("o1.outport", o1.outport, o2.inport1);
-      dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, "builtin:" + topic);
+      dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, new AutoMetricBuiltInTransport(topic));
       dag.setAttribute(LogicalPlan.GATEWAY_CONNECT_ADDRESS, "localhost:" + port);
       StramLocalCluster lc = new StramLocalCluster(dag);
-      //lc.runAsync();
       StreamingContainerManager dnmgr = lc.dnmgr;
       StramAppContext appContext = new StramTestSupport.TestAppContext();
 
@@ -948,8 +946,7 @@ public class StreamingContainerManagerTest
       Thread.sleep(1000);
       Assert.assertTrue(messages.size() > 0);
       pushAgent.close();
-      JSONObject message = messages.get(0);
-      System.out.println("Got this message: " + message.toString(2));
+      JSONObject message = new JSONObject(messages.get(0));
       Assert.assertEquals(topic, message.getString("topic"));
       Assert.assertEquals("publish", message.getString("type"));
       JSONObject data = message.getJSONObject("data");
@@ -970,4 +967,47 @@ public class StreamingContainerManagerTest
       server.stop();
     }
   }
+
+  public static class TestMetricTransport implements AutoMetric.Transport, Serializable
+  {
+    private String prefix;
+    private static List<String> messages = new ArrayList<>();
+
+    public TestMetricTransport(String prefix)
+    {
+      this.prefix = prefix;
+    }
+
+    @Override
+    public void push(String jsonData) throws IOException
+    {
+      messages.add(prefix + ":" + jsonData);
+    }
+
+    @Override
+    public long getSchemaResendInterval()
+    {
+      return 0;
+    }
+  }
+
+  @Test
+  public void testCustomMetricsTransport() throws Exception
+  {
+    TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
+    GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+    dag.addStream("o1.outport", o1.outport, o2.inport1);
+    dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, new TestMetricTransport("xyz"));
+    StramLocalCluster lc = new StramLocalCluster(dag);
+    StreamingContainerManager dnmgr = lc.dnmgr;
+    StramAppContext appContext = new StramTestSupport.TestAppContext();
+
+    AppDataPushAgent pushAgent = new AppDataPushAgent(dnmgr, appContext);
+    pushAgent.init();
+    pushAgent.pushData();
+    Assert.assertTrue(TestMetricTransport.messages.size() > 0);
+    pushAgent.close();
+    String msg = TestMetricTransport.messages.get(0);
+    Assert.assertTrue(msg.startsWith("xyz:"));
+  }
 }


Mime
View raw message