flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
Date Tue, 16 Aug 2016 07:23:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422341#comment-15422341
] 

ASF GitHub Bot commented on FLINK-2055:
---------------------------------------

Github user delding commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2332#discussion_r74886606
  
    --- Diff: flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
---
    @@ -0,0 +1,371 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.hbase;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.hadoop.hbase.client.Append;
    +import org.apache.hadoop.hbase.client.Delete;
    +import org.apache.hadoop.hbase.client.Durability;
    +import org.apache.hadoop.hbase.client.Increment;
    +import org.apache.hadoop.hbase.client.Mutation;
    +import org.apache.hadoop.hbase.client.Put;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + *  This class represents a list of {@link MutationAction}s you will take when writing
    + *  an input value of {@link HBaseSink} to a row in a HBase table.
    + *  Each {@link MutationAction} can create an HBase {@link Mutation} operation type
    + *  including {@link Put}, {@link Increment}, {@link Append} and {@link Delete}.
    + */
    +public class MutationActionList {
    +	private final List<MutationAction> actions;
    +
    +	public MutationActionList() {
    +		this.actions = new ArrayList<>();
    +	}
    +
    +	public List<MutationAction> getActions() {
    +		return this.actions;
    +	}
    +
    +	/**
    +	 * Create a new list of HBase {@link Mutation}s.
    +	 *
    +	 * @param rowKey row that the created {@link Mutation} list is applied to
    +	 * @param writeToWAL enable WAL
    +	 * @return a list of HBase {@link Mutation}s
    +	 */
    +	public List<Mutation> newMutationList(byte[] rowKey, boolean writeToWAL) {
    +		List<Mutation> mutations = new ArrayList<>();
    +		Put put = null;
    +		Increment increment = null;
    +		Append append = null;
    +		Delete delete = null;
    +		boolean rowIsDeleted = false;
    +		for (MutationAction action : actions) {
    +			switch (action.getType()) {
    +				case PUT:
    +					if (put == null) {
    +						put = new Put(rowKey);
    +						mutations.add(put);
    +					}
    +					if (action.getTs() == -1) {
    +						put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
    +					} else {
    +						put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), action.getValue());
    +					}
    +					break;
    +
    +				case INCREMENT:
    +					if (increment == null) {
    +						increment = new Increment(rowKey);
    +						mutations.add(increment);
    +					}
    +					increment.addColumn(action.getFamily(), action.getQualifier(), action.getIncrement());
    +					break;
    +
    +				case APPEND:
    +					if (append == null) {
    +						append = new Append(rowKey);
    +						mutations.add(append);
    +					}
    +					append.add(action.getFamily(), action.getQualifier(), action.getValue());
    +					break;
    +
    +				// If there are multiple DELETE_ROW actions, only the first one is served
    +				case DELETE_ROW:
    +					if (!rowIsDeleted) {
    +						for (int i = 0; i < mutations.size(); ) {
    +							if (mutations.get(i) instanceof Delete) {
    +								mutations.remove(i);
    +							} else {
    +								i++;
    +							}
    +						}
    +						delete = new Delete(rowKey, action.getTs());
    +						mutations.add(delete);
    +						rowIsDeleted = true;
    +					}
    +					break;
    +
    +				case DELETE_FAMILY:
    +					if (!rowIsDeleted) {
    +						if (delete == null) {
    +							delete = new Delete(rowKey);
    +							mutations.add(delete);
    +						}
    +						delete.addFamily(action.getFamily(), action.getTs());
    +					}
    +					break;
    +
    +				case DELETE_COLUMNS:
    +					if (!rowIsDeleted) {
    +						if (delete == null) {
    +							delete = new Delete(rowKey);
    +							mutations.add(delete);
    +						}
    +						delete.addColumns(action.getFamily(), action.getQualifier(), action.getTs());
    +					}
    +					break;
    +
    +				case DELETE_COLUMN:
    +					if (!rowIsDeleted) {
    --- End diff --
    
    I would be more than happy not to handle combination of Deletes :-). Will update.


> Implement Streaming HBaseSink
> -----------------------------
>
>                 Key: FLINK-2055
>                 URL: https://issues.apache.org/jira/browse/FLINK-2055
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming, Streaming Connectors
>    Affects Versions: 0.9
>            Reporter: Robert Metzger
>            Assignee: Hilmi Yildirim
>
> As per : http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message