apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2006) Stream API Design
Date Tue, 17 May 2016 23:53:13 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15287915#comment-15287915
] 

ASF GitHub Bot commented on APEXMALHAR-2006:
--------------------------------------------

Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/261#discussion_r63624361
  
    --- Diff: stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
---
    @@ -0,0 +1,361 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.stream.api.operator;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.stream.api.function.Function;
    +import org.apache.commons.io.IOUtils;
    +
    +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader;
    +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassWriter;
    +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Operators that wrap the functions
    + */
    +public class FunctionOperator<OUT> implements Operator
    +{
    +  private byte[] annonymousFunctionClass;
    +
    +  protected transient Function statelessF;
    +
    +  protected Function statefulF;
    +
    +  protected boolean stateful = false;
    +
    +  protected boolean isAnnonymous = false;
    +
    +  public final transient DefaultOutputPort<OUT> output = new DefaultOutputPort<>();
    +
    +  public FunctionOperator(Function f)
    +  {
    +    isAnnonymous = f.getClass().isAnonymousClass();
    +    if (isAnnonymous) {
    +      annonymousFunctionClass = functionClassData(f);
    +    } else if (f instanceof Function.Stateful) {
    +      statelessF = f;
    +    } else {
    +      statefulF = f;
    +      stateful = true;
    +    }
    +  }
    +
    +  private byte[] functionClassData(Function f)
    +  {
    +    Class<Function> classT = (Class<Function>)f.getClass();
    +
    +    byte[] classBytes = null;
    +    byte[] classNameBytes = null;
    +    String className = classT.getName();
    +    try {
    +      classNameBytes = className.replace('.', '/').getBytes();
    +      classBytes = IOUtils.toByteArray(classT.getClassLoader().getResourceAsStream(className.replace('.',
'/') + ".class"));
    +      int cursor = 0;
    +      for (int j = 0; j < classBytes.length; j++) {
    +        if (classBytes[j] != classNameBytes[cursor]) {
    +          cursor = 0;
    +        } else {
    +          cursor++;
    +        }
    +
    +        if (cursor == classNameBytes.length) {
    +          for (int p = 0; p < classNameBytes.length; p++) {
    +            if (classBytes[j - p] == '$') {
    +              classBytes[j - p] = '_';
    +            }
    +          }
    +          cursor = 0;
    +        }
    +      }
    +      ClassReader cr = new ClassReader(new ByteArrayInputStream(classBytes));
    +      ClassWriter cw = new ClassWriter(0);
    +      AnnonymousClassModifier annonymousClassModifier = new AnnonymousClassModifier(Opcodes.ASM4,
cw);
    +      cr.accept(annonymousClassModifier, 0);
    +      classBytes = cw.toByteArray();
    +    } catch (IOException e) {
    +      throw new RuntimeException(e);
    +    }
    +    int dataLength = classNameBytes.length + 4 + 4;
    +
    +    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(dataLength);
    +    DataOutputStream output = new DataOutputStream(byteArrayOutputStream);
    +
    +    try {
    +      output.writeInt(classNameBytes.length);
    +      output.write(className.replace('$', '_').getBytes());
    +      output.writeInt(classBytes.length);
    +      output.write(classBytes);
    +    } catch (IOException e) {
    +      DTThrowable.rethrow(e);
    +    } finally {
    +      try {
    +        output.flush();
    +        output.close();
    +      } catch (IOException e) {
    +        DTThrowable.rethrow(e);
    +      }
    +    }
    +
    +    return byteArrayOutputStream.toByteArray();
    +
    +  }
    +
    +  /**
    +   * Default constructor to make kryo happy
    +   */
    +  public FunctionOperator()
    +  {
    +
    +  }
    +
    +  @Override
    +  public void beginWindow(long l)
    +  {
    +
    +  }
    +
    +  @Override
    +  public void endWindow()
    +  {
    +
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    readFunction();
    +  }
    +
    +  private void readFunction()
    +  {
    +    try {
    +      if (statelessF != null || statefulF != null) {
    +        return;
    +      }
    +      DataInputStream input = new DataInputStream(new ByteArrayInputStream(annonymousFunctionClass));
    +      byte[] classNameBytes = new byte[input.readInt()];
    +      input.read(classNameBytes);
    +      String className = new String(classNameBytes);
    +      byte[] classData = new byte[input.readInt()];
    +      input.read(classData);
    +      Map<String, byte[]> classBin = new HashMap<>();
    +      classBin.put(className, classData);
    +      ByteArrayClassLoader byteArrayClassLoader = new ByteArrayClassLoader(classBin,
Thread.currentThread().getContextClassLoader());
    +      statelessF = (Function)byteArrayClassLoader.findClass(className).newInstance();
    +    } catch (Exception e) {
    +      DTThrowable.rethrow(e);
    --- End diff --
    
    DTThrowable shouldn't be used


> Stream API Design
> -----------------
>
>                 Key: APEXMALHAR-2006
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2006
>             Project: Apache Apex Malhar
>          Issue Type: Sub-task
>            Reporter: Siyuan Hua
>            Assignee: Siyuan Hua
>             Fix For: 3.4.0
>
>
> Construct DAG in a similar way as Flink/Spark Streaming



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message