syncope-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ilgro...@apache.org
Subject [syncope] branch master updated: Fixing tests after last Spring Cloud Gateway version upgrade
Date Wed, 03 Jul 2019 08:49:05 GMT
This is an automated email from the ASF dual-hosted git repository.

ilgrosso pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/syncope.git


The following commit(s) were added to refs/heads/master by this push:
     new d8fd4c9  Fixing tests after last Spring Cloud Gateway version upgrade
d8fd4c9 is described below

commit d8fd4c9cd310909b7bc9e7921c42e1d8a73dbdce
Author: Francesco Chicchiriccò <ilgrosso@apache.org>
AuthorDate: Wed Jul 3 10:48:56 2019 +0200

    Fixing tests after last Spring Cloud Gateway version upgrade
---
 .../BodyPropertyAddingGatewayFilterFactory.java    | 154 ++++++++-------------
 .../BodyPropertyMatchingRoutePredicateFactory.java |  39 ++----
 2 files changed, 62 insertions(+), 131 deletions(-)

diff --git a/sra/src/test/java/org/apache/syncope/sra/BodyPropertyAddingGatewayFilterFactory.java
b/sra/src/test/java/org/apache/syncope/sra/BodyPropertyAddingGatewayFilterFactory.java
index aec3dcc..6352425 100644
--- a/sra/src/test/java/org/apache/syncope/sra/BodyPropertyAddingGatewayFilterFactory.java
+++ b/sra/src/test/java/org/apache/syncope/sra/BodyPropertyAddingGatewayFilterFactory.java
@@ -37,13 +37,8 @@ import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
 import org.springframework.core.Ordered;
 import org.springframework.core.io.buffer.DataBuffer;
 import org.springframework.core.io.buffer.DataBufferFactory;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseCookie;
-import org.springframework.http.client.reactive.ClientHttpResponse;
 import org.springframework.http.server.reactive.ServerHttpResponse;
 import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
-import org.springframework.util.MultiValueMap;
 import org.springframework.web.server.ServerWebExchange;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -81,74 +76,75 @@ public class BodyPropertyAddingGatewayFilterFactory extends CustomGatewayFilterF
 
         @Override
         public Mono<Void> filter(final ServerWebExchange exchange, final GatewayFilterChain
chain) {
+            return chain.filter(exchange.mutate().response(decorate(exchange)).build());
+        }
+
+        private ServerHttpResponse decorate(final ServerWebExchange exchange) {
             ServerHttpResponse originalResponse = exchange.getResponse();
 
             DataBufferFactory bufferFactory = originalResponse.bufferFactory();
-            ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(originalResponse)
{
+            return new ServerHttpResponseDecorator(originalResponse) {
 
                 @Override
                 public Mono<Void> writeWith(Publisher<? extends DataBuffer> body)
{
-                    if (body instanceof Flux) {
-                        Flux<? extends DataBuffer> flux = (Flux<? extends DataBuffer>)
body;
-
-                        return super.writeWith(flux.buffer().map(dataBuffers -> {
-                            ByteArrayOutputStream payload = new ByteArrayOutputStream();
-                            dataBuffers.forEach(buffer -> {
-                                byte[] array = new byte[buffer.readableByteCount()];
-                                buffer.read(array);
-                                try {
-                                    payload.write(array);
-                                } catch (IOException e) {
-                                    LOG.error("While reading original body content", e);
-                                }
-                            });
-
-                            byte[] input = payload.toByteArray();
-
-                            InputStream is = null;
-                            boolean compressed = false;
-                            byte[] output;
+                    return super.writeWith(Flux.from(body).buffer().map(dataBuffers ->
{
+                        ByteArrayOutputStream payload = new ByteArrayOutputStream();
+                        dataBuffers.forEach(buffer -> {
+                            byte[] array = new byte[buffer.readableByteCount()];
+                            buffer.read(array);
                             try {
-                                if (isCompressed(input)) {
-                                    compressed = true;
-                                    is = new GZIPInputStream(new ByteArrayInputStream(input));
-                                } else {
-                                    is = new ByteArrayInputStream(input);
-                                }
-
-                                ObjectNode content = (ObjectNode) MAPPER.readTree(is);
-                                String[] kv = config.getData().split("=");
-                                content.put(kv[0], kv[1]);
-
-                                output = MAPPER.writeValueAsBytes(content);
+                                payload.write(array);
                             } catch (IOException e) {
-                                LOG.error("While (de)serializing as JSON", e);
-                                output = ArrayUtils.clone(input);
-                            } finally {
-                                IOUtils.closeStream(is);
+                                LOG.error("While reading original body content", e);
+                            }
+                        });
+
+                        byte[] input = payload.toByteArray();
+
+                        InputStream is = null;
+                        boolean compressed = false;
+                        byte[] output;
+                        try {
+                            if (isCompressed(input)) {
+                                compressed = true;
+                                is = new GZIPInputStream(new ByteArrayInputStream(input));
+                            } else {
+                                is = new ByteArrayInputStream(input);
                             }
 
-                            if (compressed) {
-                                try (ByteArrayOutputStream baos = new ByteArrayOutputStream(output.length);
-                                        GZIPOutputStream gzipos = new GZIPOutputStream(baos))
{
-
-                                    gzipos.write(output);
-                                    gzipos.close();
-                                    output = baos.toByteArray();
-                                } catch (IOException e) {
-                                    LOG.error("While GZIP-encoding output", e);
-                                }
+                            ObjectNode content = (ObjectNode) MAPPER.readTree(is);
+                            String[] kv = config.getData().split("=");
+                            content.put(kv[0], kv[1]);
+
+                            output = MAPPER.writeValueAsBytes(content);
+                        } catch (IOException e) {
+                            LOG.error("While (de)serializing as JSON", e);
+                            output = ArrayUtils.clone(input);
+                        } finally {
+                            IOUtils.closeStream(is);
+                        }
+
+                        if (compressed) {
+                            try (ByteArrayOutputStream baos = new ByteArrayOutputStream(output.length);
+                                    GZIPOutputStream gzipos = new GZIPOutputStream(baos))
{
+
+                                gzipos.write(output);
+                                gzipos.close();
+                                output = baos.toByteArray();
+                            } catch (IOException e) {
+                                LOG.error("While GZIP-encoding output", e);
                             }
+                        }
 
-                            return bufferFactory.wrap(output);
-                        }));
-                    }
+                        return bufferFactory.wrap(output);
+                    }));
+                }
 
-                    return super.writeWith(body);
+                @Override
+                public Mono<Void> writeAndFlushWith(final Publisher<? extends Publisher<?
extends DataBuffer>> body) {
+                    return writeWith(Flux.from(body).flatMapSequential(p -> p));
                 }
             };
-
-            return chain.filter(exchange.mutate().response(responseDecorator).build());
         }
 
         @Override
@@ -156,46 +152,4 @@ public class BodyPropertyAddingGatewayFilterFactory extends CustomGatewayFilterF
             return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
         }
     }
-
-    public class ResponseAdapter implements ClientHttpResponse {
-
-        private final Flux<DataBuffer> flux;
-
-        private final HttpHeaders headers;
-
-        @SuppressWarnings("unchecked")
-        public ResponseAdapter(final Publisher<? extends DataBuffer> body, final HttpHeaders
headers) {
-            this.headers = headers;
-            if (body instanceof Flux) {
-                flux = (Flux) body;
-            } else {
-                flux = ((Mono) body).flux();
-            }
-        }
-
-        @Override
-        public Flux<DataBuffer> getBody() {
-            return flux;
-        }
-
-        @Override
-        public HttpHeaders getHeaders() {
-            return headers;
-        }
-
-        @Override
-        public HttpStatus getStatusCode() {
-            return null;
-        }
-
-        @Override
-        public int getRawStatusCode() {
-            return 0;
-        }
-
-        @Override
-        public MultiValueMap<String, ResponseCookie> getCookies() {
-            return null;
-        }
-    }
 }
diff --git a/sra/src/test/java/org/apache/syncope/sra/BodyPropertyMatchingRoutePredicateFactory.java
b/sra/src/test/java/org/apache/syncope/sra/BodyPropertyMatchingRoutePredicateFactory.java
index bff3903..9be1ab8 100644
--- a/sra/src/test/java/org/apache/syncope/sra/BodyPropertyMatchingRoutePredicateFactory.java
+++ b/sra/src/test/java/org/apache/syncope/sra/BodyPropertyMatchingRoutePredicateFactory.java
@@ -20,17 +20,12 @@ package org.apache.syncope.sra;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.util.List;
-import org.springframework.cloud.gateway.filter.AdaptCachedBodyGlobalFilter;
 import org.springframework.cloud.gateway.handler.AsyncPredicate;
-import org.springframework.core.io.buffer.DataBuffer;
-import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
 import org.springframework.http.codec.HttpMessageReader;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
 import org.springframework.web.reactive.function.server.HandlerStrategies;
 import org.springframework.web.reactive.function.server.ServerRequest;
 import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -48,31 +43,13 @@ public class BodyPropertyMatchingRoutePredicateFactory extends CustomRoutePredic
         return exchange -> {
             JsonNode cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
             if (cachedBody == null) {
-                // Join all the DataBuffers so we have a single DataBuffer for the body
-                return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer
-> {
-                    // Update the retain counts so we can read the body twice, once to parse
into an object
-                    // that we can test the predicate against and a second time when the
HTTP client sends
-                    // the request downstream 
-                    // Note: if we end up reading the body twice we will run into a problem,
but as of right
-                    // now there is no good use case for doing this
-                    DataBufferUtils.retain(dataBuffer);
-                    // Make a slice for each read so each read has its own read/write indexes
-                    Flux<DataBuffer> cachedFlux = Flux.defer(() -> Flux.just(
-                            dataBuffer.slice(0, dataBuffer.readableByteCount())));
-
-                    ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest())
{
-
-                        @Override
-                        public Flux<DataBuffer> getBody() {
-                            return cachedFlux;
-                        }
-                    };
-                    return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(),
MESSAGE_READERS).
-                            bodyToMono(JsonNode.class).doOnNext(value -> {
-                        exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, value);
-                        exchange.getAttributes().put(AdaptCachedBodyGlobalFilter.CACHED_REQUEST_BODY_KEY,
cachedFlux);
-                    }).map(objectValue -> objectValue.has(config.getData()));
-                });
+                return ServerWebExchangeUtils.cacheRequestBodyAndRequest(
+                        exchange, serverHttpRequest -> ServerRequest.create(
+                                exchange.mutate().request(serverHttpRequest).build(), MESSAGE_READERS).
+                                bodyToMono(JsonNode.class).
+                                doOnNext(objectValue -> exchange.getAttributes().
+                                put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue)).
+                                map(objectValue -> objectValue.has(config.getData())));
             } else {
                 return Mono.just(cachedBody.has(config.getData()));
             }


Mime
View raw message