crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-404: Add Option, Either, Java primitives, and protobuf/avro/thrift record support to PTypeH
Date Thu, 29 May 2014 03:41:16 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 424aa10aa -> 652963cfb


CRUNCH-404: Add Option, Either, Java primitives, and protobuf/avro/thrift record support to
PTypeH


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/652963cf
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/652963cf
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/652963cf

Branch: refs/heads/apache-crunch-0.8
Commit: 652963cfb02011bdf7d3f1a7fd8afa942791c8de
Parents: 424aa10
Author: Josh Wills <jwills@apache.org>
Authored: Sun May 25 09:51:57 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Wed May 28 20:39:21 2014 -0700

----------------------------------------------------------------------
 crunch-scrunch/pom.xml                          | 25 +++++++-
 .../apache/crunch/scrunch/ScalaTypesTest.scala  | 61 ++++++++++++++++++++
 .../org/apache/crunch/scrunch/Conversions.scala | 52 +++++++++++++++--
 .../crunch/scrunch/EmbeddedPipeline.scala       |  1 -
 .../org/apache/crunch/scrunch/PTypeFamily.scala | 53 ++++++++++++++---
 .../org/apache/crunch/scrunch/PipelineApp.scala |  4 --
 pom.xml                                         |  6 ++
 7 files changed, 183 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/652963cf/crunch-scrunch/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-scrunch/pom.xml b/crunch-scrunch/pom.xml
index dfce881..d3cf882 100644
--- a/crunch-scrunch/pom.xml
+++ b/crunch-scrunch/pom.xml
@@ -51,8 +51,23 @@ under the License.
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase</artifactId>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
       <scope>provided</scope>
     </dependency>
     <dependency>
@@ -64,6 +79,12 @@ under the License.
       <artifactId>crunch-test</artifactId>
       <scope>test</scope>
     </dependency>
+    <!-- Used by LocalJobRunner in integration tests -->
+    <dependency>
+      <groupId>commons-httpclient</groupId>
+      <artifactId>commons-httpclient</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>

http://git-wip-us.apache.org/repos/asf/crunch/blob/652963cf/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala
new file mode 100644
index 0000000..de9a5f9
--- /dev/null
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.scrunch;
+
+import org.junit.Test;
+
+object ScalaTypesTest {
+  def et(s: String): Either[String, Int] = {
+    if (s.startsWith("a")) {
+      Left(s)
+    } else {
+      Right(s.length)
+    }
+  }
+}
+
+class ScalaTypesTest extends CrunchSuite {
+  import ScalaTypesTest._
+
+  lazy val pipeline = Pipeline.mapReduce[ScalaTypesTest](tempDir.getDefaultConfiguration)
+
+  @Test
+  def option {
+    val shakespeare = tempDir.copyResourceFileName("shakes.txt")
+
+    val out = pipeline.read(From.textFile(shakespeare))
+        .map(x => if (x.startsWith("a")) Some(x) else None)
+        .materialize
+        .take(100)
+    pipeline.done
+    assert(out.exists(!_.isEmpty))
+  }
+
+  @Test
+  def either {
+    val shakespeare = tempDir.copyResourceFileName("shakes.txt")
+
+    val out = pipeline.read(From.textFile(shakespeare))
+      .map(et)
+      .materialize
+      .take(100)
+    pipeline.done
+    assert(out.exists(_.isLeft))
+    assert(out.exists(_.isRight))
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/652963cf/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
index 833e6d9..05baebf 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
@@ -19,12 +19,15 @@ package org.apache.crunch.scrunch
 
 import org.apache.crunch.{PCollection => JCollection, PGroupedTable => JGroupedTable,
PTable => JTable, DoFn, Emitter}
 import org.apache.crunch.{Pair => CPair}
-import org.apache.crunch.types.PType
+import org.apache.crunch.types.{PTypes, PType}
 import java.nio.ByteBuffer
 import scala.collection.Iterable
 import scala.reflect.ClassTag
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce.TaskInputOutputContext
+import com.google.protobuf.Message
+import org.apache.avro.specific.SpecificRecord
+import org.apache.thrift.{TFieldIdEnum, TBase}
 
 trait CanParallelTransform[El, To] {
   def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: DoFn[A, El], ptype: PType[El]):
To
@@ -83,22 +86,61 @@ trait PTypeH[T] extends Serializable {
   def get(ptf: PTypeFamily): PType[T]
 }
 
-object PTypeH {
+trait LowPriorityPTypeH {
+  implicit def records[T <: AnyRef : ClassTag] = new PTypeH[T] {
+    def get(ptf: PTypeFamily) = ptf.records(implicitly[ClassTag[T]]).asInstanceOf[PType[T]]
+  }
+}
+
+object PTypeH extends LowPriorityPTypeH {
 
   implicit val longs = new PTypeH[Long] { def get(ptf: PTypeFamily) = ptf.longs }
+  implicit val jlongs = new PTypeH[java.lang.Long] { def get(ptf: PTypeFamily) = ptf.jlongs
}
   implicit val ints = new PTypeH[Int] { def get(ptf: PTypeFamily) = ptf.ints }
+  implicit val jints = new PTypeH[java.lang.Integer] { def get(ptf: PTypeFamily) = ptf.jints
}
+
   implicit val floats = new PTypeH[Float] { def get(ptf: PTypeFamily) = ptf.floats }
+  implicit val jfloats = new PTypeH[java.lang.Float] { def get(ptf: PTypeFamily) = ptf.jfloats
}
+
   implicit val doubles = new PTypeH[Double] { def get(ptf: PTypeFamily) = ptf.doubles }
-  implicit val strings = new PTypeH[String] { def get(ptf: PTypeFamily) = ptf.strings }
+  implicit val jdoubles = new PTypeH[java.lang.Double] { def get(ptf: PTypeFamily) = ptf.jdoubles
}
+
   implicit val booleans = new PTypeH[Boolean] { def get(ptf: PTypeFamily) = ptf.booleans
}
+  implicit val jbooleans = new PTypeH[java.lang.Boolean] { def get(ptf: PTypeFamily) = ptf.jbooleans
}
+
+  implicit val strings = new PTypeH[String] { def get(ptf: PTypeFamily) = ptf.strings }
   implicit val bytes = new PTypeH[ByteBuffer] { def get(ptf: PTypeFamily) = ptf.bytes }
 
   implicit def writables[W <: Writable : ClassTag] = new PTypeH[W] {
     def get(ptf: PTypeFamily): PType[W] = ptf.writables(implicitly[ClassTag[W]])
   }
 
-  implicit def records[T <: AnyRef : ClassTag] = new PTypeH[T] {
-    def get(ptf: PTypeFamily) = ptf.records(implicitly[ClassTag[T]]).asInstanceOf[PType[T]]
+  implicit def protos[T <: Message : ClassTag] = new PTypeH[T] {
+    def get(ptf: PTypeFamily) = {
+      PTypes.protos(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]], ptf.ptf)
+    }
+  }
+
+  implicit def thrifts[T <: TBase[_ <: TBase[_, _], _ <: TFieldIdEnum] : ClassTag]
= new PTypeH[T] {
+    def get(ptf: PTypeFamily) = {
+      PTypes.thrifts(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]], ptf.ptf)
+    }
+  }
+
+  implicit def specifics[T <: SpecificRecord : ClassTag] = new PTypeH[T] {
+    def get(ptf: PTypeFamily) = Avros.specifics[T]()
+  }
+
+  implicit def options[T: PTypeH] = new PTypeH[Option[T]] {
+    def get(ptf: PTypeFamily) = {
+      ptf.options(implicitly[PTypeH[T]].get(ptf))
+    }
+  }
+
+  implicit def eithers[L: PTypeH, R: PTypeH] = new PTypeH[Either[L, R]] {
+    def get(ptf: PTypeFamily) = {
+      ptf.eithers(implicitly[PTypeH[L]].get(ptf), implicitly[PTypeH[R]].get(ptf))
+    }
   }
 
   implicit def collections[T: PTypeH] = {

http://git-wip-us.apache.org/repos/asf/crunch/blob/652963cf/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala
b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala
index e9df263..039ea3e 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala
@@ -18,7 +18,6 @@
 package org.apache.crunch.scrunch
 
 import org.apache.hadoop.conf.Configuration
-import scala.reflect.ClassTag
 
 /**
  * Adds a pipeline to the class it is being mixed in to.

http://git-wip-us.apache.org/repos/asf/crunch/blob/652963cf/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
index 9a30d58..394e2ac 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.crunch.scrunch
 
-import org.apache.crunch.{Pair => CPair, Tuple3 => CTuple3, Tuple4 => CTuple4, MapFn}
+import org.apache.crunch.{Pair => CPair, Tuple3 => CTuple3, Tuple4 => CTuple4, Union,
MapFn}
 import org.apache.crunch.types.{PType, PTypeFamily => PTF}
 import org.apache.crunch.types.writable.{WritableTypeFamily, Writables => CWritables}
 import org.apache.crunch.types.avro.{AvroType, AvroTypeFamily, Avros => CAvros}
@@ -26,6 +26,7 @@ import java.util.{Collection => JCollection}
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 import org.apache.hadoop.io.Writable
+import org.apache.avro.specific.SpecificRecord
 
 class TMapFn[S, T](val f: S => T, val pt: Option[PType[S]] = None, var init: Boolean =
false) extends MapFn[S, T] {
   override def initialize() {
@@ -56,34 +57,66 @@ trait PTypeFamily {
     ptf.derived(cls, new TMapFn[S, T](in, Some(pt)), new TMapFn[T, S](out), pt)
   }
 
+  def derivedImmutable[S, T](cls: java.lang.Class[T], in: S => T, out: T => S, pt:
PType[S]) = {
+    ptf.derivedImmutable(cls, new TMapFn[S, T](in), new TMapFn[T, S](out), pt)
+  }
+
+  val jlongs = ptf.longs()
+
   val longs = {
     val in = (x: JLong) => x.longValue()
     val out = (x: Long) => new JLong(x)
-    derived(classOf[Long], in, out, ptf.longs())
+    derivedImmutable(classOf[Long], in, out, ptf.longs())
   }
 
+  val jints = ptf.ints()
+
   val ints = {
     val in = (x: JInt) => x.intValue()
     val out = (x: Int) => new JInt(x)
-    derived(classOf[Int], in, out, ptf.ints())
+    derivedImmutable(classOf[Int], in, out, ptf.ints())
   }
 
+  val jfloats = ptf.floats()
+
   val floats = {
     val in = (x: JFloat) => x.floatValue()
     val out = (x: Float) => new JFloat(x)
-    derived(classOf[Float], in, out, ptf.floats())
+    derivedImmutable(classOf[Float], in, out, ptf.floats())
   }
 
+  val jdoubles = ptf.doubles()
+
   val doubles = {
     val in = (x: JDouble) => x.doubleValue()
     val out = (x: Double) => new JDouble(x)
-    derived(classOf[Double], in, out, ptf.doubles())
+    derivedImmutable(classOf[Double], in, out, ptf.doubles())
   }
 
+  val jbooleans = ptf.booleans()
+
   val booleans = {
     val in = (x: JBoolean) => x.booleanValue()
     val out = (x: Boolean) => new JBoolean(x)
-    derived(classOf[Boolean], in, out, ptf.booleans())
+    derivedImmutable(classOf[Boolean], in, out, ptf.booleans())
+  }
+
+  def options[T](ptype: PType[T]) = {
+    val in: Union => Option[T] = (x: Union) => { if (x.getIndex() == 0) None else Some(x.getValue.asInstanceOf[T])
}
+    val out = (x: Option[T]) => { if (x.isEmpty) new Union(0, null) else new Union(1,
x.get) }
+    derived(classOf[Option[T]], in, out, ptf.unionOf(ptf.nulls(), ptype))
+  }
+
+  def eithers[L, R](left: PType[L], right: PType[R]): PType[Either[L, R]] = {
+    val in: Union => Either[L, R] = (x: Union) => {
+      if (x.getIndex() == 0) {
+        Left[L, R](x.getValue.asInstanceOf[L])
+      } else {
+        Right[L, R](x.getValue.asInstanceOf[R])
+      }
+    }
+    val out = (x: Either[L, R]) => { if (x.isLeft) new Union(0, x.left.get) else new Union(1,
x.right.get) }
+    derived(classOf[Either[L, R]], in, out, ptf.unionOf(left, right))
   }
 
   def tableOf[K, V](keyType: PType[K], valueType: PType[V]) = ptf.tableOf(keyType, valueType)
@@ -141,5 +174,11 @@ object Avros extends PTypeFamily {
   override def writables[T <: Writable : ClassTag] = CAvros.writables(
     implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
 
-  def reflects[T: ClassTag]() = CAvros.reflects(implicitly[ClassTag[T]].runtimeClass).asInstanceOf[AvroType[T]]
+  def specifics[T <: SpecificRecord : ClassTag]() = {
+    CAvros.specifics(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
+  }
+
+  def reflects[T: ClassTag]() = {
+    CAvros.reflects(implicitly[ClassTag[T]].runtimeClass).asInstanceOf[AvroType[T]]
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/652963cf/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala
index 11395d3..362009e 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala
@@ -17,10 +17,6 @@
  */
 package org.apache.crunch.scrunch
 
-import java.io.Serializable
-
-import scala.collection.mutable.ListBuffer
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.util.GenericOptionsParser

http://git-wip-us.apache.org/repos/asf/crunch/blob/652963cf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 224d5a6..4f45532 100644
--- a/pom.xml
+++ b/pom.xml
@@ -310,6 +310,12 @@ under the License.
         <artifactId>commons-logging</artifactId>
         <version>${commons-logging.version}</version>
       </dependency>
+
+      <dependency>
+        <groupId>commons-cli</groupId>
+        <artifactId>commons-cli</artifactId>
+        <version>${commons-cli.version}</version>
+      </dependency>
    
       <dependency>
         <groupId>log4j</groupId>


Mime
View raw message