streams-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (STREAMS-221) Simple HTTP Persist Writer
Date Fri, 05 Dec 2014 22:13:13 GMT

    [ https://issues.apache.org/jira/browse/STREAMS-221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14236200#comment-14236200
] 

ASF GitHub Bot commented on STREAMS-221:
----------------------------------------

Github user robdouglas commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/153#discussion_r21404184
  
    --- Diff: streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
---
    @@ -0,0 +1,189 @@
    +package org.apache.streams.components.http.persist;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Maps;
    +import org.apache.http.HttpEntity;
    +import org.apache.http.client.methods.CloseableHttpResponse;
    +import org.apache.http.client.methods.HttpPost;
    +import org.apache.http.client.utils.URIBuilder;
    +import org.apache.http.entity.StringEntity;
    +import org.apache.http.impl.client.CloseableHttpClient;
    +import org.apache.http.impl.client.HttpClients;
    +import org.apache.http.util.EntityUtils;
    +import org.apache.streams.components.http.HttpConfigurator;
    +import org.apache.streams.components.http.HttpPersistWriterConfiguration;
    +import org.apache.streams.config.StreamsConfigurator;
    +import org.apache.streams.core.StreamsDatum;
    +import org.apache.streams.core.StreamsPersistWriter;
    +import org.apache.streams.jackson.StreamsJacksonMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.UnsupportedEncodingException;
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.util.Map;
    +
    +/**
    + * Created by steve on 11/12/14.
    + */
    +public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter {
    +
    +    private final static String STREAMS_ID = "SimpleHTTPPostPersistWriter";
    +
    +    private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPPostPersistWriter.class);
    +
    +    protected ObjectMapper mapper;
    +
    +    protected URIBuilder uriBuilder;
    +
    +    protected CloseableHttpClient httpclient;
    +
    +    protected HttpPersistWriterConfiguration configuration;
    +
    +    public SimpleHTTPPostPersistWriter() {
    +        this(HttpConfigurator.detectPersistWriterConfiguration(StreamsConfigurator.config.getConfig("http")));
    +    }
    +
    +    public SimpleHTTPPostPersistWriter(HttpPersistWriterConfiguration configuration)
{
    +        this.configuration = configuration;
    +    }
    +
    +
    +    @Override
    +    public void write(StreamsDatum entry) {
    +
    +        ObjectNode payload = preparePayload(entry);
    +
    +        Map<String, String> params = prepareParams(entry);
    +
    +        URI uri = prepareURI(params);
    +
    +        HttpPost httppost = prepareHttpPost(uri, payload);
    +
    +        ObjectNode result = executePost(httppost);
    +
    +        try {
    +            LOGGER.debug(mapper.writeValueAsString(result));
    +        } catch (JsonProcessingException e) {
    +            LOGGER.warn("Non-json response", e.getMessage());
    +        }
    +    }
    +
    +    /**
    +     Override this to alter request URI
    +     */
    +    protected URI prepareURI(Map<String, String> params) {
    +        URI uri = null;
    +        for( Map.Entry<String,String> param : params.entrySet()) {
    +            uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue());
    +        }
    +        try {
    +            uri = uriBuilder.build();
    +        } catch (URISyntaxException e) {
    +            LOGGER.error("URI error {}", uriBuilder.toString());
    +        }
    +        return uri;
    +    }
    +
    +    /**
    +     Override this to add parameters to the request
    +     */
    +    protected Map<String, String> prepareParams(StreamsDatum entry) {
    +
    +        return Maps.newHashMap();
    +    }
    +
    +    /**
    +     Override this to alter json payload on to the request
    +     */
    +    protected ObjectNode preparePayload(StreamsDatum entry) {
    +
    +        return (ObjectNode) entry.getDocument();
    +    }
    +
    +    /**
    +     Override this to add headers to the request
    +     */
    +    public HttpPost prepareHttpPost(URI uri, ObjectNode payload) {
    +        HttpPost httppost = new HttpPost(uri);
    +        httppost.addHeader("content-type", this.configuration.getContentType());
    +// TODO: add support for authentication
    +//        if( !Strings.isNullOrEmpty(authHeader))
    +//            httpget.addHeader("Authorization", String.format("Basic %s", authHeader));
    +        try {
    +            String entity = mapper.writeValueAsString(payload);
    +            httppost.setEntity(new StringEntity(entity));
    +        } catch (JsonProcessingException e) {
    +            e.printStackTrace();
    +        } catch (UnsupportedEncodingException e) {
    +            e.printStackTrace();
    +        }
    +        return httppost;
    +    }
    +
    +    protected ObjectNode executePost(HttpPost httpPost) {
    +
    +        Preconditions.checkNotNull(httpPost);
    +
    +        ObjectNode result = null;
    +
    +        CloseableHttpResponse response = null;
    +
    +        String entityString = null;
    +        try {
    +            response = httpclient.execute(httpPost);
    +            HttpEntity entity = response.getEntity();
    +            // TODO: handle retry
    +            if (response.getStatusLine() != null && response.getStatusLine().getStatusCode()
>= 200 && entity != null) {
    --- End diff --
    
    Is there a response code enum we could use instead of '200'?


> Simple HTTP Persist Writer
> --------------------------
>
>                 Key: STREAMS-221
>                 URL: https://issues.apache.org/jira/browse/STREAMS-221
>             Project: Streams
>          Issue Type: New Feature
>            Reporter: Steve Blackmon
>            Assignee: Steve Blackmon
>
> Add a utility persist writer that posts documents to a remote end-point.
> This writer should be able to pass messages between streams pipelines executing on other
hosts in conjunction with STREAMS-222



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message