streams-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (STREAMS-273) Support POST endpoints in streams-http
Date Tue, 31 Mar 2015 23:24:54 GMT

    [ https://issues.apache.org/jira/browse/STREAMS-273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14389658#comment-14389658
] 

ASF GitHub Bot commented on STREAMS-273:
----------------------------------------

Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/206#discussion_r27532980
  
    --- Diff: streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
---
    @@ -0,0 +1,278 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.streams.components.http.processor;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import com.google.common.base.Strings;
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.commons.codec.binary.Base64;
    +import org.apache.http.HttpEntity;
    +import org.apache.http.client.methods.CloseableHttpResponse;
    +import org.apache.http.client.methods.HttpGet;
    +import org.apache.http.client.methods.HttpPost;
    +import org.apache.http.client.utils.URIBuilder;
    +import org.apache.http.entity.ContentType;
    +import org.apache.http.entity.StringEntity;
    +import org.apache.http.impl.client.CloseableHttpClient;
    +import org.apache.http.impl.client.HttpClients;
    +import org.apache.http.util.EntityUtils;
    +import org.apache.streams.components.http.HttpConfigurator;
    +import org.apache.streams.components.http.HttpProcessorConfiguration;
    +import org.apache.streams.config.StreamsConfigurator;
    +import org.apache.streams.core.StreamsDatum;
    +import org.apache.streams.core.StreamsProcessor;
    +import org.apache.streams.jackson.StreamsJacksonMapper;
    +import org.apache.streams.pojo.extensions.ExtensionUtil;
    +import org.apache.streams.pojo.json.ActivityObject;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Processor retrieves contents from an known url and stores the resulting object in
an extension field
    + */
    +public class SimpleHTTPPostProcessor implements StreamsProcessor {
    +
    +    private final static String STREAMS_ID = "SimpleHTTPPostProcessor";
    +
    +    private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPPostProcessor.class);
    +
    +    protected ObjectMapper mapper;
    +
    +    protected URIBuilder uriBuilder;
    +
    +    protected CloseableHttpClient httpclient;
    +
    +    protected HttpProcessorConfiguration configuration;
    +
    +    protected String authHeader;
    +
    +    public SimpleHTTPPostProcessor() {
    +        this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("http")));
    +    }
    +
    +    public SimpleHTTPPostProcessor(HttpProcessorConfiguration processorConfiguration)
{
    +        LOGGER.info("creating SimpleHTTPPostProcessor");
    +        LOGGER.info(processorConfiguration.toString());
    +        this.configuration = processorConfiguration;
    +    }
    +
    +
    +    /**
    +     Override this to store a result other than exact json representation of response
    +     */
    +    protected ObjectNode prepareExtensionFragment(String entityString) {
    +
    +        try {
    +            return mapper.readValue(entityString, ObjectNode.class);
    +        } catch (IOException e) {
    +            LOGGER.warn(e.getMessage());
    +            return null;
    +        }
    +    }
    +
    +    /**
    +     Override this to place result in non-standard location on document
    +     */
    +    protected ObjectNode getRootDocument(StreamsDatum datum) {
    +
    +        try {
    +            String json = datum.getDocument() instanceof String ?
    +                    (String) datum.getDocument() :
    +                    mapper.writeValueAsString(datum.getDocument());
    +            return mapper.readValue(json, ObjectNode.class);
    +        } catch (JsonProcessingException e) {
    +            LOGGER.warn(e.getMessage());
    +            return null;
    +        } catch (IOException e) {
    +            LOGGER.warn(e.getMessage());
    +            return null;
    +        }
    +
    +    }
    +
    +    /**
    +     Override this to place result in non-standard location on document
    +     */
    +    protected ActivityObject getEntityToExtend(ObjectNode rootDocument) {
    +
    +        if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
    +            return mapper.convertValue(rootDocument, ActivityObject.class);
    +        else
    +            return mapper.convertValue(rootDocument.get(this.configuration.getEntity().toString()),
ActivityObject.class);
    +
    +    }
    +
    +    /**
    +     Override this to place result in non-standard location on document
    +     */
    +    protected ObjectNode setEntityToExtend(ObjectNode rootDocument, ActivityObject activityObject)
{
    +
    +        if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
    +            return mapper.convertValue(activityObject, ObjectNode.class);
    +        else
    +            rootDocument.set(this.configuration.getEntity().toString(), mapper.convertValue(activityObject,
ObjectNode.class));
    +
    +        return rootDocument;
    +
    +    }
    +
    +    @Override
    +    public List<StreamsDatum> process(StreamsDatum entry) {
    +
    +        List<StreamsDatum> result = Lists.newArrayList();
    +
    +        ObjectNode rootDocument = getRootDocument(entry);
    +
    +        Map<String, String> params = prepareParams(entry);
    +
    +        URI uri;
    +        for( Map.Entry<String,String> param : params.entrySet()) {
    +            uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue());
    +        }
    +        try {
    +            uri = uriBuilder.build();
    +        } catch (URISyntaxException e) {
    +            LOGGER.error("URI error {}", uriBuilder.toString());
    --- End diff --
    
    fixed.


> Support POST endpoints in streams-http
> --------------------------------------
>
>                 Key: STREAMS-273
>                 URL: https://issues.apache.org/jira/browse/STREAMS-273
>             Project: Streams
>          Issue Type: Improvement
>            Reporter: Steve Blackmon
>            Assignee: Steve Blackmon
>
> Endpoints with require POST of a document rather than GET using parameters should be
supported in streams-http.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message