apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sandeepdeshmukh <...@git.apache.org>
Subject [GitHub] incubator-apex-malhar pull request: APEXMALHAR-2023 Enricher
Date Mon, 25 Apr 2016 07:29:28 GMT
Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60871105
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java ---
    @@ -0,0 +1,264 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +
    +/**
    + * This class takes a POJO as input and extract the value of the lookupKey configured
    + * for this operator. It then does a lookup in file/DB to find matching entry and all
key-value pairs
    + * specified in the file/DB or based on include fieldMap are added to original tuple.
    + * This operator is App Builder schema support enabled. <br>
    + * <p>
    + * Properties:<br>
    + * <b>inputClass</b>: Class to be loaded for the incoming data type<br>
    + * <b>outputClass</b>: Class to be loaded for the emitted data type<br>
    + * <br>
    + * <p>
    + * Example
    + * The file contains data in json format, one entry per line. during setup entire file
is read and
    + * kept in memory for quick lookup.
    + * If file contains following lines, and operator is configured with lookup key "productId"
    + * { "productId": 1, "productCategory": 3 }
    + * { "productId": 4, "productCategory": 10 }
    + * { "productId": 3, "productCategory": 1 }
    + * <p>
    + * And input tuple is
    + * { amount=10.0, channelId=4, productId=3 }
    + * <p>
    + * The tuple is modified as below before operator emits it on output port.
    + * { amount=10.0, channelId=4, productId=3, productCategory=1 }
    + *
    + * @displayName BeanEnrichment
    + * @category Database
    + * @tags enrichment, pojo, schema, lookup
    + */
    +@InterfaceStability.Evolving
    +public class POJOEnricher extends AbstractEnricher<Object, Object>
    +{
    +  private static final Logger logger = LoggerFactory.getLogger(POJOEnricher.class);
    +
    +  /**
    +   * Helper fields
    +   */
    +  protected Class<?> inputClass;
    +  protected Class<?> outputClass;
    +  private transient Map<PojoUtils.Getter, PojoUtils.Setter> fieldMap = new HashMap<>();
    +  private transient List<PojoUtils.Setter> includeSetters = new ArrayList<>();
    +  private transient List<PojoUtils.Getter> lookupGetters = new ArrayList<>();
    +
    +  @InputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      inputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +
    +    @Override
    +    public void process(Object object)
    +    {
    +      processTuple(object);
    +    }
    +  };
    +
    +  @OutputPortFieldAnnotation(schemaRequired = true)
    +  public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>()
    +  {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
    +    }
    +  };
    +
    +  protected void processTuple(Object object)
    +  {
    +    enrichTuple(object);
    +  }
    +
    +  @Override
    +  protected Object getKey(Object tuple)
    +  {
    +    ArrayList<Object> keyList = new ArrayList<>();
    +    for (PojoUtils.Getter lookupGetter : lookupGetters) {
    +      keyList.add(lookupGetter.get(tuple));
    +    }
    +    return keyList;
    +  }
    +
    +  @Override
    +  protected Object convert(Object in, Object cached)
    +  {
    +    Object o;
    +
    +    try {
    +      o = outputClass.newInstance();
    +    } catch (InstantiationException | IllegalAccessException e) {
    +      logger.error("Failed to create new instance of output POJO", e);
    +      return null;
    +    }
    +
    +    for (Map.Entry<PojoUtils.Getter, PojoUtils.Setter> entry : fieldMap.entrySet())
{
    +      entry.getValue().set(o, entry.getKey().get(in));
    +    }
    +
    +    if (cached == null) {
    +      return null;
    +    }
    +
    +    ArrayList<Object> includeObjects = (ArrayList<Object>)cached;
    +    int idx = 0;
    +    for (PojoUtils.Setter includeSetter : includeSetters) {
    +      try {
    +        includeSetter.set(o, includeObjects.get(idx++));
    +      } catch (RuntimeException e) {
    +        logger.error("Failed to set the property. Continuing with default.", e);
    +      }
    --- End diff --
    
    Use for loop with idx;
    If error, redirect to error port and metric++


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message