From user-return-35097-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue May 19 18:40:50 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 5809E180638 for ; Tue, 19 May 2020 20:40:49 +0200 (CEST) Received: (qmail 19693 invoked by uid 500); 19 May 2020 18:40:47 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 19683 invoked by uid 99); 19 May 2020 18:40:46 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 May 2020 18:40:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 310171A31F1 for ; Tue, 19 May 2020 18:40:46 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 4.058 X-Spam-Level: **** X-Spam-Status: No, score=4.058 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.249, HTML_MESSAGE=0.2, MANY_SPAN_IN_TEXT=3.599, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, T_KAM_HTML_FONT_INVALID=0.01, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=ververica-com.20150623.gappssmtp.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 8e5VHsCyn9vM for ; Tue, 19 May 2020 18:40:42 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.221.47; helo=mail-wr1-f47.google.com; envelope-from=arvid@data-artisans.com; receiver= Received: from mail-wr1-f47.google.com (mail-wr1-f47.google.com [209.85.221.47]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id C2DB6BB8C4 for ; Tue, 19 May 2020 18:40:41 +0000 (UTC) Received: by mail-wr1-f47.google.com with SMTP id l11so580425wru.0 for ; Tue, 19 May 2020 11:40:41 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=ververica-com.20150623.gappssmtp.com; s=20150623; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=xIvAHV12nu5dKZPQTsqkI9A59XHXuZBfTNaCgyRhg5E=; b=fjjhNiXuWZn4zCY0mD+q7+eC80H7l/nn2WCI4cYwvvNdeg5tx5AAsFQWoY6WNdjUpr IYsVfgnzeR/BI7Ln+XW//IKIClLQwOy3aYLpcR/eWx1Qn2YuoZCjLM13OcRd+m0IDyGo bOR4GYFWWp4+HUtZ56dWPdZziRjNxWL+U2RFaFgPa20xsQmimc0kYe4X5DL23dDh+WUF F3773U0TI8nfOcn1WRiUCom+QfrEZs12xT6Sbr49HtiDgb/9vdsUDFpN3Gs+GNP0CC/l bZpiWs/83p4CG6TsW0L5p4nbWqQXZKeVwLa06ioJZWUzvqvch6YmLHyvNwYs8rrPV9Hl bsGQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=xIvAHV12nu5dKZPQTsqkI9A59XHXuZBfTNaCgyRhg5E=; b=rKWZg8RIbzSOXteFATQcEIAQsJu3qjkWkvidoKQh+FF7vpU92lt55Na6oE+tu9LP5P WN0kdroA7aj0L6hb9GViuVdqdOBmAWqRHBFLzmyL+HskFQVtJO4qu6u2bk6GsTIj6mOh m25Tw83axCXFOsBiy3E/Av0JA5PmYas3JANK9ng+HdwnNlOvaHbrwvpNlGp28uX/A4h3 nQUM2IwBuewIamLfz7Zb18grA8hTJVXsIEHNhpCgI0/k7LPim2TvW3Cc1K00Xc+TXjHt HN6O29T/4WDIZvcO5rFhOZDZZHnjKWDS3qGvTXslkbhyIb22CRu5ocQMKL5OtGAl14cA 98jA== X-Gm-Message-State: AOAM533LG1qESxvDCgx5guopHbzznb4MGID8E7vEU2GYd4xUtHfiyzf7 BLurtmTbmls14aJWnCYxpDF6mBOPuCK/VgH8Vm3F9g== X-Google-Smtp-Source: ABdhPJxXEzc6zOO45wCQf96VN3BW24jMJj7zPenC7/MMvDcXIaOXavIwZ28bVcUXf1z664TLH3vLByL4/5XAHXJiHWk= X-Received: by 2002:a5d:400f:: with SMTP id n15mr249570wrp.419.1589913640437; Tue, 19 May 2020 11:40:40 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Arvid Heise Date: Tue, 19 May 2020 20:40:28 +0200 Message-ID: Subject: Re: Rocksdb implementation To: Jaswin Shah Cc: Yun Tang , "isha.singhal@paytm.com" , "ankit.singhal@paytm.com" , "user@flink.apache.org" Content-Type: multipart/alternative; boundary="000000000000c3829505a6049d5c" --000000000000c3829505a6049d5c Content-Type: text/plain; charset="UTF-8" Hi Jaswin, you cannot run a DataSet program inside a DataStream. However, you can perform the same query on a windowed stream. So if you would execute the batchy part every day, you can just create a tumble window of 24h and then perform your batchy analysis on that time window. Alternatively, you can dump the data into Kafka or a file system and then run the batchy part as a separate program. On Tue, May 19, 2020 at 1:36 PM Jaswin Shah wrote: > Thanks yun and Arvid. > Just a question, is it possible to have a batch execution inside the same > streaming job. You meant to say I should collect the missing messages from > both streams in sideoutput on timer expiry. So, I will execute a batch job > on side output as sideput will be shared with the same streaming job that I > have. Basically, I need that missing message infos outside. > ------------------------------ > *From:* Jaswin Shah > *Sent:* 19 May 2020 13:29 > *To:* Yun Tang ; Arvid Heise ; > isha.singhal@paytm.com ; ankit.singhal@paytm.com < > ankit.singhal@paytm.com> > *Cc:* user@flink.apache.org > *Subject:* Re: Rocksdb implementation > > ++ > ------------------------------ > *From:* Yun Tang > *Sent:* 18 May 2020 23:47 > *To:* Arvid Heise ; Jaswin Shah < > jaswin.shah@outlook.com> > *Cc:* user@flink.apache.org > *Subject:* Re: Rocksdb implementation > > Hi Jaswin > > As Arvid suggested, it's not encouraged to query the internal RocksDB > directly. Apart from Arvid's solution, I think queryable state [1] might > also help you. I think you just want to know the left entries in both of > map state after several days and query the state should make the meet, > please refer to the official doc and this example [2] to know more details. > > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html > [2] > https://github.com/apache/flink/blob/def4cbb762f849ce5b5c9c6bd367fd62bc8f45de/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java#L108-L122 > > Best > Yun Tang > ------------------------------ > *From:* Arvid Heise > *Sent:* Monday, May 18, 2020 23:40 > *To:* Jaswin Shah > *Cc:* user@flink.apache.org > *Subject:* Re: Rocksdb implementation > > Hi Jaswin, > > I'd discourage using rocksdb directly. It's more of an implementation > detail of Flink. I'd also discourage to write to Kafka directly without > using our Kafka Sink, as you will receive duplicates upon recovery. > > If you run the KeyedCoProcessFunction continuously anyways, I'd add a > timer (2 days?) [1] for all unmatched records and on triggering of the > timer, output the record through a side output [2], where you do your batch > logic. Then you don't need a separate batch job to clean that up. If you > actually want to output to Kafka for some other application, you just need > to stream the side output to a KafkaProducer. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timers > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html > > On Mon, May 18, 2020 at 10:30 AM Jaswin Shah > wrote: > > /** > * Alipay.com Inc. > * Copyright (c) 2004-2020 All Rights Reserved. > */ > package com.paytm.reconsys.functions.processfunctions; > > import com.paytm.reconsys.Constants; > import com.paytm.reconsys.configs.ConfigurationsManager; > import com.paytm.reconsys.enums.DescripancyTypeEnum; > import com.paytm.reconsys.exceptions.MissingConfigurationsException; > import com.paytm.reconsys.messages.ResultMessage; > import com.paytm.reconsys.messages.cart.CartMessage; > import com.paytm.reconsys.messages.cart.Payment; > import com.paytm.reconsys.messages.pg.PGMessage; > import org.apache.commons.lang3.StringUtils; > import org.apache.flink.api.common.state.MapState; > import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; > import org.apache.flink.util.Collector; > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.Producer; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.common.KafkaException; > import org.apache.kafka.common.errors.AuthorizationException; > import org.apache.kafka.common.errors.OutOfOrderSequenceException; > import org.apache.kafka.common.errors.ProducerFencedException; > import org.apache.kafka.common.serialization.StringSerializer; > import org.apache.flink.api.common.state.MapStateDescriptor; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.configuration.Configuration; > import java.util.Properties; > > /** > * CoProcessFuntion to process cart and pg messages connected using connect operator. > * @author jaswin.shah > * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$ > */ > public class CartPGCoprocessFunction extends KeyedCoProcessFunction { > > /** > * Map state for cart messages, orderId+mid is key and cartMessage is value. > */ > private MapState cartState = null; > > /** > * Map state for pg messages, orderId+mid is key and pgMessage is value. > */ > private MapState pgState = null; > > /** > * Intializations for cart and pg mapStates > * > * @param config > */ > @Override > public void open(Configuration config) { > MapStateDescriptor cartStateDescriptor = new MapStateDescriptor<> ( > "cartData", > TypeInformation.of(String.class), > TypeInformation.of(CartMessage.class) > ); > cartState = getRuntimeContext().getMapState(cartStateDescriptor); > > MapStateDescriptor pgStateDescriptor = new MapStateDescriptor<>( > "pgData", > TypeInformation.of(String.class), > TypeInformation.of(PGMessage.class) > ); > pgState = getRuntimeContext().getMapState(pgStateDescriptor); > } > > /** > * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present. > * 2. If present, match, checkDescripancy, process and delete entry from pgMapState. > * 3. If not present, add orderId+mid as key and cart object as value in cartMapState. > * @param cartMessage > * @param context > * @param collector > * @throws Exception > */ > @Override > public void processElement1(CartMessage cartMessage, Context context, Collector collector) throws Exception { > String searchKey = cartMessage.createJoinStringCondition(); > if(pgState.contains(searchKey)) { > generateResultMessage(cartMessage,pgState.get(searchKey)); > pgState.remove(searchKey); > } else { > cartState.put(searchKey,cartMessage); > } > } > > /** > * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present. > * 2. If present, match, checkDescripancy, process and delete entry from cartMapState. > * 3. If not present, add orderId+mid as key and cart object as value in pgMapState. > * @param pgMessage > * @param context > * @param collector > * @throws Exception > */ > @Override > public void processElement2(PGMessage pgMessage, Context context, Collector collector) throws Exception { > String searchKey = pgMessage.createJoinStringCondition(); > if(cartState.contains(searchKey)) { > generateResultMessage(cartState.get(searchKey),pgMessage); > cartState.remove(searchKey); > } else { > pgState.put(searchKey,pgMessage); > } > } > > > /** > * Create ResultMessage from cart and pg messages. > * > * @param cartMessage > * @param pgMessage > * @return > */ > private ResultMessage generateResultMessage(CartMessage cartMessage, PGMessage pgMessage) { > ResultMessage resultMessage = new ResultMessage(); > Payment payment = null; > > //Logic should be in cart: check > for (Payment pay : cartMessage.getPayments()) { > if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) { > payment = pay; > break; > } > } > resultMessage.setOrderId(cartMessage.getId()); > resultMessage.setMid(payment.getMid()); > > resultMessage.setCartOrderStatus(payment.mapToOrderStatus().getCode()); > resultMessage.setPgOrderStatus(pgMessage.getOrderStatus()); > > resultMessage.setCartOrderCompletionTime(payment.getUpdated_at()); > resultMessage.setPgOrderCompletionTime(pgMessage.getOrderCompletionTime()); > > resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue()); > resultMessage.setCartOrderAmount(cartMessage.getGrandtotal()); > > resultMessage.setCartPaymethod(payment.getPayment_method()); > resultMessage.setPgPaymethod(pgMessage.getPayMethod()); > > checkDescripancyAndTriggerAlert(resultMessage); > > return resultMessage; > } > > /** > * Evaluate if there is descripancy of any fields between the messages from two different systems. > * Write all the descripancy logic here. > * > * @param resultMessage > */ > private void checkDescripancyAndTriggerAlert(ResultMessage resultMessage) { > if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) { > resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY); > //Send message to kafka queue for order status discrepancy. > sendMessageToKafkaTopic(resultMessage.toString()); > } > > if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) { > resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY); > //Send message to kafka queue for pay method discrepancy. > sendMessageToKafkaTopic(resultMessage.toString()); > } > > if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) { > resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY); > //Send message to kafka queue for pay amount discrepancy. > sendMessageToKafkaTopic(resultMessage.toString()); > } > } > > /** > * Send a message to kafka topic > * > * @param message > */ > private void sendMessageToKafkaTopic(String message) { > Properties kafkaProperties = ConfigurationsManager.getResultSystemKafkaProperties(); > //kafkaProperties.put("transactional.id","trans123"); > Producer producer = new KafkaProducer<>(kafkaProperties, new StringSerializer(), new StringSerializer()); > //producer.initTransactions(); > try { > //producer.beginTransaction(); > producer.send(new ProducerRecord<>(ConfigurationsManager.getResultTopicName(), message)); > //producer.commitTransaction(); > } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { > // We can't recover from these exceptions, so our only option is to close the producer and exit. > producer.close(); > } catch (KafkaException e) { > producer.abortTransaction(); > } catch (MissingConfigurationsException e) { > e.printStackTrace(); > } > producer.close(); > } > } > > This is the snapshot of implementation I have done > ------------------------------ > *From:* Jaswin Shah > *Sent:* 18 May 2020 13:55 > *To:* user@flink.apache.org > *Subject:* Rocksdb implementation > > Hi, > I have implemented the flink job with MapStates. The functionality is > like, > > 1. I have two datastreams which I connect with connect operator and > then call coprocessfunction with every pair of objects. > 2. For element of first datastream, processElement1 method is called > and for an element of second datastream, processElement2 method is called. > 3. I have two MapStates in CoProcessFunction for both streams > separately. > 4. When processElement1 is called, it checks in MapState2 if > corresponding element with given id is present, if present, I match, and > delete. If not present, I add the object in MapState1. > 5. When processElement2 is called, it checks in MapState1 if > corresponding element with given id is present, if present, I match and > delete. I fnot present I add object in MapState2. > 6. Now, I want all the state data to be stored in Rocksdb. > 7. After few days, I want to run a batch streaming job on Rocksdb to > check if there are any objects which have not match found to create a > report of those. > > I need a help how can I store this state data in Rocksdb and how to do > setups, configurations and codes for those which I am not understanding. > Also, is it possible to run batch streaming job on Rocksdb data? > > Help will be highly appreciated. > > Thanks, > Jaswin > > > > -- > > Arvid Heise | Senior Java Developer > > > > Follow us @VervericaData > > -- > > Join Flink Forward - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng --000000000000c3829505a6049d5c Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Jaswin,

you cannot run a = DataSet program inside a DataStream. However, you can perform the same quer= y on a windowed stream. So if you would execute the batchy part every day, = you can just create a tumble window of 24h and then perform your batchy ana= lysis on that time window.

Alternatively, you= can dump the data into Kafka or a file system and then run the batchy part= as a separate program.

On Tue, May 19, 2020 at 1:36 PM Jaswin Sha= h <jaswin.shah@outlook.com> wrote:
Thanks yun and Arvid.=C2=A0
Just a question, is it possible to have a batch execution inside the same s= treaming job. You meant to say I should collect the missing messages from b= oth streams in sideoutput on timer expiry. So, I will execute a batch job o= n side output as sideput will be shared with the same streaming job that I have. Basically, I need that mis= sing message infos outside.

++

Fr= om: Yun Tang <= myasuka@live.com>
Sent: 18 May 2020 23:47
To: Arvid Heise <arvid@ververica.com>; Jaswin Shah <jaswin.shah@outlook.com>
Cc: user@= flink.apache.org <user@flink.apache.org>
Subject: Re: Rocksdb implementation
=C2=A0
Hi Jaswin

As Arvid suggested, it's not encouraged to query the internal RocksDB d= irectly. Apart from Arvid's solution, I think queryable state [1] might= also help you. I think you just want to know the left entries in both of m= ap state after several days and query the state should make the meet, please refer to the official doc and this exam= ple [2] to know more details.


[2] https://github.com/apache/flink/blob/def4cbb762f849ce5b5c9c6bd367fd62bc8f45= de/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apac= he/flink/streaming/tests/queryablestate/QsStateClient.java#L108-L122

Best
Yun Tang

= From: Arvid Heise <arvid@ververica.com>
Sent: Monday, May 18, 2020 23:40
To: Jaswin Shah <jaswin.shah@outlook.com>
Cc: user@= flink.apache.org <user@flink.apache.org>
Subject: Re: Rocksdb implementation
=C2=A0
Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an impleme= ntation detail of Flink. I'd also discourage to write to Kafka directly= without using our Kafka Sink, as you will receive duplicates upon recovery= .

If you run the KeyedCoProcessFunction continuously anyways, I'd ad= d a timer (2 days?) [1] for all unmatched records and on triggering of the = timer, output the record through a side output [2], where you do your batch= logic. Then you don't need a separate batch job to clean that up. If you actually want to output to Kafka for so= me other application, you just need to stream the side output to a KafkaPro= ducer.


On Mon, May 18, 2020 at 10:30 AM Jaswin Shah <jaswin.shah@outlook.c= om> wrote:
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
*/
package
com.paytm.reconsys.funct= ions.processfunctions;

import com.paytm.reconsys.Constants;
import
com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import co= m.paytm.reconsys.exceptions.MissingConfigurationsException;
impo= rt com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import c= om.paytm.reconsys.messages.cart.Payment;
import com.paytm= .reconsys.messages.pg.PGMessage;
<= /span>import org.apache.common= s.lang3.StringUtils;
import org.apache.flink.api.common.s= tate.MapState;
import org.apache.flink.streaming.api.fun= ctions.co.KeyedCoProcessFunction;
=
import org.apache.flink= .util.Collector;
import org.apache.kafka.clients.producer= .KafkaProducer;
import org.apache.kafka.clients.producer.= Producer;
import org.apache.kafka.clients.producer.Produc= erRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationExcept= ion;
import org.apache.kafka.common.errors.OutOfOrderSequ= enceException;
import org.apache.kafka.common.errors.Pro= ducerFencedException;
import
org.apache.kafka.common.seri= alization.StringSerializer;
import org.apache.flink.api.c= ommon.state.MapStateDescriptor;
import org.apache.flink.a= pi.common.typeinfo.TypeInformation;import org.apache.fli= nk.configuration.Configuration;
import java.util.Properti= es;

/**
= * CoProcessFuntion to process cart and pg messages connected using connect= operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin= .shah Exp $$
*/
public class = CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PGMessage, ResultMessage> {

= /**
* Map state for cart = messages, orderId+mid is key and cartMessage is value.
*/
private MapState<String, CartMessage> cartState =3D null;

/**
= * Map state for pg messages, orderId+mid is key and pgMessage is value= .
*/<= br> private MapState<String, PGMessage> pgState =3D null;

/**
* Intializations for cart and pg mapStates
= *
*
@param config
= */
@Override
= public void open(Configuration config) {
= MapStateDescriptor<String, CartMessage> cartStateDescriptor =3D new MapStateDescriptor<> (
"cartData",
= TypeInformation.of(String.<= span style=3D"color:rgb(204,120,50)">class
),
= TypeInformation.of(CartMess= age.class)
);
cartState =3D getRuntimeContext().getMapState(cartStateDescriptor);

=
MapStateDescrip= tor<String, PGMessage> p= gStateDescriptor =3D new MapSt= ateDescriptor<>(
"pgData",
TypeInformation.<= span style=3D"font-style:italic">of
(String.class),
TypeInformation.<= span style=3D"font-style:italic">of
(PGMessage.class)
);
pgState
=3D getRuntimeContext().ge= tMapState(pgStateDescriptor);
}

/**
* 1. Get orderId+mid from cartM= essage and check in PGMapState if an entry is present.
* 2. If present, match, c= heckDescripancy, process and delete entry from pgMapState.
* 3. If not present, = add orderId+mid as key and cart object as value in cartMapState.
= * @param cartMessage * @param context
= * @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMes= sage, Context context, Collector<ResultMessage> collec= tor) throws Exception {
= String searchKey =3D cartMessage.createJoinStringCondition();
if(pgState.c= ontains(searchKey)) {
generateResultMessage(cartMessage,pgState.get(searchKey)); pgState.remove(searchKey);
= } else {
= cartState.put(searchKey= ,cartMessage);
= }
}

/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if= an entry is present.
* 2. If present, match, checkDescripancy, process and dele= te entry from cartMapState.
* 3. If not present, add orderId+mid as key and cart= object as value in pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
= public void proce= ssElement2(PGMessage pgMessage= , Context context, Coll= ector<ResultMessage> collector) throws Exception {
String searchKey =3D pgMessage.create= JoinStringCondition();
if
(cartState.contains(searchKey)) {
gene= rateResultMessage(cartState.g= et(searchKey),pgMessage);
cartState<= /span>.remove(searchKey);
<= span style=3D"color:rgb(204,120,50)">
} else {
pgState.put(searchKey,pgMessage);
}
}


/**
* Create ResultMessage = from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private ResultMessage generateResultMess= age(CartMessage cartMessage, <= /span>PGMessage pgMessage) {
ResultMessage resultMessage =3D new
ResultMessage();
= Payment payment =3D nul= l;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, <= /span>pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
payment =3D pay= ;
break;
}
}
resultMessage.setOrde= rId(cartMessage.getId());
<= span style=3D"color:rgb(204,120,50)">
resultMessage.setMid(pa= yment.getMid());

resultMessage.setCartOrderStatus(payment.mapToOrderStatus(= ).getCode());
resultMessage.setPgOrderStatus(pg= Message.getOrderStatus());
=
resultMessage.setCartOrderCompletionTime(payment= .getUpdated_at());
resultMessage.setPgOrderComple= tionTime(pgMessage.getOrderCompletionTime());

resultMessage.setPgOrderAmoun= t(pgMessage.getOrderAmount().getValue());
resultM= essage.setCartOrderAmount(cartMessage.getGrandtotal());

resultMessage.setCa= rtPaymethod(payment.getPayment_method());
resultM= essage.setPgPaymethod(pgMessage.getPayMethod());

checkDescripancyAndTrigger= Alert(resultMessage);

return resultMessage;
}

= /**
* Evaluate if the= re is descripancy of any fields between the messages from two different sys= tems.
= * Write all the descripancy logic here.
*
* @param resultMessage
*/
pr= ivate void checkDescripancyAn= dTriggerAlert(ResultMessage resultMessage) {
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) = {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANC= Y
);
//Send message to kafka queue for order status discrepancy.
= sendMessageToKafk= aTopic(resultMessage.toString());
=
}

= if (!StringUtils.equals(resultMessage.getCartOrderAmount(),
resultMessage.getPgOrderAmount()= )) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPAN= CY
);
//Send message to kafka queue for pay method discrepancy.
<= span style=3D"color:rgb(128,128,128)">
sendMessageToKafka= Topic(resultMessage.toString());
<= /span> }

= if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymeth= od(), resultMessage.getPgPayme= thod())) {
resultMessage.setDescripancyType(DescripancyTypeE= num.PAY_AMOUNT_DES= CRIPANCY);
//Send message to kafka queue for pay amount discrepancy.
sendMessageT= oKafkaTopic(resultMessage.toString())= ;
}
}=

/**
<= /span> * Send a = message to kafka topic
*
* @param message
*/
= private void sendMessageToKafkaTopic(String mess= age) {
Properties kafkaProperties =3D ConfigurationsManager.getResultSystemKafkaProperties
();
//kafkaPropert= ies.put("transac= tional.id","trans123");
Producer<String, String> producer =3D new KafkaProducer<>(kafkaProperties, new StringSerializer(), new StringSerializer());
<= span style=3D"color:rgb(128,128,128)">//producer.initTransactions();
try {
//producer.beginTransaction();
producer.send(new ProducerRecord<>(ConfigurationsManager.getResultTopicName
(), message))= ;
//producer.commitTransaction();
= } catch (ProducerFencedException | OutOfOrderSequen= ceException | AuthorizationException e) {
// We can't recover from these exceptions, so our = only option is to close the producer and exit.
producer.close();
= } catch (KafkaExcepti= on e) {
producer.abortTransaction();
} catch (MissingConfigurati= onsException e) {
e.printStackTrace();
}
producer.close(); }
}
This is the snapshot of implementation I have done

From: Jaswin Shah <jaswin.shah@outlook.com> Sent: 18 May 2020 13:55
To: user@= flink.apache.org <user@flink.apache.org>
Subject: Rocksdb implementation
=C2=A0
Hi,
I have implemented the flink job with MapStates. The functionality is like,= =C2=A0
  1. I have two datastreams which I connect with connect operator and then c= all coprocessfunction with every pair of objects.
  2. For element of fi= rst datastream, processElement1 method is called and for an element of seco= nd datastream, processElement2 method is called.
  3. I have two MapStates in CoProcessFunction for both streams separat= ely.
  4. When processElement1 is called, it checks in MapState2 if corr= esponding element with given id is present, if present, I match, and delete= . If not present, I add the object in MapState1.
  5. When processElemen= t2 is called, it checks in MapState1 if corresponding element with given id= is present, if present, I match and delete. I fnot present I add object in= MapState2.
  6. Now, I want all the state data to be stored in Rocksdb.=
  7. After few days, I want to run a batch streaming job on Rocksdb to = check if there are any objects which have not match found to create a repor= t of those.
I need a help how can I store this state data in Rocksdb and how to do= setups, configurations and codes for those which I am not understanding. A= lso, is it possible to run batch streaming job on Rocksdb data?

Help will be highly appreciated.

Thanks,
Jaswin


--

= Arvid Heise | Senior Java Developer

=


= Follow us @VervericaData

= --

= Join Flink Forward - The Apache Flink Conference

= Stream Processing | Event Driven | Real Time

= --=

= Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

= --

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng= =C2=A0=C2=A0=C2=A0


--

Arvid Heise | Senior Java Develope= r


Follow us @VervericaData

--

Join Flink Forward= - The Apache Flink Conference

Stream Processing | Event Driven | Real= Time

--

Ververica GmbH | Invalidenstrasse = 115, 10115 Berlin, Germany

--

Ververica G= mbH
Registered at Amtsgericht Charlottenburg: HRB 1582= 44 B
Managing Directors: Timothy Alexander Steinert, Y= ip Park Tung Jason, Ji (Toni) Cheng=C2=A0=C2=A0=C2=A0
--000000000000c3829505a6049d5c--