spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hyukjin Kwon (Jira)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-29797) Read key-value metadata in Parquet files written by Apache Arrow
Date Mon, 11 Nov 2019 17:12:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-29797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16971737#comment-16971737
] 

Hyukjin Kwon commented on SPARK-29797:
--------------------------------------

ping [~isaacm] do you have any suggestion about how we should store the metadata? Otherwise,
let's leave this closed.

> Read key-value metadata in Parquet files written by Apache Arrow
> ----------------------------------------------------------------
>
>                 Key: SPARK-29797
>                 URL: https://issues.apache.org/jira/browse/SPARK-29797
>             Project: Spark
>          Issue Type: New Feature
>          Components: Java API, PySpark
>    Affects Versions: 2.4.4
>         Environment: Apache Arrow 0.14.1 built on Windows x86. 
>  
>            Reporter: Isaac Myers
>            Priority: Major
>              Labels: features
>         Attachments: minimal_working_example.cpp
>
>
> Key-value (user) metadata written to Parquet file from Apache Arrow c++ is not readable
in Spark (PySpark or Java API). I can only find field-level metadata dictionaries in the schema
and no other functions in the API that indicate the presence of file-level key-value metadata.
The attached code demonstrates creation and retrieval of file-level metadata using the Apache
Arrow API.
> {code:java}
> #include <vector>#include <vector>#include <cstdint>#include <map>#include
<arrow/api.h>#include <arrow/io/api.h>#include <parquet/arrow/reader.h>#include
<parquet/arrow/writer.h>#include <parquet/arrow/schema.h>//#include <arrow/>
> int main(int argc, char* argv[]){ /********************************* Create Parquet File
**********************************/ arrow::Status st; arrow::MemoryPool* pool = arrow::default_memory_pool();
>  // Create Schema and fields with metadata std::vector<std::shared_ptr<arrow::Field>>
fields;
>  std::unordered_map<std::string, std::string> a_keyval; a_keyval["unit"] = "sec";
a_keyval["note"] = "not the standard millisecond unit"; arrow::KeyValueMetadata a_md(a_keyval);
std::shared_ptr<arrow::Field> a_field = arrow::field("a", arrow::int16(), false, a_md.Copy());
fields.push_back(a_field);
>  std::unordered_map<std::string, std::string> b_keyval; b_keyval["unit"] = "ft";
arrow::KeyValueMetadata b_md(b_keyval); std::shared_ptr<arrow::Field> b_field = arrow::field("b",
arrow::int16(), false, b_md.Copy()); fields.push_back(b_field);
>  std::shared_ptr<arrow::Schema> schema = arrow::schema(fields);
>  // Add metadata to schema. std::unordered_map<std::string, std::string> schema_keyval;
schema_keyval["classification"] = "Type 0"; arrow::KeyValueMetadata schema_md(schema_keyval);
schema = schema->AddMetadata(schema_md.Copy());
>  // Build arrays of data and add to Table. const int64_t rowgroup_size = 100; std::vector<int16_t>
a_data(rowgroup_size, 0); std::vector<int16_t> b_data(rowgroup_size, 0);
>  for (int16_t i = 0; i < rowgroup_size; i++) { a_data[i] = i; b_data[i] = rowgroup_size
- i; }  arrow::Int16Builder a_bldr(pool); arrow::Int16Builder b_bldr(pool); st = a_bldr.Resize(rowgroup_size);
if (!st.ok()) return 1; st = b_bldr.Resize(rowgroup_size); if (!st.ok()) return 1;
>  st = a_bldr.AppendValues(a_data); if (!st.ok()) return 1;
>  st = b_bldr.AppendValues(b_data); if (!st.ok()) return 1;
>  std::shared_ptr<arrow::Array> a_arr_ptr; std::shared_ptr<arrow::Array> b_arr_ptr;
>  arrow::ArrayVector arr_vec; st = a_bldr.Finish(&a_arr_ptr); if (!st.ok()) return
1; arr_vec.push_back(a_arr_ptr); st = b_bldr.Finish(&b_arr_ptr); if (!st.ok()) return
1; arr_vec.push_back(b_arr_ptr);
>  std::shared_ptr<arrow::Table> table = arrow::Table::Make(schema, arr_vec);
>  // Test metadata printf("\nMetadata from original schema:\n"); printf("%s\n", schema->metadata()->ToString().c_str());
printf("%s\n", schema->field(0)->metadata()->ToString().c_str()); printf("%s\n",
schema->field(1)->metadata()->ToString().c_str());
>  std::shared_ptr<arrow::Schema> table_schema = table->schema(); printf("\nMetadata
from schema retrieved from table (should be the same):\n"); printf("%s\n", table_schema->metadata()->ToString().c_str());
printf("%s\n", table_schema->field(0)->metadata()->ToString().c_str()); printf("%s\n",
table_schema->field(1)->metadata()->ToString().c_str());
>  // Open file and write table. std::string file_name = "test.parquet"; std::shared_ptr<arrow::io::FileOutputStream>
ostream; st = arrow::io::FileOutputStream::Open(file_name, &ostream); if (!st.ok()) return
1;
>  std::unique_ptr<parquet::arrow::FileWriter> writer; std::shared_ptr<parquet::WriterProperties>
props = parquet::default_writer_properties(); st = parquet::arrow::FileWriter::Open(*schema,
pool, ostream, props, &writer); if (!st.ok()) return 1; st = writer->WriteTable(*table,
rowgroup_size); if (!st.ok()) return 1;
>  // Close file and stream. st = writer->Close(); if (!st.ok()) return 1; st = ostream->Close();
if (!st.ok()) return 1;
>  /********************************* Read Parquet File **********************************/
>  // Create new memory pool. Not sure if this is necessary. //arrow::MemoryPool* pool2
= arrow::default_memory_pool();
>  // Open file reader. std::shared_ptr<arrow::io::ReadableFile> input_file; st =
arrow::io::ReadableFile::Open(file_name, pool, &input_file); if (!st.ok()) return 1; std::unique_ptr<parquet::arrow::FileReader>
reader; st = parquet::arrow::OpenFile(input_file, pool, &reader); if (!st.ok()) return
1;
>  // Get schema and read metadata. std::shared_ptr<arrow::Schema> new_schema; st
= reader->GetSchema(&new_schema); if (!st.ok()) return 1; printf("\nMetadata from schema
read from file:\n"); printf("%s\n", new_schema->metadata()->ToString().c_str());
>  // Crashes because there are no metadata. /*printf("%s\n", new_schema->field(0)->metadata()->ToString().c_str());
printf("%s\n", new_schema->field(1)->metadata()->ToString().c_str());*/
>  printf("field name %s metadata exists: %d\n", new_schema->field(0)->name().c_str(),
new_schema->field(0)->HasMetadata()); printf("field name %s metadata exists: %d\n",
new_schema->field(1)->name().c_str(), new_schema->field(1)->HasMetadata());
>  // What if I read the whole table and get the schema from it. std::shared_ptr<arrow::Table>
new_table; st = reader->ReadTable(&new_table); if (!st.ok()) return 1; std::shared_ptr<arrow::Schema>
schema_from_table = new_table->schema(); printf("\nMetadata from schema that is retrieved
through table that is read from file:\n"); printf("%s\n", schema_from_table->metadata()->ToString().c_str());
>  // Crashes because there are no metadata. Known bug:  https://issues.apache.org/jira/browse/ARROW-4359
/*printf("%s\n", schema_from_table->field(0)->metadata()->ToString().c_str()); printf("%s\n",
schema_from_table->field(1)->metadata()->ToString().c_str());*/
>  printf("field name %s metadata exists: %d\n", schema_from_table->field(0)->name().c_str(),
schema_from_table->field(0)->HasMetadata()); printf("field name %s metadata exists:
%d\n", schema_from_table->field(1)->name().c_str(), schema_from_table->field(1)->HasMetadata());
st = input_file->Close(); if (!st.ok()) return 1;
>  return 0;} {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message