flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Caizhi Weng <tsreape...@gmail.com>
Subject Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志
Date Fri, 19 Jul 2019 08:16:27 GMT
Hi Henry,

LOG.error(e.getLocalizedMessage());
running = true;

这里写错了吧,应该是 running = false;

Henry <henrybao91@163.com> 于2019年7月19日周五 下午4:04写道:

>
>
> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的
source
> 代码,但是里面没有写log里报的哪个错的提示。
> package com.JavaCustoms;
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import javax.jms.*;
>
> public class FlinkJMSStreamSource extends RichSourceFunction<String> {
> private static final long serialVersionUID = 1L;
> private static final Logger LOG =
> LoggerFactory.getLogger(FlinkJMSStreamSource.class);
> private transient volatile boolean running;
> private transient MessageConsumer consumer;
> private transient Connection connection;
>
> // topic name
> private static final String topicName = "flink_mypay";
> // tcp str
> private static final String tcpStr = "tcp://server.mn:61616";
> // 持久订阅的id标识
> private static final String clientId = "flink_hz";
> // Subscription name
> private static final String subscriptionName = "flink_topic_mypay";
>
> private void init() throws JMSException {
> // Create a ConnectionFactory
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(tcpStr);
>
> // Create a Connection
> connection = connectionFactory.createConnection();
> connection.setClientID(clientId);
> //    connection.start();
>
>       // Create a Session
> Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
> // Create a MessageConsumer from the Session to the Topic or Queue
> Topic topic = session.createTopic(topicName);
> consumer = session.createDurableSubscriber(topic, subscriptionName);
> connection.start();
>    }
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> running = true;
>       init();
>    }
>
> @Override
> public void run(SourceContext<String> ctx) {
> // this source never completes
>
> while (running) {
> try {
>             Message message = consumer.receive();
>             BytesMessage bytesMessage = (BytesMessage) message;
> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
>             bytesMessage.readBytes(bytes);
>
> String text = new String(bytes);
>             ctx.collect(text);
>
>          } catch (JMSException e) {
> LOG.error(e.getLocalizedMessage());
> running = true;
>          }
>       }
> try {
>          close();
>       } catch (Exception e) {
> LOG.error(e.getMessage(), e);
>       }
>    }
>
> @Override
> public void cancel() {
> running = false;
>    }
>
> @Override
> public void close() throws Exception {
> LOG.info("Closing");
> try {
> connection.close();
>       } catch (JMSException e) {
> throw new RuntimeException("Error while closing ActiveMQ connection ", e);
>       }
>    }
> }
>
>
>
>
>
>
>
>
>
>
> 在 2019-07-19 14:43:17,"Caizhi Weng" <tsreaper96@gmail.com> 写道:
> >Hi Henry
> >
> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF`
(log4j) 或者 `<root
> >level="OFF"> <appender-ref ref="file"/> </root>` (logback) 把 log
关掉,或者把
> log
> >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写
log 的时候死循环了...
> >
> >Henry <henrybao91@163.com> 于2019年7月19日周五 下午2:20写道:
> >
> >>
> >>
> >>
> >>
> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
> >>
> >>
> >>
> >>
> >>
> >> 在 2019-07-19 11:11:37,"Caizhi Weng" <tsreaper96@gmail.com> 写道:
> >> >Hi Henry,
> >> >
> >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source
本身实现的问题。你可能需要修改 source
> >> >的源码让它出错后关闭或者进行其它处理...
> >> >
> >> >Henry <henrybao91@163.com> 于2019年7月19日周五 上午9:31写道:
> >> >
> >> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
> >> >> 报错图片链接:
> >> >> https://img-blog.csdnimg.cn/20190719092540880.png
> >> >> https://img-blog.csdnimg.cn/20190719092848500.png
> >> >>
> >> >>
> >> >>
> >>
> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
> >> >>
> >>
> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
> >>
>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message