hive-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Elliot West (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (HIVE-10165) Improve hive-hcatalog-streaming extensibility and support updates and deletes.
Date Mon, 25 May 2015 20:38:18 GMT

     [ https://issues.apache.org/jira/browse/HIVE-10165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Elliot West updated HIVE-10165:
-------------------------------
    Attachment: HIVE-10165.4.patch

I see now that I've not been following the patch submission process (my apologies). I've renamed
and uploaded the patch with a suitably versioned name ([^HIVE-10165.4.patch]). I've also taken
the opportunity to provide package level documentation and 'improve' the bucketing of new
records.

> Improve hive-hcatalog-streaming extensibility and support updates and deletes.
> ------------------------------------------------------------------------------
>
>                 Key: HIVE-10165
>                 URL: https://issues.apache.org/jira/browse/HIVE-10165
>             Project: Hive
>          Issue Type: Improvement
>          Components: HCatalog
>            Reporter: Elliot West
>            Assignee: Elliot West
>              Labels: streaming_api
>         Attachments: HIVE-10165.0.patch, HIVE-10165.4.patch
>
>
> h3. Overview
> I'd like to extend the [hive-hcatalog-streaming|https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest]
API so that it also supports the writing of record updates and deletes in addition to the
already supported inserts.
> h3. Motivation
> We have many Hadoop processes outside of Hive that merge changed facts into existing
datasets. Traditionally we achieve this by: reading in a ground-truth dataset and a modified
dataset, grouping by a key, sorting by a sequence and then applying a function to determine
inserted, updated, and deleted rows. However, in our current scheme we must rewrite all partitions
that may potentially contain changes. In practice the number of mutated records is very small
when compared with the records contained in a partition. This approach results in a number
of operational issues:
> * Excessive amount of write activity required for small data changes.
> * Downstream applications cannot robustly read these datasets while they are being updated.
> * Due to scale of the updates (hundreds or partitions) the scope for contention is high.

> I believe we can address this problem by instead writing only the changed records to
a Hive transactional table. This should drastically reduce the amount of data that we need
to write and also provide a means for managing concurrent access to the data. Our existing
merge processes can read and retain each record's {{ROW_ID}}/{{RecordIdentifier}} and pass
this through to an updated form of the hive-hcatalog-streaming API which will then have the
required data to perform an update or insert in a transactional manner. 
> h3. Benefits
> * Enables the creation of large-scale dataset merge processes  
> * Opens up Hive transactional functionality in an accessible manner to processes that
operate outside of Hive.
> h3. Implementation
> Our changes do not break the existing API contracts. Instead our approach has been to
consider the functionality offered by the existing API and our proposed API as fulfilling
separate and distinct use-cases. The existing API is primarily focused on the task of continuously
writing large volumes of new data into a Hive table for near-immediate analysis. Our use-case
however, is concerned more with the frequent but not continuous ingestion of mutations to
a Hive table from some ETL merge process. Consequently we feel it is justifiable to add our
new functionality via an alternative set of public interfaces and leave the existing API as
is. This keeps both APIs clean and focused at the expense of presenting additional options
to potential users. Wherever possible, shared implementation concerns have been factored out
into abstract base classes that are open to third-party extension. A detailed breakdown of
the changes is as follows:
> * We've introduced a public {{RecordMutator}} interface whose purpose is to expose insert/update/delete
operations to the user. This is a counterpart to the write-only {{RecordWriter}}. We've also
factored out life-cycle methods common to these two interfaces into a super {{RecordOperationWriter}}
interface.  Note that the row representation has be changed from {{byte[]}} to {{Object}}.
Within our data processing jobs our records are often available in a strongly typed and decoded
form such as a POJO or a Tuple object. Therefore is seems to make sense that we are able to
pass this through to the {{OrcRecordUpdater}} without having to go through a {{byte[]}} encoding
step. This of course still allows users to use {{byte[]}} if they wish.
> * The introduction of {{RecordMutator}} requires that insert/update/delete operations
are then also exposed on a {{TransactionBatch}} type. We've done this with the introduction
of a public {{MutatorTransactionBatch}} interface which is a counterpart to the write-only
{{TransactionBatch}}. We've also factored out life-cycle methods common to these two interfaces
into a super {{BaseTransactionBatch}} interface. 
> * Functionality that would be shared by implementations of both {{RecordWriters}} and
{{RecordMutators}} has been factored out of {{AbstractRecordWriter}} into a new abstract base
class {{AbstractOperationRecordWriter}}. The visibility is such that it is open to extension
by third parties. The {{AbstractOperationRecordWriter}} also permits the setting of the {{AcidOutputFormat.Options#recordIdColumn()}}
(defaulted to {{-1}}) which is a requirement for enabling updates and deletes. Additionally,
these options are now fed an {{ObjectInspector}} via an abstract method so that a {{SerDe}}
is not mandated (it was not required for our use-case). The {{AbstractRecordWriter}} is now
much leaner, handling only the extraction of the {{ObjectInspector}} from the {{SerDe}}.
> * A new abstract class, {{AbstractRecordMutator}} has been introduced to act as the base
of concrete {{RecordMutator}} implementations. The key functionality added by this class is
a validation step on {{update}} and {{delete}} to ensure that the record specified contains
a {{RecordIdentifier}}. This was added as it is not explicitly checked for elsewhere and would
otherwise generate an NPE deep down in {{OrcRecordUpdater}}.
> * There are now two private transaction batch implementations: {{HiveEndPoint.TransactionBatchImpl}}
and its insert/update/delete counterpart: {{HiveEndPoint.MutationTransactionBatchImpl}}. As
you might expect, {{TransactionBatchImpl}} must delegate to a {{RecordWriter}} implementation
whereas {{MutationTransactionBatchImpl}} must delegates to a {{RecordMutator}} implementation.
Shared transaction batch functionality has been factored out into an {{AbstractTransactionBatch}}
class. In the case of {{MutationTransactionBatchImpl}} we've added a check to ensure that
an error occurs should a user submit multiple types of operation to the same batch as we've
found that this can lead to inconsistent data being returned from the underlying table when
read from Hive.
> * To enable the usage of the different transaction batch variants we've added an additional
transaction batch factory method to {{StreamingConnection}} and provided a suitable implementation
in {{HiveEndPoint}}. It's worth noting that {{StreamingConnection}} is the only public facing
component of the API contract that contains references to both the existing writer scheme
and our mutator scheme.
> Please find this changes in the attached patch: [^HIVE-10165.0.patch].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message