apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hitesh Goyal <hitesh.go...@nlpcaptcha.com>
Subject RE: Connecting multiple operators
Date Mon, 29 Aug 2016 09:02:55 GMT
  Please find the code below. Also find the Application Logs as an attached file.

MeanOperetor.java

package com.example.myapexapp;

import java.util.ArrayList;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

public class MeanOperator extends BaseOperator {
                private static final Logger LOG = LoggerFactory.getLogger(MeanOperator.class);
                private ArrayList<Double> values;

                /**
                * Input data port that takes a number.
                */
                public final transient DefaultInputPort<Object> meandata = new DefaultInputPort<Object>()
{
                                /**
                                * Computes sum and count with each tuple
                                */
                                @Override
                                public void process(Object tuple) {
                                                if (tuple instanceof DbData) {
                                                                DbData dataTuple = (DbData)
tuple;
                                                                values.add(dataTuple.getAbv());
                                                } else {
                                                                LOG.info("Invalid input format
of tuple: " + tuple.toString());
                                                }

                                }
                };

                /**
                * Output port that emits median of incoming data.
                */
                public final transient DefaultOutputPort<Number> mean = new DefaultOutputPort<Number>();

                @Override
                public void beginWindow(long arg0) {
                                values = new ArrayList<Double>();
                }
                @Override
                public void endWindow() {
                                if (values.size() == 0) {
                                                return;
                                }
                                if (values.size() == 1) {
                                                mean.emit(values.get(0));
                                                return;
                                }
                                      double sum=0;
                                                    for (Double value : values) {
                                                        sum += value;
                                                    }
                                                    mean.emit(sum/values.size());

                }

}



StandardDeviationOperator.java


package com.example.myapexapp;

import java.util.ArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

public class StandardDeviationOperator extends BaseOperator {
       private static final Logger LOG = LoggerFactory.getLogger(MeanOperator.class);
       private ArrayList<Double> values;

       /**
       * Input data port that takes a number.
       */
       public final transient DefaultInputPort<Object> meandata = new DefaultInputPort<Object>()
{
              /**
              * Computes sum and count with each tuple
              */
              @Override
              public void process(Object tuple) {
                     if (tuple instanceof DbData) {
                           DbData dataTuple = (DbData) tuple;
                           values.add(dataTuple.getAbv());
                     } else {
                           LOG.info("Invalid input format of tuple: " + tuple.toString());
                     }

              }
       };

       /**
       * Output port that emits median of incoming data.
       */
       public final transient DefaultOutputPort<Number> deviation = new DefaultOutputPort<Number>();

       @Override
       public void beginWindow(long arg0) {
              values = new ArrayList<Double>();
       }
       @Override
       public void endWindow() {
              if (values.size() == 0) {
                     return;
              }
              if (values.size() == 1) {
                     deviation.emit(values.get(0));
                     return;
              }
                    double sum=0,meanvalue=0,temp=0;
                         for (int i=0;i<values.size();i++) {
                             sum += values.get(i);
                         }
                        meanvalue=sum/values.size();
                     for(int i=0;i<values.size();i++){
                      Double val = values.get(i);
                      double squrDiffToMean = Math.pow(val - meanvalue, 2);
                      temp += squrDiffToMean;
                     }
                  double meanOfDiffs = (double) temp / (double) (values.size());
                  deviation.emit(Math.sqrt(meanOfDiffs));



       }

}








From: Priyanka Gugale [mailto:priyag@apache.org]
Sent: Monday, August 29, 2016 2:09 PM
To: users@apex.apache.org
Subject: Re: Connecting multiple operators

Hitesh,

Can you please share code of your MeanOperator and StandardDeviationOperator.
Also please share the application logs. After shutting down application you can run "yan logs
-applicationId <appId> to collect logs.

-Priyanka

On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal <hitesh.goyal@nlpcaptcha.com<mailto:hitesh.goyal@nlpcaptcha.com>>
wrote:
Hi team,

I am trying to process some data using Operators.

@SuppressWarnings("unchecked")
       @Override
       public void populateDAG(DAG dag, Configuration conf) {
              System.setProperty("viewmode", "production");
              CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator",
CouchBasePOJOInputOperator.class);
              inputOperator.setStore(new CouchBaseStore());
              MedOperator med = dag.addOperator("median", MedOperator.class);
              MeanOperator mean=dag.addOperator("mean", MeanOperator.class);
              StandardDeviationOperator sdo=dag.addOperator("sdo", StandardDeviationOperator.class);
              ConsoleOutputOperator cons = dag.addOperator("cons", new ConsoleOutputOperator());
              ConsoleOutputOperator cons1 = dag.addOperator("cons1", new ConsoleOutputOperator());
              ConsoleOutputOperator cons2 = dag.addOperator("cons2", new ConsoleOutputOperator());
              dag.addStream("inputFormatter", inputOperator.outputPort, med.data,mean.meandata,sdo.meandata);
              dag.addStream("cons", med.median, cons.input).setLocality(Locality.THREAD_LOCAL);
              dag.addStream("cons1", mean.mean, cons1.input).setLocality(Locality.THREAD_LOCAL);
              dag.addStream("cons2", sdo.deviation, cons2.input).setLocality(Locality.THREAD_LOCAL);
       }
  There is no error in the code but when I launch this application in Data Torrent, the status
of operators remains pending instead of Running/Active. Physical DAG view is clearly showing
right connection of one stream to another.


Regards,
Hitesh Goyal
Simpli5d Technologies
Cont No.: 9599803307


Mime
View raw message