apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2023) Adding Enrichment Operator to Malhar
Date Mon, 25 Apr 2016 07:53:12 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15256027#comment-15256027
] 

ASF GitHub Bot commented on APEXMALHAR-2023:
--------------------------------------------

Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60873281
  
    --- Diff: contrib/src/test/java/com/datatorrent/contrib/enrich/MapEnricherTest.java ---
    @@ -0,0 +1,271 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.commons.collections.CollectionUtils;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.collect.Maps;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.lib.io.ConsoleOutputOperator;
    +import com.datatorrent.lib.testbench.CollectorTestSink;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.TestUtils;
    +
    +public class MapEnricherTest
    +{
    +  @Test
    +  public void includeAllKeys()
    +  {
    +    MapEnricher oper = new MapEnricher();
    +    oper.setStore(new MemoryStore());
    +    oper.setLookupFields(Arrays.asList("In1"));
    +    oper.setup(null);
    +
    +    CollectorTestSink sink = new CollectorTestSink();
    +    TestUtils.setSink(oper.output, sink);
    +
    +    Map<String, Object> inMap = Maps.newHashMap();
    +    inMap.put("In1", "Value1");
    +    inMap.put("In2", "Value2");
    +
    +    oper.activate(null);
    +    oper.beginWindow(1);
    +    oper.input.process(inMap);
    +    oper.endWindow();
    +    oper.deactivate();
    +
    +    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
    +    Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, C=Val_C, In2=Value2, In1=Value3}",
    +        sink.collectedTuples.get(0).toString());
    +  }
    +
    +  @Test
    +  public void includeSelectedKeys()
    +  {
    +    MapEnricher oper = new MapEnricher();
    +    oper.setStore(new MemoryStore());
    +    oper.setLookupFields(Arrays.asList("In1"));
    +    oper.setIncludeFields(Arrays.asList("A", "B"));
    +    oper.setup(null);
    +
    +    CollectorTestSink sink = new CollectorTestSink();
    +    TestUtils.setSink(oper.output, sink);
    +
    +    Map<String, Object> inMap = Maps.newHashMap();
    +    inMap.put("In1", "Value1");
    +    inMap.put("In2", "Value2");
    +
    +    oper.activate(null);
    +    oper.beginWindow(1);
    +    oper.input.process(inMap);
    +    oper.endWindow();
    +    oper.deactivate();
    +
    +    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
    +    Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, In2=Value2, In1=Value1}",
    +        sink.collectedTuples.get(0).toString());
    +  }
    +
    +  @Test
    +  public void testApplication() throws Exception
    +  {
    +    LocalMode lma = LocalMode.newInstance();
    +    Configuration conf = new Configuration(false);
    +    lma.prepareDAG(new EnrichApplication(), conf);
    +    LocalMode.Controller lc = lma.getController();
    +    lc.run(10000);// runs for 10 seconds and quits
    +  }
    +
    +  public static class EnrichApplication implements StreamingApplication
    +  {
    +    @Override
    +    public void populateDAG(DAG dag, Configuration configuration)
    +    {
    +      RandomMapGenerator input = dag.addOperator("Input", RandomMapGenerator.class);
    +      MapEnricher enrich = dag.addOperator("Enrich", MapEnricher.class);
    +      ConsoleOutputOperator console = dag.addOperator("Console", ConsoleOutputOperator.class);
    +      console.setSilent(true);
    +
    +      List<String> includeFields = new ArrayList<>();
    +      includeFields.add("A");
    +      includeFields.add("B");
    +      List<String> lookupFields = new ArrayList<>();
    +      lookupFields.add("In1");
    +
    +      enrich.setStore(new MemoryStore());
    +      enrich.setIncludeFields(includeFields);
    +      enrich.setLookupFields(lookupFields);
    +
    +      dag.addStream("S1", input.output, enrich.input);
    +      dag.addStream("S2", enrich.output, console.input);
    +    }
    +  }
    +
    +  public static class RandomMapGenerator implements InputOperator
    --- End diff --
    
    extends BaseOperator and implements InputOperator?


> Adding Enrichment Operator to Malhar
> ------------------------------------
>
>                 Key: APEXMALHAR-2023
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2023
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>          Components: adapters database
>    Affects Versions: 3.3.1
>            Reporter: Chinmay Kolhatkar
>            Assignee: Chinmay Kolhatkar
>
> Add Enrichment Operator to Apex Malhar.
> Discussion is happening in mailing list here:
> http://mail-archives.apache.org/mod_mbox/incubator-apex-dev/201603.mbox/%3CCAKJfLDMo24-Gcvum2ZL8-0JOnE8QLryAy0Zu_R5zhMd_bsJyHw%40mail.gmail.com%3E
> Ponymail permalink:
> https://pony-poc.apache.org/thread.html/Z8t5ut5pu5vprgt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message