spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From JayKay <>
Subject Sharing object/state accross transformations
Date Wed, 02 Dec 2015 10:54:40 GMT
I'm new to Apache Spark and an absolute beginner. I'm playing around with
Spark Streaming (API version 1.5.1) in Java and want to implement a
prototype which uses HyperLogLog to estimate distinct elements. I use the
stream-lib from clearspring ( 

I planned to use updateStateByKey to hold a global state over all events.
The problem is that for every call of the specified function, my HLL returns
a 1 (it seems to use a new instance of my HLL object every time). Same
problem occurs with a simple, global integer variable which I tried to
increment in every function call. This also has always the initial value in

This is a code snippet where I define the update function:

Function2<List&lt;String>, Optional<Long>, Optional<Long>> hllCountFunction
= new Function2<List&lt;String>, Optional<Long>, Optional<Long>>()
	public Optional<Long> call(List<String> values, Optional<Long> state)
throws Exception { -> hll.offer(value));
		long newState = state.isPresent() ? hll.cardinality() : 0;
		return Optional.of(newState);

And this is the snippet how I use the function:

JavaPairDStream<String, Long> hllCounts = fullvisitorids.mapToPair(new
PairFunction<String, String, String>() {
	public Tuple2<String, String> call(String value) {
		return new Tuple2<String, String>("key", value);

After a lot of research I found the concept of Accumulators. Do I need to
specify a custom Accumulator by extending the Accumulator class (in Java)? I
also read that for transformations this only should be used for debugging

So how can I achive to use one global defined HLL-object in a spark stream
transformation? I also tried to implement a custom Accumulator but this also
failed because I don't get how to use the AccumulableParam interface. I
implemented the Accumulator and overwrote the add and value methods. But
what do I have to do in the AccumulableParam with addAccumulator, addInPlace
and zero?

Thanks in advance for your help and your advice!

View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message