hive-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Peyman Mohajerian <mohaj...@gmail.com>
Subject Re: Writing ORC Files
Date Tue, 07 Apr 2015 16:20:37 GMT
I think you have to call 'addRow' to the writer:

https://hive.apache.org/javadocs/r0.12.0/api/org/apache/hadoop/hive/ql/io/orc/Writer.html

That's just based on the javadoc, i don't have any experience doing this.

On Tue, Apr 7, 2015 at 8:43 AM, Grant Overby (groverby) <groverby@cisco.com>
wrote:

>   I have a Storm Trident Bolt for writing ORC File. The files are
> created; however, they are always zero length. This code eventually causes
> an OOME. I suspect I am missing some sort of flushing action, but don’t see
> anything like that in the api.
>
>  My bolt follows. Any thoughts as to what I’m doing wrong or links to
> reference uses of org.apache.hadoop.hive.ql.io.orc.Writer ?
>
>  package com.cisco.tinderbox.burner.trident.functions;
>
> import storm.trident.operation.BaseFunction;
> import storm.trident.operation.TridentCollector;
> import storm.trident.tuple.TridentTuple;
>
> import com.cisco.tinderbox.burner.io.system.CurrentUnixTime;
> import com.cisco.tinderbox.burner.trident.Topology;
> import com.cisco.tinderbox.model.ConnectionEvent;
> import com.google.common.base.Throwables;
>
> import java.io.IOException;
> import java.util.List;
> import java.util.UUID;
>
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.fs.RawLocalFileSystem;
> import org.apache.hadoop.hive.ql.io.orc.OrcFile;
> import org.apache.hadoop.hive.ql.io.orc.Writer;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
> import org.apache.hive.hcatalog.streaming.FlatTableColumn;
> import org.apache.hive.hcatalog.streaming.FlatTableObjectInspector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import static org.apache.hadoop.hive.ql.io.orc.CompressionKind.*;
>
> public class OrcSink extends BaseFunction {
>     private static final Logger logger = LoggerFactory.getLogger(OrcSink.class);
>     private static final CurrentUnixTime currentUnixTime = CurrentUnixTime.getInstance();
>     private static final long serialVersionUID = 7435558912956446385L;
>     private final String dbName;
>     private final String tableName;
>     private final List<FlatTableColumn<?>> fields;
>     private final String hdfsUrl;
>     private transient volatile int partition;
>     private transient volatile Writer writer;
>     private transient volatile Path path;
>
>     public OrcSink(String hdfsUrl, String dbName, String tableName, List<FlatTableColumn<?>>
fields) {
>         this.hdfsUrl = hdfsUrl;
>         this.dbName = dbName;
>         this.tableName = tableName;
>         this.fields = fields;
>     }
>
>     @Override
>     public void cleanup() {
>         closeWriter();
>     }
>
>     @Override
>     public synchronized void execute(TridentTuple tuple, TridentCollector collector)
{
>         try {
>             refreshWriterIfNeeded();
>             ConnectionEvent connectionEvent = (ConnectionEvent) tuple.getValueByField(Topology.FIELD_CORRELATED);
>             writer.addRow(connectionEvent);
>         } catch (IOException e) {
>             logger.error("could not write to orc", e);
>         }
>     }
>
>     private void closeWriter() {
>         if (writer != null) {
>             try {
>                 writer.close();
>             } catch (IOException e) {
>                 Throwables.propagate(e);
>             } finally {
>                 writer = null;
>             }
>         }
>     }
>
>     private void createWriter() {
>         try {
>             Configuration fsConf = new Configuration();
>             fsConf.set("fs.defaultFS", hdfsUrl);
>             FileSystem fs = new RawLocalFileSystem(); //FileSystem.get(fsConf);
>             String fileName = System.currentTimeMillis() + "-" + UUID.randomUUID().toString()
+ ".orc";
>             path = new Path("/data/diska/orc/" + dbName + "/" + tableName + "/" + partition
+ "/" + fileName);
>             Configuration writerConf = new Configuration();
>             ObjectInspector oi = new FlatTableObjectInspector(dbName + "." + tableName,
fields);
>             int stripeSize = 250 * 1024 * 1024;
>             int compressBufferSize = 256 * 1024;
>             int rowIndexStride = 10000;
>             writer = OrcFile.createWriter(fs, path, writerConf, oi, stripeSize, SNAPPY,
compressBufferSize, rowIndexStride);
>         } catch (IOException e) {
>             throw Throwables.propagate(e);
>         }
>     }
>
>     private void refreshWriter() {
>         partition = currentUnixTime.getQuarterHour();
>         closeWriter();
>         createWriter();
>     }
>
>     private void refreshWriterIfNeeded() {
>         if (writer == null || partition != currentUnixTime.getQuarterHour()) {
>             refreshWriter();
>         }
>     }
> }
>
>
>
>         *Grant Overby*
> Software Engineer
> Cisco.com <http://www.cisco.com/>
> groverby@cisco.com
> Mobile: *865 724 4910 <865%20724%204910>*
>
>
>
>        Think before you print.
>
> This email may contain confidential and privileged material for the sole
> use of the intended recipient. Any review, use, distribution or disclosure
> by others is strictly prohibited. If you are not the intended recipient (or
> authorized to receive for the recipient), please contact the sender by
> reply email and delete all copies of this message.
>
> Please click here
> <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for
> Company Registration Information.
>
>
>
>

Mime
View raw message