spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Isaac Myers (Jira)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-29797) Read key-value metadata in Parquet files written by Apache Arrow
Date Tue, 12 Nov 2019 19:52:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-29797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16972725#comment-16972725
] 

Isaac Myers commented on SPARK-29797:
-------------------------------------

To be clear, Arrow does write the metadata because it can be retrieved per the example code
(attached). The issue lies with Spark's ability to read it. It does not need to be associated
with the df, though 'df.file_metadata()' would be fine. Otherwise, maybe 'spark.read.parquet_metadata()'.

> Read key-value metadata in Parquet files written by Apache Arrow
> ----------------------------------------------------------------
>
>                 Key: SPARK-29797
>                 URL: https://issues.apache.org/jira/browse/SPARK-29797
>             Project: Spark
>          Issue Type: New Feature
>          Components: Java API, PySpark
>    Affects Versions: 2.4.4
>         Environment: Apache Arrow 0.14.1 built on Windows x86. 
>  
>            Reporter: Isaac Myers
>            Priority: Major
>              Labels: features
>         Attachments: minimal_working_example.cpp
>
>
> Key-value (user) metadata written to Parquet file from Apache Arrow c++ is not readable
in Spark (PySpark or Java API). I can only find field-level metadata dictionaries in the schema
and no other functions in the API that indicate the presence of file-level key-value metadata.
The attached code demonstrates creation and retrieval of file-level metadata using the Apache
Arrow API.
> {code:java}
> #include <vector>#include <vector>#include <cstdint>#include <map>#include
<arrow/api.h>#include <arrow/io/api.h>#include <parquet/arrow/reader.h>#include
<parquet/arrow/writer.h>#include <parquet/arrow/schema.h>//#include <arrow/>
> int main(int argc, char* argv[]){ /********************************* Create Parquet File
**********************************/ arrow::Status st; arrow::MemoryPool* pool = arrow::default_memory_pool();
>  // Create Schema and fields with metadata std::vector<std::shared_ptr<arrow::Field>>
fields;
>  std::unordered_map<std::string, std::string> a_keyval; a_keyval["unit"] = "sec";
a_keyval["note"] = "not the standard millisecond unit"; arrow::KeyValueMetadata a_md(a_keyval);
std::shared_ptr<arrow::Field> a_field = arrow::field("a", arrow::int16(), false, a_md.Copy());
fields.push_back(a_field);
>  std::unordered_map<std::string, std::string> b_keyval; b_keyval["unit"] = "ft";
arrow::KeyValueMetadata b_md(b_keyval); std::shared_ptr<arrow::Field> b_field = arrow::field("b",
arrow::int16(), false, b_md.Copy()); fields.push_back(b_field);
>  std::shared_ptr<arrow::Schema> schema = arrow::schema(fields);
>  // Add metadata to schema. std::unordered_map<std::string, std::string> schema_keyval;
schema_keyval["classification"] = "Type 0"; arrow::KeyValueMetadata schema_md(schema_keyval);
schema = schema->AddMetadata(schema_md.Copy());
>  // Build arrays of data and add to Table. const int64_t rowgroup_size = 100; std::vector<int16_t>
a_data(rowgroup_size, 0); std::vector<int16_t> b_data(rowgroup_size, 0);
>  for (int16_t i = 0; i < rowgroup_size; i++) { a_data[i] = i; b_data[i] = rowgroup_size
- i; }  arrow::Int16Builder a_bldr(pool); arrow::Int16Builder b_bldr(pool); st = a_bldr.Resize(rowgroup_size);
if (!st.ok()) return 1; st = b_bldr.Resize(rowgroup_size); if (!st.ok()) return 1;
>  st = a_bldr.AppendValues(a_data); if (!st.ok()) return 1;
>  st = b_bldr.AppendValues(b_data); if (!st.ok()) return 1;
>  std::shared_ptr<arrow::Array> a_arr_ptr; std::shared_ptr<arrow::Array> b_arr_ptr;
>  arrow::ArrayVector arr_vec; st = a_bldr.Finish(&a_arr_ptr); if (!st.ok()) return
1; arr_vec.push_back(a_arr_ptr); st = b_bldr.Finish(&b_arr_ptr); if (!st.ok()) return
1; arr_vec.push_back(b_arr_ptr);
>  std::shared_ptr<arrow::Table> table = arrow::Table::Make(schema, arr_vec);
>  // Test metadata printf("\nMetadata from original schema:\n"); printf("%s\n", schema->metadata()->ToString().c_str());
printf("%s\n", schema->field(0)->metadata()->ToString().c_str()); printf("%s\n",
schema->field(1)->metadata()->ToString().c_str());
>  std::shared_ptr<arrow::Schema> table_schema = table->schema(); printf("\nMetadata
from schema retrieved from table (should be the same):\n"); printf("%s\n", table_schema->metadata()->ToString().c_str());
printf("%s\n", table_schema->field(0)->metadata()->ToString().c_str()); printf("%s\n",
table_schema->field(1)->metadata()->ToString().c_str());
>  // Open file and write table. std::string file_name = "test.parquet"; std::shared_ptr<arrow::io::FileOutputStream>
ostream; st = arrow::io::FileOutputStream::Open(file_name, &ostream); if (!st.ok()) return
1;
>  std::unique_ptr<parquet::arrow::FileWriter> writer; std::shared_ptr<parquet::WriterProperties>
props = parquet::default_writer_properties(); st = parquet::arrow::FileWriter::Open(*schema,
pool, ostream, props, &writer); if (!st.ok()) return 1; st = writer->WriteTable(*table,
rowgroup_size); if (!st.ok()) return 1;
>  // Close file and stream. st = writer->Close(); if (!st.ok()) return 1; st = ostream->Close();
if (!st.ok()) return 1;
>  /********************************* Read Parquet File **********************************/
>  // Create new memory pool. Not sure if this is necessary. //arrow::MemoryPool* pool2
= arrow::default_memory_pool();
>  // Open file reader. std::shared_ptr<arrow::io::ReadableFile> input_file; st =
arrow::io::ReadableFile::Open(file_name, pool, &input_file); if (!st.ok()) return 1; std::unique_ptr<parquet::arrow::FileReader>
reader; st = parquet::arrow::OpenFile(input_file, pool, &reader); if (!st.ok()) return
1;
>  // Get schema and read metadata. std::shared_ptr<arrow::Schema> new_schema; st
= reader->GetSchema(&new_schema); if (!st.ok()) return 1; printf("\nMetadata from schema
read from file:\n"); printf("%s\n", new_schema->metadata()->ToString().c_str());
>  // Crashes because there are no metadata. /*printf("%s\n", new_schema->field(0)->metadata()->ToString().c_str());
printf("%s\n", new_schema->field(1)->metadata()->ToString().c_str());*/
>  printf("field name %s metadata exists: %d\n", new_schema->field(0)->name().c_str(),
new_schema->field(0)->HasMetadata()); printf("field name %s metadata exists: %d\n",
new_schema->field(1)->name().c_str(), new_schema->field(1)->HasMetadata());
>  // What if I read the whole table and get the schema from it. std::shared_ptr<arrow::Table>
new_table; st = reader->ReadTable(&new_table); if (!st.ok()) return 1; std::shared_ptr<arrow::Schema>
schema_from_table = new_table->schema(); printf("\nMetadata from schema that is retrieved
through table that is read from file:\n"); printf("%s\n", schema_from_table->metadata()->ToString().c_str());
>  // Crashes because there are no metadata. Known bug:  https://issues.apache.org/jira/browse/ARROW-4359
/*printf("%s\n", schema_from_table->field(0)->metadata()->ToString().c_str()); printf("%s\n",
schema_from_table->field(1)->metadata()->ToString().c_str());*/
>  printf("field name %s metadata exists: %d\n", schema_from_table->field(0)->name().c_str(),
schema_from_table->field(0)->HasMetadata()); printf("field name %s metadata exists:
%d\n", schema_from_table->field(1)->name().c_str(), schema_from_table->field(1)->HasMetadata());
st = input_file->Close(); if (!st.ok()) return 1;
>  return 0;} {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message