From user-return-14601-archive-asf-public=cust-asf.ponee.io@storm.apache.org Wed Oct 16 09:11:02 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1D4A5180644 for ; Wed, 16 Oct 2019 11:11:02 +0200 (CEST) Received: (qmail 92519 invoked by uid 500); 16 Oct 2019 09:10:59 -0000 Mailing-List: contact user-help@storm.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.apache.org Delivered-To: mailing list user@storm.apache.org Received: (qmail 92504 invoked by uid 99); 16 Oct 2019 09:10:59 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Oct 2019 09:10:59 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id B2906C08A8 for ; Wed, 16 Oct 2019 09:10:58 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.202 X-Spam-Level: * X-Spam-Status: No, score=1.202 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=0.2, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_NONE=0.001] autolearn=disabled Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id NKgrM8IPJsk1 for ; Wed, 16 Oct 2019 09:10:56 +0000 (UTC) Received-SPF: None (mailfrom) identity=mailfrom; client-ip=37.35.5.91; helo=smtp.thales-services.fr; envelope-from=marc.madaule@thales-services.fr; receiver= Received: from smtp.thales-services.fr (mx.thales-services.fr [37.35.5.91]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 1D9007F5A9 for ; Wed, 16 Oct 2019 09:04:47 +0000 (UTC) Received: from mail-core.thales-services.fr (bm-core [192.168.1.4]) by smtp.thales-services.fr (Postfix) with ESMTPS id 5DD05BD6E5 for ; Wed, 16 Oct 2019 11:04:47 +0200 (CEST) Received: from localhost.localdomain (localhost [127.0.0.1]) by mail-core.thales-services.fr (Postfix) with ESMTP id 6469EBB807 for ; Wed, 16 Oct 2019 11:04:47 +0200 (CEST) MIME-Version: 1.0 Content-Type: multipart/alternative; boundary="=_a0540fe9c97129a84c23487558eb7f52" Date: Wed, 16 Oct 2019 11:04:47 +0200 From: Marc MADAULE To: user Subject: Multi-branch Trident topology not working Message-ID: <1f1d5d9545c703070b02b76ac81f9ee5@thales-services.fr> X-Sender: marc.madaule@thales-services.fr User-Agent: Roundcube Webmail/0.8.5 X-Bm-Milter-Handled: 48ab778a-e581-4b30-b0b1-fc2a2b432990 X-Bm-Transport-Timestamp: 1571216687414 --=_a0540fe9c97129a84c23487558eb7f52 Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset=UTF-8 Hello, I'm facing an issue with Trident and would appreciate your expertise. I'm separating the tuples output by a spout into two branches, based on a field value. Each branch features different processing algorithms and one of my branches has a State operator (a partitionPersist()). Then at the end I need to merge the two branches together. This topology does not work when using a partitioning operator after the State operator, but I don't understand why (bug ? misunderstanding ?). The following code is a minimal demonstrator of the issue. When the shuffle() line is commented out, I get all the messages I'm expecting (i.e. the topology works as expected) : Stream A after filter : [1, A] Stream A end of branch : [1, A, computed A value] Merged stream : [1, A, computed A value] Stream B after filter : [1, B] STREAM B END OF BRANCH : [1, B, COMPUTED B VALUE] MERGED STREAM : [1, B, COMPUTED B VALUE] But when the shuffle() line is present, the topology only processes branch B up to the State operator. No "Stream B end of branch" message appears : Stream A after filter : [1, A] Stream B after filter : [1, B] Stream A end of branch : [1, A, computed A value] Merged stream : [1, A, computed A value] (1 minute pause, then Trident figures out that the micro-batch has failed and replays it) Stream A after filter : [1, A] Stream B after filter : [1, B] Stream A end of branch : [1, A, computed A value] Merged stream : [1, A, computed A value] (...) Tested with same results under Storm 1.2.3 and 2.0.0. Thank you, marc ========================== CODE ================================ import java.util.List; import java.util.Map; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.operation.BaseFilter; import org.apache.storm.trident.operation.BaseFunction; import org.apache.storm.trident.operation.Consumer; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.operation.TridentOperationContext; import org.apache.storm.trident.state.StateUpdater; import org.apache.storm.trident.testing.FixedBatchSpout; import org.apache.storm.trident.testing.MemoryMapState; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; public class JoinIssueMinimal { public static void main(final String[] args) throws Exception { TridentTopology topology = buildTopology(); final String topologyName = "JoinIssueMinimal"; final Config conf = new Config(); conf.setMaxSpoutPending(1); final LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf, topology.build()); Utils.sleep(700 * 1000L); cluster.killTopology(topologyName); cluster.shutdown(); } public static TridentTopology buildTopology() { final TridentTopology topology = new TridentTopology(); // this spout will output 2 tuples for the first batch (one A and one B) final Stream startStream = topology.newStream("start", new FixedBatchSpout( new Fields("key", "type"), 2, new Values(1, "A"), new Values(1, "B"))); // "A" branch Stream AStream = startStream .filter(keepOnlyType("A")) .peek(printTuples("Stream A after filter")) .each(addField("computed A value"), new Fields("val")) .peek(printTuples("Stream A end of branch")); // tuples have "key", "type" and "val" fields // "B" branch Stream BStream = startStream .filter(keepOnlyType("B")) .peek(printTuples("Stream B after filter")) .partitionPersist( new MemoryMapState.Factory(), new Fields("key", "type"), reemitTuples(), // state operator which simply returns the input tuple new Fields("key", "type")) .newValuesStream() .shuffle() // <<<<<<<<<<<<<<<<< when this shuffle is present, the topology no longer works .each(addField("computed B value"), new Fields("val")) .parallelismHint(2) // I would like multiple instances of the above computation (addField), therefore I need to shuffle() earlier .peek(printTuples("Stream B end of branch")); // tuples have "key", "type" and "val" fields // merge the two branches together topology.merge(AStream, BStream) .peek(printTuples("Merged stream")); return topology; } /** Returns a Filter which keeps only tuples with a given 'type' field */ static BaseFilter keepOnlyType(String t) { return new BaseFilter() { @Override public boolean isKeep(TridentTuple tuple) { return t.equals(tuple.getStringByField("type")); } }; } /** Returns a Consumer which prints all tuples, along with a given message */ static Consumer printTuples(String msg) { return new Consumer() { @Override public void accept(TridentTuple input) { System.out.println(msg + " : " + input); } }; } /** Returns a function which adds a field with a given value */ static BaseFunction addField(String val) { return new BaseFunction() { @Override public void execute(TridentTuple tuple, TridentCollector collector) { collector.emit(new Values(val)); } }; } /** Returns a StateUpdater which always returns all input tuples as-is */ static StateUpdater> reemitTuples() { return new StateUpdater>() { @Override public void prepare(Map conf, TridentOperationContext context) {} @Override public void cleanup() {} @Override public void updateState(MemoryMapState state, List tuples, TridentCollector collector) { for (TridentTuple t : tuples) { collector.emit(t); } } }; } } --=_a0540fe9c97129a84c23487558eb7f52 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=UTF-8

Hello,

I'm facing an issue= with Trident and would appreciate your expertise.

I'm separating the = tuples output by a spout into two branches, based on a field value. Each br= anch features different processing algorithms and one of my branches has a = State operator (a partitionPersist()). Then at the end I need to merge the = two branches together. This topology does not work when using a partitionin= g operator after the State operator, but I don't understand why (bug ? misu= nderstanding ?).

The following code = is a minimal demonstrator of the issue.
When the shuffle() line is com= mented out, I get all the messages I'm expecting (i.e. the topology works a= s expected) :

Stream A after filter : [1, A]
St= ream A end of branch : [1, A, computed A value]
Merged stream : [1, A,= computed A value]
Stream B after filter : [1, B]
Stream = B end of branch : [1, B, computed B value]
Merged str= eam : [1, B, computed B value]

But when the shuffl= e() line is present, the topology only processes branch B up to the State o= perator. No "Stream B end of branch" message appears :

Stream A after filter : [1, A]
Stream B after filt= er : [1, B]
Stream A end of branch : [1, A, computed A value]
Mer= ged stream : [1, A, computed A value]

(1 minute pause, then Trident figures out that the micr= o-batch has failed and replays it)

Stream A after filter : [1, A]
Stream B after filt= er : [1, B]
Stream A end of branch : [1, A, computed A value]
Mer= ged stream : [1, A, computed A value]

(...)

Tested with s= ame results under Storm 1.2.3 and 2.0.0.

Thank you,marc

 

=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D CODE =3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D

 

import java= =2Eutil.List;
import java.util.Map;

import org.apache.storm= =2EConfig;
import org.apache.storm.LocalCluster;
import org.apach= e.storm.generated.StormTopology;
import org.apache.storm.trident.Strea= m;
import org.apache.storm.trident.TridentTopology;
import org.ap= ache.storm.trident.operation.BaseFilter;
import org.apache.storm.tride= nt.operation.BaseFunction;
import org.apache.storm.trident.operation= =2EConsumer;
import org.apache.storm.trident.operation.TridentCollecto= r;
import org.apache.storm.trident.operation.TridentOperationContext;<= br />import org.apache.storm.trident.state.StateUpdater;
import org.ap= ache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.tr= ident.testing.MemoryMapState;
import org.apache.storm.trident.tuple.Tr= identTuple;
import org.apache.storm.tuple.Fields;
import org.apac= he.storm.tuple.Values;
import org.apache.storm.utils.Utils;

public class JoinIssueMinimal {

    public stati= c void main(final String[] args) throws Exception {
    = ;    TridentTopology topology =3D buildTopology();
&nbs= p;       final String topologyName =3D "JoinIssueM= inimal";
        final Config conf =3D n= ew Config();
        conf.setMaxSpoutPen= ding(1);
        final LocalCluster clus= ter =3D new LocalCluster();
        clus= ter.submitTopology(topologyName, conf, topology.build());
  =      Utils.sleep(700 * 1000L);
    = ;    cluster.killTopology(topologyName);
   &= nbsp;    cluster.shutdown();
    }
=
    public static TridentTopology buildTopology() {        final TridentTopology topology =3D= new TridentTopology();
        
&n= bsp;       // this spout will output 2 tuples for = the first batch (one A and one B)
       &nbs= p;final Stream startStream =3D topology.newStream("start", new FixedBatchSp= out(
             &n= bsp;  new Fields("key", "type"),
      =          2,
    &nb= sp;           new Values(1, "A"),               &= nbsp;new Values(1, "B")));

        = ;// "A" branch
        Stream AStream = =3D startStream
           &nb= sp;    .filter(keepOnlyType("A"))
    &n= bsp;           .peek(printTuples("S= tream A after filter"))
         &n= bsp;      .each(addField("computed A value"), new Field= s("val"))
            &nb= sp;   .peek(printTuples("Stream A end of branch"));
 &n= bsp;              // tupl= es have "key", "type" and "val" fields

     =    // "B" branch
        Strea= m BStream =3D startStream
         =        .filter(keepOnlyType("B"))
  = ;              .peek(prin= tTuples("Stream B after filter"))
       &nbs= p;        .partitionPersist(
  = ;               &nbs= p;      new MemoryMapState.Factory(),
  = ;               &nbs= p;      new Fields("key", "type"),
   &n= bsp;               &= nbsp;    reemitTuples(), // state operator which simply retu= rns the input tuple
          = ;              new Fields= ("key", "type"))
           &n= bsp;    .newValuesStream()
     &nb= sp;          .shuffle() // <<<&= lt;<<<<<<<<<<<<<  when this shu= ffle is present, the topology no longer works
     = ;           .each(addField("compute= d B value"), new Fields("val"))
        =         .parallelismHint(2) // I would like m= ultiple instances of the above computation (addField), therefore I need to = shuffle() earlier
           &= nbsp;    .peek(printTuples("Stream B end of branch"));
=                 = ;// tuples have "key", "type" and "val" fields

   &nbs= p;    // merge the two branches together
   &= nbsp;    topology.merge(AStream, BStream)
   =              .peek(printT= uples("Merged stream"));

        r= eturn topology;
    }
    
&nb= sp;   /** Returns a Filter which keeps only tuples with a given '= type' field */
    static BaseFilter keepOnlyType(Strin= g t) {
        return new BaseFilter() {=
            @Override            public boolean = isKeep(TridentTuple tuple) {
        &nb= sp;       return t.equals(tuple.getStringByField("= type"));
            }        };
    }
=     
    /** Returns a Consumer which pr= ints all tuples, along with a given message */
    stat= ic Consumer printTuples(String msg) {
       =  return new Consumer() {
        &n= bsp;   @Override
         = ;   public void accept(TridentTuple input) {
   &n= bsp;            System.out.pri= ntln(msg + " : " + input);
         = ;   }
        };
 &n= bsp;  }
    
    /** Returns a= function which adds a field with a given value */
    = static BaseFunction addField(String val) {
     &n= bsp;  return new BaseFunction() {
      =      @Override
       &n= bsp;    public void execute(TridentTuple tuple, TridentColle= ctor collector) {
           &= nbsp;    collector.emit(new Values(val));
   =          }
     = ;   };
    }
    
&n= bsp;   /** Returns a StateUpdater which always returns all input = tuples as-is */
    static StateUpdater<MemoryMapSta= te<Integer>> reemitTuples() {
      =  return new StateUpdater<MemoryMapState<Integer>>() {
            @Override
&= nbsp;           public void prepare= (Map conf, TridentOperationContext context) {}
    &nbs= p;       @Override
     &= nbsp;      public void cleanup() {}
   &= nbsp;        @Override
   &nbs= p;        public void updateState(MemoryMapSt= ate<Integer> state, List<TridentTuple> tuples, TridentCollector= collector) {
            = ;    for (TridentTuple t : tuples) {
    = ;               &nbs= p;collector.emit(t);
          = ;      }
        &nb= sp;   }
        };
 =    }
}

 
--=_a0540fe9c97129a84c23487558eb7f52--