Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 27DC594C7 for ; Tue, 4 Oct 2011 00:43:03 +0000 (UTC) Received: (qmail 71273 invoked by uid 500); 4 Oct 2011 00:43:03 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 71245 invoked by uid 500); 4 Oct 2011 00:43:03 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 71237 invoked by uid 99); 4 Oct 2011 00:43:03 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Oct 2011 00:43:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Oct 2011 00:42:59 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A15C023888FE; Tue, 4 Oct 2011 00:42:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1178650 - in /hadoop/common/trunk/hadoop-mapreduce-project: CHANGES.txt hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java Date: Tue, 04 Oct 2011 00:42:37 -0000 To: mapreduce-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111004004237.A15C023888FE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: acmurthy Date: Tue Oct 4 00:42:37 2011 New Revision: 1178650 URL: http://svn.apache.org/viewvc?rev=1178650&view=rev Log: MAPREDUCE-3138. Add a utility to help applications bridge changes in Context Objects APIs due to MAPREDUCE-954. Contributed by omalley. Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1178650&r1=1178649&r2=1178650&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Oct 4 00:42:37 2011 @@ -335,6 +335,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3134. Added documentation the CapacityScheduler. (acmurthy) + MAPREDUCE-3138. Add a utility to help applications bridge changes in + Context Objects APIs due to MAPREDUCE-954. (omalley via acmurthy) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java?rev=1178650&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java Tue Oct 4 00:42:37 2011 @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; + +import org.apache.hadoop.conf.Configuration; + +/** + * A factory to allow applications to deal with inconsistencies between + * MapReduce Context Objects API between hadoop-0.20 and later versions. + */ +public class ContextFactory { + + private static final Constructor JOB_CONTEXT_CONSTRUCTOR; + private static final Constructor TASK_CONTEXT_CONSTRUCTOR; + private static final Constructor MAP_CONTEXT_CONSTRUCTOR; + private static final Constructor MAP_CONTEXT_IMPL_CONSTRUCTOR; + private static final boolean useV21; + + private static final Field REPORTER_FIELD; + private static final Field READER_FIELD; + private static final Field WRITER_FIELD; + private static final Field OUTER_MAP_FIELD; + private static final Field WRAPPED_CONTEXT_FIELD; + + static { + boolean v21 = true; + final String PACKAGE = "org.apache.hadoop.mapreduce"; + try { + Class.forName(PACKAGE + ".task.JobContextImpl"); + } catch (ClassNotFoundException cnfe) { + v21 = false; + } + useV21 = v21; + Class jobContextCls; + Class taskContextCls; + Class taskIOContextCls; + Class mapCls; + Class mapContextCls; + Class innerMapContextCls; + try { + if (v21) { + jobContextCls = + Class.forName(PACKAGE+".task.JobContextImpl"); + taskContextCls = + Class.forName(PACKAGE+".task.TaskAttemptContextImpl"); + taskIOContextCls = + Class.forName(PACKAGE+".task.TaskInputOutputContextImpl"); + mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl"); + mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper"); + innerMapContextCls = + Class.forName(PACKAGE+".lib.map.WrappedMapper$Context"); + } else { + jobContextCls = + Class.forName(PACKAGE+".JobContext"); + taskContextCls = + Class.forName(PACKAGE+".TaskAttemptContext"); + taskIOContextCls = + Class.forName(PACKAGE+".TaskInputOutputContext"); + mapContextCls = Class.forName(PACKAGE + ".MapContext"); + mapCls = Class.forName(PACKAGE + ".Mapper"); + innerMapContextCls = + Class.forName(PACKAGE+".Mapper$Context"); + } + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Can't find class", e); + } + try { + JOB_CONTEXT_CONSTRUCTOR = + jobContextCls.getConstructor(Configuration.class, JobID.class); + JOB_CONTEXT_CONSTRUCTOR.setAccessible(true); + TASK_CONTEXT_CONSTRUCTOR = + taskContextCls.getConstructor(Configuration.class, + TaskAttemptID.class); + TASK_CONTEXT_CONSTRUCTOR.setAccessible(true); + if (useV21) { + MAP_CONTEXT_CONSTRUCTOR = + innerMapContextCls.getConstructor(mapCls, + MapContext.class); + MAP_CONTEXT_IMPL_CONSTRUCTOR = + mapContextCls.getDeclaredConstructor(Configuration.class, + TaskAttemptID.class, + RecordReader.class, + RecordWriter.class, + OutputCommitter.class, + StatusReporter.class, + InputSplit.class); + MAP_CONTEXT_IMPL_CONSTRUCTOR.setAccessible(true); + WRAPPED_CONTEXT_FIELD = + innerMapContextCls.getDeclaredField("mapContext"); + WRAPPED_CONTEXT_FIELD.setAccessible(true); + } else { + MAP_CONTEXT_CONSTRUCTOR = + innerMapContextCls.getConstructor(mapCls, + Configuration.class, + TaskAttemptID.class, + RecordReader.class, + RecordWriter.class, + OutputCommitter.class, + StatusReporter.class, + InputSplit.class); + MAP_CONTEXT_IMPL_CONSTRUCTOR = null; + WRAPPED_CONTEXT_FIELD = null; + } + MAP_CONTEXT_CONSTRUCTOR.setAccessible(true); + REPORTER_FIELD = taskIOContextCls.getDeclaredField("reporter"); + REPORTER_FIELD.setAccessible(true); + READER_FIELD = mapContextCls.getDeclaredField("reader"); + READER_FIELD.setAccessible(true); + WRITER_FIELD = taskIOContextCls.getDeclaredField("output"); + WRITER_FIELD.setAccessible(true); + OUTER_MAP_FIELD = innerMapContextCls.getDeclaredField("this$0"); + OUTER_MAP_FIELD.setAccessible(true); + } catch (SecurityException e) { + throw new IllegalArgumentException("Can't run constructor ", e); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("Can't find constructor ", e); + } catch (NoSuchFieldException e) { + throw new IllegalArgumentException("Can't find field ", e); + } + } + + /** + * Clone a job or task attempt context with a new configuration. + * @param original the original context + * @param conf the new configuration + * @return a new context object + * @throws InterruptedException + * @throws IOException + */ + @SuppressWarnings("unchecked") + public static JobContext cloneContext(JobContext original, + Configuration conf + ) throws IOException, + InterruptedException { + try { + if (original instanceof MapContext) { + return cloneMapContext((Mapper.Context) original, conf, null, null); + } else if (original instanceof ReduceContext) { + throw new IllegalArgumentException("can't clone ReduceContext"); + } else if (original instanceof TaskAttemptContext) { + TaskAttemptContext spec = (TaskAttemptContext) original; + return (JobContext) + TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, spec.getTaskAttemptID()); + } else { + return (JobContext) + JOB_CONTEXT_CONSTRUCTOR.newInstance(conf, original.getJobID()); + } + } catch (InstantiationException e) { + throw new IllegalArgumentException("Can't clone object", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't clone object", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't clone object", e); + } + } + + /** + * Copy a mapper context, optionally replacing the input and output. + * @param input key type + * @param input value type + * @param output key type + * @param output value type + * @param context the context to clone + * @param conf a new configuration + * @param reader Reader to read from. Null means to clone from context. + * @param writer Writer to write to. Null means to clone from context. + * @return a new context. it will not be the same class as the original. + * @throws IOException + * @throws InterruptedException + */ + @SuppressWarnings("unchecked") + public static Mapper.Context + cloneMapContext(MapContext context, + Configuration conf, + RecordReader reader, + RecordWriter writer + ) throws IOException, InterruptedException { + try { + // get the outer object pointer + Object outer = OUTER_MAP_FIELD.get(context); + // if it is a wrapped 21 context, unwrap it + if ("org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context".equals + (context.getClass().getName())) { + context = (MapContext) WRAPPED_CONTEXT_FIELD.get(context); + } + // if the reader or writer aren't given, use the same ones + if (reader == null) { + reader = (RecordReader) READER_FIELD.get(context); + } + if (writer == null) { + writer = (RecordWriter) WRITER_FIELD.get(context); + } + if (useV21) { + Object basis = + MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(conf, + context.getTaskAttemptID(), + reader, writer, + context.getOutputCommitter(), + REPORTER_FIELD.get(context), + context.getInputSplit()); + return (Mapper.Context) + MAP_CONTEXT_CONSTRUCTOR.newInstance(outer, basis); + } else { + return (Mapper.Context) + MAP_CONTEXT_CONSTRUCTOR.newInstance(outer, + conf, context.getTaskAttemptID(), + reader, writer, + context.getOutputCommitter(), + REPORTER_FIELD.get(context), + context.getInputSplit()); + } + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't access field", e); + } catch (InstantiationException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't invoke constructor", e); + } + } +}