Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D944E200C09 for ; Wed, 11 Jan 2017 02:02:13 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D7E00160B3D; Wed, 11 Jan 2017 01:02:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0185D160B4B for ; Wed, 11 Jan 2017 02:02:12 +0100 (CET) Received: (qmail 13910 invoked by uid 500); 11 Jan 2017 01:02:12 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 13897 invoked by uid 99); 11 Jan 2017 01:02:12 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Jan 2017 01:02:12 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 9C98D1A06F5 for ; Wed, 11 Jan 2017 01:02:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id G5j7PAaPN3zt for ; Wed, 11 Jan 2017 01:02:09 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id D8EF65F1EE for ; Wed, 11 Jan 2017 01:02:08 +0000 (UTC) Received: (qmail 11002 invoked by uid 99); 11 Jan 2017 01:00:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Jan 2017 01:00:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 88B67DFB93; Wed, 11 Jan 2017 01:00:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Wed, 11 Jan 2017 01:00:53 -0000 Message-Id: <7060ab87d44947bfbd2948cf35a0ab33@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/50] incubator-quickstep git commit: QUICKSTEP-46 fixed [Forced Update!] archived-at: Wed, 11 Jan 2017 01:02:14 -0000 Repository: incubator-quickstep Updated Branches: refs/heads/quickstep_partition_parser_support fb919fef9 -> 4890bbe83 (forced update) QUICKSTEP-46 fixed Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7f0067b7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7f0067b7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7f0067b7 Branch: refs/heads/quickstep_partition_parser_support Commit: 7f0067b7913d8f2b68e8ac771fb9a87090d773e9 Parents: 1effc79 Author: tarun Authored: Mon Oct 10 20:29:42 2016 -0500 Committer: tarunbansal Committed: Fri Oct 28 10:02:34 2016 -0500 ---------------------------------------------------------------------- relational_operators/CMakeLists.txt | 14 ++++ relational_operators/TextScanOperator.cpp | 73 ++++++++++++++++---- relational_operators/TextScanOperator.hpp | 19 ++++- .../tests/text_scan_faulty_golden_output.txt | 5 ++ .../tests/text_scan_faulty_input.txt | 4 ++ 5 files changed, 99 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0067b7/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index 8dd65d0..0735bce 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -784,3 +784,17 @@ add_test(TextScanOperator_unittest ${TEXT_SCAN_INPUT_FILE} ${TEXT_SCAN_GOLDEN_OUTPUT_FILE} ${TEXT_SCAN_FAILURE_OUTPUT_FILE}) +file(TO_NATIVE_PATH + "${CMAKE_CURRENT_SOURCE_DIR}/tests/text_scan_faulty_input.txt" + TEXT_SCAN_FAULTY_INPUT_FILE) +file(TO_NATIVE_PATH + "${CMAKE_CURRENT_SOURCE_DIR}/tests/text_scan_faulty_golden_output.txt" + TEXT_SCAN_FAULTY_GOLDEN_OUTPUT_FILE) +file(TO_NATIVE_PATH + "${CMAKE_CURRENT_BINARY_DIR}/text_scan_faulty_failure_output.txt" + TEXT_SCAN_FAULTY_FAILURE_OUTPUT_FILE) +add_test(TextScanOperator_faulty_unittest + TextScanOperator_unittest + ${TEXT_SCAN_FAULTY_INPUT_FILE} + ${TEXT_SCAN_FAULTY_GOLDEN_OUTPUT_FILE} + ${TEXT_SCAN_FAULTY_FAILURE_OUTPUT_FILE}) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0067b7/relational_operators/TextScanOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp index 4151bac..aa734d3 100644 --- a/relational_operators/TextScanOperator.cpp +++ b/relational_operators/TextScanOperator.cpp @@ -196,7 +196,9 @@ serialization::WorkOrder* TextScanOperator::createWorkOrderProto(const string &f void TextScanWorkOrder::execute() { const CatalogRelationSchema &relation = output_destination_->getRelation(); std::vector tuples; + bool is_faulty; + std::vector vector_tuple_returned; constexpr std::size_t kSmallBufferSize = 0x4000; char *buffer = reinterpret_cast(malloc(std::max(text_segment_size_, kSmallBufferSize))); @@ -218,7 +220,6 @@ void TextScanWorkOrder::execute() { } else { --row_ptr; } - if (row_ptr >= buffer_end) { // This block does not even contain a newline character. return; @@ -238,16 +239,23 @@ void TextScanWorkOrder::execute() { // RIGHT AFTER the LAST newline character in this text segment. // Process the tuples which are between the first newline character and the - // last newline character. + // last newline character. SKIP any row which is corrupt instead of ABORTING the + // whole COPY operation. while (row_ptr < end_ptr) { if (*row_ptr == '\r' || *row_ptr == '\n') { // Skip empty lines. ++row_ptr; } else { - tuples.emplace_back(parseRow(&row_ptr, relation)); + vector_tuple_returned = parseRow(&row_ptr, relation, &is_faulty); + if (is_faulty) { + // Skip faulty rows + LOG(INFO) << "Faulty row found. Hence switching to next row."; + } else { + // Convert vector returned to tuple only when a valid row is encountered. + tuples.emplace_back(Tuple(std::move(vector_tuple_returned))); + } } } - // Process the tuple that is right after the last newline character. // NOTE(jianqiao): dynamic_read_size is trying to balance between the cases // that the last tuple is very small / very large. @@ -279,7 +287,15 @@ void TextScanWorkOrder::execute() { row_string.push_back('\n'); } row_ptr = row_string.c_str(); - tuples.emplace_back(parseRow(&row_ptr, relation)); + + vector_tuple_returned = parseRow(&row_ptr, relation, &is_faulty); + if (is_faulty) { + // Skip the faulty row. + LOG(INFO) << "Faulty row found. Hence switching to next row."; + } else { + // Convert vector returned to tuple only when a valid row is encountered. + tuples.emplace_back(Tuple(std::move(vector_tuple_returned))); + } } std::fclose(file); @@ -312,19 +328,26 @@ void TextScanWorkOrder::execute() { output_destination_->bulkInsertTuples(&column_vectors); } -Tuple TextScanWorkOrder::parseRow(const char **row_ptr, - const CatalogRelationSchema &relation) const { +std::vector TextScanWorkOrder::parseRow(const char **row_ptr, + const CatalogRelationSchema &relation, bool *is_faulty) const { std::vector attribute_values; + // Always assume current row is not faulty initially. + *is_faulty = false; bool is_null_literal; bool has_reached_end_of_line = false; std::string value_str; for (const auto &attr : relation) { if (has_reached_end_of_line) { - throw TextScanFormatError("Row has too few fields"); + // Do not abort if one of the row is faulty. + // Set is_faulty to true and SKIP the current row. + *is_faulty = true; + LOG(INFO) << "Row has too few fields."; + return attribute_values; } value_str.clear(); + extractFieldString(row_ptr, &is_null_literal, &has_reached_end_of_line, @@ -333,24 +356,46 @@ Tuple TextScanWorkOrder::parseRow(const char **row_ptr, if (is_null_literal) { // NULL literal. if (!attr.getType().isNullable()) { - throw TextScanFormatError( - "NULL literal '\\N' was specified for a column with a " - "non-nullable Type"); + *is_faulty = true; + LOG(INFO) << "NULL literal '\\N' was specified for a column with a " + "non-nullable Type."; + skipFaultyRow(row_ptr); + return attribute_values; } attribute_values.emplace_back(attr.getType().makeNullValue()); } else { attribute_values.emplace_back(); if (!attr.getType().parseValueFromString(value_str, &(attribute_values.back()))) { - throw TextScanFormatError("Failed to parse value"); + // Do not abort if one of the row is faulty. + *is_faulty = true; + LOG(INFO) << "Failed to parse value."; + skipFaultyRow(row_ptr); + return attribute_values; } } } if (!has_reached_end_of_line) { - throw TextScanFormatError("Row has too many fields"); + // Do not abort if one of the row is faulty. + // Set is_faulty to true and SKIP the current row. + *is_faulty = true; + LOG(INFO) << "Row has too many fields."; + skipFaultyRow(row_ptr); } - return Tuple(std::move(attribute_values)); + return attribute_values; +} + +void TextScanWorkOrder::skipFaultyRow(const char **field_ptr) const { + const char *cur_ptr = *field_ptr; + // Move row pointer to the end of faulty row. + for (;; ++cur_ptr) { + const char c = *cur_ptr; + if (c == '\n') { + break; + } + } + *field_ptr = cur_ptr + 1; } void TextScanWorkOrder::extractFieldString(const char **field_ptr, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0067b7/relational_operators/TextScanOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp index 24af844..65863b3 100644 --- a/relational_operators/TextScanOperator.hpp +++ b/relational_operators/TextScanOperator.hpp @@ -24,6 +24,7 @@ #include #include #include +#include #include "catalog/CatalogRelation.hpp" #include "catalog/CatalogTypedefs.hpp" @@ -241,6 +242,18 @@ class TextScanWorkOrder : public WorkOrder { std::string *field_string) const; /** + * @brief This method helps incorporate fault tolerance while ingesting data. + * It is called whenever a faulty row is encountered and it is + * required to move \p *field_ptr to the next row. + * + * @param field_ptr \p *field_ptr points to the current position of the input + * char stream while parsing a faulty row. After the call, \p *field_ptr + * will be modified to the start position of the NEXT record in the + * stream. + */ + void skipFaultyRow(const char **field_ptr) const; + + /** * @brief Make a tuple by parsing all of the individual fields from a char stream. * * @param \p *row_ptr points to the current position of the input char stream @@ -248,10 +261,12 @@ class TextScanWorkOrder : public WorkOrder { * After the call, \p *row_ptr will be modified to the start position of * the NEXT text row. * @param relation The relation schema for the tuple. + * @param is_faulty OUTPUT parameter. Set to true if the row is faulty, * @return The tuple parsed from the char stream. */ - Tuple parseRow(const char **row_ptr, - const CatalogRelationSchema &relation) const; +std::vector parseRow(const char **row_ptr, + const CatalogRelationSchema &relation, + bool *is_faulty) const; /** * @brief Parse up to three octal digits (0-7) starting at \p *literal_ptr as http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0067b7/relational_operators/tests/text_scan_faulty_golden_output.txt ---------------------------------------------------------------------- diff --git a/relational_operators/tests/text_scan_faulty_golden_output.txt b/relational_operators/tests/text_scan_faulty_golden_output.txt new file mode 100644 index 0000000..e07bedf --- /dev/null +++ b/relational_operators/tests/text_scan_faulty_golden_output.txt @@ -0,0 +1,5 @@ ++--------------------+------------------------+--------------------+-----------------------------------------+----------------------------------------+--------------------+ +|long_attr |double_attr |char_attr |datetime_attr |interval_attr |varchar_attr | ++--------------------+------------------------+--------------------+-----------------------------------------+----------------------------------------+--------------------+ +| 1234| 12.34| foo| 1994-04-27T08:20:50| 12 days 00:00:00| right_row| ++--------------------+------------------------+--------------------+-----------------------------------------+----------------------------------------+--------------------+ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0067b7/relational_operators/tests/text_scan_faulty_input.txt ---------------------------------------------------------------------- diff --git a/relational_operators/tests/text_scan_faulty_input.txt b/relational_operators/tests/text_scan_faulty_input.txt new file mode 100644 index 0000000..aa00d39 --- /dev/null +++ b/relational_operators/tests/text_scan_faulty_input.txt @@ -0,0 +1,4 @@ +1234 12.34 foo 1994-04-27T08:20:50 12 days right_row +1234 abcd foo 1994-04-27T08:20:50 12 days row_with_wrong_datatype_value +1234 foo 1994-04-27T08:20:50 12 days row_with_less_values +1234 abcd foo 1994-04-27T08:20:50 12 days bar row_with_more_values