flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Caizhi Weng <tsreape...@gmail.com>
Subject Re: Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志
Date Sun, 21 Jul 2019 11:16:36 GMT
Hi Henry,

你可能看错了,仔细看你的 run 函数,里面有个 try catch 里有 running =
true...

Henry <henrybao91@163.com> 于2019年7月20日周六 下午9:32写道:

>
>
> 啊!我想起来了,之前忘了因为啥原因了,为了方便调试把  running
= false; 改成了 running = true;
> 感谢感谢!  但是原因是为啥呢?这个 running = true; 是写在 cancel 中的,任务在执行没有取消它,
> 怎么会跳转这里呢?
>
>
>
>
>
> 在 2019-07-20 03:23:28,"Caizhi Weng" <tsreaper96@gmail.com> 写道:
> >Hi Henry,
> >
> >LOG.error(e.getLocalizedMessage());
> >running = true;
> >
> >这里写错了吧,应该是 running = false;
> >
> >Henry <henrybao91@163.com> 于2019年7月19日周五 下午4:04写道:
> >
> >>
> >>
> >> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的
source
> >> 代码,但是里面没有写log里报的哪个错的提示。
> >> package com.JavaCustoms;
> >> import org.apache.activemq.ActiveMQConnectionFactory;
> >> import org.apache.flink.configuration.Configuration;
> >> import
> org.apache.flink.streaming.api.functions.source.RichSourceFunction;
> >> import org.slf4j.Logger;
> >> import org.slf4j.LoggerFactory;
> >>
> >> import javax.jms.*;
> >>
> >> public class FlinkJMSStreamSource extends RichSourceFunction<String> {
> >> private static final long serialVersionUID = 1L;
> >> private static final Logger LOG =
> >> LoggerFactory.getLogger(FlinkJMSStreamSource.class);
> >> private transient volatile boolean running;
> >> private transient MessageConsumer consumer;
> >> private transient Connection connection;
> >>
> >> // topic name
> >> private static final String topicName = "flink_mypay";
> >> // tcp str
> >> private static final String tcpStr = "tcp://server.mn:61616";
> >> // 持久订阅的id标识
> >> private static final String clientId = "flink_hz";
> >> // Subscription name
> >> private static final String subscriptionName = "flink_topic_mypay";
> >>
> >> private void init() throws JMSException {
> >> // Create a ConnectionFactory
> >> ActiveMQConnectionFactory connectionFactory = new
> >> ActiveMQConnectionFactory(tcpStr);
> >>
> >> // Create a Connection
> >> connection = connectionFactory.createConnection();
> >> connection.setClientID(clientId);
> >> //    connection.start();
> >>
> >>       // Create a Session
> >> Session session = connection.createSession(false,
> >> Session.AUTO_ACKNOWLEDGE);
> >>
> >> // Create a MessageConsumer from the Session to the Topic or Queue
> >> Topic topic = session.createTopic(topicName);
> >> consumer = session.createDurableSubscriber(topic, subscriptionName);
> >> connection.start();
> >>    }
> >>
> >> @Override
> >> public void open(Configuration parameters) throws Exception {
> >> super.open(parameters);
> >> running = true;
> >>       init();
> >>    }
> >>
> >> @Override
> >> public void run(SourceContext<String> ctx) {
> >> // this source never completes
> >>
> >> while (running) {
> >> try {
> >>             Message message = consumer.receive();
> >>             BytesMessage bytesMessage = (BytesMessage) message;
> >> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
> >>             bytesMessage.readBytes(bytes);
> >>
> >> String text = new String(bytes);
> >>             ctx.collect(text);
> >>
> >>          } catch (JMSException e) {
> >> LOG.error(e.getLocalizedMessage());
> >> running = true;
> >>          }
> >>       }
> >> try {
> >>          close();
> >>       } catch (Exception e) {
> >> LOG.error(e.getMessage(), e);
> >>       }
> >>    }
> >>
> >> @Override
> >> public void cancel() {
> >> running = false;
> >>    }
> >>
> >> @Override
> >> public void close() throws Exception {
> >> LOG.info("Closing");
> >> try {
> >> connection.close();
> >>       } catch (JMSException e) {
> >> throw new RuntimeException("Error while closing ActiveMQ connection ",
> e);
> >>       }
> >>    }
> >> }
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2019-07-19 14:43:17,"Caizhi Weng" <tsreaper96@gmail.com> 写道:
> >> >Hi Henry
> >> >
> >> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF`
(log4j) 或者 `<root
> >> >level="OFF"> <appender-ref ref="file"/> </root>` (logback)
把 log 关掉,或者把
> >> log
> >> >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source
里写 log 的时候死循环了...
> >> >
> >> >Henry <henrybao91@163.com> 于2019年7月19日周五 下午2:20写道:
> >> >
> >> >>
> >> >>
> >> >>
> >> >>
> >>
> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2019-07-19 11:11:37,"Caizhi Weng" <tsreaper96@gmail.com>
写道:
> >> >> >Hi Henry,
> >> >> >
> >> >> >这个 source 看起来不像是 Flink 提供的 source,应该是
source 本身实现的问题。你可能需要修改 source
> >> >> >的源码让它出错后关闭或者进行其它处理...
> >> >> >
> >> >> >Henry <henrybao91@163.com> 于2019年7月19日周五 上午9:31写道:
> >> >> >
> >> >> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
> >> >> >> 报错图片链接:
> >> >> >> https://img-blog.csdnimg.cn/20190719092540880.png
> >> >> >> https://img-blog.csdnimg.cn/20190719092848500.png
> >> >> >>
> >> >> >>
> >> >> >>
> >> >>
> >>
> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
> >> >> >>
> >> >>
> >>
> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
> >> >>
> >>
>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message