ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Cherkasov <michael.cherka...@gmail.com>
Subject Re: Performance WordCount to Hadoop
Date Wed, 28 Jun 2017 15:59:41 GMT
Hi Mimmo,

You should deploy
transfomer: stmr.deployClass(StreamTransformerExample.class)
or should have the same code available among the cluster.

Thanks,
Mikhail.


2017-06-27 20:55 GMT+03:00 Mimmo Celano <mimmo.celano1993@gmail.com>:

> Hi, thanks to all
> Sorry for mistake, in this WordCount the input was splitted in lines, so
> every line call function Map. This is the problem, i change the code. When
> I try to execute WordCount without stream receiver to update word in cache
> all work fine, but when i add stream receiver i got this error.
>
> public class WordCountExample {
>
>   public static class TokenizerMapper extends Mapper<Object, Text, Text,
> IntWritable> {
>     // Making objects is expensive. Instantiate outside the loop and re-use
>     private final static IntWritable one = new IntWritable(1);
>     private Text word = new Text();
>     Ignite ignite;
>     IgniteCache<String, Long> cache;
> IgniteDataStreamer<String, Long> stmr;
>
>     @Override protected void setup(Context context) throws IOException,
> InterruptedException {
>         super.setup(context);
>         Ignition.setClientMode(true);
>         ignite = Ignition.start("/home/hduser/apache-ignite-2.0.0-src/
> examples/config/example-cache1.xml");
> CacheConfiguration<String,Long> cfg2= Ignition.loadSpringBean("/
> home/hduser/apache-ignite-2.0.0-src/examples/config/example-cache1.xml",
> "cacheconf");
> cache = ignite.getOrCreateCache(cfg2);
> try{
> stmr = ignite.dataStreamer("default");
> }catch(Exception e){
> System.out.println("Error DataStream");
> }
> stmr.allowOverwrite(true);
> stmr.receiver(StreamTransformer.from((e, arg) -> {     //Probably the
> error
>          // Get current count.
>       Long val = e.getValue();
>
>          // Increment current count by 1.
>       e.setValue(val == null ? 1L : val + 1);
>
>       return null;
>    }));
>     }
>
>
>     public void map(Object key, Text value, Context context) throws
> IOException, InterruptedException {
>       StringTokenizer itr = new StringTokenizer(value.toString());
>
>       // Whilst iterating over the token iterator
>       while (itr.hasMoreTokens()) {
>     stmr.addData(itr.nextToken(), 1L);
>
>       }
>
>     }
>
>     @Override protected void cleanup(Context context) throws IOException,
> InterruptedException {
>         super.setup(context);
>
>         ignite.close();
>     }
>
>   }
>
>
>
>   public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
> {
>     private IntWritable result = new IntWritable();
>
>
>
>     public void reduce(Text key, Iterable<IntWritable> values, Context
> context) throws IOException, InterruptedException {
>       int sum = 0;
>
>       for (IntWritable val : values) {
>         sum += val.get();
>       }
>
>       result.set(sum);
>       context.write(key, result);
>     }
>
>
>   }
>
>   public static void main(String[] args) throws Exception {
>     Configuration conf = new Configuration();
>     Job job = Job.getInstance(conf, "word count");
>
>     // Make this class the main in the JAR file
>     job.setJarByClass(WordCount.class);
>
>     // Set out Mapper class, conforming to the API
>     job.setMapperClass(TokenizerMapper.class);
>
>     // Set out Combiner & Reducer classes, conforming to the (same) API
>     job.setCombinerClass(IntSumReducer.class);
>     job.setReducerClass(IntSumReducer.class);
>
>     // Set the ouput Key type
>     job.setOutputKeyClass(Text.class);
>
>     // Set the output Value type
>     job.setOutputValueClass(IntWritable.class);
>
>     // Set number of reducers
>     job.setNumReduceTasks(10);
>
>     // Get the input and output paths from the job arguments
>     FileInputFormat.addInputPath(job, new Path(args[0]));
>     FileOutputFormat.setOutputPath(job, new Path(args[1]));
>
>     System.exit(job.waitForCompletion(true) ? 0 : 1);
>   }
> }
>
>
> 17/06/27 17:52:17 ERROR datastreamer.DataStreamProcessor: Failed to
> unmarshal message [nodeId=491340cd-aeb0-49ba-affc-36f188f3a8e5,
> req=DataStreamerRequest [reqId=36, cacheName=default,
> ignoreDepOwnership=true, skipStore=false, keepBinary=false, depMode=null,
> sampleClsName=null, userVer=null, ldrParticipants=null, clsLdrId=null,
> forceLocDep=true, topVer=AffinityTopologyVersion [topVer=9, minorTopVer=1]]]
> class org.apache.ignite.IgniteCheckedException: Failed to unmarshal
> object with optimized marshaller
> at org.apache.ignite.internal.util.IgniteUtils.unmarshal(
> IgniteUtils.java:9893)
> at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.
> processRequest(DataStreamProcessor.java:288)
> at org.apache.ignite.internal.processors.datastreamer.
> DataStreamProcessor.access$000(DataStreamProcessor.java:58)
> at org.apache.ignite.internal.processors.datastreamer.
> DataStreamProcessor$1.onMessage(DataStreamProcessor.java:88)
> at org.apache.ignite.internal.managers.communication.
> GridIoManager.invokeListener(GridIoManager.java:1257)
> at org.apache.ignite.internal.managers.communication.GridIoManager.
> processRegularMessage0(GridIoManager.java:885)
> at org.apache.ignite.internal.managers.communication.
> GridIoManager.access$2100(GridIoManager.java:114)
> at org.apache.ignite.internal.managers.communication.GridIoManager$7.run(
> GridIoManager.java:802)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: class org.apache.ignite.binary.BinaryObjectException: Failed
> to unmarshal object with optimized marshaller
> at org.apache.ignite.internal.binary.BinaryUtils.
> doReadOptimized(BinaryUtils.java:1715)
> at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize0(
> BinaryReaderExImpl.java:1944)
> at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(
> BinaryReaderExImpl.java:1704)
> at org.apache.ignite.internal.binary.GridBinaryMarshaller.deserialize(
> GridBinaryMarshaller.java:304)
> at org.apache.ignite.internal.binary.BinaryMarshaller.
> unmarshal0(BinaryMarshaller.java:99)
> at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(
> AbstractNodeNameAwareMarshaller.java:82)
> at org.apache.ignite.internal.util.IgniteUtils.unmarshal(
> IgniteUtils.java:9887)
> ... 10 more
> Caused by: class org.apache.ignite.IgniteCheckedException: Failed to find
> class with given class loader for unmarshalling (make sure same versions of
> all classes are available on all nodes or enable peer-class-loading)
> [clsLdr=sun.misc.Launcher$AppClassLoader@3941a79c,
> cls=ignite.WordCountExample$TokenizerMapper]
> at org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller.
> unmarshal0(OptimizedMarshaller.java:230)
> at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(
> AbstractNodeNameAwareMarshaller.java:94)
> at org.apache.ignite.internal.binary.BinaryUtils.
> doReadOptimized(BinaryUtils.java:1712)
> ... 16 more
> Caused by: java.lang.ClassNotFoundException: ignite.WordCountExample$
> TokenizerMapper
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.ignite.internal.util.IgniteUtils.forName(
> IgniteUtils.java:8478)
> at org.apache.ignite.internal.MarshallerContextImpl.getClass(
> MarshallerContextImpl.java:340)
> at org.apache.ignite.internal.marshaller.optimized.
> OptimizedMarshallerUtils.classDescriptor(OptimizedMarshallerUtils.java:
> 268)
> at org.apache.ignite.internal.marshaller.optimized.
> OptimizedObjectInputStream.readClass(OptimizedObjectInputStream.java:349)
> at org.apache.ignite.internal.marshaller.optimized.
> OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.
> java:301)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:367)
> at org.apache.ignite.internal.marshaller.optimized.
> OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:491)
> at org.apache.ignite.internal.marshaller.optimized.
> OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.
> java:579)
> at org.apache.ignite.internal.marshaller.optimized.
> OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:927)
> at org.apache.ignite.internal.marshaller.optimized.
> OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.
> java:324)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:367)
> at org.apache.ignite.internal.marshaller.optimized.
> OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:491)
> at org.apache.ignite.internal.marshaller.optimized.
> OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.
> java:579)
> at org.apache.ignite.internal.marshaller.optimized.
> OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:927)
> at org.apache.ignite.internal.marshaller.optimized.
> OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.
> java:324)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:367)
> at org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller.
> unmarshal0(OptimizedMarshaller.java:227)
> ... 18 more
>
>
> Should I add something to the server?
>
> public class StartServer2{
>
> public static void main(String args[]) throws Exception {
> Ignition.setClientMode(false);
> try(Ignite ignite = Ignition.start("/home/hduser/apache-ignite-2.0.0-src/
> examples/config/example-cache1.xml")){
> CacheConfiguration<String,Long> cfg2= Ignition.loadSpringBean("/
> home/hduser/apache-ignite-2.0.0-src/examples/config/example-cache1.xml",
> "cacheconf");
> IgniteCache<String, Long> cache;
> cache = ignite.getOrCreateCache(cfg2);
> while(true){
> }
> }
> }
>
> }
>
>
> Thanks for support
> Mimmo
>
> 2017-06-27 12:23 GMT+02:00 Mimmo Celano <mimmo.celano1993@gmail.com>:
>
>> Hi,
>> Thanks for your reply.
>> We are testing ignite performances on hadoop without hadoop-accelerator
>> for eventually use it in our project.
>> We have a file of 700mb wich is spitted in 6 map tasks, I think that
>> setup time it's not expensive, it's 6-7 seconds. The cache.put which you
>> have mentioned is not computing because it' a comment.
>> Hadoop computation time is 1:50 min with 3 parallel tasks, 1:57 min with
>> ignite setup and without caching operation, 24:25 min with ignite setup and
>> one cache.put(interno) at the end of every map task, 33:35 min with ignite
>> setup and datastream for word caching. I don't think that these cache.put
>> or datastream are so expensive, it's slower than we were thinking.
>> The Nodes are connected each other within a 10Gbps lan, this may be a
>> Bottleneck communication?
>> Could we improve computation time with a server for each node and
>> affinity collocation to write every word in local memory without tcp
>> connection? Can we eventually use cache.get and tcp communication to get
>> information of an inserted word?
>>
>> Thanks
>>
>> 2017-06-27 10:43 GMT+02:00 Michael Cherkasov <michael.cherkasov@gmail.com
>> >:
>>
>>> Hi Mimmo,
>>>
>>> How many map tasks do you have? if you have a lot of map tasks with
>>> small amount of work you will spend almost all cpu time in setup method.
>>>
>>> Also if you have small amount of data, one network operation(
>>> cache.put("interno", 666); ) can be very expensive operation relative to
>>> the whole map task.
>>>
>>> I don't understand what you try to achieve, I think you have some
>>> misunderstanding how to use ignite with hadoop.
>>>
>>> Please, read this doc: https://apacheignite-fs.readme
>>> .io/docs/hadoop-accelerator
>>> it explains how integrate ignite and hadoop together.
>>>
>>> You can use ignite's IGFS and use hadoop as secondary file system and
>>> ignites implementation of job tracker that should improve performance in
>>> your case.
>>>
>>> Thanks,
>>> Mikhail.
>>>
>>> 2017-06-26 21:00 GMT+03:00 mimmo_c <mimmo.celano1993@gmail.com>:
>>>
>>>> Hi,
>>>> ìThanks to your suggestions i managed to configure good ignite. All
>>>> Work but
>>>> I found another issue... The computation is 20 or 30 times slower than
>>>> the
>>>> same computation without put word in cache. If i put just 1 word in
>>>> cache at
>>>> the beginning of map function the computation time it's the same.  From
>>>> what
>>>> can depend?
>>>> This is the Server Code
>>>>
>>>>         public static void main(String args[]) throws Exception {
>>>>                 Ignition.setClientMode(false);
>>>>
>>>>                 try(Ignite ignite =
>>>> Ignition.start("/home/hduser/apache-ignite-2.0.0-src/example
>>>> s/config/example-cache1.xml")){
>>>>
>>>>                         //CacheConfiguration<String ,Integer> cfg2
=
>>>> new CacheConfiguration<>();
>>>>                         CacheConfiguration<String,Integer> cfg2=
>>>> Ignition.loadSpringBean("/home/hduser/apache-ignite-2.0.0-sr
>>>> c/examples/config/example-cache1.xml",
>>>> "cacheconf");
>>>>
>>>>
>>>>                         IgniteCache<String, Integer> cache;
>>>>                         cache = ignite.getOrCreateCache(cfg2);
>>>>                         //cache.put("uno", 1);
>>>>                         //cache.put("due", 2);
>>>>
>>>>                         //System.out.println(cache.get("uno"));
>>>>                         //System.out.println(cache.get("due"));
>>>>
>>>>                         while(true){
>>>>
>>>>                         }
>>>>                 }
>>>>
>>>>
>>>>
>>>>
>>>>         }
>>>>
>>>> This is the WordCount code
>>>>
>>>> public class WordCountIgnite extends Configured implements Tool {
>>>>
>>>>
>>>>
>>>>         public static void main(String args[]) throws Exception {
>>>>
>>>>
>>>>                 int res = ToolRunner.run(new WordCountIgnite(), args);
>>>>                 System.exit(res);
>>>>
>>>>
>>>>         }
>>>>
>>>>         public int run(String[] args) throws Exception {
>>>>
>>>>                 Path inputPath = new Path(args[0]);
>>>>                 Path outputPath = new Path(args[1]);
>>>>
>>>>                 Configuration conf = getConf();
>>>>                 Job job = Job.getInstance(conf, "word count");
>>>>
>>>>                 FileInputFormat.setInputPaths(job, inputPath);
>>>>                 FileOutputFormat.setOutputPath(job, outputPath);
>>>>
>>>>                 job.setJarByClass(this.getClass());
>>>>                 job.setInputFormatClass(TextInputFormat.class);
>>>>                 job.setOutputFormatClass(TextOutputFormat.class);
>>>>                 job.setMapOutputKeyClass(Text.class);
>>>>                 job.setMapOutputValueClass(IntWritable.class);
>>>>                 job.setOutputKeyClass(Text.class);
>>>>                 job.setOutputValueClass(IntWritable.class);
>>>>
>>>>                 job.setMapperClass(Map.class);
>>>>                 job.setCombinerClass(Reduce.class);
>>>>                 job.setReducerClass(Reduce.class);
>>>>
>>>>                 return job.waitForCompletion(true) ? 0 : 1;
>>>>         }
>>>>
>>>>         public static class Map extends Mapper<LongWritable, Text, Text,
>>>> IntWritable> {
>>>>
>>>>                 private final static IntWritable one = new
>>>> IntWritable(1);
>>>>                 private Text word = new Text();
>>>>                 Ignite ignite;
>>>>                 IgniteCache<String, Integer> cache;
>>>>
>>>>                 @Override protected void setup(Context context) throws
>>>> IOException,
>>>> InterruptedException {
>>>>             super.setup(context);
>>>>
>>>>             ignite =
>>>> Ignition.start("/home/hduser/apache-ignite-2.0.0-src/example
>>>> s/config/example-cache2.xml");
>>>>
>>>>                 //CacheConfiguration<String ,Integer> cfg2 = new
>>>> CacheConfiguration<>();
>>>>                 CacheConfiguration<String,Integer> cfg2=
>>>> Ignition.loadSpringBean("/home/hduser/apache-ignite-2.0.0-sr
>>>> c/examples/config/example-cache1.xml",
>>>> "cacheconf");
>>>>
>>>>
>>>>                 cache = ignite.getOrCreateCache(cfg2);
>>>>                 //cache.put("test", 1993);
>>>>
>>>>         }
>>>>
>>>>                 @Override
>>>>                 public void map(LongWritable key, Text value, Context
>>>> context) throws
>>>> IOException, InterruptedException {
>>>> //                      String line = value.toString();
>>>> //                      StringTokenizer tokenizer = new
>>>> StringTokenizer(line);
>>>> //                      while (tokenizer.hasMoreTokens()) {
>>>> //                              word.set(tokenizer.nextToken());
>>>> //                              context.write(word, one);
>>>> //                      }
>>>>
>>>>
>>>>                         String[] lines = tokenize(value.toString());
>>>>
>>>>                         try (IgniteDataStreamer<String, Integer> stmr
=
>>>> ignite.dataStreamer("Cache")) {
>>>>                             // Stream entries.
>>>>                                 for(String token : lines){
>>>>                                         //word.set(token);
>>>>                                         //context.write(word, one);
>>>>                                         stmr.addData(token, 1);
>>>>                                 }
>>>>                         }
>>>>
>>>>
>>>>
>>>>                         //cache.put("interno", 666);
>>>>                 }
>>>>
>>>>                 @Override protected void cleanup(Context context)
>>>> throws IOException,
>>>> InterruptedException {
>>>>             super.setup(context);
>>>>
>>>>             ignite.close();
>>>>
>>>>         }
>>>>
>>>>                 private String[] tokenize(String text) {
>>>>                           text = text.toLowerCase();
>>>>                           text = text.replace("'","");
>>>>                           text = text.replaceAll("[\\s\\W]+", "
>>>> ").trim();
>>>>                           return text.split(" ");
>>>>                         }
>>>>         }
>>>>
>>>>         public static class Reduce extends Reducer<Text, IntWritable,
>>>> Text,
>>>> IntWritable> {
>>>>
>>>>                 @Override
>>>>                 public void reduce(Text key, Iterable<IntWritable>
>>>> values, Context
>>>> context) throws IOException, InterruptedException {
>>>>                         int sum = 0;
>>>>                         for (IntWritable value : values) {
>>>>                                 sum += value.get();
>>>>                         }
>>>>
>>>>                         context.write(key, new IntWritable(sum));
>>>>                 }
>>>>
>>>>         }
>>>>
>>>>
>>>> }
>>>>
>>>> This is the Ignite Configuration
>>>>
>>>> <beans xmlns="http://www.springframework.org/schema/beans"
>>>>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>>>        xsi:schemaLocation="
>>>>         http://www.springframework.org/schema/beans
>>>>         http://www.springframework.org/schema/beans/spring-beans.xsd">
>>>>     <bean id="ignite.cfg"
>>>> class="org.apache.ignite.configuration.IgniteConfiguration">
>>>>
>>>>
>>>>         <property name="discoverySpi">
>>>>             <bean
>>>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>>>>                 <property name="ipFinder">
>>>>
>>>>
>>>>
>>>>                     <bean
>>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicas
>>>> t.TcpDiscoveryMulticastIpFinder">
>>>>                         <property name="addresses">
>>>>                             <list>
>>>>
>>>>                                 <value>127.0.0.1:47500..47509</value>
>>>>                                 <value>192.168.30.5:47500..475
>>>> 09</value>
>>>>                                 <value>192.168.30.22:47500..47
>>>> 509</value>
>>>>                                 <value>192.168.30.99:47500..47
>>>> 509</value>
>>>>                             </list>
>>>>                         </property>
>>>>                     </bean>
>>>>                 </property>
>>>>             </bean>
>>>>         </property>
>>>>     </bean>
>>>>
>>>>     <bean id="cacheconf"
>>>> class="org.apache.ignite.configuration.CacheConfiguration">
>>>>         <property name="name" value="default"/>
>>>>         <property name="atomicityMode" value="ATOMIC"/>
>>>>         <property name="backups" value="0"/>
>>>>         <property name="cacheMode" value="PARTITIONED"/>
>>>>
>>>>     </bean>
>>>> </beans>
>>>>
>>>>
>>>> According to Ignite Documentation
>>>> https://apacheignite.readme.io/docs/performance-tips  I think maybe is
>>>> a
>>>> problem of cache start dimension but i can't set it. I'm using Ignite
>>>> 2.0.
>>>> When I add <property name="startSize"  value="10"/> in configuration
it
>>>> launch exception
>>>>
>>>> Caused by: org.springframework.beans.NotWritablePropertyException:
>>>> Invalid
>>>> property 'startSize' of bean class
>>>> [org.apache.ignite.configuration.CacheConfiguration]: Bean property
>>>> 'startSize' is not writable or has an invalid setter method. Does the
>>>> parameter type of the setter match the return type of the getter?
>>>>
>>>> I don't know why.... I tried with cache.put() method but it is slower.
>>>> Are
>>>> they normale this computation times?
>>>>
>>>> I think the times should be similar to those of a normal hadoop
>>>> computation.
>>>> The test are made by 2 slave who are Ignite server Node too, in Hadoop
>>>> computation create 2 Client node in there same slave who put data in
>>>> cache.
>>>>
>>>> Thanks
>>>> Mimmo
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-ignite-users.705
>>>> 18.x6.nabble.com/Performance-WordCount-to-Hadoop-tp14084.html
>>>> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>>>>
>>>
>>>
>>
>

Mime
View raw message