apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-1963) Add abstract operator for Async Processing
Date Sat, 16 Jan 2016 06:34:39 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-1963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15103027#comment-15103027
] 

ASF GitHub Bot commented on APEXMALHAR-1963:
--------------------------------------------

Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/157#discussion_r49928172
  
    --- Diff: library/src/test/java/com/datatorrent/lib/async/AbstractAsyncProcessorTest.java
---
    @@ -0,0 +1,145 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.async;
    +
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.lib.testbench.CollectorTestSink;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.util.List;
    +
    +public class AbstractAsyncProcessorTest
    +{
    +  public static class TestAsyncProcessor extends AbstractAsyncProcessor<String, String>
    +  {
    +    public transient final DefaultInputPort<String> input = new DefaultInputPort<String>()
    +    {
    +      @Override public void process(String s)
    +      {
    +        enqueueTupleForProcessing(s);
    +      }
    +    };
    +    public transient final DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +    public transient final DefaultOutputPort<String> error = new DefaultOutputPort<>();
    +
    +    @Override protected void handleProcessedTuple(String inpTuple, String resultTuple,
State processState)
    +    {
    +      if (processState == State.SUCCESS) {
    +        output.emit(resultTuple);
    +      } else {
    +        error.emit(inpTuple);
    +      }
    +    }
    +
    +    @Override protected String processTupleAsync(String tuple)
    +    {
    +      int i = Integer.parseInt(tuple) % 3;
    +      switch (i % 3) {
    +        case 1:
    +          return tuple + ";FIRSTPASS";
    +        case 2:
    +          return tuple + ";SECONDPASS";
    +        case 0:
    +        default:
    +          throw new RuntimeException("Expected");
    +      }
    +    }
    +  }
    +
    +  @Test public void maintainOrderTest() throws InterruptedException
    +  {
    +    testOperator(1, true, 5000);
    +    testOperator(1, true, 15000);
    +  }
    +
    +  @Test public void noOrderMaintainedTest() throws InterruptedException
    +  {
    +    testOperator(1, false, 5000);
    +    testOperator(2, false, 15000);
    +  }
    +
    +  private void testOperator(int numThreads, boolean maintainOrder, long waitInterval)
throws InterruptedException
    +  {
    +    CollectorTestSink<Object> sinkOut = new CollectorTestSink<>();
    +    CollectorTestSink<Object> sinkErr = new CollectorTestSink<>();
    +
    +    TestAsyncProcessor async = new TestAsyncProcessor();
    +    async.setNumProcessors(numThreads);
    +    async.setMaintainTupleOrder(maintainOrder);
    +    async.output.setSink(sinkOut);
    +    async.error.setSink(sinkErr);
    +    async.setup(null);
    +
    +    for (int windowId = 0; windowId < 10; windowId++) {
    +      async.beginWindow(windowId);
    +      for (int i = 0; i < 100; i++) {
    +        async.input.put(Integer.toString(windowId * 100 + i));
    +      }
    +      async.endWindow();
    +    }
    +
    +    for (int i = 0; i < 10; i++) {
    +      Thread.sleep(waitInterval / 10);
    +      async.endWindow();
    +    }
    --- End diff --
    
    Got it. I should have added the beginWindow also here. Will update.
    This loop is just to make sure to wait for all the items to finish processing.


> Add abstract operator for Async Processing
> ------------------------------------------
>
>                 Key: APEXMALHAR-1963
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-1963
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>    Affects Versions: 3.3.0
>            Reporter: Chinmay Kolhatkar
>            Assignee: Chinmay Kolhatkar
>
> Create an abstract operator which does following:
> 1) Asynchronously processes the tuples
> 2) Have parallel executions
> 3) Make sure the outbound tuples are ordered similar to inbound tuples if configured.
> 4) Have processing timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message