streams-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (STREAMS-134) Complete percolate tagger
Date Tue, 05 Aug 2014 13:13:13 GMT

    [ https://issues.apache.org/jira/browse/STREAMS-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14086242#comment-14086242
] 

ASF GitHub Bot commented on STREAMS-134:
----------------------------------------

Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/61#discussion_r15810538
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
---
    @@ -0,0 +1,384 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.streams.elasticsearch.processor;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ArrayNode;
    +import com.fasterxml.jackson.databind.node.JsonNodeFactory;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Lists;
    +import org.apache.streams.core.StreamsDatum;
    +import org.apache.streams.core.StreamsProcessor;
    +import org.apache.streams.data.util.ActivityUtil;
    +import org.apache.streams.elasticsearch.ElasticsearchClientManager;
    +import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
    +import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
    +import org.apache.streams.jackson.StreamsJacksonMapper;
    +import org.apache.streams.pojo.json.Activity;
    +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
    +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
    +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
    +import org.elasticsearch.action.bulk.BulkItemResponse;
    +import org.elasticsearch.action.bulk.BulkRequestBuilder;
    +import org.elasticsearch.action.bulk.BulkResponse;
    +import org.elasticsearch.action.index.IndexRequest;
    +import org.elasticsearch.action.index.IndexRequestBuilder;
    +import org.elasticsearch.action.percolate.PercolateRequestBuilder;
    +import org.elasticsearch.action.percolate.PercolateResponse;
    +import org.elasticsearch.action.search.SearchRequestBuilder;
    +import org.elasticsearch.action.search.SearchResponse;
    +import org.elasticsearch.index.query.QueryBuilders;
    +import org.elasticsearch.index.query.QueryStringQueryBuilder;
    +import org.elasticsearch.search.SearchHit;
    +import org.elasticsearch.search.SearchHits;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.*;
    +
    +/**
    + * References:
    + * Some helpful references to help
    + * Purpose              URL
    + * -------------        ----------------------------------------------------------------
    + * [Status Codes]       http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
    + * [Test Cases]         http://greenbytes.de/tech/tc/httpredirects/
    + * [t.co behavior]      https://dev.twitter.com/docs/tco-redirection-behavior
    + */
    +
    +public class PercolateTagProcessor implements StreamsProcessor {
    +
    +    public static final String STREAMS_ID = "PercolateTagProcessor";
    +
    +    private final static Logger LOGGER = LoggerFactory.getLogger(PercolateTagProcessor.class);
    +
    +    private ObjectMapper mapper;
    +
    +    protected Queue<StreamsDatum> inQueue;
    +    protected Queue<StreamsDatum> outQueue;
    +
    +    public String TAGS_EXTENSION = "tags";
    +
    +    private ElasticsearchWriterConfiguration config;
    +    private ElasticsearchClientManager manager;
    +    private BulkRequestBuilder bulkBuilder;
    +
    +    public PercolateTagProcessor(ElasticsearchWriterConfiguration config) {
    +        this.config = config;
    +    }
    +
    +    public ElasticsearchClientManager getManager() {
    +        return manager;
    +    }
    +
    +    public void setManager(ElasticsearchClientManager manager) {
    +        this.manager = manager;
    +    }
    +
    +    public ElasticsearchConfiguration getConfig() {
    +        return config;
    +    }
    +
    +    public void setConfig(ElasticsearchWriterConfiguration config) {
    +        this.config = config;
    +    }
    +
    +    public Queue<StreamsDatum> getProcessorOutputQueue() {
    +        return outQueue;
    +    }
    +
    +    @Override
    +    public List<StreamsDatum> process(StreamsDatum entry) {
    +
    +        List<StreamsDatum> result = Lists.newArrayList();
    +
    +        String json;
    +        ObjectNode node;
    +        // first check for valid json
    +        if (entry.getDocument() instanceof String) {
    +            json = (String) entry.getDocument();
    +            try {
    +                node = (ObjectNode) mapper.readTree(json);
    +            } catch (IOException e) {
    +                e.printStackTrace();
    +                return null;
    +            }
    +        } else {
    --- End diff --
    
    if you are going to check types, might as well check to make sure this is an ObjectNode.
    
    Longer term, we need a way for processors to describe the types they support.


> Complete percolate tagger
> -------------------------
>
>                 Key: STREAMS-134
>                 URL: https://issues.apache.org/jira/browse/STREAMS-134
>             Project: Streams
>          Issue Type: New Feature
>            Reporter: Steve Blackmon
>
> streams-persist-elasticsearch contains an unfinished class for creating and applying
tags to documents in flight.
> complete, test, and document this feature.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message