flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Looking for instructions & source for flink-java-examples-0.6-incubating-WebLogAnalysis.jar
Date Mon, 29 Sep 2014 12:22:29 GMT
Hi Anirvan!

Thanks for you observations and questions. Here are some comments:

1)

The Flink resource management is based on slots, like Hadoop had slots for
mappers and reducers (we have only one type of slots, though). The number
of slots that each machine offers is defined in the config under
"taskmanager.numberOfTaskSlots". So mostly, you have #machines x
#slotsPerMachine slots available. You can always check the number of
registered TaskManagers and the number of available slots at the web
frontend (by default http://<jobmanager>:8081)

You can set the parallelism to as high as you have slots. No slot is
reserved for the job manager. Maybe one machine did not come up properly.


2)

We do not reserve heterogeneous resources. A job occupies a number of slots
(some, all, whatever), that is all. The managed memory is distributed by
the TaskManager among its slots.

You can define a heterogeneous cluster, though. The config on some nodes
may say that the TaskManager offers 4 slots, while on other machines, it
offers 8 slots.


3)

We do not yet have a "straggler" resolution, like speculative execution or
so.


Did that answer your questions? Let us know if you have more questions!


Greetings,
Stephan


On Mon, Sep 29, 2014 at 1:51 PM, Anirvan BASU <anirvan.basu@inria.fr> wrote:

> Hello Robert, Stephan and others,
>
> Hope you had a pleasurable weekend!
>
> Thanks for your helpful instructions and guidance on the operational /
> configurational aspects of Flink.
> Based on them, I did several "Flinking" experimentations this weekend and
> a few points to note here :
>
> 1. For the config parameter, "parallelization.degree.default" , while
> using a cluster of "n" nodes, it can be set to maximum of "n-1".
> When we reach "n" it gives the "insufficient slots" error message.
> So perhaps, the nth node is reserved for jobmanager's administrative tasks
> (??)
> (something similar to Hadoop's number of reducers set by the option -D
> mapred.reduce.tasks=n )
> It would be helpful if (following Hadoop MapRed framework), the value
> would cap to n-1 if the above conf parameter was wrongly set to >= n (where
> n is the no. of nodes)
>
> 2. This follows our previous discussion (part of it is already implemented
> as I understand).
> Here in Grid'5000 (as well as in some other grid-based systems which I
> have seen), for each new reservation, different types of nodes are assigned
> (unless otherwise explicitly requested).
> Meaning, in one reservation of 10 nodes, I could have 16-core nodes, while
> in another reservation, a mix of 4-core and 8-core nodes, and so on ...
> In such circumstances, would it be possible to use conf value OR calculate
> automatically, the total no. of cores available ?
> If set to a positive value, the Job manager will use these values
> explicitly set by the user.
> If set to -1, the Jobmanager reads the # of nodes used and the # of cores
> in each node; then it does a simple arithmetic to calculate the degree of
> parallelisation, also perhaps the number of Tasks ...
> Of course, I agree with you that you do not want a single Flink job to
> swallow as much resources (processor power). In which case, use another %
> value which limits the max. usage of CPU cores.
> (Again, this idea can go in circles also, because somebody can argue to
> use a node-specific % value, instead of a global % value for all CPUs. This
> scenario arises, when some nodes in teh cluster are serving other
> frameworks/jobs - hadoop, spark, etc)
> I let you debate which would be a quick & easy solution.
>
> 3. In the Flink runtime, is there any protocol for backing-up "straggler"
> tasks ?
> For e.g. In a workflow, if subsequent join tasks are waiting due to a
> "straggling" reduce job on a particular node, how (based on what protocol)
> does the Job manager backup the straggler task and free the bottleneck?
> In Hadoop, there exist different policies for culling or backing up (some
> implemented with additional modules like "mantri" - I'm sure you are
> familiar with the literature on this topic).
>
> Thanks in advance and Happy Flinking !
> Anirvan
>
> ------------------------------
>
> *From: *"rmetzger0" <rmetzger@apache.org>
> *To: *user@flink.incubator.apache.org
> *Sent: *Thursday, September 25, 2014 10:28:54 PM
> *Subject: *Re: Looking for instructions & source for
> flink-java-examples-0.6-incubating-WebLogAnalysis.jar
>
>
> Hi,
>
> Flink has so-called "Slots" to execute tasks (smaller parts of a job) in.
> Every TaskManager has n slots. n is set by the configuration value "taskmanager.numberOfTaskSlots".
> In your case, every TaskManager provides one slot. Since you are having 10
> TaskManagers, your cluster provides, when started, a total number of 10
> slots.
>
> The bin/flink tool allows users to pass the "-p" argument, for the
> parallelism. So if you start a WordCount with -p 5, it will use 5 slots. If
> the user does not specify the "-p" argument, we use the "
> parallelization.degree.default" value. In your case, you've set it to 20
> and provided no "-p" argument, hence, your job failed with the exception.
>
> So resolve the issue, either set parallelization.degree.default to "10"
> or pass "-p 10" to bin/flink.
>
>
> Cheers,
> Robert
>
>
> On Thu, Sep 25, 2014 at 9:56 PM, nirvanesque [via Apache Flink (Incubator)
> User Mailing List archive.] <[hidden email]
> <http://user/SendEmail.jtp?type=node&node=116&i=0>> wrote:
>
>> Thanks Robert for all the helpful tips!
>>
>> Yes, of course, I did restart Flink everytime.
>> Generally, in our Grid'5000, we reserve some nodes and then run our
>> experiments (including installing hadoop, flink, etc).
>> During the reservation, all nodes are nfs-ed to the access site, so all
>> files & folders are mirrored on each node, no need to scp or rsync.
>>
>> I usually, reserve 10 nodes.
>> Then I install flink on the same nodes as hadoop and the job manager runs
>> on the hadoop namenode (master).
>> Then I ssh to the job manager node to launch my jobs.
>>
>> I'm not sure if there was some issue in configuration after the last
>> overhaul of the Grid'5000 this Monday.
>>
>> Anyway, I cleaned up everything, downloaded flink 0.6 and installed on
>> Hadoop 1.2.1
>> The job manager and task managers start fine.
>>
>> Then the problem starts when I run a simple command as:
>>
>> abasu@adonis-10:~$  ./flink/bin/flink run -v
>> ./flink/examples/flink-java-examples-0.6-incubating-WordCount.jar
>> file:///home/abasu/examples/wordcount/input/gutenberg.txt
>> file:///home/abasu/examples/wordcount/output/
>> Error: The program execution failed:
>> org.apache.flink.runtime.jobmanager.scheduler.SchedulingException: Not
>> enough slots to schedule job 72eda81dfd32200015712eaafd609000
>>         at
>> org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler.scheduleJob(DefaultScheduler.java:155)
>>         at
>> org.apache.flink.runtime.jobmanager.JobManager.submitJob(JobManager.java:510)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>         at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:422)
>>         at
>> org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:958)
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed:
>> org.apache.flink.runtime.jobmanager.scheduler.SchedulingException: Not
>> enough slots to schedule job 72eda81dfd32200015712eaafd609000
>>         at
>> org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler.scheduleJob(DefaultScheduler.java:155)
>>         at
>> org.apache.flink.runtime.jobmanager.JobManager.submitJob(JobManager.java:510)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>         at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:422)
>>         at
>> org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:958)
>>
>>         at org.apache.flink.client.program.Client.run(Client.java:325)
>>         at org.apache.flink.client.program.Client.run(Client.java:291)
>>         at org.apache.flink.client.program.Client.run(Client.java:285)
>>         at
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:54)
>>         at
>> org.apache.flink.example.java.wordcount.WordCount.main(WordCount.java:82)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>>         at org.apache.flink.client.program.Client.run(Client.java:244)
>>         at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:332)
>>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)
>>         at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:930)
>>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:954)
>>
>>
>> The conf file is as follows (with your suggestions)
>> No default conf files in the conf folder.
>>
>>
>> #==============================================================================
>> # Common
>>
>> #==============================================================================
>> # abasu : modified for master node
>> jobmanager.rpc.address: adonis-10.grenoble.grid5000.fr
>>
>> jobmanager.rpc.port: 6123
>>
>> jobmanager.heap.mb: 1024
>>
>> taskmanager.heap.mb: 8192
>>
>>
>> # abasu : modified from -1 according to configuration pages
>> taskmanager.numberOfTaskSlots: 1
>>
>> # abasu : modified from 1 according to advice of Robert Metzger on
>> configuration
>> parallelization.degree.default: 20
>>
>>
>> #==============================================================================
>> # Web Frontend
>>
>> #==============================================================================
>>
>> jobmanager.web.port: 8081
>>
>> webclient.port: 8080
>>
>>
>> #==============================================================================
>> # Advanced
>>
>> #==============================================================================
>>
>> # The number of buffers for the network stack.
>> #
>> # taskmanager.network.numberOfBuffers: 2048
>>
>> # Directories for temporary files.
>> #
>> # Add a delimited list for multiple directories, using the system
>> directory
>> # delimiter (colon ':' on unix) or a comma, e.g.:
>> #     /data1/tmp:/data2/tmp:/data3/tmp
>> #
>> # Note: Each directory entry is read from and written to by a different
>> I/O
>> # thread. You can include the same directory multiple times in order to
>> create
>> # multiple I/O threads against that directory. This is for example
>> relevant for
>> # high-throughput RAIDs.
>> #
>> # If not specified, the system-specific Java temporary directory
>> (java.io.tmpdir
>> # property) is taken.
>> #
>> # taskmanager.tmp.dirs: /tmp
>>
>> # Path to the Hadoop configuration directory.
>> #
>> # This configuration is used when writing into HDFS. Unless specified
>> otherwise,
>> # HDFS file creation will use HDFS default settings with respect to
>> block-size,
>> # replication factor, etc.
>> #
>> # You can also directly specify the paths to hdfs-default.xml and
>> hdfs-site.xml
>> # via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'.
>> #
>> # abasu : uncommented
>> fs.hdfs.hadoopconf: /opt/hadoop/conf/
>>
>>
>>
>> Thanks in advance for your help.
>> Seems like I'm going a few steps backward now.
>> FYI, Hadoop runs well, tested Terasort.
>>
>> Best!
>> Anirvan
>>
>>
>>
>>
>>
>>
>>
>> Le 25/09/2014 16:46, rmetzger0 [via Apache Flink (Incubator) User Mailing
>> List archive.] a écrit :
>>
>> Hi,
>>
>> the configuration file looks correct now.
>>
>> Did you restart Flink after you updated the configuration?
>> Also, the config needs to be changed on all cluster nodes (we use a NFS
>> shared folder, you can also use rsync to synchronize the files of the
>> "master" (JobManager) to all the "worker" nodes (TaskManager).
>> Are there other configuration files in the conf/ directory? I think our
>> code is loading all configuration files by their extension. So if there is
>> a flink-conf-defaults.yaml in the same dir, it might still contain the
>> erroneous values.
>>
>>
>> Regarding your suggestion:
>> We already implemented part of your suggestion: if the user sets the dop
>> to -1, we'll print a warning and set the DOP to 1.
>> We discussed setting the "numberOfTaskSlots" to the number of CPUs by
>> default, but that would basically mean that Flink is grabbing all the
>> resources on the cluster. We therefore decided for being conservative and
>> use only one core by default.
>>
>> By the way, I would recommend giving the Flink TaskManager (and also the
>> jobManager) a bit more memory.
>>
>> Your current settings are:
>>
>> jobmanager.heap.mb: 256
>> taskmanager.heap.mb: 512
>>
>> These are MBs, so the TaskManagers will only have 512 MB. If your cluster
>> nodes have, say 20GB, set the TaskManager heap space to 15000. This will
>> lead to huge performance improvements for your jobs!
>>
>> I hope restarting resolves the issue.
>>
>> Best,
>> Robert
>>
>>
>> On Thu, Sep 25, 2014 at 4:26 PM, nirvanesque [via Apache Flink
>> (Incubator) User Mailing List archive.] <[hidden email]
>> <http://user/SendEmail.jtp?type=node&node=114&i=0>> wrote:
>>
>>> Robert, Stephan and the "powers that be" ...
>>>
>>> Thanks for your valuable inputs.
>>> I tried them a few times today and here's the output and the config
>>> infos.
>>>
>>> Please see my suggestions at the end also.
>>>
>>> Looking forward to your helpful replies.
>>>
>>>
>>> The failure output :
>>> [QUOTE]
>>> abasu@edel-59:~$ ./flink/bin/flink run -v
>>> ./flink/examples/flink-java-examples-0.6-incubating-WebLogAnalysis.jar
>>> file:///home/abasu/examples/Weblogs/documents
>>> file:///home/abasu/examples/Weblogs/ranks
>>> file:///home/abasu/examples/Weblogs/visits
>>> file:///home/abasu/examples/Weblogs/result
>>>
>>> Error: The main method caused an error.
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error.
>>>         at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>>>         at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>>>         at org.apache.flink.client.program.Client.run(Client.java:244)
>>>         at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:332)
>>>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)
>>>         at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:930)
>>>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:954)
>>> Caused by: java.lang.IllegalArgumentException
>>>         at
>>> org.apache.flink.compiler.dag.OptimizerNode.setDegreeOfParallelism(OptimizerNode.java:414)
>>>         at
>>> org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.preVisit(PactCompiler.java:772)
>>>         at
>>> org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.preVisit(PactCompiler.java:616)
>>>         at
>>> org.apache.flink.api.common.operators.base.GenericDataSinkBase.accept(GenericDataSinkBase.java:287)
>>>         at org.apache.flink.api.common.Plan.accept(Plan.java:281)
>>>         at
>>> org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:511)
>>>         at
>>> org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:460)
>>>         at
>>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:196)
>>>         at
>>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:209)
>>>         at org.apache.flink.client.program.Client.run(Client.java:285)
>>>         at
>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:54)
>>>         at
>>> org.apache.flink.example.java.relational.WebLogAnalysis.main(WebLogAnalysis.java:148)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>         at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>>>         ... 6 more
>>> abasu@edel-59:~$
>>> [/QUOTE]
>>>
>>>
>>> The config file flink-conf.yaml (changed only in lines below "abasu" :
>>> [QUOTE]
>>>
>>> #==============================================================================
>>> # Common
>>>
>>> #==============================================================================
>>> # abasu : modified for master node
>>> jobmanager.rpc.address: edel-59.grenoble.grid5000.fr
>>>
>>> jobmanager.rpc.port: 6123
>>>
>>> jobmanager.heap.mb: 256
>>>
>>> taskmanager.heap.mb: 512
>>>
>>> # abasu : modified from -1 according to configuration pages
>>> taskmanager.numberOfTaskSlots: 1
>>>
>>> # abasu : modified from 1 according to advice of Robert Metzger on
>>> configuration
>>> parallelization.degree.default: 20
>>>
>>>
>>> #==============================================================================
>>> # Web Frontend
>>>
>>> #==============================================================================
>>>
>>> jobmanager.web.port: 8081
>>>
>>> webclient.port: 8080
>>>
>>>
>>> #==============================================================================
>>> # Advanced
>>>
>>> #==============================================================================
>>>
>>> # The number of buffers for the network stack.
>>> #
>>> # taskmanager.network.numberOfBuffers: 2048
>>>
>>> # Directories for temporary files.
>>> #
>>> # Add a delimited list for multiple directories, using the system
>>> directory
>>> # delimiter (colon ':' on unix) or a comma, e.g.:
>>> #     /data1/tmp:/data2/tmp:/data3/tmp
>>> #
>>> # Note: Each directory entry is read from and written to by a different
>>> I/O
>>> # thread. You can include the same directory multiple times in order to
>>> create
>>> # multiple I/O threads against that directory. This is for example
>>> relevant for
>>> # high-throughput RAIDs.
>>> #
>>> # If not specified, the system-specific Java temporary directory
>>> (java.io.tmpdir
>>> # property) is taken.
>>> #
>>> # taskmanager.tmp.dirs: /tmp
>>>
>>> # Path to the Hadoop configuration directory.
>>> #
>>> # This configuration is used when writing into HDFS. Unless specified
>>> otherwise,
>>> # HDFS file creation will use HDFS default settings with respect to
>>> block-size,
>>> # replication factor, etc.
>>> #
>>> # You can also directly specify the paths to hdfs-default.xml and
>>> hdfs-site.xml
>>> # via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'.
>>> #
>>> # abasu : uncommented
>>> fs.hdfs.hadoopconf: /opt/hadoop/conf/
>>> [/QUOTE]
>>>
>>>
>>> A suggestion:
>>> For reading parameters from conf files, especially for
>>> "parallelization.degree.default" and may be for
>>> "taskmanager.numberOfTaskSlots" would it be possible to have the following
>>> (if not already implemented):
>>> If set to a positive value, the Job manager will use these values
>>> explicitly set by the user.
>>> If set to -1, the Jobmanager reads the # of nodes used and the # of
>>> cores in each node; then it does a simple arithmetic to calculate the
>>> degree of parallelisation, also perhaps the number of Tasks ...
>>> Of course, the user is free to use a different configuration using the
>>> "-m" option.
>>>
>>> What do you think ?
>>>
>>> Thanks in advance,
>>> Anirvan
>>>
>>>
>>>
>>>
>>>
>>> Le 24/09/2014 18:59, Stephan Ewen [via Apache Flink (Incubator) User
>>> Mailing List archive.] a écrit :
>>>
>>> I get a similar error when I use a negative value for the default
>>> parallelism in the flink-config.yaml
>>>
>>> I'll push a patch that gives better error messages and sanity checks the
>>> config value, prints a warning, and falls back to DOP 1 if the config is
>>> invalid and no manual value is provided.
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Sep 24, 2014 at 5:48 PM, rmetzger0 <[hidden email]
>>> <http://user/SendEmail.jtp?type=node&node=106&i=0>> wrote:
>>>
>>>> Hi,
>>>>
>>>> this looks like a minor issue in your flink configuration.
>>>> Can you have a look into your config (in the conf/flink-conf.yaml file)
>>>> and tell me the value of the "parallelization.degree.default" property?
>>>> If it is set to 0 or a negative value, thats the source of the issue.
>>>> Set it to something higher (I recommend numOfNodes*numOfCoresPerNode).
>>>>
>>>> But I have to admit that our error reporting here is not very helpful,
>>>> we'll improve that...
>>>>
>>>>
>>>> Cheers,
>>>> Robert
>>>>
>>>>
>>>> On Wed, Sep 24, 2014 at 4:23 PM, nirvanesque [via Apache Flink
>>>> (Incubator) User Mailing List archive.] <[hidden email]
>>>> <http://user/SendEmail.jtp?type=node&node=103&i=0>> wrote:
>>>>
>>>>> Hello robert,
>>>>>
>>>>> Apologies for getting back to you on this one :-(
>>>>> Es tut mir wirklich Leid!
>>>>>
>>>>> I've been trying today several times in vain this programme, and
>>>>> following is the error output :
>>>>>
>>>>> [QUOTE]
>>>>> abasu@edel-4:~$ ./flink/bin/flink run -v
>>>>> flink/examples/flink-java-examples-0.6-incubating-WebLogAnalysis.jar
>>>>> file:///home/abasu/examples/Weblogs/documents
>>>>> file:///home/abasu/examples/Weblogs/ranks
>>>>> file:///home/abasu/examples/Weblogs/visits
>>>>> file:///home/abasu/examples/Weblogs/results
>>>>> Error: The main method caused an error.
>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> method caused an error.
>>>>>         at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>>>>>         at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>>>>>         at org.apache.flink.client.program.Client.run(Client.java:244)
>>>>>         at
>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:332)
>>>>>         at
>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)
>>>>>         at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:930)
>>>>>         at
>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:954)
>>>>> Caused by: java.lang.IllegalArgumentException
>>>>>         at
>>>>> org.apache.flink.compiler.dag.OptimizerNode.setDegreeOfParallelism(OptimizerNode.java:414)
>>>>>         at
>>>>> org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.preVisit(PactCompiler.java:772)
>>>>>         at
>>>>> org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.preVisit(PactCompiler.java:616)
>>>>>         at
>>>>> org.apache.flink.api.common.operators.base.GenericDataSinkBase.accept(GenericDataSinkBase.java:287)
>>>>>         at org.apache.flink.api.common.Plan.accept(Plan.java:281)
>>>>>         at
>>>>> org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:511)
>>>>>         at
>>>>> org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:460)
>>>>>         at
>>>>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:196)
>>>>>         at
>>>>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:209)
>>>>>         at org.apache.flink.client.program.Client.run(Client.java:285)
>>>>>         at
>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:54)
>>>>>         at
>>>>> org.apache.flink.example.java.relational.WebLogAnalysis.main(WebLogAnalysis.java:148)
>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>         at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>         at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>         at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>>>>>         ... 6 more
>>>>> abasu@edel-4:~$
>>>>>
>>>>> [/QUOTE]
>>>>>
>>>>>
>>>>> I'm using Flink version 0.6 on a cluster of 10 nodes.
>>>>> Job launched from Job manager node.
>>>>>
>>>>> As per your instructions, I've used the set for <<documents>>
>>>>> <<ranks>> <<visits>> <<results>>
>>>>> that I downloaded from your scratch tree branch.
>>>>>
>>>>> It seems that the main function fails because of other reasons (even
>>>>> before entering the programme logic).
>>>>> I'm trying to break my head on the exact reason ....
>>>>>
>>>>> If you could help me in this direction ... ?
>>>>>
>>>>> Thanks a million in advance,
>>>>> Anirvan
>>>>>
>>>>>
>>>>> Le 23/09/2014 17:22, rmetzger0 [via Apache Flink (Incubator) User
>>>>> Mailing List archive.] a écrit :
>>>>>
>>>>> Hi,
>>>>>
>>>>> you have to use the "WebLogDataGenerator" found here:
>>>>> https://github.com/apache/incubator-flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java
>>>>>
>>>>> It accepts two arguments, the number of documents and visits.
>>>>> The generated files are located in /tmp/documents /tmp/ranks and
>>>>> /tmp/visits.
>>>>> I've generated some sample data for you, located here:
>>>>> https://github.com/rmetzger/scratch/tree/weblogdataexample/weblog
>>>>>
>>>>>
>>>>> Best,
>>>>> Robert
>>>>>
>>>>>
>>>>> On Tue, Sep 23, 2014 at 4:05 PM, nirvanesque [via Apache Flink
>>>>> (Incubator) User Mailing List archive.] <[hidden email]
>>>>> <http://user/SendEmail.jtp?type=node&node=100&i=0>>
wrote:
>>>>>
>>>>>> Hello Robert,
>>>>>>
>>>>>> Thanks as usual for all your help with the information.
>>>>>>
>>>>>> I'm trying in vain to create the different input files from the
>>>>>> program source code but running into difficulties.
>>>>>>
>>>>>> Could you (or anyone else) please post here samples of the 4 inputs
>>>>>> that are required to run this program ?
>>>>>>
>>>>>> Thanks in advance,
>>>>>> Anirvan
>>>>>>
>>>>>> Le 09/09/2014 23:54, rmetzger0 [via Apache Flink (Incubator) User
>>>>>> Mailing List archive.] a écrit :
>>>>>>
>>>>>> Hi Anirvan,
>>>>>>
>>>>>> sorry for the late response. You've posted the question to Nabble,
>>>>>> which is only a mirror of our actual mailing list at [hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=99&i=0>.
Sadly, the
>>>>>> message is not automatically posted to the apache list because the
apache
>>>>>> server is rejecting the mails from nabble.
>>>>>>  I've already asked and there is no way to change this behavior.
>>>>>> So I actually saw the two messages you posted here by accident.
>>>>>>
>>>>>> Regarding your actual question:
>>>>>> - The command line arguments for the WebLogAnalysis example are:
>>>>>>    "WebLogAnalysis <documents path> <ranks path> <visits
>>>>>> path> <result path>"
>>>>>>
>>>>>> - Regarding the "info -d" command. I think its an artifact from our
>>>>>> old java API. I've filed an issue in JIRA:
>>>>>> https://issues.apache.org/jira/browse/FLINK-1095 Lets see how we
>>>>>> resolve it. Thanks for reporting this!
>>>>>>
>>>>>> You can find the source code of all of our examples in the source
>>>>>> release of Flink (in the flink-examples/flink-java-examples project.
You
>>>>>> can also access the source (and hence the examples) through GitHub:
>>>>>> https://github.com/apache/incubator-flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java.
>>>>>>
>>>>>>
>>>>>> To build the examples, you can run: "mvn clean package -DskipTests"
>>>>>> in the "flink-examples/flink-java-examples" directory. This will
re-build
>>>>>> them.
>>>>>>
>>>>>> If you don't want to import the whole Flink project just for playing
>>>>>> around with the examples, you can also create an empty maven project.
This
>>>>>> script:
>>>>>> curl
>>>>>> https://raw.githubusercontent.com/apache/incubator-flink/master/flink-quickstart/quickstart.sh
|
>>>>>> bash
>>>>>>
>>>>>> will automatically set everything up for you. Just import the
>>>>>> "quickstart" project into Eclipse or IntelliJ. It will download all
>>>>>> dependencies and package everything correctly. If you want to use
an
>>>>>> example there, just copy the Java file into the "quickstart" project.
>>>>>>
>>>>>> The examples are indeed a very good way to learn how to write Flink
>>>>>> jobs.
>>>>>>
>>>>>> Please continue asking if you have further questions!
>>>>>>
>>>>>> Best,
>>>>>> Robert
>>>>>>
>>>>>>

Mime
View raw message