flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Date Thu, 24 Aug 2017 21:54:43 GMT
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4355#discussion_r135128323
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
---
    @@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int iValue, int
iWeight) {
     			accumulator.count -= iWeight;
     		}
     	}
    +
    +	/**
    +	 * CountDistinct accumulator.
    +	 */
    +	public static class CountDistinctAccum {
    +		public MapView<String, Integer> map;
    +		public long count;
    +	}
    +
    +	/**
    +	 * CountDistinct aggregate.
    +	 */
    +	public static class CountDistinct extends AggregateFunction<Long, CountDistinctAccum>
{
    +
    +		@Override
    +		public CountDistinctAccum createAccumulator() {
    +			CountDistinctAccum accum = new CountDistinctAccum();
    +			accum.map = new MapView<>(Types.STRING, Types.INT);
    +			accum.count = 0L;
    +			return accum;
    +		}
    +
    +		//Overloaded accumulate method
    +		public void accumulate(CountDistinctAccum accumulator, String id) {
    +			try {
    +				if (!accumulator.map.contains(id)) {
    +					accumulator.map.put(id, 1);
    +					accumulator.count += 1;
    +				}
    +			} catch (Exception e) {
    +				e.printStackTrace();
    +			}
    +		}
    +
    +		//Overloaded accumulate method
    +		public void accumulate(CountDistinctAccum accumulator, long id) {
    +			try {
    +				if (!accumulator.map.contains(String.valueOf(id))) {
    +					accumulator.map.put(String.valueOf(id), 1);
    +					accumulator.count += 1;
    +				}
    +			} catch (Exception e) {
    +				e.printStackTrace();
    +			}
    +		}
    +
    +		@Override
    +		public Long getValue(CountDistinctAccum accumulator) {
    +			return accumulator.count;
    +		}
    +	}
    +
    +	/**
    +	 * CountDistinct aggregate with merge.
    +	 */
    +	public static class CountDistinctWithMerge extends CountDistinct {
    +
    +		//Overloaded merge method
    +		public void merge(CountDistinctAccum acc, Iterable<CountDistinctAccum> it) {
    +			Iterator<CountDistinctAccum> iter = it.iterator();
    +			while (iter.hasNext()) {
    +				CountDistinctAccum mergeAcc = iter.next();
    +				acc.count += mergeAcc.count;
    +
    +				try {
    +					Iterator<String> mapItr = mergeAcc.map.keys().iterator();
    +					while (mapItr.hasNext()) {
    +						String key = mapItr.next();
    +						if (!acc.map.contains(key)) {
    +							acc.map.put(key, 1);
    +						}
    +					}
    +				} catch (Exception e) {
    +					e.printStackTrace();
    +				}
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * CountDistinct aggregate with merge and reset.
    +	 */
    +	public static class CountDistinctWithMergeAndReset extends CountDistinctWithMerge {
    +
    +		//Overloaded retract method
    +		public void resetAccumulator(CountDistinctAccum acc) {
    +			acc.map.clear();
    +			acc.count = 0;
    +		}
    +	}
    +
    +	/**
    +	 * CountDistinct aggregate with retract.
    +	 */
    +	public static class CountDistinctWithRetractAndReset extends CountDistinct {
    +
    +		//Overloaded retract method
    +		public void retract(CountDistinctAccum accumulator, long id) {
    +			try {
    +				if (!accumulator.map.contains(String.valueOf(id))) {
    +					accumulator.map.remove(String.valueOf(id));
    --- End diff --
    
    shouldn't a count distinct with retraction increment the counter value in the MapView
in `accumulate` and decrement the counter in `retract`? Only when the counter reaches 0, the
map entry should be removed and `accumulator.count` be decremented.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message