Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 601DB200D34 for ; Fri, 3 Nov 2017 16:17:07 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5EA94160BE9; Fri, 3 Nov 2017 15:17:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 08F3F160C02 for ; Fri, 3 Nov 2017 16:17:04 +0100 (CET) Received: (qmail 51579 invoked by uid 500); 3 Nov 2017 15:17:03 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 49918 invoked by uid 99); 3 Nov 2017 15:17:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Nov 2017 15:17:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CA1C5E04F3; Fri, 3 Nov 2017 15:17:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Fri, 03 Nov 2017 15:17:13 -0000 Message-Id: <409e99f009ca4065a09771946569a2d5@git.apache.org> In-Reply-To: <8c6bf9b1f4a04c2795f029adfdbc9d17@git.apache.org> References: <8c6bf9b1f4a04c2795f029adfdbc9d17@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/51] [partial] hbase-site git commit: Published site at . archived-at: Fri, 03 Nov 2017 15:17:07 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/7d38bdbb/devapidocs/src-html/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.AbstractAggregationCallback.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.AbstractAggregationCallback.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.AbstractAggregationCallback.html index cece735..cb909d9 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.AbstractAggregationCallback.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.AbstractAggregationCallback.html @@ -28,446 +28,458 @@ 020import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance; 021import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB; 022 -023import java.io.IOException; -024import java.util.Map; -025import java.util.NavigableMap; -026import java.util.NavigableSet; -027import java.util.NoSuchElementException; -028import java.util.TreeMap; -029import java.util.concurrent.CompletableFuture; -030 -031import org.apache.hadoop.hbase.Cell; -032import org.apache.hadoop.hbase.client.RawAsyncTable; -033import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallback; -034import org.apache.hadoop.hbase.client.RawScanResultConsumer; -035import org.apache.hadoop.hbase.client.RegionInfo; -036import org.apache.hadoop.hbase.client.Result; -037import org.apache.hadoop.hbase.client.Scan; -038import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; -039import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; -040import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; -041import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; -042import org.apache.hadoop.hbase.util.Bytes; -043import org.apache.hadoop.hbase.util.ReflectionUtils; -044import org.apache.yetus.audience.InterfaceAudience; -045 -046import com.google.protobuf.Message; -047 -048/** -049 * This client class is for invoking the aggregate functions deployed on the Region Server side via -050 * the AggregateService. This class will implement the supporting functionality for -051 * summing/processing the individual results obtained from the AggregateService for each region. -052 */ -053@InterfaceAudience.Public -054public class AsyncAggregationClient { -055 -056 private static abstract class AbstractAggregationCallback<T> -057 implements CoprocessorCallback<AggregateResponse> { -058 -059 private final CompletableFuture<T> future; -060 -061 protected boolean finished = false; -062 -063 private void completeExceptionally(Throwable error) { -064 if (finished) { -065 return; -066 } -067 finished = true; -068 future.completeExceptionally(error); -069 } -070 -071 protected AbstractAggregationCallback(CompletableFuture<T> future) { -072 this.future = future; -073 } -074 -075 @Override -076 public synchronized void onRegionError(RegionInfo region, Throwable error) { -077 completeExceptionally(error); -078 } -079 -080 @Override -081 public synchronized void onError(Throwable error) { -082 completeExceptionally(error); -083 } -084 -085 protected abstract void aggregate(RegionInfo region, AggregateResponse resp) -086 throws IOException; -087 -088 @Override -089 public synchronized void onRegionComplete(RegionInfo region, AggregateResponse resp) { -090 try { -091 aggregate(region, resp); -092 } catch (IOException e) { -093 completeExceptionally(e); -094 } -095 } -096 -097 protected abstract T getFinalResult(); -098 -099 @Override -100 public synchronized void onComplete() { -101 if (finished) { -102 return; -103 } -104 finished = true; -105 future.complete(getFinalResult()); -106 } -107 } -108 -109 private static <R, S, P extends Message, Q extends Message, T extends Message> R -110 getCellValueFromProto(ColumnInterpreter<R, S, P, Q, T> ci, AggregateResponse resp, -111 int firstPartIndex) throws IOException { -112 Q q = getParsedGenericInstance(ci.getClass(), 3, resp.getFirstPart(firstPartIndex)); -113 return ci.getCellValueFromProto(q); -114 } -115 -116 private static <R, S, P extends Message, Q extends Message, T extends Message> S -117 getPromotedValueFromProto(ColumnInterpreter<R, S, P, Q, T> ci, AggregateResponse resp, -118 int firstPartIndex) throws IOException { -119 T t = getParsedGenericInstance(ci.getClass(), 4, resp.getFirstPart(firstPartIndex)); -120 return ci.getPromotedValueFromProto(t); -121 } -122 -123 public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R> -124 max(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { -125 CompletableFuture<R> future = new CompletableFuture<>(); -126 AggregateRequest req; -127 try { -128 req = validateArgAndGetPB(scan, ci, false); -129 } catch (IOException e) { -130 future.completeExceptionally(e); -131 return future; -132 } -133 AbstractAggregationCallback<R> callback = new AbstractAggregationCallback<R>(future) { -134 -135 private R max; -136 -137 @Override -138 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { -139 if (resp.getFirstPartCount() > 0) { -140 R result = getCellValueFromProto(ci, resp, 0); -141 if (max == null || (result != null && ci.compare(max, result) < 0)) { -142 max = result; -143 } -144 } -145 } -146 -147 @Override -148 protected R getFinalResult() { -149 return max; +023import com.google.protobuf.Message; +024 +025import java.io.IOException; +026import java.util.Map; +027import java.util.NavigableMap; +028import java.util.NavigableSet; +029import java.util.NoSuchElementException; +030import java.util.TreeMap; +031import java.util.concurrent.CompletableFuture; +032 +033import org.apache.hadoop.hbase.Cell; +034import org.apache.hadoop.hbase.HConstants; +035import org.apache.hadoop.hbase.client.RawAsyncTable; +036import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallback; +037import org.apache.hadoop.hbase.client.RawScanResultConsumer; +038import org.apache.hadoop.hbase.client.RegionInfo; +039import org.apache.hadoop.hbase.client.Result; +040import org.apache.hadoop.hbase.client.Scan; +041import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; +042import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; +043import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; +044import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; +045import org.apache.hadoop.hbase.util.Bytes; +046import org.apache.hadoop.hbase.util.ReflectionUtils; +047import org.apache.yetus.audience.InterfaceAudience; +048 +049/** +050 * This client class is for invoking the aggregate functions deployed on the Region Server side via +051 * the AggregateService. This class will implement the supporting functionality for +052 * summing/processing the individual results obtained from the AggregateService for each region. +053 */ +054@InterfaceAudience.Public +055public class AsyncAggregationClient { +056 +057 private static abstract class AbstractAggregationCallback<T> +058 implements CoprocessorCallback<AggregateResponse> { +059 +060 private final CompletableFuture<T> future; +061 +062 protected boolean finished = false; +063 +064 private void completeExceptionally(Throwable error) { +065 if (finished) { +066 return; +067 } +068 finished = true; +069 future.completeExceptionally(error); +070 } +071 +072 protected AbstractAggregationCallback(CompletableFuture<T> future) { +073 this.future = future; +074 } +075 +076 @Override +077 public synchronized void onRegionError(RegionInfo region, Throwable error) { +078 completeExceptionally(error); +079 } +080 +081 @Override +082 public synchronized void onError(Throwable error) { +083 completeExceptionally(error); +084 } +085 +086 protected abstract void aggregate(RegionInfo region, AggregateResponse resp) +087 throws IOException; +088 +089 @Override +090 public synchronized void onRegionComplete(RegionInfo region, AggregateResponse resp) { +091 try { +092 aggregate(region, resp); +093 } catch (IOException e) { +094 completeExceptionally(e); +095 } +096 } +097 +098 protected abstract T getFinalResult(); +099 +100 @Override +101 public synchronized void onComplete() { +102 if (finished) { +103 return; +104 } +105 finished = true; +106 future.complete(getFinalResult()); +107 } +108 } +109 +110 private static <R, S, P extends Message, Q extends Message, T extends Message> R +111 getCellValueFromProto(ColumnInterpreter<R, S, P, Q, T> ci, AggregateResponse resp, +112 int firstPartIndex) throws IOException { +113 Q q = getParsedGenericInstance(ci.getClass(), 3, resp.getFirstPart(firstPartIndex)); +114 return ci.getCellValueFromProto(q); +115 } +116 +117 private static <R, S, P extends Message, Q extends Message, T extends Message> S +118 getPromotedValueFromProto(ColumnInterpreter<R, S, P, Q, T> ci, AggregateResponse resp, +119 int firstPartIndex) throws IOException { +120 T t = getParsedGenericInstance(ci.getClass(), 4, resp.getFirstPart(firstPartIndex)); +121 return ci.getPromotedValueFromProto(t); +122 } +123 +124 private static byte[] nullToEmpty(byte[] b) { +125 return b != null ? b : HConstants.EMPTY_BYTE_ARRAY; +126 } +127 +128 public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R> +129 max(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { +130 CompletableFuture<R> future = new CompletableFuture<>(); +131 AggregateRequest req; +132 try { +133 req = validateArgAndGetPB(scan, ci, false); +134 } catch (IOException e) { +135 future.completeExceptionally(e); +136 return future; +137 } +138 AbstractAggregationCallback<R> callback = new AbstractAggregationCallback<R>(future) { +139 +140 private R max; +141 +142 @Override +143 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { +144 if (resp.getFirstPartCount() > 0) { +145 R result = getCellValueFromProto(ci, resp, 0); +146 if (max == null || (result != null && ci.compare(max, result) < 0)) { +147 max = result; +148 } +149 } 150 } -151 }; -152 table.coprocessorService(channel -> AggregateService.newStub(channel), -153 (stub, controller, rpcCallback) -> stub.getMax(controller, req, rpcCallback), -154 scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), -155 callback); -156 return future; -157 } -158 -159 public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R> -160 min(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { -161 CompletableFuture<R> future = new CompletableFuture<>(); -162 AggregateRequest req; -163 try { -164 req = validateArgAndGetPB(scan, ci, false); -165 } catch (IOException e) { -166 future.completeExceptionally(e); -167 return future; -168 } -169 AbstractAggregationCallback<R> callback = new AbstractAggregationCallback<R>(future) { -170 -171 private R min; -172 -173 @Override -174 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { -175 if (resp.getFirstPartCount() > 0) { -176 R result = getCellValueFromProto(ci, resp, 0); -177 if (min == null || (result != null && ci.compare(min, result) > 0)) { -178 min = result; -179 } -180 } -181 } -182 -183 @Override -184 protected R getFinalResult() { -185 return min; -186 } -187 }; -188 table.coprocessorService(channel -> AggregateService.newStub(channel), -189 (stub, controller, rpcCallback) -> stub.getMin(controller, req, rpcCallback), -190 scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), -191 callback); -192 return future; -193 } -194 -195 public static <R, S, P extends Message, Q extends Message, T extends Message> -196 CompletableFuture<Long> -197 rowCount(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { -198 CompletableFuture<Long> future = new CompletableFuture<>(); -199 AggregateRequest req; -200 try { -201 req = validateArgAndGetPB(scan, ci, true); -202 } catch (IOException e) { -203 future.completeExceptionally(e); -204 return future; -205 } -206 AbstractAggregationCallback<Long> callback = new AbstractAggregationCallback<Long>(future) { -207 -208 private long count; -209 -210 @Override -211 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { -212 count += resp.getFirstPart(0).asReadOnlyByteBuffer().getLong(); -213 } +151 +152 @Override +153 protected R getFinalResult() { +154 return max; +155 } +156 }; +157 table +158 .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, +159 (stub, controller, rpcCallback) -> stub.getMax(controller, req, rpcCallback), callback) +160 .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) +161 .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); +162 return future; +163 } +164 +165 public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R> +166 min(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { +167 CompletableFuture<R> future = new CompletableFuture<>(); +168 AggregateRequest req; +169 try { +170 req = validateArgAndGetPB(scan, ci, false); +171 } catch (IOException e) { +172 future.completeExceptionally(e); +173 return future; +174 } +175 AbstractAggregationCallback<R> callback = new AbstractAggregationCallback<R>(future) { +176 +177 private R min; +178 +179 @Override +180 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { +181 if (resp.getFirstPartCount() > 0) { +182 R result = getCellValueFromProto(ci, resp, 0); +183 if (min == null || (result != null && ci.compare(min, result) > 0)) { +184 min = result; +185 } +186 } +187 } +188 +189 @Override +190 protected R getFinalResult() { +191 return min; +192 } +193 }; +194 table +195 .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, +196 (stub, controller, rpcCallback) -> stub.getMin(controller, req, rpcCallback), callback) +197 .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) +198 .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); +199 return future; +200 } +201 +202 public static <R, S, P extends Message, Q extends Message, T extends Message> +203 CompletableFuture<Long> +204 rowCount(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { +205 CompletableFuture<Long> future = new CompletableFuture<>(); +206 AggregateRequest req; +207 try { +208 req = validateArgAndGetPB(scan, ci, true); +209 } catch (IOException e) { +210 future.completeExceptionally(e); +211 return future; +212 } +213 AbstractAggregationCallback<Long> callback = new AbstractAggregationCallback<Long>(future) { 214 -215 @Override -216 protected Long getFinalResult() { -217 return count; -218 } -219 }; -220 table.coprocessorService(channel -> AggregateService.newStub(channel), -221 (stub, controller, rpcCallback) -> stub.getRowNum(controller, req, rpcCallback), -222 scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), -223 callback); -224 return future; -225 } -226 -227 public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<S> -228 sum(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { -229 CompletableFuture<S> future = new CompletableFuture<>(); -230 AggregateRequest req; -231 try { -232 req = validateArgAndGetPB(scan, ci, false); -233 } catch (IOException e) { -234 future.completeExceptionally(e); -235 return future; -236 } -237 AbstractAggregationCallback<S> callback = new AbstractAggregationCallback<S>(future) { -238 -239 private S sum; -240 -241 @Override -242 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { -243 if (resp.getFirstPartCount() > 0) { -244 S s = getPromotedValueFromProto(ci, resp, 0); -245 sum = ci.add(sum, s); -246 } -247 } +215 private long count; +216 +217 @Override +218 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { +219 count += resp.getFirstPart(0).asReadOnlyByteBuffer().getLong(); +220 } +221 +222 @Override +223 protected Long getFinalResult() { +224 return count; +225 } +226 }; +227 table +228 .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, +229 (stub, controller, rpcCallback) -> stub.getRowNum(controller, req, rpcCallback), callback) +230 .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) +231 .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); +232 return future; +233 } +234 +235 public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<S> +236 sum(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { +237 CompletableFuture<S> future = new CompletableFuture<>(); +238 AggregateRequest req; +239 try { +240 req = validateArgAndGetPB(scan, ci, false); +241 } catch (IOException e) { +242 future.completeExceptionally(e); +243 return future; +244 } +245 AbstractAggregationCallback<S> callback = new AbstractAggregationCallback<S>(future) { +246 +247 private S sum; 248 249 @Override -250 protected S getFinalResult() { -251 return sum; -252 } -253 }; -254 table.coprocessorService(channel -> AggregateService.newStub(channel), -255 (stub, controller, rpcCallback) -> stub.getSum(controller, req, rpcCallback), -256 scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), -257 callback); -258 return future; -259 } -260 -261 public static <R, S, P extends Message, Q extends Message, T extends Message> -262 CompletableFuture<Double> -263 avg(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { -264 CompletableFuture<Double> future = new CompletableFuture<>(); -265 AggregateRequest req; -266 try { -267 req = validateArgAndGetPB(scan, ci, false); -268 } catch (IOException e) { -269 future.completeExceptionally(e); -270 return future; -271 } -272 AbstractAggregationCallback<Double> callback = new AbstractAggregationCallback<Double>(future) { -273 -274 private S sum; -275 -276 long count = 0L; -277 -278 @Override -279 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { -280 if (resp.getFirstPartCount() > 0) { -281 sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0)); -282 count += resp.getSecondPart().asReadOnlyByteBuffer().getLong(); -283 } -284 } -285 -286 @Override -287 protected Double getFinalResult() { -288 return ci.divideForAvg(sum, count); -289 } -290 }; -291 table.coprocessorService(channel -> AggregateService.newStub(channel), -292 (stub, controller, rpcCallback) -> stub.getAvg(controller, req, rpcCallback), -293 scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), -294 callback); -295 return future; -296 } -297 -298 public static <R, S, P extends Message, Q extends Message, T extends Message> -299 CompletableFuture<Double> -300 std(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { -301 CompletableFuture<Double> future = new CompletableFuture<>(); -302 AggregateRequest req; -303 try { -304 req = validateArgAndGetPB(scan, ci, false); -305 } catch (IOException e) { -306 future.completeExceptionally(e); -307 return future; -308 } -309 AbstractAggregationCallback<Double> callback = new AbstractAggregationCallback<Double>(future) { -310 -311 private S sum; -312 -313 private S sumSq; -314 -315 private long count; -316 -317 @Override -318 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { -319 if (resp.getFirstPartCount() > 0) { -320 sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0)); -321 sumSq = ci.add(sumSq, getPromotedValueFromProto(ci, resp, 1)); -322 count += resp.getSecondPart().asReadOnlyByteBuffer().getLong(); -323 } -324 } -325 -326 @Override -327 protected Double getFinalResult() { -328 double avg = ci.divideForAvg(sum, count); -329 double avgSq = ci.divideForAvg(sumSq, count); -330 return Math.sqrt(avgSq - avg * avg); -331 } -332 }; -333 table.coprocessorService(channel -> AggregateService.newStub(channel), -334 (stub, controller, rpcCallback) -> stub.getStd(controller, req, rpcCallback), -335 scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), -336 callback); -337 return future; -338 } -339 -340 // the map key is the startRow of the region -341 private static <R, S, P extends Message, Q extends Message, T extends Message> -342 CompletableFuture<NavigableMap<byte[], S>> -343 sumByRegion(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { -344 CompletableFuture<NavigableMap<byte[], S>> future = -345 new CompletableFuture<NavigableMap<byte[], S>>(); -346 AggregateRequest req; -347 try { -348 req = validateArgAndGetPB(scan, ci, false); -349 } catch (IOException e) { -350 future.completeExceptionally(e); -351 return future; -352 } -353 int firstPartIndex = scan.getFamilyMap().get(scan.getFamilies()[0]).size() - 1; -354 AbstractAggregationCallback<NavigableMap<byte[], S>> callback = -355 new AbstractAggregationCallback<NavigableMap<byte[], S>>(future) { -356 -357 private final NavigableMap<byte[], S> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); -358 -359 @Override -360 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { -361 if (resp.getFirstPartCount() > 0) { -362 map.put(region.getStartKey(), getPromotedValueFromProto(ci, resp, firstPartIndex)); -363 } -364 } -365 -366 @Override -367 protected NavigableMap<byte[], S> getFinalResult() { -368 return map; -369 } -370 }; -371 table.coprocessorService(channel -> AggregateService.newStub(channel), -372 (stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback), -373 scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), -374 callback); -375 return future; -376 } -377 -378 private static <R, S, P extends Message, Q extends Message, T extends Message> void findMedian( -379 CompletableFuture<R> future, RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, -380 Scan scan, NavigableMap<byte[], S> sumByRegion) { -381 double halfSum = ci.divideForAvg(sumByRegion.values().stream().reduce(ci::add).get(), 2L); -382 S movingSum = null; -383 byte[] startRow = null; -384 for (Map.Entry<byte[], S> entry : sumByRegion.entrySet()) { -385 startRow = entry.getKey(); -386 S newMovingSum = ci.add(movingSum, entry.getValue()); -387 if (ci.divideForAvg(newMovingSum, 1L) > halfSum) { -388 break; -389 } -390 movingSum = newMovingSum; -391 } -392 if (startRow != null) { -393 scan.withStartRow(startRow); -394 } -395 // we can not pass movingSum directly to an anonymous class as it is not final. -396 S baseSum = movingSum; -397 byte[] family = scan.getFamilies()[0]; -398 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family); -399 byte[] weightQualifier = qualifiers.last(); -400 byte[] valueQualifier = qualifiers.first(); -401 table.scan(scan, new RawScanResultConsumer() { -402 -403 private S sum = baseSum; -404 -405 private R value = null; -406 -407 @Override -408 public void onNext(Result[] results, ScanController controller) { -409 try { -410 for (Result result : results) { -411 Cell weightCell = result.getColumnLatestCell(family, weightQualifier); -412 R weight = ci.getValue(family, weightQualifier, weightCell); -413 sum = ci.add(sum, ci.castToReturnType(weight)); -414 if (ci.divideForAvg(sum, 1L) > halfSum) { -415 if (value != null) { -416 future.complete(value); -417 } else { -418 future.completeExceptionally(new NoSuchElementException()); -419 } -420 controller.terminate(); -421 return; -422 } -423 Cell valueCell = result.getColumnLatestCell(family, valueQualifier); -424 value = ci.getValue(family, valueQualifier, valueCell); -425 } -426 } catch (IOException e) { -427 future.completeExceptionally(e); -428 controller.terminate(); -429 } -430 } -431 -432 @Override -433 public void onError(Throwable error) { -434 future.completeExceptionally(error); -435 } -436 -437 @Override -438 public void onComplete() { -439 if (!future.isDone()) { -440 // we should not reach here as the future should be completed in onNext. -441 future.completeExceptionally(new NoSuchElementException()); -442 } -443 } -444 }); -445 } -446 -447 public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R> -448 median(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { -449 CompletableFuture<R> future = new CompletableFuture<>(); -450 sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> { -451 if (error != null) { -452 future.completeExceptionally(error); -453 } else if (sumByRegion.isEmpty()) { -454 future.completeExceptionally(new NoSuchElementException()); -455 } else { -456 findMedian(future, table, ci, ReflectionUtils.newInstance(scan.getClass(), scan), -457 sumByRegion); -458 } -459 }); -460 return future; -461 } -462} +250 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { +251 if (resp.getFirstPartCount() > 0) { +252 S s = getPromotedValueFromProto(ci, resp, 0); +253 sum = ci.add(sum, s); +254 } +255 } +256 +257 @Override +258 protected S getFinalResult() { +259 return sum; +260 } +261 }; +262 table +263 .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, +264 (stub, controller, rpcCallback) -> stub.getSum(controller, req, rpcCallback), callback) +265 .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) +266 .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); +267 return future; +268 } +269 +270 public static <R, S, P extends Message, Q extends Message, T extends Message> +271 CompletableFuture<Double> +272 avg(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { +273 CompletableFuture<Double> future = new CompletableFuture<>(); +274 AggregateRequest req; +275 try { +276 req = validateArgAndGetPB(scan, ci, false); +277 } catch (IOException e) { +278 future.completeExceptionally(e); +279 return future; +280 } +281 AbstractAggregationCallback<Double> callback = new AbstractAggregationCallback<Double>(future) { +282 +283 private S sum; +284 +285 long count = 0L; +286 +287 @Override +288 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { +289 if (resp.getFirstPartCount() > 0) { +290 sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0)); +291 count += resp.getSecondPart().asReadOnlyByteBuffer().getLong(); +292 } +293 } +294 +295 @Override +296 protected Double getFinalResult() { +297 return ci.divideForAvg(sum, count); +298 } +299 }; +300 table +301 .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, +302 (stub, controller, rpcCallback) -> stub.getAvg(controller, req, rpcCallback), callback) +303 .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) +304 .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); +305 return future; +306 } +307 +308 public static <R, S, P extends Message, Q extends Message, T extends Message> +309 CompletableFuture<Double> +310 std(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { +311 CompletableFuture<Double> future = new CompletableFuture<>(); +312 AggregateRequest req; +313 try { +314 req = validateArgAndGetPB(scan, ci, false); +315 } catch (IOException e) { +316 future.completeExceptionally(e); +317 return future; +318 } +319 AbstractAggregationCallback<Double> callback = new AbstractAggregationCallback<Double>(future) { +320 +321 private S sum; +322 +323 private S sumSq; +324 +325 private long count; +326 +327 @Override +328 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { +329 if (resp.getFirstPartCount() > 0) { +330 sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0)); +331 sumSq = ci.add(sumSq, getPromotedValueFromProto(ci, resp, 1)); +332 count += resp.getSecondPart().asReadOnlyByteBuffer().getLong(); +333 } +334 } +335 +336 @Override +337 protected Double getFinalResult() { +338 double avg = ci.divideForAvg(sum, count); +339 double avgSq = ci.divideForAvg(sumSq, count); +340 return Math.sqrt(avgSq - avg * avg); +341 } +342 }; +343 table +344 .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, +345 (stub, controller, rpcCallback) -> stub.getStd(controller, req, rpcCallback), callback) +346 .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) +347 .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); +348 return future; +349 } +350 +351 // the map key is the startRow of the region +352 private static <R, S, P extends Message, Q extends Message, T extends Message> +353 CompletableFuture<NavigableMap<byte[], S>> +354 sumByRegion(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { +355 CompletableFuture<NavigableMap<byte[], S>> future = +356 new CompletableFuture<NavigableMap<byte[], S>>(); +357 AggregateRequest req; +358 try { +359 req = validateArgAndGetPB(scan, ci, false); +360 } catch (IOException e) { +361 future.completeExceptionally(e); +362 return future; +363 } +364 int firstPartIndex = scan.getFamilyMap().get(scan.getFamilies()[0]).size() - 1; +365 AbstractAggregationCallback<NavigableMap<byte[], S>> callback = +366 new AbstractAggregationCallback<NavigableMap<byte[], S>>(future) { +367 +368 private final NavigableMap<byte[], S> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); +369 +370 @Override +371 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { +372 if (resp.getFirstPartCount() > 0) { +373 map.put(region.getStartKey(), getPromotedValueFromProto(ci, resp, firstPartIndex)); +374 } +375 } +376 +377 @Override +378 protected NavigableMap<byte[], S> getFinalResult() { +379 return map; +380 } +381 }; +382 table +383 .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, +384 (stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback), callback) +385 .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) +386 .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); +387 return future; +388 } +389 +390 private static <R, S, P extends Message, Q extends Message, T extends Message> void findMedian( +391 CompletableFuture<R> future, RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, +392 Scan scan, NavigableMap<byte[], S> sumByRegion) { +393 double halfSum = ci.divideForAvg(sumByRegion.values().stream().reduce(ci::add).get(), 2L); +394 S movingSum = null; +395 byte[] startRow = null; +396 for (Map.Entry<byte[], S> entry : sumByRegion.entrySet()) { +397 startRow = entry.getKey(); +398 S newMovingSum = ci.add(movingSum, entry.getValue()); +399 if (ci.divideForAvg(newMovingSum, 1L) > halfSum) { +400 break; +401 } +402 movingSum = newMovingSum; +403 } +404 if (startRow != null) { +405 scan.withStartRow(startRow); +406 } +407 // we can not pass movingSum directly to an anonymous class as it is not final. +408 S baseSum = movingSum; +409 byte[] family = scan.getFamilies()[0]; +410 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family); +411 byte[] weightQualifier = qualifiers.last(); +412 byte[] valueQualifier = qualifiers.first(); +413 table.scan(scan, new RawScanResultConsumer() { +414 +415 private S sum = baseSum; +416 +417 private R value = null; +418 +419 @Override +420 public void onNext(Result[] results, ScanController controller) { +421 try { +422 for (Result result : results) { +423 Cell weightCell = result.getColumnLatestCell(family, weightQualifier); +424 R weight = ci.getValue(family, weightQualifier, weightCell); +425 sum = ci.add(sum, ci.castToReturnType(weight)); +426 if (ci.divideForAvg(sum, 1L) > halfSum) { +427 if (value != null) { +428 future.complete(value); +429 } else { +430 future.completeExceptionally(new NoSuchElementException()); +431 } +432 controller.terminate(); +433 return; +434 } +435 Cell valueCell = result.getColumnLatestCell(family, valueQualifier); +436 value = ci.getValue(family, valueQualifier, valueCell); +437 } +438 } catch (IOException e) { +439 future.completeExceptionally(e); +440 controller.terminate(); +441 } +442 } +443 +444 @Override +445 public void onError(Throwable error) { +446 future.completeExceptionally(error); +447 } +448 +449 @Override +450 public void onComplete() { +451 if (!future.isDone()) { +452 // we should not reach here as the future should be completed in onNext. +453 future.completeExceptionally(new NoSuchElementException()); +454 } +455 } +456 }); +457 } +458 +459 public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R> +460 median(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { +461 CompletableFuture<R> future = new CompletableFuture<>(); +462 sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> { +463 if (error != null) { +464 future.completeExceptionally(error); +465 } else if (sumByRegion.isEmpty()) { +466 future.completeExceptionally(new NoSuchElementException()); +467 } else { +468 findMedian(future, table, ci, ReflectionUtils.newInstance(scan.getClass(), scan), +469 sumByRegion); +470 } +471 }); +472 return future; +473 } +474}