arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Priyanshu LNU <t-p...@microsoft.com>
Subject [java][apache-arrow] Having problem in appending record batches to same arrow file
Date Wed, 10 Feb 2021 08:14:29 GMT
Hi,
I am using Apache arrow in Java. I want to append stream of record batches to an already existing
file.
That means, I have a fileOutputStream. Using ArrowStreamWriter, I wrote some batches to this
fileOutputStream. Now I close the fileOutputStream and then again start this stream to append
some batches to it.
The issue is that metadata gets appended again, so the streamreader is not able to read all
the batches. So how Can I append stream of recordBatches to an existing file correctly without
repetition of metadata in between?

Below is part of Java code I am currently using:

public void setupWrite(String filename, boolean useCustom) throws Exception {
        File arrowFile = validateFile(filename, false);
        this.fileOutputStream = new FileOutputStream(arrowFile);
        Schema schema = makeSchema();
        this.root = VectorSchemaRoot.create(schema, this.ra);
        DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider();
        this.arrowStreamWriter=new ArrowStreamWriter(root,provider,Channels.newChannel(this.fileOutputStream));

        // Just to show some stuff about the schema and layout
        System.out.println("Schema/Layout: ");
        for (Field field : root.getSchema().getFields()) {
            FieldVector vector = root.getVector(field.getName());
            showFieldLayout(field, vector);
        }
        System.out.println("Generated " + this.entries + " data entries , batch size " + batchSize
+ " usingCustomWriter: " + useCustom + " useNullValues " + this.useNullValues);

     // writing logic starts here
//      arrowFileWriter.start();
      arrowStreamWriter.start();

      for(int i = 0; i < this.entries;) {
          int toProcessItems = Math.min(this.batchSize, this.entries - i);
          // set the batch row count
          root.setRowCount(toProcessItems);
          for (Field field : root.getSchema().getFields()) {
              FieldVector vector = root.getVector(field.getName());
             // System.out.println(vector.getMinorType());
              switch (vector.getMinorType()) {
                  case INT:
                      writeFieldInt(vector, i, toProcessItems);
                      break;
                  case BIGINT:
                      writeFieldLong(vector, i, toProcessItems);
                      break;
                  case FLOAT4:
                      writeFieldFloat4(vector, i, toProcessItems);
                      break;
                  case VARCHAR:
                      writeFieldVarchar(vector, i, toProcessItems);
                      break;
                  case DATEDAY:
                      writeFieldDate(vector, i, toProcessItems);
                      break;
               // case VARBINARY:
                 //   writeFieldVarBinary(vector, i, toProcessItems);
                   // break;
                  default:
                      throw new Exception(" Not supported yet type: " + vector.getMinorType());
              }
          }
//          arrowFileWriter.writeBatch();
          arrowStreamWriter.writeBatch();
          i+=toProcessItems;
      }
      arrowStreamWriter.end();
      arrowStreamWriter.close();
      fileOutputStream.flush();
      fileOutputStream.close();






      this.fileOutputStream = new FileOutputStream(arrowFile,true);
      this.arrowStreamWriter=new ArrowStreamWriter(root,provider,this.fileOutputStream.getChannel());
      arrowStreamWriter.start();
      for(int i = 0; i < this.entries;) {
          int toProcessItems = Math.min(this.batchSize, this.entries - i);
          // set the batch row count
          root.setRowCount(toProcessItems);
          for (Field field : root.getSchema().getFields()) {
              FieldVector vector = root.getVector(field.getName());
             // System.out.println(vector.getMinorType());
              switch (vector.getMinorType()) {
                  case INT:
                      writeFieldInt(vector, i, toProcessItems);
                      break;
                  case BIGINT:
                      writeFieldLong(vector, i, toProcessItems);
                      break;
                  case FLOAT4:
                      writeFieldFloat4(vector, i, toProcessItems);
                      break;
                  case VARCHAR:
                      writeFieldVarchar(vector, i, toProcessItems);
                      break;
                  case DATEDAY:
                      writeFieldDate(vector, i, toProcessItems);
                      break;
               // case VARBINARY:
                 //   writeFieldVarBinary(vector, i, toProcessItems);
                   // break;
                  default:
                      throw new Exception(" Not supported yet type: " + vector.getMinorType());
              }
          arrowStreamWriter.writeBatch();
          i+=toProcessItems;
      }
      arrowStreamWriter.end();
      arrowStreamWriter.close();
      fileOutputStream.flush();
      fileOutputStream.close();
 }


Thanks,
Priyanshu


Mime
View raw message