storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Peter Neubauer <>
Subject Querying partitioned state from within the same topology
Date Wed, 10 Feb 2016 08:43:00 GMT
Hi there,
I'm trying to wrap my head around the best way to query state that has
been partitioned from other parts of the topology.

Basically, I have something like the state factory

StateFactory imageMapStateFactory = ELSIndexMapState.transactional(
        new ClientFactory.Transport(),
        new TopLevelTxIDSerializer(topoName, MapImageBasicData.class)

and then a stream doing persistentAggregate using it:

TridentState imageMapState = kafkaStream
        .groupBy(new Fields(INDEX, TYPE, ID))
        .persistentAggregate(imageMapStateFactory, new
Fields(IMAGE_ID, EVENT), new MapImageUpdateAggregator(), new

Now, in some other part of the topo, I need to ask the imageMapState
for images (which might be cached and updated by the modifying
stream). Since this state is partitioned by INDEX, TYPE, ID, I'm not
sure how to do it. Should I reuse the imageMapState from above and the
grouping is automatically correctly discovered and routed to the
appropriate partitioned instance, or do I have to do something else in
order to ask the same partitioned state instances that hold the
updated data?

//Validate and Persist ChangesetCreated Events into ELS
Stream validChangesetsStream = metaCheckedStream
        .groupBy(new Fields(INDEX, TYPE, ID))
        .stateQuery(imageMapState, new Fields(IMAGE_ID), new
ELSQueryMapImageByIdQueryFunction(), new Fields(IMAGE))
        .each(new Fields(IMAGE_ID, EVENT, IMAGE,
PREVIOUS_STATE_UPDATE), new ValidateImageLocationChangesFunction(),

Thanks for any hints!


G:  neubauer.peter
S:  peter.neubauer
P:  +46 704 106975
T:   @peterneubauer

Mapillary - Join the greatest expedition of our time.

View raw message