flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Vinnik <alvinni...@gmail.com>
Subject Re: Cannot configure akka.ask.timeout
Date Thu, 13 Dec 2018 15:56:48 GMT
Qi,

Thanks for references! How do enable concurrent s3 file listing? Here is
the code.

// Consume the JSON files
Configuration configuration = new
Configuration(GlobalConfiguration.loadConfiguration());
configuration.setBoolean(JsonLinesInputFormat.ENUMERATE_NESTED_FILES_FLAG,
true);

JsonLinesInputFormat jsonInputFormat = new JsonLinesInputFormat(new
Path(inputPath), configuration);
jsonInputFormat.setFilesFilter(new BucketingSinkFilter());

DataSet<ObjectNode> input = env.readFile(jsonInputFormat,
inputPath).withParameters(configuration);


On Wed, Dec 12, 2018 at 8:53 PM qi luo <luoqi.bd@gmail.com> wrote:

> Hi Alex,
>
> The hard code I’ve found is [1] and [2].
>
> We encountered a similar issue like yours (listing a lot of HDFS files).
> We end up with a newer version of HDFSFileInput which lists files
> concurrently. Another hack we did is to list the files in client side and
> pass them to JobManager via serialization (not recommended though as it
> doesn’t follow Flink framework mechanism).
>
> You can also try listing S3 files concurrently, or paste your sample code
> here.
>
> [1]
> https://github.com/apache/flink/blob/967b31b333e6f4b014ea3041f420bfaff2484618/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L187
> [2]
> https://github.com/apache/flink/blob/b0496f21d70cc1af15569f3632d7a58fd53b8f95/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L117
>
> On Dec 13, 2018, at 1:09 AM, Alex Vinnik <alvinnik.g@gmail.com> wrote:
>
> Qi,
>
> Job submission timeout is caused by listing too many files in S3
> during env.readFile call to create input DataSet. Is there a way NOT to
> list S3 files during a job submission? It seems like it should help to
> mitigate that timeout problem.
>
> What hardcoded value you were referring to?
>
> Best,
> -Alex
>
> On Wed, Dec 12, 2018 at 7:47 AM Alex Vinnik <alvinnik.g@gmail.com> wrote:
>
>> Hi Qi,
>>
>> Thanks for looking into this. Here is ticket
>> https://issues.apache.org/jira/browse/FLINK-11143
>>
>> Best,
>> -Alex
>>
>> On Tue, Dec 11, 2018 at 8:47 PM qi luo <luoqi.bd@gmail.com> wrote:
>>
>>> Hi Alex and Lukas,
>>>
>>> This error is controlled by another RPC timeout (which is hard coded and
>>> not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can
>>> propose a fix on that?
>>>
>>> Cheers,
>>> Qi
>>>
>>> On Dec 12, 2018, at 7:07 AM, Alex Vinnik <alvinnik.g@gmail.com> wrote:
>>>
>>> Hi there,
>>>
>>> Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .
>>>
>>> akka.pattern.AskTimeoutException: Ask timed out on [Actor[
>>> akka://flink/user/dispatcher#202546747]] after [10000 ms]. Sender[null]
>>> sent message of type
>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>>
>>> akka.ask.timeout: 600s
>>>
>>> But looks like it is not honored. Any suggestions what can be done.
>>>
>>> Thanks
>>>
>>> On 2018/07/13 10:24:16, Lukas Kircher <l...@gmail.com> wrote:
>>> > Hello,>
>>> >
>>> > I have problems setting configuration parameters for Akka in Flink
>>> 1.5.0. When I run a job I get the exception listed below which states that
>>> Akka timed out after 10000ms. I tried to increase the timeout by following
>>> the Flink configuration documentation. Specifically I did the following:>
>>> >
>>> > 1) Passed a configuration to the Flink execution environment with
>>> `akka.ask.timeout` set to a higher value. I started this in Intellij.>
>>> > 2) Passed program arguments via the run configuration in Intellij,
>>> e.g. `-Dakka.ask.timeout:100s`>
>>> > 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a
>>> local standalone cluster via start-cluster.sh. The setting is reflected in
>>> Flink's web interface.>
>>> >
>>> > However - despite explicit configuration the default setting seems to
>>> be used. The exception below states in each case that akka ask timed out
>>> after 10000ms.>
>>> >
>>> > As my problem seems very basic I do not include an SSCCE for now but I
>>> can try to build one if this helps figuring out the issue.>
>>> >
>>> > ------>
>>> > [...]>
>>> > Exception in thread "main"
>>> org.apache.flink.runtime.client.JobExecutionException: Could not retrieve
>>> JobResult.>
>>> > [...]>
>>> > at
>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>
>>>
>>> > at
>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)>
>>> > at
>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>
>>>
>>> > at
>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>
>>>
>>> > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)>
>>> > [...]>
>>> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[
>>> akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
>>> after [10000 ms]. Sender[null] sent message of type
>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".>
>>> > at
>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)>
>>> > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)>
>>> > at
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>
>>>
>>> > at
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>
>>>
>>> > at
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>
>>>
>>> > at
>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>
>>>
>>> > at
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>
>>>
>>> > at
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>
>>>
>>> > at
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>
>>>
>>> > at java.lang.Thread.run(Thread.java:745)>
>>> > [...]>
>>> > ------>
>>> >
>>> >
>>> > Best regards and thanks for your help,>
>>> > Lukas>
>>> >
>>> >
>>> >
>>> >
>>>
>>>
>>>
>

Mime
View raw message