apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXCORE-448) Make operator name available in OperatorContext
Date Wed, 03 Aug 2016 23:24:20 GMT

    [ https://issues.apache.org/jira/browse/APEXCORE-448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15406810#comment-15406810
] 

ASF GitHub Bot commented on APEXCORE-448:
-----------------------------------------

Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/apex-core/pull/364#discussion_r73439463
  
    --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
    @@ -0,0 +1,99 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +
    +public class OperatorContextTest
    +{
    +
    +  @Test
    +  public void testInjectionOfOperatorName() throws Exception
    +  {
    +    final LocalMode lma = LocalMode.newInstance();
    +    final CountDownLatch latch = new CountDownLatch(1);
    +    StreamingApplication testApp = new StreamingApplication()
    +    {
    +      @Override
    +      public void populateDAG(DAG dag, Configuration conf)
    +      {
    +        MockInputOperator input = dag.addOperator("input", new MockInputOperator());
    +        input.countDownLatch = latch;
    +        GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator());
    +
    +        dag.addStream("stream", input.output, output.ip1);
    +      }
    +    };
    +
    +    lma.prepareDAG(testApp, new Configuration());
    +    LocalMode.Controller lc = lma.getController();
    +    lc.runAsync();
    +    latch.await(2, TimeUnit.SECONDS);
    +    lc.shutdown();
    +  }
    +
    +  private static class MockInputOperator extends BaseOperator implements InputOperator,
AutoMetric.Aggregator
    +  {
    +    private transient CountDownLatch countDownLatch;
    +
    +    @AutoMetric
    +    String operatorName;
    +
    +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
    +
    +    @Override
    +    public void setup(Context.OperatorContext context)
    +    {
    +      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
    --- End diff --
    
    I agree with @vrozov that implementing aggregator is not required.
    I think the better way would to be define `latch` as static object of the OperatorContextTest
and use that in setup.. so no need to implement Aggregator..
    
    ```java
    public class OperatorContextTest
    {
    
      private static final CountDownLatch latch = new CountDownLatch(1);
    
      @Test
      public void testInjectionOfOperatorName() throws Exception
      {
        final LocalMode lma = LocalMode.newInstance();
    
        StreamingApplication testApp = new StreamingApplication()
        {
          @Override
          public void populateDAG(DAG dag, Configuration conf)
          {
            dag.addOperator("input", new MockInputOperator());
          }
        };
    
        lma.prepareDAG(testApp, new Configuration());
        LocalMode.Controller lc = lma.getController();
        lc.runAsync();
        latch.await();
        lc.shutdown();
      }
    
      private static class MockInputOperator extends BaseOperator implements InputOperator
      {
    
        @Override
        public void setup(Context.OperatorContext context)
        {
          String operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
          Assert.assertEquals("operator name", "input", operatorName);
          latch.countDown();
        }
    
        @Override
        public void emitTuples()
        {
        }
      }
    }
    ```


> Make operator name available in OperatorContext
> -----------------------------------------------
>
>                 Key: APEXCORE-448
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-448
>             Project: Apache Apex Core
>          Issue Type: Improvement
>            Reporter: Chandni Singh
>            Assignee: Chandni Singh
>
> Need name of the logical operator in the OperatorContext which can be used by WindowDataManager
to create a unique path per logical operator .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message