cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [2/3] git commit: Add hadoop progressable compatibility. Patch by Ben Coverston, reviewed by brandonwilliams for CASSANDRA-5201
Date Wed, 05 Mar 2014 18:59:07 GMT
Add hadoop progressable compatibility.
Patch by Ben Coverston, reviewed by brandonwilliams for CASSANDRA-5201


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4cf8a8a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4cf8a8a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4cf8a8a6

Branch: refs/heads/trunk
Commit: 4cf8a8a6c356889609f9ffb74d548a68e52ec506
Parents: 7f7a9cc
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Wed Mar 5 12:54:42 2014 -0600
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Wed Mar 5 12:54:42 2014 -0600

----------------------------------------------------------------------
 build.xml                                       |   3 -
 .../hadoop/AbstractColumnFamilyInputFormat.java |   1 -
 .../AbstractColumnFamilyOutputFormat.java       |   1 -
 .../AbstractColumnFamilyRecordWriter.java       |   2 +
 .../cassandra/hadoop/BulkOutputFormat.java      |   3 +-
 .../cassandra/hadoop/BulkRecordWriter.java      |  16 +-
 .../hadoop/ColumnFamilyInputFormat.java         |   1 -
 .../hadoop/ColumnFamilyOutputFormat.java        |   2 +-
 .../hadoop/ColumnFamilyRecordReader.java        |   1 -
 .../hadoop/ColumnFamilyRecordWriter.java        |  15 +-
 .../apache/cassandra/hadoop/HadoopCompat.java   | 309 +++++++++++++++++++
 .../apache/cassandra/hadoop/Progressable.java   |  50 ---
 .../cassandra/hadoop/cql3/CqlOutputFormat.java  |   3 +-
 .../hadoop/cql3/CqlPagingInputFormat.java       |   2 +-
 .../hadoop/cql3/CqlPagingRecordReader.java      |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  |  12 +-
 .../cassandra/hadoop/pig/CassandraStorage.java  |   2 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   1 -
 18 files changed, 345 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index bb8673e..304b5fe 100644
--- a/build.xml
+++ b/build.xml
@@ -374,7 +374,6 @@
           	<exclusion groupId="org.mortbay.jetty" artifactId="servlet-api"/>
           </dependency>
           <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" version="1.0.3"/>
-          <dependency groupId="com.twitter.elephantbird" artifactId="elephant-bird-hadoop-compat"
version="4.3"/>
           <dependency groupId="org.apache.pig" artifactId="pig" version="0.11.1"/>
           <dependency groupId="net.java.dev.jna" artifactId="jna" version="4.0.0"/>
 
@@ -418,7 +417,6 @@
         <dependency groupId="org.apache.rat" artifactId="apache-rat"/>
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
       	<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>
-        <dependency groupId="com.twitter.elephantbird" artifactId="elephant-bird-hadoop-compat"/>
         <dependency groupId="org.apache.pig" artifactId="pig"/>
       	<dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
       </artifact:pom>
@@ -485,7 +483,6 @@
         <!-- don't need hadoop classes to run, but if you use the hadoop stuff -->
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/>
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/>
-        <dependency groupId="com.twitter.elephantbird" artifactId="elephant-bird-hadoop-compat"
optional="true"/>
         <dependency groupId="org.apache.pig" artifactId="pig" optional="true"/>
 
         <!-- don't need jna to run, but nice to have -->

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 760193f..cb106e9 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import com.twitter.elephantbird.util.HadoopCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
index a3c4234..3041829 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.twitter.elephantbird.util.HadoopCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
index 1956262..501ca65 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.transport.TTransport;
+import org.apache.hadoop.util.Progressable;
 
 
 /**
@@ -67,6 +68,7 @@ public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends
RecordWrite
 
     protected final ConsistencyLevel consistencyLevel;
     protected Progressable progressable;
+    protected TaskAttemptContext context;
 
     protected AbstractColumnFamilyRecordWriter(Configuration conf)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index 566d5ee..c3d8e05 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import com.twitter.elephantbird.util.HadoopCompat;
 import org.apache.cassandra.thrift.Mutation;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
@@ -61,7 +60,7 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
     @Deprecated
     public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf
job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
     {
-        return new BulkRecordWriter(job, new Progressable(progress));
+        return new BulkRecordWriter(job, progress);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index a8e2e13..d6136a2 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -28,7 +28,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import com.twitter.elephantbird.util.HadoopCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +52,7 @@ import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
+import org.apache.hadoop.util.Progressable;
 
 final class BulkRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>>
 implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
@@ -67,6 +67,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
     private SSTableLoader loader;
     private File outputdir;
     private Progressable progress;
+    private TaskAttemptContext context;
     private int maxFailures;
 
     private enum CFType
@@ -87,10 +88,9 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
     BulkRecordWriter(TaskAttemptContext context)
     {
         this(HadoopCompat.getConfiguration(context));
-        this.progress = new Progressable(context);
+        this.context = context;
     }
 
-
     BulkRecordWriter(Configuration conf, Progressable progress)
     {
         this(conf);
@@ -205,7 +205,10 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                         writer.addExpiringColumn(mut.getColumn_or_supercolumn().column.name,
mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp,
mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl)
* 1000));
                 }
             }
-            progress.progress();
+            if (null != progress)
+                progress.progress();
+            if (null != context)
+                HadoopCompat.progress(context);
         }
     }
     @Override
@@ -236,7 +239,10 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                 }
                 catch (ExecutionException | TimeoutException te)
                 {
-                    progress.progress();
+                    if (null != progress)
+                        progress.progress();
+                    if (null != context)
+                        HadoopCompat.progress(context);
                 }
                 catch (InterruptedException e)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index 362cd70..a2c7a36 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.twitter.elephantbird.util.HadoopCompat;
 import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index 724ba7d..49aaf99 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -51,7 +51,7 @@ public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<B
     @Deprecated
     public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem,
org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress)
     {
-        return new ColumnFamilyRecordWriter(job, new Progressable(progress));
+        return new ColumnFamilyRecordWriter(job, progress);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index ef883fd..f6d2b7e 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.*;
-import com.twitter.elephantbird.util.HadoopCompat;
 import org.apache.cassandra.db.Cell;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index 0ae2a67..d6a873b 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -23,7 +23,6 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.twitter.elephantbird.util.HadoopCompat;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.thrift.*;
@@ -31,6 +30,7 @@ import org.apache.cassandra.utils.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
+import org.apache.hadoop.util.Progressable;
 
 
 /**
@@ -62,9 +62,9 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
     ColumnFamilyRecordWriter(TaskAttemptContext context)
     {
         this(HadoopCompat.getConfiguration(context));
-        this.progressable = new Progressable(context);
-    }
+        this.context = context;
 
+    }
     ColumnFamilyRecordWriter(Configuration conf, Progressable progressable)
     {
         this(conf);
@@ -128,7 +128,10 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
 
         for (Mutation amut : value)
             client.put(Pair.create(keybuff, amut));
+        if (progressable != null)
             progressable.progress();
+        if (context != null)
+            HadoopCompat.progress(context);
     }
 
     /**
@@ -140,9 +143,9 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
         public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf);
         
         /**
-         * Constructs an {@link RangeClient} for the given endpoints.
-         * @param endpoints the possible endpoints to execute the mutations on
-         */
+        * Constructs an {@link RangeClient} for the given endpoints.
+        * @param endpoints the possible endpoints to execute the mutations on
+        */
         public RangeClient(List<InetAddress> endpoints)
         {
             super(endpoints);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/HadoopCompat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/HadoopCompat.java b/src/java/org/apache/cassandra/hadoop/HadoopCompat.java
new file mode 100644
index 0000000..f2f7033
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/HadoopCompat.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hadoop;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/*
+ * This is based on ContextFactory.java from hadoop-2.0.x sources.
+ */
+
+/**
+ * Utility methods to allow applications to deal with inconsistencies between
+ * MapReduce Context Objects API between Hadoop 1.x and 2.x.
+ */
+public class HadoopCompat {
+
+    private static final boolean useV21;
+
+    private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR;
+    private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR;
+    private static final Constructor<?> MAP_CONTEXT_CONSTRUCTOR;
+    private static final Constructor<?> GENERIC_COUNTER_CONSTRUCTOR;
+
+    private static final Field READER_FIELD;
+    private static final Field WRITER_FIELD;
+
+    private static final Method GET_CONFIGURATION_METHOD;
+    private static final Method SET_STATUS_METHOD;
+    private static final Method GET_COUNTER_METHOD;
+    private static final Method INCREMENT_COUNTER_METHOD;
+    private static final Method GET_TASK_ATTEMPT_ID;
+    private static final Method PROGRESS_METHOD;
+
+    static {
+        boolean v21 = true;
+        final String PACKAGE = "org.apache.hadoop.mapreduce";
+        try {
+            Class.forName(PACKAGE + ".task.JobContextImpl");
+        } catch (ClassNotFoundException cnfe) {
+            v21 = false;
+        }
+        useV21 = v21;
+        Class<?> jobContextCls;
+        Class<?> taskContextCls;
+        Class<?> taskIOContextCls;
+        Class<?> mapContextCls;
+        Class<?> genericCounterCls;
+        try {
+            if (v21) {
+                jobContextCls =
+                        Class.forName(PACKAGE+".task.JobContextImpl");
+                taskContextCls =
+                        Class.forName(PACKAGE+".task.TaskAttemptContextImpl");
+                taskIOContextCls =
+                        Class.forName(PACKAGE+".task.TaskInputOutputContextImpl");
+                mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl");
+                genericCounterCls = Class.forName(PACKAGE+".counters.GenericCounter");
+            } else {
+                jobContextCls =
+                        Class.forName(PACKAGE+".JobContext");
+                taskContextCls =
+                        Class.forName(PACKAGE+".TaskAttemptContext");
+                taskIOContextCls =
+                        Class.forName(PACKAGE+".TaskInputOutputContext");
+                mapContextCls = Class.forName(PACKAGE + ".MapContext");
+                genericCounterCls =
+                        Class.forName("org.apache.hadoop.mapred.Counters$Counter");
+
+            }
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("Can't find class", e);
+        }
+        try {
+            JOB_CONTEXT_CONSTRUCTOR =
+                    jobContextCls.getConstructor(Configuration.class, JobID.class);
+            JOB_CONTEXT_CONSTRUCTOR.setAccessible(true);
+            TASK_CONTEXT_CONSTRUCTOR =
+                    taskContextCls.getConstructor(Configuration.class,
+                            TaskAttemptID.class);
+            TASK_CONTEXT_CONSTRUCTOR.setAccessible(true);
+            GENERIC_COUNTER_CONSTRUCTOR =
+                    genericCounterCls.getDeclaredConstructor(String.class,
+                            String.class,
+                            Long.TYPE);
+            GENERIC_COUNTER_CONSTRUCTOR.setAccessible(true);
+
+            if (useV21) {
+                MAP_CONTEXT_CONSTRUCTOR =
+                        mapContextCls.getDeclaredConstructor(Configuration.class,
+                                TaskAttemptID.class,
+                                RecordReader.class,
+                                RecordWriter.class,
+                                OutputCommitter.class,
+                                StatusReporter.class,
+                                InputSplit.class);
+                Method get_counter;
+                try {
+                    get_counter = Class.forName(PACKAGE + ".TaskAttemptContext").getMethod("getCounter",
String.class,
+                            String.class);
+                } catch (Exception e) {
+                    get_counter = Class.forName(PACKAGE + ".TaskInputOutputContext").getMethod("getCounter",
+                            String.class, String.class);
+                }
+                GET_COUNTER_METHOD = get_counter;
+            } else {
+                MAP_CONTEXT_CONSTRUCTOR =
+                        mapContextCls.getConstructor(Configuration.class,
+                                TaskAttemptID.class,
+                                RecordReader.class,
+                                RecordWriter.class,
+                                OutputCommitter.class,
+                                StatusReporter.class,
+                                InputSplit.class);
+                GET_COUNTER_METHOD = Class.forName(PACKAGE+".TaskInputOutputContext")
+                        .getMethod("getCounter", String.class, String.class);
+            }
+            MAP_CONTEXT_CONSTRUCTOR.setAccessible(true);
+            READER_FIELD = mapContextCls.getDeclaredField("reader");
+            READER_FIELD.setAccessible(true);
+            WRITER_FIELD = taskIOContextCls.getDeclaredField("output");
+            WRITER_FIELD.setAccessible(true);
+            GET_CONFIGURATION_METHOD = Class.forName(PACKAGE+".JobContext")
+                    .getMethod("getConfiguration");
+            SET_STATUS_METHOD = Class.forName(PACKAGE+".TaskAttemptContext")
+                    .getMethod("setStatus", String.class);
+            GET_TASK_ATTEMPT_ID = Class.forName(PACKAGE+".TaskAttemptContext")
+                    .getMethod("getTaskAttemptID");
+            INCREMENT_COUNTER_METHOD = Class.forName(PACKAGE+".Counter")
+                    .getMethod("increment", Long.TYPE);
+            PROGRESS_METHOD = Class.forName(PACKAGE+".TaskAttemptContext")
+                    .getMethod("progress");
+
+        } catch (SecurityException e) {
+            throw new IllegalArgumentException("Can't run constructor ", e);
+        } catch (NoSuchMethodException e) {
+            throw new IllegalArgumentException("Can't find constructor ", e);
+        } catch (NoSuchFieldException e) {
+            throw new IllegalArgumentException("Can't find field ", e);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("Can't find class", e);
+        }
+    }
+
+    /**
+     * True if runtime Hadoop version is 2.x, false otherwise.
+     */
+    public static boolean isVersion2x() {
+        return useV21;
+    }
+
+    private static Object newInstance(Constructor<?> constructor, Object...args) {
+        try {
+            return constructor.newInstance(args);
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("Can't instantiate " + constructor, e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("Can't instantiate " + constructor, e);
+        } catch (InvocationTargetException e) {
+            throw new IllegalArgumentException("Can't instantiate " + constructor, e);
+        }
+    }
+
+    /**
+     * Creates JobContext from a JobConf and jobId using the correct constructor
+     * for based on Hadoop version. <code>jobId</code> could be null.
+     */
+    public static JobContext newJobContext(Configuration conf, JobID jobId) {
+        return (JobContext) newInstance(JOB_CONTEXT_CONSTRUCTOR, conf, jobId);
+    }
+
+    /**
+     * Creates TaskAttempContext from a JobConf and jobId using the correct
+     * constructor for based on Hadoop version.
+     */
+    public static TaskAttemptContext newTaskAttemptContext(
+            Configuration conf, TaskAttemptID taskAttemptId) {
+        return (TaskAttemptContext)
+                newInstance(TASK_CONTEXT_CONSTRUCTOR, conf, taskAttemptId);
+    }
+
+    /**
+     * Instantiates MapContext under Hadoop 1 and MapContextImpl under Hadoop 2.
+     */
+    public static MapContext newMapContext(Configuration conf,
+                                           TaskAttemptID taskAttemptID,
+                                           RecordReader recordReader,
+                                           RecordWriter recordWriter,
+                                           OutputCommitter outputCommitter,
+                                           StatusReporter statusReporter,
+                                           InputSplit inputSplit) {
+        return (MapContext) newInstance(MAP_CONTEXT_CONSTRUCTOR,
+                conf, taskAttemptID, recordReader, recordWriter, outputCommitter,
+                statusReporter, inputSplit);
+    }
+
+    /**
+     * @return with Hadoop 2 : <code>new GenericCounter(args)</code>,<br>
+     *         with Hadoop 1 : <code>new Counter(args)</code>
+     */
+    public static Counter newGenericCounter(String name, String displayName, long value)
{
+        try {
+            return (Counter)
+                    GENERIC_COUNTER_CONSTRUCTOR.newInstance(name, displayName, value);
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("Can't instantiate Counter", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("Can't instantiate Counter", e);
+        } catch (InvocationTargetException e) {
+            throw new IllegalArgumentException("Can't instantiate Counter", e);
+        }
+    }
+
+    /**
+     * Invokes a method and rethrows any exception as runtime excetpions.
+     */
+    private static Object invoke(Method method, Object obj, Object... args) {
+        try {
+            return method.invoke(obj, args);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("Can't invoke method " + method.getName(),
e);
+        } catch (InvocationTargetException e) {
+            throw new IllegalArgumentException("Can't invoke method " + method.getName(),
e);
+        }
+    }
+
+    /**
+     * Invoke getConfiguration() on JobContext. Works with both
+     * Hadoop 1 and 2.
+     */
+    public static Configuration getConfiguration(JobContext context) {
+        return (Configuration) invoke(GET_CONFIGURATION_METHOD, context);
+    }
+
+    /**
+     * Invoke setStatus() on TaskAttemptContext. Works with both
+     * Hadoop 1 and 2.
+     */
+    public static void setStatus(TaskAttemptContext context, String status) {
+        invoke(SET_STATUS_METHOD, context, status);
+    }
+
+    /**
+     * returns TaskAttemptContext.getTaskAttemptID(). Works with both
+     * Hadoop 1 and 2.
+     */
+    public static TaskAttemptID getTaskAttemptID(TaskAttemptContext taskContext) {
+        return (TaskAttemptID) invoke(GET_TASK_ATTEMPT_ID, taskContext);
+    }
+
+    /**
+     * Invoke getCounter() on TaskInputOutputContext. Works with both
+     * Hadoop 1 and 2.
+     */
+    public static Counter getCounter(TaskInputOutputContext context,
+                                     String groupName, String counterName) {
+        return (Counter) invoke(GET_COUNTER_METHOD, context, groupName, counterName);
+    }
+
+    /**
+     * Invoke TaskAttemptContext.progress(). Works with both
+     * Hadoop 1 and 2.
+     */
+    public static void progress(TaskAttemptContext context) {
+        invoke(PROGRESS_METHOD, context);
+    }
+
+    /**
+     * Increment the counter. Works with both Hadoop 1 and 2
+     */
+    public static void incrementCounter(Counter counter, long increment) {
+        // incrementing a count might be called often. Might be affected by
+        // cost of invoke(). might be good candidate to handle in a shim.
+        // (TODO Raghu) figure out how achieve such a build with maven
+        invoke(INCREMENT_COUNTER_METHOD, counter, increment);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/Progressable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/Progressable.java b/src/java/org/apache/cassandra/hadoop/Progressable.java
deleted file mode 100644
index ac253ef..0000000
--- a/src/java/org/apache/cassandra/hadoop/Progressable.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package org.apache.cassandra.hadoop;
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-
-public class Progressable
-{
-    private TaskAttemptContext context;
-    private org.apache.hadoop.util.Progressable progressable;
-
-    public Progressable(TaskAttemptContext context)
-    {
-        this.context = context;
-    }
-
-    public Progressable(org.apache.hadoop.util.Progressable progressable)
-    {
-        this.progressable = progressable;
-    }
-
-    public void progress()
-    {
-        if (context != null)
-            context.progress();
-        else
-            progressable.progress();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
index 3cc0cd1..7c89bef 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
@@ -25,7 +25,6 @@ import java.util.Map;
 
 import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat;
 import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.hadoop.Progressable;
 import org.apache.hadoop.mapreduce.*;
 
 /**
@@ -59,7 +58,7 @@ public class CqlOutputFormat extends AbstractColumnFamilyOutputFormat<Map<String
     @Deprecated
     public CqlRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf
job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
     {
-        return new CqlRecordWriter(job, new Progressable(progress));
+        return new CqlRecordWriter(job, progress);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
index 6f4478e..96f2f94 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-import com.twitter.elephantbird.util.HadoopCompat;
+import org.apache.cassandra.hadoop.HadoopCompat;
 import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat;
 import org.apache.cassandra.hadoop.ReporterWrapper;
 import org.apache.hadoop.mapred.InputSplit;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
index f712584..9d60485 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
@@ -26,7 +26,7 @@ import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterables;
-import com.twitter.elephantbird.util.HadoopCompat;
+import org.apache.cassandra.hadoop.HadoopCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 826fc0d..a030462 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -23,7 +23,8 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
-import com.twitter.elephantbird.util.HadoopCompat;
+import org.apache.cassandra.hadoop.HadoopCompat;
+import org.apache.hadoop.util.Progressable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,7 +38,6 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter;
 import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.hadoop.Progressable;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -86,7 +86,7 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
ByteB
     CqlRecordWriter(TaskAttemptContext context) throws IOException
     {
         this(HadoopCompat.getConfiguration(context));
-        this.progressable = new Progressable(context);
+        this.context = context;
     }
 
     CqlRecordWriter(Configuration conf, Progressable progressable) throws IOException
@@ -181,7 +181,11 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String,
ByteB
             allValues.add(keyColumns.get(column));
 
         client.put(allValues);
-        progressable.progress();
+
+        if (progressable != null)
+            progressable.progress();
+        if (context != null)
+            HadoopCompat.progress(context);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 56f66bb..14d30d5 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
-import com.twitter.elephantbird.util.HadoopCompat;
+import org.apache.cassandra.hadoop.HadoopCompat;
 import org.apache.cassandra.db.Cell;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cf8a8a6/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index b349cf7..efa6d34 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
-import com.twitter.elephantbird.util.HadoopCompat;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.composites.CellNames;


Mime
View raw message