flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Window Function on AllWindowed Stream - Combining Kafka Topics
Date Tue, 13 Jun 2017 08:13:37 GMT
Hi,

As a simple test, you can put your key extraction logic into a MapFunction, i.e. MapFunction<T
extends SignalSet<?>, Tuple2<String, T>> and then simply use that field as
the key:

input
  .map(new MyKeyExtractorMapper())
  .keyBy(0)

If that solves your problem it means that the key extraction is not deterministic. This is
a problem because getKey() is called at different points in time and when the result is not
always the same you will get that error.

Best,
Aljoscha
> On 12. Jun 2017, at 22:04, Meera <mvengadasubbu@ebay.com> wrote:
> 
> Did this problem get resolved  
> 
> - I am running into this problem when I parallelize the tasks 
> Unexpected key group index. This indicates a bug.
> 
> - it runs fine on 1 parallelism. This suggests there is some key grouping
> issue - I checked my Watermark and KeySelector - they look okay.
> 
> The snippet of my KeySelector and Watermark attached to the KeyedStream. 
> public class DimensionKeySelector<T extends SignalSet<?>> implements
> KeySelector<T, String> {
> 
> 	private static final long serialVersionUID = 7666263008141606451L;
> 	private final String[] dimKeys;
> 
> 	public DimensionKeySelector(Map<String, String> conf) {
> 		if (conf.containsKey("dimKeys") == false) {
> 			throw new RuntimeException("Required 'dimKeys' missing.");
> 		}
> 		this.dimKeys = conf.get("dimKeys").split(",");
> 	}
> 
> 	@Override
> 	public String getKey(T signalSet) throws Exception {
> 		StringBuffer group = new StringBuffer(signalSet.namespace());
> 		if (signalSet.size() != 0) {
> 			for (String dim : dimKeys) {
> 				if (signalSet.dimensions().containsKey(dim)) {
> 					group.append(signalSet.dimensions().get(dim));
> 				}
> 			}
> 		}
> 		return group.toString();
> 	}
> }
> 
> and Watermark
> public class WaterMarks extends
> BoundedOutOfOrdernessTimestampExtractor<MetricSignalSet> {
> 
>    public WaterMarks(Time maxOutOfOrderness) {
>        super(maxOutOfOrderness);
>    }
> 
>    private static final long serialVersionUID = 1L;
> 
>    @Override
>    public long extractTimestamp(MetricSignalSet element) {
>        return element.get(0).timestamp().getTime();
>    }
> }
> 
> Any thoughts?
> 
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-Function-on-AllWindowed-Stream-Combining-Kafka-Topics-tp12941p13663.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Mime
View raw message