apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXCORE-276) Make App Data Push transport pluggable and configurable
Date Thu, 31 Dec 2015 00:34:49 GMT

    [ https://issues.apache.org/jira/browse/APEXCORE-276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15075557#comment-15075557
] 

ASF GitHub Bot commented on APEXCORE-276:
-----------------------------------------

Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/193#discussion_r48641557
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
---
    @@ -970,4 +969,48 @@ public void onClose(int closeCode, String message)
           server.stop();
         }
       }
    +
    +  public static class TestMetricTransport implements AutoMetric.Transport, Serializable
    +  {
    +    private String prefix;
    +    private static List<String> messages = new ArrayList<>();
    +
    +    public TestMetricTransport(String prefix)
    +    {
    +      this.prefix = prefix;
    +    }
    +
    +    @Override
    +    public void push(String jsonData) throws IOException
    +    {
    +      messages.add(prefix + ":" + jsonData);
    +    }
    +
    +    @Override
    +    public long getSchemaResendInterval()
    +    {
    +      return 0;
    +    }
    +  }
    +
    +  @Test
    +  public void testCustomMetricsTransport() throws Exception
    +  {
    +    TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
    +    GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
    +    dag.addStream("o1.outport", o1.outport, o2.inport1);
    +    dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, new TestMetricTransport("xyz"));
    +    StramLocalCluster lc = new StramLocalCluster(dag);
    +    StreamingContainerManager dnmgr = lc.dnmgr;
    +    StramAppContext appContext = new StramTestSupport.TestAppContext();
    +
    +    AppDataPushAgent pushAgent = new AppDataPushAgent(dnmgr, appContext);
    +    pushAgent.init();
    +    pushAgent.pushData();
    +    Assert.assertTrue(TestMetricTransport.messages.size() > 0);
    +    pushAgent.close();
    +    String msg = TestMetricTransport.messages.get(0);
    +    System.out.println("Got this message: " + msg);
    --- End diff --
    
    Not needed.


> Make App Data Push transport pluggable and configurable
> -------------------------------------------------------
>
>                 Key: APEXCORE-276
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-276
>             Project: Apache Apex Core
>          Issue Type: New Feature
>            Reporter: David Yan
>            Assignee: David Yan
>
> Currently it's not possible without changing the code to have your own transport.
> Code from AppDataPushAgent.java:
> {code}  
>   public void init()
>   {
>     String appDataPushTransport = dnmgr.getLogicalPlan().getValue(DAGContext.METRICS_TRANSPORT);
>     if (appDataPushTransport.startsWith(APP_DATA_PUSH_TRANSPORT_BUILTIN_VALUE + ":"))
{
>       String topic = appDataPushTransport.substring(APP_DATA_PUSH_TRANSPORT_BUILTIN_VALUE.length()
+ 1);
>       appDataPusher = new WebsocketAppDataPusher(dnmgr.getWsClient(), topic);
>       LOG.info("App Data Push Transport set up for {}", appDataPushTransport);
>     } else {
>       // TBD add kakfa
>       LOG.error("App Data Push Transport not recognized: {}", appDataPushTransport);
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message