apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Priyanka Gugale <priya...@datatorrent.com>
Subject Re: Connecting multiple operators
Date Mon, 29 Aug 2016 11:03:17 GMT
I suspect there is not enough memory to launch the operators. As per the
code, we will need 4 containers, may be your cluster doesn't have enough
resources. Let's try to set low memory for operators, anyway we are not
storing much in memory. You can configure memory using setting:

 <property>
   <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
   <value>256</value>
  </property>

Refer troubleshooting guide to know more:
http://docs.datatorrent.com/troubleshooting/#configuring-memory

Also check on UI for number of requested vs allocated containers, and check
the hadoop memory settings.

-Priyanka



On Mon, Aug 29, 2016 at 2:32 PM, Hitesh Goyal <hitesh.goyal@nlpcaptcha.com>
wrote:

>   Please find the code below. Also find the Application Logs as an
> attached file.
>
>
>
> MeanOperetor.java
>
>
>
> package com.example.myapexapp;
>
>
>
> import java.util.ArrayList;
>
>
>
>
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import com.datatorrent.api.DefaultInputPort;
>
> import com.datatorrent.api.DefaultOutputPort;
>
> import com.datatorrent.common.util.BaseOperator;
>
>
>
> public class MeanOperator extends BaseOperator {
>
>                 private static final Logger LOG = LoggerFactory.getLogger(
> MeanOperator.class);
>
>                 private ArrayList<Double> values;
>
>
>
>                 /**
>
>                 * Input data port that takes a number.
>
>                 */
>
>                 public final transient DefaultInputPort<Object> meandata =
> new DefaultInputPort<Object>() {
>
>                                 /**
>
>                                 * Computes sum and count with each tuple
>
>                                 */
>
>                                 @Override
>
>                                 public void process(Object tuple) {
>
>                                                 if (tuple instanceof
> DbData) {
>
>                                                                 DbData
> dataTuple = (DbData) tuple;
>
>
> values.add(dataTuple.getAbv());
>
>                                                 } else {
>
>
> LOG.info("Invalid input format of tuple: " + tuple.toString());
>
>                                                 }
>
>
>
>                                 }
>
>                 };
>
>
>
>                 /**
>
>                 * Output port that emits median of incoming data.
>
>                 */
>
>                 public final transient DefaultOutputPort<Number> mean =
> new DefaultOutputPort<Number>();
>
>
>
>                 @Override
>
>                 public void beginWindow(long arg0) {
>
>                                 values = new ArrayList<Double>();
>
>                 }
>
>                 @Override
>
>                 public void endWindow() {
>
>                                 if (values.size() == 0) {
>
>                                                 return;
>
>                                 }
>
>                                 if (values.size() == 1) {
>
>                                                 mean.emit(values.get(0));
>
>                                                 return;
>
>                                 }
>
>                                       double sum=0;
>
>                                                     for (Double value :
> values) {
>
>                                                         sum += value;
>
>                                                     }
>
>
> mean.emit(sum/values.size());
>
>
>
>                 }
>
>
>
> }
>
>
>
>
>
>
>
> StandardDeviationOperator.java
>
>
>
>
>
> *package* com.example.myapexapp;
>
>
>
> *import* java.util.ArrayList;
>
>
>
> *import* org.slf4j.Logger;
>
> *import* org.slf4j.LoggerFactory;
>
>
>
> *import* com.datatorrent.api.DefaultInputPort;
>
> *import* com.datatorrent.api.DefaultOutputPort;
>
> *import* com.datatorrent.common.util.BaseOperator;
>
>
>
> *public* *class* StandardDeviationOperator *extends* BaseOperator {
>
>        *private* *static* *final* Logger *LOG* = LoggerFactory.*getLogger*
> (MeanOperator.*class*);
>
>        *private* ArrayList<Double> values;
>
>
>
>        /**
>
>        * Input data port that takes a number.
>
>        */
>
>        *public* *final* *transient* DefaultInputPort<Object> meandata =
> *new* DefaultInputPort<Object>() {
>
>               /**
>
>               * Computes sum and count with each tuple
>
>               */
>
>               @Override
>
>               *public* *void* process(Object tuple) {
>
>                      *if* (tuple *instanceof* DbData) {
>
>                            DbData dataTuple = (DbData) tuple;
>
>                            values.add(dataTuple.getAbv());
>
>                      } *else* {
>
>                            *LOG*.info("Invalid input format of tuple: " +
> tuple.toString());
>
>                      }
>
>
>
>               }
>
>        };
>
>
>
>        /**
>
>        * Output port that emits median of incoming data.
>
>        */
>
>        *public* *final* *transient* DefaultOutputPort<Number> deviation =
> *new* DefaultOutputPort<Number>();
>
>
>
>        @Override
>
>        *public* *void* beginWindow(*long* arg0) {
>
>               values = *new* ArrayList<Double>();
>
>        }
>
>        @Override
>
>        *public* *void* endWindow() {
>
>               *if* (values.size() == 0) {
>
>                      *return*;
>
>               }
>
>               *if* (values.size() == 1) {
>
>                      deviation.emit(values.get(0));
>
>                      *return*;
>
>               }
>
>                     *double* sum=0,meanvalue=0,temp=0;
>
>                          *for* (*int* i=0;i<values.size();i++) {
>
>                              sum += values.get(i);
>
>                          }
>
>                         meanvalue=sum/values.size();
>
>                      *for*(*int* i=0;i<values.size();i++){
>
>                       Double val = values.get(i);
>
>                       *double* squrDiffToMean = Math.*pow*(val - meanvalue,
> 2);
>
>                       temp += squrDiffToMean;
>
>                      }
>
>                   *double* meanOfDiffs = (*double*) temp / (*double*) (
> values.size());
>
>                   deviation.emit(Math.*sqrt*(meanOfDiffs));
>
>
>
>
>
>
>
>        }
>
>
>
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *From:* Priyanka Gugale [mailto:priyag@apache.org]
> *Sent:* Monday, August 29, 2016 2:09 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Connecting multiple operators
>
>
>
> Hitesh,
>
>
>
> Can you please share code of your MeanOperator and
> StandardDeviationOperator.
>
> Also please share the application logs. After shutting down application
> you can run "yan logs -applicationId <appId> to collect logs.
>
>
>
> -Priyanka
>
>
>
> On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal <
> hitesh.goyal@nlpcaptcha.com> wrote:
>
> Hi team,
>
>
>
> I am trying to process some data using Operators.
>
>
>
> @SuppressWarnings("unchecked")
>
>        @Override
>
>        *public* *void* populateDAG(DAG dag, Configuration conf) {
>
>               System.*setProperty*("viewmode", "production");
>
>               CouchBasePOJOInputOperator inputOperator = dag.addOperator("
> inputOperator", CouchBasePOJOInputOperator.*class*);
>
>               inputOperator.setStore(*new* CouchBaseStore());
>
>               MedOperator med = dag.addOperator("median", MedOperator.
> *class*);
>
>               MeanOperator mean=dag.addOperator("mean", MeanOperator.
> *class*);
>
>               StandardDeviationOperator sdo=dag.addOperator("sdo",
> StandardDeviationOperator.*class*);
>
>               ConsoleOutputOperator cons = dag.addOperator("cons", *new*
> ConsoleOutputOperator());
>
>               ConsoleOutputOperator cons1 = dag.addOperator("cons1", *new*
> ConsoleOutputOperator());
>
>               ConsoleOutputOperator cons2 = dag.addOperator("cons2", *new*
> ConsoleOutputOperator());
>
>               dag.addStream("inputFormatter", inputOperator.outputPort,
> med.data,mean.meandata,sdo.meandata);
>
>               dag.addStream("cons", med.median, cons.input).setLocality(
> Locality.*THREAD_LOCAL*);
>
>               dag.addStream("cons1", mean.mean, cons1.input).setLocality(
> Locality.*THREAD_LOCAL*);
>
>               dag.addStream("cons2", sdo.deviation, cons2.input
> ).setLocality(Locality.*THREAD_LOCAL*);
>
>        }
>
>   There is no error in the code but when I launch this application in Data
> Torrent, the status of operators remains pending instead of Running/Active.
> Physical DAG view is clearly showing right connection of one stream to
> another.
>
>
>
>
>
> Regards,
>
> *Hitesh Goyal*
>
> Simpli5d Technologies
>
> Cont No.: 9599803307
>
>
>
>
>

Mime
View raw message