Hello,
I'm facing an issue with Trident and would appreciate your
expertise.
I'm separating the tuples output by a spout into two
branches, based on a field value. Each branch features different
processing algorithms and one of my branches has a State operator (a
partitionPersist()). Then at the end I need to merge the two branches
together. This topology does not work when using a partitioning operator
after the State operator, but I don't understand why (bug ?
misunderstanding ?).
The following code is a minimal demonstrator of
the issue.
When the shuffle() line is commented out, I get all the
messages I'm expecting (i.e. the topology works as expected) :
Stream
A after filter : [1, A]
Stream A end of branch : [1, A, computed A
value]
Merged stream : [1, A, computed A value]
Stream B after filter :
[1, B]
STREAM B END OF BRANCH : [1, B, COMPUTED B VALUE]
MERGED STREAM :
[1, B, COMPUTED B VALUE]
But when the shuffle() line is present, the
topology only processes branch B up to the State operator. No "Stream B
end of branch" message appears :
Stream A after filter : [1, A]
Stream
B after filter : [1, B]
Stream A end of branch : [1, A, computed A
value]
Merged stream : [1, A, computed A value]
(1 minute pause, then
Trident figures out that the micro-batch has failed and replays it)
Stream A after filter : [1, A]
Stream B after filter : [1, B]
Stream A
end of branch : [1, A, computed A value]
Merged stream : [1, A, computed
A value]
(...)
Tested with same results under Storm 1.2.3 and 2.0.0.
Thank you,
marc
========================== CODE
================================
import java.util.List;
import
java.util.Map;
import org.apache.storm.Config;
import
org.apache.storm.LocalCluster;
import
org.apache.storm.generated.StormTopology;
import
org.apache.storm.trident.Stream;
import
org.apache.storm.trident.TridentTopology;
import
org.apache.storm.trident.operation.BaseFilter;
import
org.apache.storm.trident.operation.BaseFunction;
import
org.apache.storm.trident.operation.Consumer;
import
org.apache.storm.trident.operation.TridentCollector;
import
org.apache.storm.trident.operation.TridentOperationContext;
import
org.apache.storm.trident.state.StateUpdater;
import
org.apache.storm.trident.testing.FixedBatchSpout;
import
org.apache.storm.trident.testing.MemoryMapState;
import
org.apache.storm.trident.tuple.TridentTuple;
import
org.apache.storm.tuple.Fields;
import
org.apache.storm.tuple.Values;
import
org.apache.storm.utils.Utils;
public class JoinIssueMinimal {
public static void main(final String[] args) throws Exception {
TridentTopology topology = buildTopology();
final String
topologyName = "JoinIssueMinimal";
final Config conf = new
Config();
conf.setMaxSpoutPending(1);
final LocalCluster
cluster = new LocalCluster();
cluster.submitTopology(topologyName, conf, topology.build());
Utils.sleep(700 * 1000L);
cluster.killTopology(topologyName);
cluster.shutdown();
}
public static TridentTopology
buildTopology() {
final TridentTopology topology = new
TridentTopology();
// this spout will output 2 tuples for the
first batch (one A and one B)
final Stream startStream =
topology.newStream("start", new FixedBatchSpout(
new
Fields("key", "type"),
2,
new Values(1,
"A"),
new Values(1, "B")));
// "A" branch
Stream AStream = startStream
.filter(keepOnlyType("A"))
.peek(printTuples("Stream A
after filter"))
.each(addField("computed A value"), new
Fields("val"))
.peek(printTuples("Stream A end of
branch"));
// tuples have "key", "type" and "val"
fields
// "B" branch
Stream BStream = startStream
.filter(keepOnlyType("B"))
.peek(printTuples("Stream B after filter"))
.partitionPersist(
new MemoryMapState.Factory(),
new Fields("key", "type"),
reemitTuples(), // state operator which simply returns the input
tuple
new Fields("key", "type"))
.newValuesStream()
.shuffle() // <<<<<<<<<<<<<<<<<
when this shuffle is present, the topology no longer works
.each(addField("computed B value"), new Fields("val"))
.parallelismHint(2) // I would like multiple instances of the above
computation (addField), therefore I need to shuffle() earlier
.peek(printTuples("Stream B end of branch"));
//
tuples have "key", "type" and "val" fields
// merge the two
branches together
topology.merge(AStream, BStream)
.peek(printTuples("Merged stream"));
return topology;
}
/** Returns a Filter which keeps only tuples with a given 'type'
field */
static BaseFilter keepOnlyType(String t) {
return
new BaseFilter() {
@Override
public boolean
isKeep(TridentTuple tuple) {
return
t.equals(tuple.getStringByField("type"));
}
};
}
/** Returns a Consumer which prints all tuples, along with a
given message */
static Consumer printTuples(String msg) {
return new Consumer() {
@Override
public void
accept(TridentTuple input) {
System.out.println(msg + "
: " + input);
}
};
}
/** Returns a function
which adds a field with a given value */
static BaseFunction
addField(String val) {
return new BaseFunction() {
@Override
public void execute(TridentTuple tuple,
TridentCollector collector) {
collector.emit(new
Values(val));
}
};
}
/** Returns a
StateUpdater which always returns all input tuples as-is */
static
StateUpdater<MemoryMapState<Integer>> reemitTuples() {
return
new StateUpdater<MemoryMapState<Integer>>() {
@Override
public void prepare(Map conf, TridentOperationContext context)
{}
@Override
public void cleanup() {}
@Override
public void updateState(MemoryMapState<Integer>
state, List<TridentTuple> tuples, TridentCollector collector) {
for (TridentTuple t : tuples) {
collector.emit(t);
}
}
};
}
}
|