avro-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Scott Carey (JIRA)" <j...@apache.org>
Subject [jira] Issue Comment Edited: (AVRO-782) issue of cache coherence or reuse for avro map reduce
Date Thu, 17 Mar 2011 04:15:29 GMT

    [ https://issues.apache.org/jira/browse/AVRO-782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13007801#comment-13007801
] 

Scott Carey edited comment on AVRO-782 at 3/17/11 4:13 AM:
-----------------------------------------------------------

Something is odd with how the  objects passed in to reduce() are re-used by the framework.

So, if you change

{code}
value.put("rowKey", key);
{code}
to
{code}
value.put("rowKey", new Utf8(Arrays.copyOf(key.getBytes(),key.getByteLength())));
{code}
or
{code}
value.put("rowKey", key.toString();
{code}

It works.  

Since Utf8 is mutable, the next reduce call modifies it rather than creating a new one.  
The same thing can happen with any object.

For example, try this version of the mapper:
{code}
	public static class MapImpl extends AvroMapper<GenericRecord, Pair<Utf8, GenericRecord>>
{
	  static GenericRecord foo = null;
		@Override
		public void map(GenericRecord input, AvroCollector<Pair<Utf8, GenericRecord>>
collector,
				Reporter reporter) throws IOException {
		  if (null == foo) foo = input;
			collector.collect(new Pair<Utf8,GenericRecord>(input.get("rowKey"),input));

      System.out.println("working on " + input.toString() );
      System.out.println("the first was " + foo.toString());
		}
	}
{code}

It will print out:
{noformat}
working on 0000000000000000000000000000000000000
the first was 0000000000000000000000000000000000000
working on 0000000100000000000000000000000000001
the first was 0000000100000000000000000000000000001
working on 0000000200000000000000000000000000002
the first was 0000000200000000000000000000000000002
working on 0000000300000000000000000000000000003
the first was 0000000300000000000000000000000000003
working on 0000000400000000000000000000000000004
the first was 0000000400000000000000000000000000004
{noformat}

the call to collect() here works without copying in the mapper because it immediately serializes
the value.   The problem on the reducer side is during reading, not writing.  It reads a value
twice in a row for some reason. In fact, it reads both the '1' and '2' values and keys in
the '1' loop.  

I think what is happening is that 'key', passed into reduce() is used by the internal Hadoop
value iterator.  Once we read a second value in in the generic record, we overwrite the iterator
state and breaks, causing it to skip an iteration and become offset such that the current
loop matches the next key.


The workaround in the short term is to copy the key before setting it on a mutable object
where it can be changed.  

We probably want a copy-constructor for Utf8 to make copies easier too.

      was (Author: scott_carey):
    Something is odd with how the  objects passed in to reduce() are re-used by the framework.

So, if you change

{code}
value.put("rowKey", key);
{code}
to
{code}
value.put("rowKey", new Utf8(Arrays.copyOf(key.getBytes(),key.getByteLength())));
{code}
or
{code}
value.put("rowKey", key.toString();
{code}

It works.  

Since Utf8 is mutable, the next reduce call modifies it rather than creating a new one.  
The same thing can happen with any object.

For example, try this version of the mapper:
{code}
	public static class MapImpl extends AvroMapper<GenericRecord, Pair<Utf8, GenericRecord>>
{
	  static GenericRecord foo = null;
		@Override
		public void map(GenericRecord input, AvroCollector<Pair<Utf8, GenericRecord>>
collector,
				Reporter reporter) throws IOException {
		  if (null == foo) foo = input;
			collector.collect(new Pair<Utf8,GenericRecord>(input.get("rowKey"),input));

      System.out.println("working on " + input.toString() );
      System.out.println("the first was " + foo.toString());
		}
	}
{code}

It will print out:
{noformat}
working on 0000000000000000000000000000000000000
the first was 0000000000000000000000000000000000000
working on 0000000100000000000000000000000000001
the first was 0000000100000000000000000000000000001
working on 0000000200000000000000000000000000002
the first was 0000000200000000000000000000000000002
working on 0000000300000000000000000000000000003
the first was 0000000300000000000000000000000000003
working on 0000000400000000000000000000000000004
the first was 0000000400000000000000000000000000004
{noformat}

the call to collect() here works without copying in the mapper because it immediately serializes
the value.   The problem on the mapper side is during reading, not writing.  It reads a value
twice in a row for some reason. In fact, it reads both the '1' and '2' values and keys in
the '1' loop.  

I think what is happening is that 'key', passed into reduce() is used by the internal Hadoop
value iterator.  Once we read a second value in in the generic record, we overwrite the iterator
state and breaks, causing it to skip an iteration and become offset such that the current
loop matches the next key.


The workaround in the short term is to copy the key before setting it on a mutable object
where it can be changed.  

We probably want a copy-constructor for Utf8 to make copies easier too.
  
> issue of cache coherence or reuse for avro map reduce
> -----------------------------------------------------
>
>                 Key: AVRO-782
>                 URL: https://issues.apache.org/jira/browse/AVRO-782
>             Project: Avro
>          Issue Type: Bug
>          Components: java
>    Affects Versions: 1.5.0
>         Environment: Mac with VMWare running Linux training-vm 2.6.28-19-server #61-Ubuntu
>            Reporter: ey-chih chow
>         Attachments: AVRO-782.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Our map reduce jobs are using Avro map/reduce API.  For one of the jobs, we got the following
trace for the reducer:
> ====================================================================================================
> attempt_20110310145147365_0002_r_000000_0/syslog:2011-03-10 14:52:31,226 INFO com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer:
working on 0000000000000000000000000000000000000 whose rowKey is 0000000000000000000000000000000000000
> attempt_20110310145315542_0002_r_000000_0/syslog:2011-03-10 14:53:59,010 INFO com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer:
working on 0000000000000000000000000000000000000 whose rowKey is 0000000000000000000000000000000000000
> attempt_20110310145315542_0002_r_000000_0/syslog:2011-03-10 14:53:59,016 INFO com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer:
working on 0000000100000000000000000000000000001 whose rowKey is 0000000200000000000000000000000000002
> attempt_20110310145315542_0002_r_000000_0/syslog:2011-03-10 14:53:59,017 INFO com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer:
working on 0000000200000000000000000000000000002 whose rowKey is 0000000300000000000000000000000000003
> attempt_20110310145315542_0002_r_000000_0/syslog:2011-03-10 14:53:59,021 INFO com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer:
working on 0000000300000000000000000000000000003 whose rowKey is 0000000400000000000000000000000000004
> attempt_20110310145315542_0002_r_000000_0/syslog:2011-03-10 14:53:59,023 INFO com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer:
working on 0000000400000000000000000000000000004 whose rowKey is 0000000500000000000000000000000000005
> attempt_20110310145315542_0002_r_000000_0/syslog:2011-03-10 14:53:59,024 INFO com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer:
working on 0000000500000000000000000000000000005 whose rowKey is 0000000500000000000000000000000000005
> ====================================================================================================
> If we add the following two lines to the reducer code:
> ====================================================================================================
> boolean workAround = getConf().getBoolean(NgActivityGatheringJob.NG_AVRO_BUG_WORKAROUND,
true);
> Utf8 dupKey = (workAround) ? new Utf8(key.toString()) : key; // use dupKey instead of
key passed to reducer
> ====================================================================================================
> We got the following trace, which we consider as the right behavior:
> ====================================================================================================
> 2011-03-10 15:04:33,431 INFO com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer:
working on 0000000000000000000000000000000000000 whose rowKey is 0000000000000000000000000000000000000
> attempt_20110310150517897_0002_r_000000_0/syslog:2011-03-10 15:06:01,374 INFO com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer:
working on 0000000000000000000000000000000000000 whose rowKey is 0000000000000000000000000000000000000
> attempt_20110310150517897_0002_r_000000_0/syslog:2011-03-10 15:06:01,381 INFO com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer:
working on 0000000100000000000000000000000000001 whose rowKey is 0000000100000000000000000000000000001
> attempt_20110310150517897_0002_r_000000_0/syslog:2011-03-10 15:06:01,383 INFO com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer:
working on 0000000200000000000000000000000000002 whose rowKey is 0000000200000000000000000000000000002
> attempt_20110310150517897_0002_r_000000_0/syslog:2011-03-10 15:06:01,389 INFO com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer:
working on 0000000300000000000000000000000000003 whose rowKey is 0000000300000000000000000000000000003
> attempt_20110310150517897_0002_r_000000_0/syslog:2011-03-10 15:06:01,391 INFO com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer:
working on 0000000400000000000000000000000000004 whose rowKey is 0000000400000000000000000000000000004
> attempt_20110310150517897_0002_r_000000_0/syslog:2011-03-10 15:06:01,393 INFO com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer:
working on 0000000500000000000000000000000000005 whose rowKey is 0000000500000000000000000000000000005
> ====================================================================================================
> According to Scott Carey, this might relate to object reuse.  We have created an Unit
test case that will reproduce the problem.  The test case will be attached as a patch.  Note
that we run this test case under our Ngmoco dev environment, which might need to make some
adjustment to run on other environment.   

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message