hive-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Owen O'Malley" <omal...@apache.org>
Subject Re: OrcFile writing failing with multiple threads
Date Tue, 11 Jun 2013 17:00:23 GMT
Sorry for dropping this thread for so long. Are you using the orc.Writer
interface directly or going through the OrcOutputFormat? I forgot that I
had updated the Writer to synchronize things. It looks like all of the
methods in Writer are synchronized. I haven't had a chance to investigate
this further yet.

-- Owen


On Fri, May 24, 2013 at 1:28 PM, Andrew Psaltis <
Andrew.Psaltis@webtrends.com> wrote:

>  Here is a snippet from the file header comment the WriterImpl for ORC:
>
>  /**
>  …………
>  * This class is synchronized so that multi-threaded access is ok. In
>  * particular, because the MemoryManager is shared between writers, this
> class
>  * assumes that checkMemory may be called from a separate thread.
>  */
>
>  And then the addRow looks like this:
>
>   public void addRow(Object row) throws IOException {
>     synchronized (this) {
>       treeWriter.write(row);
>       rowsInStripe += 1;
>       if (buildIndex) {
>         rowsInIndex += 1;
>
>          if (rowsInIndex >= rowIndexStride) {
>           createRowIndexEntry();
>         }
>       }
>     }
>     memoryManager.addedRow();
>   }
>
>  Am I missing something here about the synchronized(this) ?  Perhaps I am
> looking in the wrong place.
>
>  Thanks,
> agp
>
>
>   From: Owen O'Malley <omalley@apache.org>
> Reply-To: "user@hive.apache.org" <user@hive.apache.org>
> Date: Friday, May 24, 2013 2:15 PM
> To: "user@hive.apache.org" <user@hive.apache.org>
> Subject: Re: OrcFile writing failing with multiple threads
>
>   Currently, ORC writers, like the Java collections API don't lock
> themselves. You should synchronize on the writer before adding a row. I'm
> open to making the writers synchronized.
>
>  -- Owen
>
>
> On Fri, May 24, 2013 at 11:39 AM, Andrew Psaltis <
> Andrew.Psaltis@webtrends.com> wrote:
>
>>  All,
>> I have a test application that is attempting to add rows to an OrcFile
>> from multiple threads, however, every time I do I get exceptions with stack
>> traces like the following:
>>
>>  java.lang.IndexOutOfBoundsException: Index 4 is outside of 0..5
>> at
>> org.apache.hadoop.hive.ql.io.orc.DynamicIntArray.get(DynamicIntArray.java:73)
>> at
>> org.apache.hadoop.hive.ql.io.orc.StringRedBlackTree.compareValue(StringRedBlackTree.java:55)
>> at
>> org.apache.hadoop.hive.ql.io.orc.RedBlackTree.add(RedBlackTree.java:192)
>> at
>> org.apache.hadoop.hive.ql.io.orc.RedBlackTree.add(RedBlackTree.java:199)
>> at
>> org.apache.hadoop.hive.ql.io.orc.RedBlackTree.add(RedBlackTree.java:300)
>> at
>> org.apache.hadoop.hive.ql.io.orc.StringRedBlackTree.add(StringRedBlackTree.java:45)
>> at
>> org.apache.hadoop.hive.ql.io.orc.WriterImpl$StringTreeWriter.write(WriterImpl.java:723)
>> at
>> org.apache.hadoop.hive.ql.io.orc.WriterImpl$MapTreeWriter.write(WriterImpl.java:1093)
>> at
>> org.apache.hadoop.hive.ql.io.orc.WriterImpl$StructTreeWriter.write(WriterImpl.java:996)
>> at
>> org.apache.hadoop.hive.ql.io.orc.WriterImpl.addRow(WriterImpl.java:1450)
>> at OrcFileTester$BigRowWriter.run(OrcFileTester.java:129)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:722)
>>
>>
>>  Below is the source code for my sample app that is heavily based on the
>> TestOrcFile test case using BigRow. Is there something I am doing wrong
>> here, or is this a legitimate bug in the Orc writing?
>>
>>  Thanks in advance,
>> Andrew
>>
>>
>>  ------------------------- Java app code follows
>> ---------------------------------
>>  import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.fs.FileSystem;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
>> import org.apache.hadoop.hive.ql.io.orc.OrcFile;
>> import org.apache.hadoop.hive.ql.io.orc.Writer;
>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
>> import
>> org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
>> import org.apache.hadoop.io.BytesWritable;
>> import org.apache.hadoop.io.Text;
>>
>>  import java.io.File;
>> import java.io.IOException;
>> import java.util.HashMap;
>> import java.util.Map;
>> import java.util.concurrent.ExecutorService;
>> import java.util.concurrent.Executors;
>> import java.util.concurrent.LinkedBlockingQueue;
>>
>>  public class OrcFileTester {
>>
>>      private Writer writer;
>>     private LinkedBlockingQueue<BigRow> bigRowQueue = new
>> LinkedBlockingQueue<BigRow>();
>>     public OrcFileTester(){
>>
>>        try{
>>         Path workDir = new Path(System.getProperty("test.tmp.dir",
>>                 "target" + File.separator + "test" + File.separator +
>> "tmp"));
>>
>>          Configuration conf;
>>         FileSystem fs;
>>         Path testFilePath;
>>
>>          conf = new Configuration();
>>         fs = FileSystem.getLocal(conf);
>>         testFilePath = new Path(workDir, "TestOrcFile.OrcFileTester.orc");
>>         fs.delete(testFilePath, false);
>>
>>
>>          ObjectInspector inspector =
>> ObjectInspectorFactory.getReflectionObjectInspector
>>                 (BigRow.class,
>> ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
>>         writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
>>                 100000, CompressionKind.ZLIB, 10000, 10000);
>>
>>          final ExecutorService bigRowWorkerPool =
>> Executors.newFixedThreadPool(10);
>>
>>          //Changing this to more than 1 causes exceptions when writing
>> rows.
>>         for (int i = 0; i < 1; i++) {
>>             bigRowWorkerPool.submit(new BigRowWriter());
>>         }
>>           for(int i =0; i < 100; i++){
>>               if(0 == i % 2){
>>                  bigRowQueue.put(new BigRow(false, (byte) 1, (short)
>> 1024, 65536,
>>                          Long.MAX_VALUE, (float) 1.0, -15.0,
>> bytes(0,1,2,3,4), "hi",map("hey","orc")));
>>               } else{
>>                    bigRowQueue.put(new BigRow(false, null, (short) 1024,
>> 65536,
>>                            Long.MAX_VALUE, (float) 1.0, -15.0,
>> bytes(0,1,2,3,4), "hi",map("hey","orc")));
>>               }
>>           }
>>
>>            while (!bigRowQueue.isEmpty()) {
>>               Thread.sleep(2000);
>>           }
>>           bigRowWorkerPool.shutdownNow();
>>       }catch(Exception ex){
>>           ex.printStackTrace();
>>       }
>>     }
>>     public void WriteBigRow(){
>>
>>      }
>>
>>      private static Map<Text, Text> map(String... items)  {
>>         Map<Text, Text> result = new HashMap<Text, Text>();
>>         for(String i: items) {
>>             result.put(new Text(i), new Text(i));
>>         }
>>         return result;
>>     }
>>     private static BytesWritable bytes(int... items) {
>>         BytesWritable result = new BytesWritable();
>>         result.setSize(items.length);
>>         for(int i=0; i < items.length; ++i) {
>>             result.getBytes()[i] = (byte) items[i];
>>         }
>>         return result;
>>     }
>>
>>      public static class BigRow {
>>         Boolean boolean1;
>>         Byte byte1;
>>         Short short1;
>>         Integer int1;
>>         Long long1;
>>         Float float1;
>>         Double double1;
>>         BytesWritable bytes1;
>>         Text string1;
>>         Map<Text, Text> map = new HashMap<Text, Text>();
>>
>>          BigRow(Boolean b1, Byte b2, Short s1, Integer i1, Long l1,
>> Float f1,
>>                Double d1,
>>                BytesWritable b3, String s2, Map<Text, Text> m2) {
>>             this.boolean1 = b1;
>>             this.byte1 = b2;
>>             this.short1 = s1;
>>             this.int1 = i1;
>>             this.long1 = l1;
>>             this.float1 = f1;
>>             this.double1 = d1;
>>             this.bytes1 = b3;
>>             if (s2 == null) {
>>                 this.string1 = null;
>>             } else {
>>                 this.string1 = new Text(s2);
>>             }
>>             this.map = m2;
>>         }
>>     }
>>
>>
>>
>>      class BigRowWriter implements Runnable{
>>
>>          @Override
>>         public void run() {
>>                 try {
>>                     BigRow bigRow = bigRowQueue.take();
>>                     writer.addRow(bigRow);
>>                 } catch (Exception e) {
>>                     e.printStackTrace();
>>                 }
>>
>>          }
>>     }
>>
>>      public static void main(String[] args) throws IOException {
>>         OrcFileTester  orcFileTester = new OrcFileTester();
>>         orcFileTester.WriteBigRow();
>>     }
>>
>>
>>
>>  }
>>
>>  -----------------------------end of Java source
>> ------------------------------
>>
>>  ----------------------------- pom file start
>> ----------------------------------
>>  <?xml version="1.0" encoding="UTF-8"?>
>> <project xmlns="http://maven.apache.org/POM/4.0.0"
>>          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>     <modelVersion>4.0.0</modelVersion>
>>
>>      <groupId>ORCTester</groupId>
>>     <artifactId>ORCTester</artifactId>
>>     <version>1.0-SNAPSHOT</version>
>>   <dependencies>
>>       <dependency>
>>           <groupId>org.apache.hive</groupId>
>>           <artifactId>hive-exec</artifactId>
>>           <version>0.11.0</version>
>>       </dependency>
>>       <dependency>
>>           <groupId>org.apache.hadoop</groupId>
>>           <artifactId>hadoop-core</artifactId>
>>           <version>0.20.2</version>
>>       </dependency>
>>   </dependencies>
>>     <build>
>>         <plugins>
>>             <plugin>
>>                 <groupId>org.codehaus.mojo</groupId>
>>                 <artifactId>exec-maven-plugin</artifactId>
>>                 <version>1.1.1</version>
>>                 <executions>
>>                     <execution>
>>                         <phase>test</phase>
>>                         <goals>
>>                             <goal>java</goal>
>>                         </goals>
>>                         <configuration>
>>                             <mainClass>OrcFileTester</mainClass>
>>                             <arguments/>
>>                         </configuration>
>>                     </execution>
>>                 </executions>
>>             </plugin>
>>         </plugins>
>>     </build>
>> </project>
>>    ----------------------------- pom file end
>> ----------------------------------
>>
>
>

Mime
View raw message