Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DF14B200A5B for ; Wed, 25 May 2016 09:39:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DDB29160A18; Wed, 25 May 2016 07:39:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 08EC5160A17 for ; Wed, 25 May 2016 09:39:13 +0200 (CEST) Received: (qmail 80887 invoked by uid 500); 25 May 2016 07:39:13 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 80877 invoked by uid 99); 25 May 2016 07:39:13 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 May 2016 07:39:13 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 070882C1F56 for ; Wed, 25 May 2016 07:39:13 +0000 (UTC) Date: Wed, 25 May 2016 07:39:13 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-3967) Provide RethinkDB Sink for Flink MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 25 May 2016 07:39:15 -0000 [ https://issues.apache.org/jira/browse/FLINK-3967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15299618#comment-15299618 ] ASF GitHub Bot commented on FLINK-3967: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2031#discussion_r64527750 --- Diff: flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/FlinkRethinkDbSink.java --- @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.rethinkdb; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Objects; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.rethinkdb.RethinkDB; +import com.rethinkdb.gen.ast.Insert; +import com.rethinkdb.gen.ast.Table; +import com.rethinkdb.net.Connection; + +/** + * This class is the Flink sink for RethinkDB which is a tcp/JSON protocol based document + * oriented NoSQL database. + * + *

+ * This sink provides two constuctors: + *

+ * {@link #FlinkRethinkDbSink(String hostname, int hostport, String database, String table, JSONSerializationSchema schema)}, and + *

+ * {@link #FlinkRethinkDbSink(String hostname, int hostport, String database, String table, JSONSerializationSchema schema, ConflictStrategy conflictStrategy)} + *

+ * + * The parameter for the constructor are as follows: + *

+ *

    + *
  • hostname - the rethinkdb hostname
  • + *
  • hosport - the rethinkdb port for the driver to connect
  • + *
  • database - the rethinkdb database name to which the table belongs
  • + *
  • table - the rethinkdb table name where documents are inserted
  • + *
  • schema - the schema tranfromer that converts input to JSONObject, or JSONArray
  • + *
  • conflictStrategy - the conflict resolution strategy in case inserted document has id which exists in the db
  • + *
+ *

+ * + * The user can also set: + *

+ *

    + *
  • username - default is admin
  • + *
  • password - default is blank
  • + *
+ *

with the {@link #setUsernameAndPassword(String, String)} method. + *

+ * NOTE: If multiple documents are getting inserted (eg: using JSONArray), the sink + * checks if there is an error entry in the result HashMap and throws a runtime exception if errors + * counts is not zero. The exception message contains the results HashMap. + * In case of multiple errors only the first error is noted in the result HashMap. + * + * + * @see {@link ConflictStrategy} for conflict resolution strategies + * + * @param a value that can be transformed into a {@link org.json.simple.JSONArray;} or {@link org.json.simple.JSONObject} + */ +public class FlinkRethinkDbSink extends RichSinkFunction implements Serializable{ + + /** + * Serial version for the class + */ + private static final long serialVersionUID = -2135499016796158755L; + + /** + * Logger for the class + */ + private static final Logger LOG = LoggerFactory.getLogger(FlinkRethinkDbSink.class); + + /** + * Conflict resolution option key in case document ids are same + */ + public static final String CONFLICT_OPT = "conflict"; + + /** + * Result key indicating number of errors + */ + public static final String RESULT_ERROR_KEY = "errors"; + + /** + * Serialization schema for the sink + */ + private JSONSerializationSchema serializationSchema; + + /** + * RethinkDB connection object + */ + private transient Connection rethinkDbConnection; + + /** + * RethinkDB hostname + */ + private String hostname; + + /** + * RethinkDB port + */ + private int hostport; + + /** + * RethinkDB tablename where documents are inserted + */ + private String tableName; + + /** + * RethinkDB database where document are inserted + */ + private String databaseName; + + /** + * Conflict resolution strategy + */ + private ConflictStrategy conflict; + + /** + * Default user name + */ + public static final String DEFAULT_USER_NAME = "admin"; + + /** + * User name + */ + private String username = DEFAULT_USER_NAME; + + /** + * Default user name + */ + public static final String DEFAULT_PASSWORD = ""; + + /** + * password + */ + private String password = DEFAULT_PASSWORD; + + + /** + * Constructor for RethinkDB sink + * @param hostname + * @param hostport + * @param database + * @param table + * @param schema + */ + public FlinkRethinkDbSink(String hostname, int hostport, String database, String table, + JSONSerializationSchema schema) { + this(hostname, hostport, database, table, schema, ConflictStrategy.update); + } + + /** + * Constructor for sink + * @param hostname + * @param hostport + * @param database name + * @param table name + * @param schema serialization converter + * @param conflict resolution strategy for document id conflict + */ + public FlinkRethinkDbSink(String hostname, int hostport, String database, String table, + JSONSerializationSchema schema, + ConflictStrategy conflict) { + this.hostname = Objects.requireNonNull(hostname); + this.hostport = hostport; + this.databaseName = Objects.requireNonNull(database); + this.tableName = Objects.requireNonNull(table); + this.serializationSchema = Objects.requireNonNull(schema); + this.conflict = conflict; + } + + /** + * Open the sink + */ + @Override + public void open(Configuration parameters) throws Exception { + LOG.info("Received parameters : {}", parameters); + + super.open(parameters); + + rethinkDbConnection = getRethinkDB().connection().hostname(hostname) + .port(hostport).user(username, password).connect(); + + LOG.info("RethinkDb connection created for host {} port {} and db {}", + hostname, hostport,databaseName); + } + + /** + * Helper method to help testability + * @return RethinkDB instance + */ + protected RethinkDB getRethinkDB() { + return RethinkDB.r; + } + + /** + * Set username and password. If username and password are not provided, + * then default username (admin) and blank password are used. + * + * @param username + * @param password + * + * @throws IllegalArgumentException if arguments is null or empty + */ + public void setUsernameAndPassword(String username, String password) { + + if ( StringUtils.isBlank(username) ) { + throw new IllegalArgumentException("username " + username + " cannot be null or empty" ); + } else { + this.username = username; + } + + if ( StringUtils.isBlank(password) ) { + throw new IllegalArgumentException("password " + password + " cannot be null or empty" ); + } else { + this.password = password; + } + } + + /** + * Invoke the sink with the input + * + * @param the value to be inserted + * + * @throws RuntimeException if there are errors while inserting row into rethinkdb + */ + @Override + public void invoke(OUT value) throws Exception { + LOG.debug("Received value {}", value); + + Object json = serializationSchema.toJSON(value); + LOG.debug("Object/Json: {}/{}", value, json); + Insert insert = getRdbTable().insert(json).optArg(CONFLICT_OPT, conflict.toString()); + HashMap result = runInsert(insert); + + LOG.debug("Object/Json/Result: {}/{}/{}", value, json, result); + + if ( (Long)result.get(RESULT_ERROR_KEY) != 0 ) { --- End diff -- this is a synchronous operation, correct? If so I'd be curious about a benchmark for this sink. > Provide RethinkDB Sink for Flink > -------------------------------- > > Key: FLINK-3967 > URL: https://issues.apache.org/jira/browse/FLINK-3967 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors > Affects Versions: 1.0.3 > Environment: All > Reporter: Mans Singh > Assignee: Mans Singh > Priority: Minor > Labels: features > Fix For: 1.1.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Provide Sink to stream data from flink to rethink db. -- This message was sent by Atlassian JIRA (v6.3.4#6332)