streams-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (STREAMS-60) WebHdfsReader should set timestamp when reading documents
Date Mon, 19 May 2014 19:53:40 GMT

    [ https://issues.apache.org/jira/browse/STREAMS-60?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14002254#comment-14002254
] 

ASF GitHub Bot commented on STREAMS-60:
---------------------------------------

Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/12#discussion_r12812755
  
    --- Diff: streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
---
    @@ -0,0 +1,149 @@
    +package org.apache.streams.json;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ArrayNode;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Lists;
    +import com.jayway.jsonpath.JsonPath;
    +import net.minidev.json.JSONArray;
    +import net.minidev.json.JSONObject;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.streams.core.StreamsDatum;
    +import org.apache.streams.core.StreamsProcessor;
    +import org.apache.streams.jackson.StreamsJacksonMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Created by sblackmon on 12/10/13.
    + */
    +public class JsonPathFilter implements StreamsProcessor {
    +
    +    public JsonPathFilter() {
    +        System.out.println("creating JsonPathFilter");
    +    }
    +
    +    private final static String STREAMS_ID = "JsonPathFilter";
    +
    +    private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathFilter.class);
    +
    +    private ObjectMapper mapper = new StreamsJacksonMapper();
    +
    +    private String pathExpression;
    +    private JsonPath jsonPath;
    +    private String destNodeName;
    +
    +    @Override
    +    public List<StreamsDatum> process(StreamsDatum entry) {
    +
    +        List<StreamsDatum> result = Lists.newArrayList();
    +
    +        String json = null;
    +
    +        ObjectNode document = null;
    +
    +        LOGGER.debug("{} processing {}", STREAMS_ID);
    +
    +        if( entry.getDocument() instanceof ObjectNode ) {
    +            document = (ObjectNode) entry.getDocument();
    +            try {
    +                json = mapper.writeValueAsString(document);
    +            } catch (JsonProcessingException e) {
    +                e.printStackTrace();
    +            }
    +        } else if( entry.getDocument() instanceof String ) {
    +            json = (String) entry.getDocument();
    +            try {
    +                document = mapper.readValue(json, ObjectNode.class);
    +            } catch (IOException e) {
    +                e.printStackTrace();
    +                return null;
    +            }
    +        }
    +
    +        Preconditions.checkNotNull(document);
    +
    +        if( StringUtils.isNotEmpty(json)) {
    +
    +            Object srcResult = null;
    +            try {
    +                srcResult = jsonPath.read(json);
    +
    +            } catch( Exception e ) {
    +                e.printStackTrace();
    +                LOGGER.warn(e.getMessage());
    +            }
    +
    +            Preconditions.checkNotNull(srcResult);
    +
    +            String[] path = StringUtils.split(pathExpression, '.');
    +            ObjectNode node = document;
    +            for (int i = 1; i < path.length-1; i++) {
    +                node = (ObjectNode) document.get(path[i]);
    +            }
    +
    +            Preconditions.checkNotNull(node);
    +
    +            if( srcResult instanceof JSONArray ) {
    +                try {
    +                    ArrayNode jsonNode = mapper.convertValue(srcResult, ArrayNode.class);
    +                    if( jsonNode.size() == 1 ) {
    --- End diff --
    
    Both use cases are supported and tested - extracting String fields as well as extracting
objects.


> WebHdfsReader should set timestamp when reading documents
> ---------------------------------------------------------
>
>                 Key: STREAMS-60
>                 URL: https://issues.apache.org/jira/browse/STREAMS-60
>             Project: Streams
>          Issue Type: Improvement
>            Reporter: Steve Blackmon
>
> WebHdfsReader should set timestamp when reading documents



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message