flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [1/4] flume git commit: FLUME-1734. Add a Hive Sink based on Hive Streaming support.
Date Tue, 10 Feb 2015 07:16:17 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk 990776427 -> a7f9255a8


http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java
b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java
new file mode 100644
index 0000000..1fd60bc
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.flume.sink.hive;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.util.Shell;
+import org.apache.hive.hcatalog.streaming.QueryFailedException;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestUtil {
+
+  private final static String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+
+  /**
+   * Set up the configuration so it will use the DbTxnManager, concurrency will be set to
true,
+   * and the JDBC configs will be set for putting the transaction and lock info in the embedded
+   * metastore.
+   * @param conf HiveConf to add these values to.
+   */
+  public static void setConfValues(HiveConf conf) {
+    conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, txnMgr);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+    conf.set("fs.raw.impl", RawFileSystem.class.getName());
+  }
+
+  public static void createDbAndTable(Driver driver, String databaseName,
+                                      String tableName, List<String> partVals,
+                                      String[] colNames, String[] colTypes,
+                                      String[] partNames, String dbLocation)
+          throws Exception {
+    String dbUri = "raw://" + dbLocation;
+    String tableLoc = dbUri + Path.SEPARATOR + tableName;
+
+    runDDL(driver, "create database IF NOT EXISTS " + databaseName + " location '" + dbUri
+ "'");
+    runDDL(driver, "use " + databaseName);
+    String crtTbl = "create table " + tableName +
+            " ( " +  getTableColumnsStr(colNames,colTypes) + " )" +
+            getPartitionStmtStr(partNames) +
+            " clustered by ( " + colNames[0] + " )" +
+            " into 10 buckets " +
+            " stored as orc " +
+            " location '" + tableLoc +  "'";
+    runDDL(driver, crtTbl);
+    System.out.println("crtTbl = " + crtTbl);
+    if (partNames!=null && partNames.length!=0) {
+      String addPart = "alter table " + tableName + " add partition ( " +
+              getTablePartsStr2(partNames, partVals) + " )";
+      runDDL(driver, addPart);
+    }
+  }
+
+  private static String getPartitionStmtStr(String[] partNames) {
+    if ( partNames == null || partNames.length == 0) {
+      return "";
+    }
+    return " partitioned by (" + getTablePartsStr(partNames) + " )";
+  }
+
+  // delete db and all tables in it
+  public static void dropDB(HiveConf conf, String databaseName) throws HiveException, MetaException
{
+    IMetaStoreClient client = new HiveMetaStoreClient(conf);
+    try {
+      for (String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) {
+        client.dropTable(databaseName, table, true, true);
+      }
+      client.dropDatabase(databaseName);
+    } catch (TException e) {
+      client.close();
+    }
+  }
+
+  private static String getTableColumnsStr(String[] colNames, String[] colTypes) {
+    StringBuffer sb = new StringBuffer();
+    for (int i=0; i < colNames.length; ++i) {
+      sb.append(colNames[i] + " " + colTypes[i]);
+      if (i<colNames.length-1) {
+        sb.append(",");
+      }
+    }
+    return sb.toString();
+  }
+
+  // converts partNames into "partName1 string, partName2 string"
+  private static String getTablePartsStr(String[] partNames) {
+    if (partNames==null || partNames.length==0) {
+      return "";
+    }
+    StringBuffer sb = new StringBuffer();
+    for (int i=0; i < partNames.length; ++i) {
+      sb.append(partNames[i] + " string");
+      if (i < partNames.length-1) {
+        sb.append(",");
+      }
+    }
+    return sb.toString();
+  }
+
+  // converts partNames,partVals into "partName1=val1, partName2=val2"
+  private static String getTablePartsStr2(String[] partNames, List<String> partVals)
{
+    StringBuffer sb = new StringBuffer();
+    for (int i=0; i < partVals.size(); ++i) {
+      sb.append(partNames[i] + " = '" + partVals.get(i) + "'");
+      if (i < partVals.size()-1) {
+        sb.append(",");
+      }
+    }
+    return sb.toString();
+  }
+
+  public static ArrayList<String> listRecordsInTable(Driver driver, String dbName,
String tblName)
+          throws CommandNeedRetryException, IOException {
+    driver.run("select * from " + dbName + "." + tblName);
+    ArrayList<String> res = new ArrayList<String>();
+    driver.getResults(res);
+    return res;
+  }
+
+  public static ArrayList<String> listRecordsInPartition(Driver driver, String dbName,
+                               String tblName, String continent, String country)
+          throws CommandNeedRetryException, IOException {
+    driver.run("select * from " + dbName + "." + tblName + " where continent='"
+            + continent + "' and country='" + country + "'");
+    ArrayList<String> res = new ArrayList<String>();
+    driver.getResults(res);
+    return res;
+  }
+
+
+  public static class RawFileSystem extends RawLocalFileSystem {
+    private static final URI NAME;
+    static {
+      try {
+        NAME = new URI("raw:///");
+      } catch (URISyntaxException se) {
+        throw new IllegalArgumentException("bad uri", se);
+      }
+    }
+
+    @Override
+    public URI getUri() {
+      return NAME;
+    }
+
+    static String execCommand(File f, String... cmd) throws IOException {
+      String[] args = new String[cmd.length + 1];
+      System.arraycopy(cmd, 0, args, 0, cmd.length);
+      args[cmd.length] = f.getCanonicalPath();
+      String output = Shell.execCommand(args);
+      return output;
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path path) throws IOException {
+      File file = pathToFile(path);
+      if (!file.exists()) {
+        throw new FileNotFoundException("Can't find " + path);
+      }
+      // get close enough
+      short mod = 0;
+      if (file.canRead()) {
+        mod |= 0444;
+      }
+      if (file.canWrite()) {
+        mod |= 0200;
+      }
+      if (file.canExecute()) {
+        mod |= 0111;
+      }
+      ShimLoader.getHadoopShims();
+      return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
+              file.lastModified(), file.lastModified(),
+              FsPermission.createImmutable(mod), "owen", "users", path);
+    }
+  }
+  private static boolean runDDL(Driver driver, String sql) throws QueryFailedException {
+    int retryCount = 1; // # of times to retry if first attempt fails
+    for (int attempt=0; attempt <= retryCount; ++attempt) {
+      try {
+        driver.run(sql);
+        return true;
+      } catch (CommandNeedRetryException e) {
+        if (attempt == retryCount) {
+          throw new QueryFailedException(sql, e);
+        }
+        continue;
+      }
+    } // for
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/test/resources/log4j.properties b/flume-ng-sinks/flume-hive-sink/src/test/resources/log4j.properties
new file mode 100644
index 0000000..252b5ea
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/test/resources/log4j.properties
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger = INFO, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.org.apache.flume = DEBUG
+log4j.logger.org.apache.hadoop = WARN
+log4j.logger.org.mortbay = WARN

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml
index 4bac019..de12891 100644
--- a/flume-ng-sinks/pom.xml
+++ b/flume-ng-sinks/pom.xml
@@ -72,6 +72,7 @@ limitations under the License.
            -->
       <modules>
         <module>flume-dataset-sink</module>
+        <module>flume-hive-sink</module>
       </modules>
     </profile>
 
@@ -87,6 +88,7 @@ limitations under the License.
            -->
       <modules>
         <module>flume-dataset-sink</module>
+        <module>flume-hive-sink</module>
       </modules>
     </profile>
 

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java
b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java
index eba8d2e..3b5b3c7 100644
--- a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java
+++ b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java
@@ -168,15 +168,20 @@ public class Scribe {
         super("Log");
       }
 
-      protected Log_args getEmptyArgsInstance() {
+      public Log_args getEmptyArgsInstance() {
         return new Log_args();
       }
 
-      protected Log_result getResult(I iface, Log_args args) throws org.apache.thrift.TException
{
+      public Log_result getResult(I iface, Log_args args) throws org.apache
+          .thrift.TException {
         Log_result result = new Log_result();
         result.success = iface.Log(args.messages);
         return result;
       }
+
+      public boolean isOneway() {
+        return false;
+      }
     }
 
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1350fa4..ea7ffe3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,11 +48,12 @@ limitations under the License.
 
     <avro.version>1.7.4</avro.version>
     <elasticsearch.version>0.90.1</elasticsearch.version>
-
     <hadoop2.version>2.4.0</hadoop2.version>
     <thrift.version>0.7.0</thrift.version>
     <kite.version>0.17.1</kite.version>
-    <hive.version>0.10.0</hive.version>
+    <hive.version>0.13.1</hive.version>
+    <xalan.verion>2.7.1</xalan.verion>
+    <xerces.version>2.9.1</xerces.version>
   </properties>
 
   <modules>
@@ -81,7 +82,7 @@ limitations under the License.
         </property>
       </activation>
       <properties>
-        <hadoop.version>1.0.1</hadoop.version>
+        <hadoop.version>1.2.1</hadoop.version>
         <hbase.version>0.92.1</hbase.version>
         <hadoop.common.artifact.id>hadoop-core</hadoop.common.artifact.id>
         <thrift.version>0.7.0</thrift.version>
@@ -133,7 +134,7 @@ limitations under the License.
         <hadoop.version>${hadoop2.version}</hadoop.version>
         <hbase.version>0.94.2</hbase.version>
         <hadoop.common.artifact.id>hadoop-common</hadoop.common.artifact.id>
-        <thrift.version>0.8.0</thrift.version>
+        <thrift.version>0.9.0</thrift.version>
       </properties>
       <dependencyManagement>
         <dependencies>
@@ -149,6 +150,11 @@ limitations under the License.
           </dependency>
           <dependency>
             <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-minicluster</artifactId>
             <version>${hadoop.version}</version>
           </dependency>
@@ -211,7 +217,7 @@ limitations under the License.
         <hadoop.version>${hadoop2.version}</hadoop.version>
         <hbase.version>0.98.2-hadoop2</hbase.version>
         <hadoop.common.artifact.id>hadoop-common</hadoop.common.artifact.id>
-        <thrift.version>0.8.0</thrift.version>
+        <thrift.version>0.9.0</thrift.version>
       </properties>
       <dependencyManagement>
         <dependencies>
@@ -736,6 +742,8 @@ limitations under the License.
                   <exclude>**/.classpath</exclude>
                   <exclude>**/.project</exclude>
                   <exclude>**/target/**</exclude>
+                  <exclude>**/derby.log</exclude>
+                  <exclude>**/metastore_db/</exclude>
                 </excludes>
               </configuration>
             </execution>
@@ -971,6 +979,7 @@ limitations under the License.
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-common</artifactId>
         <version>${hadoop.version}</version>
+        <optional>true</optional>
       </dependency>
 
       <dependency>
@@ -1026,6 +1035,15 @@ limitations under the License.
         <artifactId>joda-time</artifactId>
         <version>2.1</version>
       </dependency>
+      <!-- Adding zookeeper as dependency, beacuse it was
+           pulling different version of zookeeper as transient
+           dependency from asynchbase. -->
+
+      <dependency>
+        <groupId>org.apache.zookeeper</groupId>
+        <artifactId>zookeeper</artifactId>
+        <version>${zookeeper.version}</version>
+       </dependency>
 
       <dependency>
         <groupId>org.apache.hadoop</groupId>
@@ -1123,6 +1141,12 @@ limitations under the License.
 
       <dependency>
         <groupId>org.apache.flume.flume-ng-sinks</groupId>
+        <artifactId>flume-hive-sink</artifactId>
+        <version>1.6.0-SNAPSHOT</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.flume.flume-ng-sinks</groupId>
         <artifactId>flume-irc-sink</artifactId>
         <version>1.6.0-SNAPSHOT</version>
       </dependency>
@@ -1331,6 +1355,18 @@ limitations under the License.
         <version>1.1.0</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.hive.hcatalog</groupId>
+        <artifactId>hive-hcatalog-streaming</artifactId>
+        <version>${hive.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hive</groupId>
+        <artifactId>hive-cli</artifactId>
+        <version>${hive.version}</version>
+      </dependency>
+
       <!-- Dependency for Zk provider -->
       <dependency>
         <groupId>org.apache.curator</groupId>


Mime
View raw message