flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility
Date Mon, 24 Aug 2015 12:46:46 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709201#comment-14709201

ASF GitHub Bot commented on FLINK-2525:

Github user ffbin commented on a diff in the pull request:

    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
    @@ -222,6 +239,31 @@ public void testOpenSink() throws Exception {
    +	public void testOpenWithStormConf() throws Exception {
    +		final IRichBolt bolt = mock(IRichBolt.class);
    +		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object,
    +		Configuration jobConfiguration = new Configuration();
    +		jobConfiguration.setString(new String("path"), new String("/home/user/file.txt"));
    +		jobConfiguration.setInteger(new String("delimitSize"), 1024);
    +		Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
    +				new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
    +				mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
    +				mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
    +				new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
    +				mock(TaskManagerRuntimeInfo.class));
    +		StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, new ExecutionConfig(),
    +				mock(KeySelector.class),
    +				mock(StateHandleProvider.class), mock(Map.class));
    +		wrapper.setup(mock(Output.class), ctx);
    +		wrapper.open(mock(Configuration.class));
    --- End diff --
    The open() is usually called by openAllOperators(), and the Configuration config parameter
is usually task Configuration, not job Configuration. So i mock it.

> Add configuration support in Storm-compatibility
> ------------------------------------------------
>                 Key: FLINK-2525
>                 URL: https://issues.apache.org/jira/browse/FLINK-2525
>             Project: Flink
>          Issue Type: New Feature
>          Components: flink-contrib
>            Reporter: fangfengbin
>            Assignee: fangfengbin
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and `Bolt.prepare()`,
respectively. Both methods have a config `Map` as first parameter. This map is currently not
populated. Thus, Spouts and Bolts cannot be configure with user defined parameters. In order
to support this feature, spout and bolt wrapper classes need to be extended to create a proper
`Map` object. Furthermore, the clients need to be extended to take a `Map`, translate it into
a Flink `Configuration` that is forwarded to the wrappers for proper initialization of the

This message was sent by Atlassian JIRA

View raw message