flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arvid Heise <ar...@ververica.com>
Subject Re: Rocksdb implementation
Date Tue, 19 May 2020 18:40:28 GMT
Hi Jaswin,

you cannot run a DataSet program inside a DataStream. However, you can
perform the same query on a windowed stream. So if you would execute the
batchy part every day, you can just create a tumble window of 24h and then
perform your batchy analysis on that time window.

Alternatively, you can dump the data into Kafka or a file system and then
run the batchy part as a separate program.

On Tue, May 19, 2020 at 1:36 PM Jaswin Shah <jaswin.shah@outlook.com> wrote:

> Thanks yun and Arvid.
> Just a question, is it possible to have a batch execution inside the same
> streaming job. You meant to say I should collect the missing messages from
> both streams in sideoutput on timer expiry. So, I will execute a batch job
> on side output as sideput will be shared with the same streaming job that I
> have. Basically, I need that missing message infos outside.
> ------------------------------
> *From:* Jaswin Shah <jaswin.shah@outlook.com>
> *Sent:* 19 May 2020 13:29
> *To:* Yun Tang <myasuka@live.com>; Arvid Heise <arvid@ververica.com>;
> isha.singhal@paytm.com <isha.singhal@paytm.com>; ankit.singhal@paytm.com <
> ankit.singhal@paytm.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Rocksdb implementation
>
> ++
> ------------------------------
> *From:* Yun Tang <myasuka@live.com>
> *Sent:* 18 May 2020 23:47
> *To:* Arvid Heise <arvid@ververica.com>; Jaswin Shah <
> jaswin.shah@outlook.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Rocksdb implementation
>
> Hi Jaswin
>
> As Arvid suggested, it's not encouraged to query the internal RocksDB
> directly. Apart from Arvid's solution, I think queryable state [1] might
> also help you. I think you just want to know the left entries in both of
> map state after several days and query the state should make the meet,
> please refer to the official doc and this example [2] to know more details.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
> [2]
> https://github.com/apache/flink/blob/def4cbb762f849ce5b5c9c6bd367fd62bc8f45de/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java#L108-L122
>
> Best
> Yun Tang
> ------------------------------
> *From:* Arvid Heise <arvid@ververica.com>
> *Sent:* Monday, May 18, 2020 23:40
> *To:* Jaswin Shah <jaswin.shah@outlook.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Rocksdb implementation
>
> Hi Jaswin,
>
> I'd discourage using rocksdb directly. It's more of an implementation
> detail of Flink. I'd also discourage to write to Kafka directly without
> using our Kafka Sink, as you will receive duplicates upon recovery.
>
> If you run the KeyedCoProcessFunction continuously anyways, I'd add a
> timer (2 days?) [1] for all unmatched records and on triggering of the
> timer, output the record through a side output [2], where you do your batch
> logic. Then you don't need a separate batch job to clean that up. If you
> actually want to output to Kafka for some other application, you just need
> to stream the side output to a KafkaProducer.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timers
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>
> On Mon, May 18, 2020 at 10:30 AM Jaswin Shah <jaswin.shah@outlook.com>
> wrote:
>
> /**
>  * Alipay.com Inc.
>  * Copyright (c) 2004-2020 All Rights Reserved.
>  */
> package com.paytm.reconsys.functions.processfunctions;
>
> import com.paytm.reconsys.Constants;
> import com.paytm.reconsys.configs.ConfigurationsManager;
> import com.paytm.reconsys.enums.DescripancyTypeEnum;
> import com.paytm.reconsys.exceptions.MissingConfigurationsException;
> import com.paytm.reconsys.messages.ResultMessage;
> import com.paytm.reconsys.messages.cart.CartMessage;
> import com.paytm.reconsys.messages.cart.Payment;
> import com.paytm.reconsys.messages.pg.PGMessage;
> import org.apache.commons.lang3.StringUtils;
> import org.apache.flink.api.common.state.MapState;
> import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
> import org.apache.flink.util.Collector;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.KafkaException;
> import org.apache.kafka.common.errors.AuthorizationException;
> import org.apache.kafka.common.errors.OutOfOrderSequenceException;
> import org.apache.kafka.common.errors.ProducerFencedException;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.configuration.Configuration;
> import java.util.Properties;
>
> /**
>  * CoProcessFuntion to process cart and pg messages connected using connect operator.
>  * @author jaswin.shah
>  * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah
Exp $$
>  */
> public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage,
PGMessage, ResultMessage> {
>
>     /**
>      * Map state for cart messages, orderId+mid is key and cartMessage is value.
>      */
>     private MapState<String, CartMessage> cartState = null;
>
>     /**
>      * Map state for pg messages, orderId+mid is key and pgMessage is value.
>      */
>     private MapState<String, PGMessage> pgState = null;
>
>     /**
>      * Intializations for cart and pg mapStates
>      *
>      * @param config
>      */
>     @Override
>     public void open(Configuration config) {
>         MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<>
(
>             "cartData",
>             TypeInformation.of(String.class),
>             TypeInformation.of(CartMessage.class)
>         );
>         cartState = getRuntimeContext().getMapState(cartStateDescriptor);
>
>         MapStateDescriptor<String, PGMessage> pgStateDescriptor = new MapStateDescriptor<>(
>             "pgData",
>             TypeInformation.of(String.class),
>             TypeInformation.of(PGMessage.class)
>         );
>         pgState = getRuntimeContext().getMapState(pgStateDescriptor);
>     }
>
>     /**
>      * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
>      * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
>      * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
>      * @param cartMessage
>      * @param context
>      * @param collector
>      * @throws Exception
>      */
>     @Override
>     public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage>
collector) throws Exception {
>         String searchKey = cartMessage.createJoinStringCondition();
>        if(pgState.contains(searchKey)) {
>            generateResultMessage(cartMessage,pgState.get(searchKey));
>            pgState.remove(searchKey);
>        } else {
>            cartState.put(searchKey,cartMessage);
>        }
>     }
>
>     /**
>      * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
>      * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
>      * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
>      * @param pgMessage
>      * @param context
>      * @param collector
>      * @throws Exception
>      */
>     @Override
>     public void processElement2(PGMessage pgMessage, Context context, Collector<ResultMessage>
collector) throws Exception {
>         String searchKey = pgMessage.createJoinStringCondition();
>         if(cartState.contains(searchKey)) {
>             generateResultMessage(cartState.get(searchKey),pgMessage);
>             cartState.remove(searchKey);
>         } else {
>             pgState.put(searchKey,pgMessage);
>         }
>     }
>
>
>     /**
>      * Create ResultMessage from cart and pg messages.
>      *
>      * @param cartMessage
>      * @param pgMessage
>      * @return
>      */
>     private ResultMessage generateResultMessage(CartMessage cartMessage, PGMessage pgMessage)
{
>         ResultMessage resultMessage = new ResultMessage();
>         Payment payment = null;
>
>         //Logic should be in cart: check
>         for (Payment pay : cartMessage.getPayments()) {
>             if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG())
&& StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
>                 payment = pay;
>                 break;
>             }
>         }
>         resultMessage.setOrderId(cartMessage.getId());
>         resultMessage.setMid(payment.getMid());
>
>         resultMessage.setCartOrderStatus(payment.mapToOrderStatus().getCode());
>         resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());
>
>         resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
>         resultMessage.setPgOrderCompletionTime(pgMessage.getOrderCompletionTime());
>
>         resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
>         resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());
>
>         resultMessage.setCartPaymethod(payment.getPayment_method());
>         resultMessage.setPgPaymethod(pgMessage.getPayMethod());
>
>         checkDescripancyAndTriggerAlert(resultMessage);
>
>         return resultMessage;
>     }
>
>     /**
>      * Evaluate if there is descripancy of any fields between the messages from two different
systems.
>      * Write all the descripancy logic here.
>      *
>      * @param resultMessage
>      */
>     private void checkDescripancyAndTriggerAlert(ResultMessage resultMessage) {
>         if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus()))
{
>             resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
>             //Send message to kafka queue for order status discrepancy.
>             sendMessageToKafkaTopic(resultMessage.toString());
>         }
>
>         if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount()))
{
>             resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
>             //Send message to kafka queue for pay method discrepancy.
>             sendMessageToKafkaTopic(resultMessage.toString());
>         }
>
>         if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod()))
{
>             resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
>             //Send message to kafka queue for pay amount discrepancy.
>             sendMessageToKafkaTopic(resultMessage.toString());
>         }
>     }
>
>     /**
>      * Send a message to kafka topic
>      *
>      * @param message
>      */
>     private void sendMessageToKafkaTopic(String message) {
>         Properties kafkaProperties = ConfigurationsManager.getResultSystemKafkaProperties();
>         //kafkaProperties.put("transactional.id","trans123");
>         Producer<String, String> producer = new KafkaProducer<>(kafkaProperties,
new StringSerializer(), new StringSerializer());
>         //producer.initTransactions();
>         try {
>             //producer.beginTransaction();
>             producer.send(new ProducerRecord<>(ConfigurationsManager.getResultTopicName(),
message));
>             //producer.commitTransaction();
>         } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException
e) {
>             // We can't recover from these exceptions, so our only option is to close
the producer and exit.
>             producer.close();
>         } catch (KafkaException e) {
>             producer.abortTransaction();
>         } catch (MissingConfigurationsException e) {
>             e.printStackTrace();
>         }
>         producer.close();
>     }
> }
>
> This is the snapshot of implementation I have done
> ------------------------------
> *From:* Jaswin Shah <jaswin.shah@outlook.com>
> *Sent:* 18 May 2020 13:55
> *To:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Rocksdb implementation
>
> Hi,
> I have implemented the flink job with MapStates. The functionality is
> like,
>
>    1. I have two datastreams which I connect with connect operator and
>    then call coprocessfunction with every pair of objects.
>    2. For element of first datastream, processElement1 method is called
>    and for an element of second datastream, processElement2 method is called.
>    3. I have two MapStates in CoProcessFunction for both streams
>    separately.
>    4. When processElement1 is called, it checks in MapState2 if
>    corresponding element with given id is present, if present, I match, and
>    delete. If not present, I add the object in MapState1.
>    5. When processElement2 is called, it checks in MapState1 if
>    corresponding element with given id is present, if present, I match and
>    delete. I fnot present I add object in MapState2.
>    6. Now, I want all the state data to be stored in Rocksdb.
>    7. After few days, I want to run a batch streaming job on Rocksdb to
>    check if there are any objects which have not match found to create a
>    report of those.
>
> I need a help how can I store this state data in Rocksdb and how to do
> setups, configurations and codes for those which I am not understanding.
> Also, is it possible to run batch streaming job on Rocksdb data?
>
> Help will be highly appreciated.
>
> Thanks,
> Jaswin
>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Mime
View raw message