apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Weise <thomas.we...@gmail.com>
Subject Re: HDHT question - looking for the datatorrent gurus!
Date Sat, 03 Sep 2016 16:47:26 GMT
Jim,

You need to use the delay operator for iterative processing. Here is an
example:

https://github.com/apache/apex-malhar/blob/master/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java

Thomas

On Sat, Sep 3, 2016 at 9:05 AM, Jim <jim@facility.supplies> wrote:

> Tushar,
>
> I am trying to implement what you described, and I get a validation
> error:  Loops In Graph, and can't seem to find any reference to this.
>
> Below I have pasted my Application.java file; what could be causing this
> error:
>
> java.lang.RuntimeException: Error creating local cluster
>
>         at com.datatorrent.stram.LocalModeImpl.getController(
> LocalModeImpl.java:78)
>         at supplies.facility.edi.sellars.ApplicationTest.testApplication(
> ApplicationTest.java:30)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
> FrameworkMethod.java:44)
>         at org.junit.internal.runners.model.ReflectiveCallable.run(
> ReflectiveCallable.java:15)
>         at org.junit.runners.model.FrameworkMethod.invokeExplosively(
> FrameworkMethod.java:41)
>         at org.junit.internal.runners.statements.InvokeMethod.
> evaluate(InvokeMethod.java:20)
>         at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(
> BlockJUnit4ClassRunner.java:79)
>         at org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:71)
>         at org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:49)
>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
>         at org.junit.runners.ParentRunner.runChildren(
> ParentRunner.java:191)
>         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
>         at org.junit.runners.ParentRunner$2.evaluate(
> ParentRunner.java:184)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
>         at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(
> JUnit4IdeaTestRunner.java:117)
>         at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(
> JUnit4IdeaTestRunner.java:42)
>         at com.intellij.rt.execution.junit.JUnitStarter.
> prepareStreamsAndStart(JUnitStarter.java:262)
>         at com.intellij.rt.execution.junit.JUnitStarter.main(
> JUnitStarter.java:84)
> Caused by: javax.validation.ValidationException: Loops in graph:
> [[OperatorMeta{name=operator997, operator=FunctionalAcknowledgmentOperator{name=null},
> attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.
> Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.
> StringCodec$Integer2String@7b7fdc8}=192}}, OperatorMeta{name=operator856,
> operator=ShipNotificationOperator{name=null}, attributes={Attribute{defaultValue=1024,
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
> codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}},
> OperatorMeta{name=operator855, operator=POAcknowledgmentOperator{name=null},
> attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.
> Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.
> StringCodec$Integer2String@7b7fdc8}=192}}, OperatorMeta{name=ediRouter,
> operator=EDIRoutingOperator{name=null}, attributes={Attribute{defaultValue=1024,
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
> codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=512}},
> OperatorMeta{name=operator810, operator=InvoiceOperator{name=null},
> attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.
> Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.
> StringCodec$Integer2String@7b7fdc8}=192}}]]
>         at com.datatorrent.stram.plan.logical.LogicalPlan.validate(
> LogicalPlan.java:1775)
>         at com.datatorrent.stram.StramLocalCluster.<init>(
> StramLocalCluster.java:278)
>         at com.datatorrent.stram.LocalModeImpl.getController(
> LocalModeImpl.java:76)
>         ... 23 more
>
> ============================================  Application.java starts
> here ============================================================
>
> /**
>  * Put your copyright and license info here.
>  */
> package supplies.facility.edi.sellars;
>
> import com.datatorrent.api.DAG;
> import com.datatorrent.api.DAG.Locality;
> import com.datatorrent.api.StreamingApplication;
> import com.datatorrent.api.annotation.ApplicationAnnotation;
> import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator;
> import com.datatorrent.contrib.kinesis.KinesisConsumer;
> import com.datatorrent.contrib.kinesis.KinesisStringInputOperator;
> import com.datatorrent.contrib.kinesis.ShardManager;
> import com.datatorrent.lib.db.jdbc.JdbcStore;
> import com.datatorrent.netlet.util.DTThrowable;
> import org.apache.hadoop.conf.Configuration;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import supplies.facility.apex.contrib.elasticsearch.
> ElasticSearchMapOutputOperator;
> import supplies.facility.edi.helpers.EdiSmtpOutputOperator;
> import supplies.facility.edi.helpers.KinesisEdiTransactionSetInputO
> perator;
> import supplies.facility.edi.helpers.SqsTransactionSetInputOperator;
> import supplies.facility.edi.helpers.StatefulShardManager;
>
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
>
> @ApplicationAnnotation(name="FsEdiSellars")
> public class Application implements StreamingApplication {
>   private final Locality locality = null;
>
>   private static final Logger LOG = LoggerFactory.getLogger(
> Application.class);
>
>   private JdbcStore jdbcStore;
>
>   @Override
>   public void populateDAG(DAG dag, Configuration conf) {
>
>     try {
>       //Read from SQS FS_EDI_SELLARS
>       //SQSConnectionFactory
>       SqsTransactionSetInputOperator sqsReader = dag.addOperator("TransactionsQueue",
> new SqsTransactionSetInputOperator());
>
>       // Define all the operators
>       EDIRoutingOperator ediRouter = dag.addOperator("ediRouter", new
> EDIRoutingOperator());
>       InvoiceOperator operator810 = dag.addOperator("operator810", new
> InvoiceOperator());
>       POAcknowledgmentOperator operator855 =
> dag.addOperator("operator855", new POAcknowledgmentOperator());
>       ShipNotificationOperator operator856 =
> dag.addOperator("operator856", new ShipNotificationOperator());
>       FunctionalAcknowledgmentOperator operator997 =
> dag.addOperator("operator997", new FunctionalAcknowledgmentOperator());
>
>       // Set up the EDIRoutingOperator that takes the inbound EDI data,
> and puts it into multiple
>       // outbound streams for each of the applicable EDI transactions:
>       //       810 = Invoice
>       //       855 = Purchase Order Acknowledgment
>       //       856 = Automatic Ship Notification(ASN)
>       //       997 = Functional Acknowledgment
>       // as well as one for an elasticsearch logger handler to log all
> incoming transactions.
>       dag.addStream("ediInboundStream", sqsReader.output,
> ediRouter.inboundEdiPort).setLocality(locality.CONTAINER_LOCAL);
>       dag.addStream("loopBack855", operator855.loopbackPort855,
> ediRouter.loopbackPort855).setLocality(locality.CONTAINER_LOCAL);
>       dag.addStream("loopBack856", operator856.loopbackPort856,
> ediRouter.loopbackPort856).setLocality(locality.CONTAINER_LOCAL);
>       dag.addStream("loopBack810", operator810.loopbackPort810,
> ediRouter.loopbackPort810).setLocality(locality.CONTAINER_LOCAL);
>       dag.addStream("loopBack997", operator997.loopbackPort997,
> ediRouter.loopbackPort997).setLocality(locality.CONTAINER_LOCAL);
>
>       // Set up the Invoice operator and tie it to the output stream
> created in the EDIRouter
>       dag.addStream("invoices", ediRouter.outputPort810,
> operator810.InputPort).setLocality(locality);
>
>       // Set up the Purchase Order Acknowledgment operator and tie it to
> the output stream created in the EDIRouter
>       dag.addStream("PoAcknowledgments", ediRouter.outputPort855,
> operator855.InputPort).setLocality(locality);
>
>       // Set up the Functional Acknowledgment operator and tie it to the
> output stream created in the
>       dag.addStream("shipNotifications", ediRouter.outputPort856,
> operator856.InputPort).setLocality(locality);
>
>       // Set up the Functional Acknowledgment operator and tie it to the
> output stream created in the EDIRouter
>       dag.addStream("functionalAcknowledgments", ediRouter.outputPort997,
> operator997.InputPort).setLocality(locality);
>
>       // Set up the elasticsearch operator and tie it to the output stream
> created in the EDIRouter
>       ElasticSearchMapOutputOperator operatorEs =
> dag.addOperator("operatorEs", new ElasticSearchMapOutputOperator());
>       dag.addStream("esLogger", ediRouter.outputPortEs,
> operatorEs.input).setLocality(locality);
>
>       // Set up the smtp output operator to use for the Ship Notificaiton
> messages
>       EdiSmtpOutputOperator operatorSmtp855 = dag.addOperator("operatorSmtp855",
> new EdiSmtpOutputOperator());
>       dag.addStream("smtpOutput855", operator855.outputEmails,
> operatorSmtp855.input).setLocality(locality);
>
>       // Set up the smtp output operator to use for the Ship Notificaiton
> messages
>       EdiSmtpOutputOperator operatorSmtp856 = dag.addOperator("operatorSmtp856",
> new EdiSmtpOutputOperator());
>       dag.addStream("smtpOutput856", operator856.outputEmails,
> operatorSmtp856.input).setLocality(locality);
>
>     } catch (Exception exc) {
>       DTThrowable.rethrow(exc);
>     }
>   }
> }
>
>
> ==============================================  here is one of the
> operator files, stripped of everything but the module declarations
> =============
>
> import com.datatorrent.api.DefaultOutputPort;
> import com.datatorrent.api.annotation.Stateless;
> import org.milyn.payload.StringResult;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import org.w3c.dom.Document;
> import supplies.facility.edi.helpers.AbstractEDIProcessor;
>
> /**
>  * This Operator handles the processing for the EDI 810, Invoice
> transactions.
>  *
>  * As invoices arrive, they are added to the accounting invoicing system
> for payment,
>  * and the order status is updated to show that the item is now closed.
>  *
>  * Created by jradke on 2/7/2016.
>  */
> @Stateless
> public class InvoiceOperator extends AbstractEDIProcessor {
>
>     private static final Logger logger = LoggerFactory.getLogger(
> InvoiceOperator.class);
>
>     public InvoiceOperator() {
>         super("files/smooks-config810.xml");
>     }
>
>     public transient DefaultOutputPort<Long> loopbackPort810= new
> DefaultOutputPort<Long>();
>
>     @Override
>     protected void processXML(Document xmlDocument) {
>         logger.trace("Entered InvoiceOperator processXML");
>     }
> }
>
> Thanks!
>
> Jim
>
> -----Original Message-----
> From: Tushar Gosavi [mailto:tushar@datatorrent.com]
> Sent: Thursday, September 1, 2016 1:06 PM
> To: users@apex.apache.org
> Subject: Re: HDHT question - looking for the datatorrent gurus!
>
> Hi Jim,
>
> Yes this is what I had in mind, The manage state needs to have separate
> input for each of the 5 operators. The platform does not support connecting
> multiple output port to a single input port, but you could achieve similar
> effect using stream merge operator
> (https://github.com/apache/apex-malhar/blob/3ce83708f795b081d564be357a8333
> 928154398e/library/src/main/java/com/datatorrent/lib/
> stream/StreamMerger.java)
>
> - Tushar.
>
>
> On Thu, Sep 1, 2016 at 10:37 PM, Jim <jim@facility.supplies> wrote:
> > Tushar,
> >
> > Funny that you described it that way, as that is exactly what I was
> thinking about this morning.
> >
> >
> > So the flow would be:
> >
> >
> >
> > Router Operator
> >
> >
> > |
> >
> >
> > Managed State Operator
> >
> >
>                                           |
> >                                 ------------------------------
> ------------------------------------------------------------
> ---------------------------------------------
> >                                |
>                    |
>       |                                                      |
> >
> >          General Acknowledgement             Detailed Acknowledgement
>                    Ship Notification
> Invoice
> >
> >                                |
>                    |
>       |                                                      |
> >                                 ------------------------------
> ------------------------------------------------------------
> ---------------------------------------------
> >
>                                          |
> >                               ------------------------------
> ------------------------------------------------------------
> ------------------------------------------------
> >                            /    each of the 4 operators at the end of
> processing emits a  record back to Managed State Operator      /
> >
> > ----------------------------------------------------------------------
> > --------------------------------------------------------------------
> >
> >
> > In this scenario, would the managed state operator just have 1 input,
> > that all the other operators emit to, or would it need to have separate
> inputs for each of the 5 operators that would be emitting to it?
> >
> > This is what you were describing too, correct?
> >
> > Thanks,
> >
> > Jim
> >
> > -----Original Message-----
> > From: Tushar Gosavi [mailto:tushar@datatorrent.com]
> > Sent: Thursday, September 1, 2016 11:49 AM
> > To: users@apex.apache.org
> > Subject: Re: HDHT question - looking for the datatorrent gurus!
> >
> > Hi Jim,
> >
> > Currently HDHT is accessible only to single operator in a DAG. Single
> HDHT store can not be managed by two different operator at a time which
> could cause metadata corruption. Theoretically HDHT bucket could be read
> from multiple operators, but only one writer is allowed.
> >
> > In your case a stage in transaction is processed completely by different
> operator and then only next stage can start. It could still be achieved by
> using a single operator which manages HDHT state, and having a loop in DAG
> to send completed transaction ids to sequencer.
> >
> > - Sequence operator will emit transaction to transaction processing
> operator.
> > - If it receives an out of order transaction it will note it down in
> HDHT.
> > - The processing operator will send completed transaction id on a port
> which is connected back to sequence operator.
> > - On receiving data on this loopback port, sequence operator will update
> HDHT and search for next transaction in order, which could be stored in
> HDHT and will emit to next processing operator.
> >
> > - Tushar.
> >
> >
> > On Sat, Aug 27, 2016 at 1:31 AM, Jim <jim@facility.supplies> wrote:
> >> Good afternoon,
> >>
> >>
> >>
> >> I have an apex application where I may receive edi transactions, but
> >> sometimes they arrive out of order and I want to hold any out of
> >> sequence transactions till the correct time in the flow to process them.
> >>
> >>
> >>
> >> For example for a standard order, we will receive from the remote
> vendor:
> >>
> >>
> >>
> >> 1.)    General Acknowledgement
> >>
> >> 2.)    Detailed Acknowledgement
> >>
> >> 3.)    Ship Notification
> >>
> >> 4.)    Invoice
> >>
> >>
> >>
> >> They are supposed to be sent and received in that order.
> >>
> >>
> >>
> >> However sometimes vendors systems have problems, etc. so they send
> >> the all of these at the same time, and then we can receive them out of
> sequence.
> >> Data packets for these are very small, say from 1 to 512 bytes, and
> >> the only time they will be out of sequence, we will receive them very
> >> closely together.
> >>
> >>
> >>
> >> I am trying to think of the best way to do this in my datatorrent /
> >> Hadoop / yarn facilities, instead of creating a datatable in
> >> postgreSQl and using that.
> >>
> >>
> >>
> >> Can I create a flow that works like this (I am not sure if this makes
> >> sense, or is the best way to solve my problem, while keeping state,
> >> etc. maintained for all the operators):
> >>
> >>
> >>
> >> 1.)    In the inbound transaction router, check the hdht store for the
> order
> >> number, if it doesn’t exist, this means it is a new order, if the
> >> transaction trying to process is the general acknowledgment, emit the
> >> data to the general acknowledgement operator; if it is not – store
> >> the transaction data into the correct bucket identifying the
> >> transaction is it for, as well as the next step to be the general
> >> acknowledgement in HDHT by order number.
> >>
> >> 2.)    Say the next transaction is the ship notification, in the
> router, we
> >> would check the HDHT store, see this is not the next expected
> >> transaction (say it is supposed to be the detail acknowledgement), so
> >> we would just post the data for the ship notification into HDHT the
> store and say we are done.
> >>
> >> 3.)    Say we now receive the detailed acknowledgement for an order
> whose
> >> next step IS the detailed acknowledgement, we would see this is the
> >> correct next transaction, emit it to the detailed acknowledgement
> >> operator, and update the HDHT store to show that the next transaction
> >> should be the ship notification.  NOTE:  we can’t emit the ship
> >> notification yet, till we have confirmed that the detailed
> ackkowledgment has been completed.
> >>
> >> 4.)    In each of the 4 transaction operators at the end of the
> processing,
> >> we would update the HDHT store to show the next expected step, and if
> >> we already received data for the next expected step pull it from the
> >> HDHT store, and write the transaction into our SQS queue which is the
> >> input into the inbound transaction router at the beginning of the
> >> application, so it processes through the system.
> >>
> >>
> >>
> >> I believe HDHT can be used to pass data throughout an entire
> >> application, and is not limited to just a per operator basis, correct?
> >>
> >>
> >>
> >> Any comments / feedback?
> >>
> >>
> >>
> >> Thanks,
> >>
> >>
> >>
> >> Jim
>

Mime
View raw message