apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ROHIT RAJKUMAR GARG (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXCORE-718) How to Use one Kafka input operator and Feed to multiple operator
Date Fri, 12 May 2017 07:40:04 GMT

    [ https://issues.apache.org/jira/browse/APEXCORE-718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16007749#comment-16007749
] 

ROHIT RAJKUMAR GARG commented on APEXCORE-718:
----------------------------------------------

Thanks [~tushargosavi] 

I will use the email id provided .We even tried as per the solution mentioned ..Please find
the code 

public void populateDAG(DAG dag, Configuration conf)
    {
        KafkaSinglePortStringInputOperator kafkaReader = dag.addOperator("MessageReader",
new KafkaSinglePortStringInputOperator());

 IPVersionKeyValueCreator ipversionKeyValueCreator = dag.addOperator("ipversionKeyValueCreator",new

  IPVersionKeyValueCreator());
        SumKeyVal<String, Long> ipversionSumOperator =  getSumKeyValOperator("ipversionSum",dag);
        IPVersionMapCreator ipversionMapCreator = dag.addOperator("ipversionMapCreator",new
IPVersionMapCreator());

        //Operators for HTTP Response Code Count
        HttpResponseCodeKeyValueCreator httpResponseCodeKeyValueCreator = dag.addOperator("httpResponseCodeKeyValueCreator",new
HttpResponseCodeKeyValueCreator());
        SumKeyVal<String, Long> httpResponseCodeSumOperator =  getSumKeyValOperator("httpResponseCodeSum",dag);
        HttpResponseCodeMapCreator httpResponseCodeMapCreator = dag.addOperator("httpResponseCodeMapCreator",new
HttpResponseCodeMapCreator());

        ElasticSearchMapOutputOperator esOutput= dag.addOperator("elasticSearchOperator",
new ElasticSearchMapOutputOperator());

        try {
          esOutput.setStore(createStore());
        } catch (IOException e) {
          e.printStackTrace();
        }

        esOutput.setIndexName("test_apex_kafka_es");
        esOutput.setType("test_apex_es");
        System.out.println("Wrting to ES");

        dag.addStream("kafkaData3", kafkaReader.outputPort , ipversionKeyValueCreator.input,
httpResponseCodeKeyValueCreator.input).setLocality(Locality.CONTAINER_LOCAL);
        dag.addStream("JsonData3", ipversionKeyValueCreator.output , ipversionSumOperator.data).setLocality(Locality.CONTAINER_LOCAL);
        dag.addStream("ConsumptionData3", ipversionSumOperator.sum , ipversionMapCreator.input).setLocality(Locality.CONTAINER_LOCAL);
        dag.addStream("MapData3", ipversionMapCreator.output , esOutput.input).setLocality(Locality.CONTAINER_LOCAL);

        //Streams for HTTP Response Code Count
        //dag.addStream("kafkaData4", kafkaReader.outputPort , httpResponseCodeKeyValueCreator.input).setLocality(Locality.CONTAINER_LOCAL);
        dag.addStream("JsonData4", httpResponseCodeKeyValueCreator.output , httpResponseCodeSumOperator.data).setLocality(Locality.CONTAINER_LOCAL);
        dag.addStream("ConsumptionData4", httpResponseCodeSumOperator.sum , httpResponseCodeMapCreator.input).setLocality(Locality.CONTAINER_LOCAL);
        dag.addStream("MapData4", httpResponseCodeMapCreator.output , esOutput.input).setLocality(Locality.CONTAINER_LOCAL);
    }

--------------========================================================================================================


java.lang.IllegalArgumentException: Port input already connected to stream LogicalPlan.StreamMeta[id=MapData3]
        at com.datatorrent.stram.plan.logical.LogicalPlan$StreamMeta.addSink(LogicalPlan.java:548)
        at com.datatorrent.stram.plan.logical.LogicalPlan.addStream(LogicalPlan.java:1429)
        at com.datatorrent.stram.plan.logical.LogicalPlan.addStream(LogicalPlan.java:1480)
        at com.datatorrent.stram.plan.logical.LogicalPlan.addStream(LogicalPlan.java:125)
        at org.jio.media.Application.populateDAG(Application.java:99)
        at com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.prepareDAG(LogicalPlanConfiguration.java:2263)
        at com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.createFromStreamingApplication(LogicalPlanConfiguration.java:2123)
        at org.apache.apex.engine.util.StreamingAppFactory.createApp(StreamingAppFactory.java:47)
        at com.datatorrent.stram.client.StramAppLauncher$1.createApp(StramAppLauncher.java:473)
        at com.datatorrent.stram.client.StramAppLauncher.runLocal(StramAppLauncher.java:518)
        at com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:2143)
        at com.datatorrent.stram.cli.ApexCli.launchAppPackage(ApexCli.java:3561)
        at com.datatorrent.stram.cli.ApexCli.access$7400(ApexCli.java:153)
        at com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:1980)
        at com.datatorrent.stram.cli.ApexCli$3.run(ApexCli.java:1539)



> How to Use one Kafka input operator and Feed to multiple operator 
> ------------------------------------------------------------------
>
>                 Key: APEXCORE-718
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-718
>             Project: Apache Apex Core
>          Issue Type: Question
>            Reporter: ROHIT RAJKUMAR GARG
>
> I want to read from a topic using the kafka input operator and then feed it to multiple
operator . in short it will be a sort of branch 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message