flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Henry <henryba...@163.com>
Subject Re:Re: Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志
Date Mon, 22 Jul 2019 00:59:47 GMT



看到啦,谢谢啦。





在 2019-07-21 19:16:36,"Caizhi Weng" <tsreaper96@gmail.com> 写道:
>Hi Henry,
>
>你可能看错了,仔细看你的 run 函数,里面有个 try catch 里有 running
= true...
>
>Henry <henrybao91@163.com> 于2019年7月20日周六 下午9:32写道:
>
>>
>>
>> 啊!我想起来了,之前忘了因为啥原因了,为了方便调试把  running
= false; 改成了 running = true;
>> 感谢感谢!  但是原因是为啥呢?这个 running = true; 是写在 cancel
中的,任务在执行没有取消它,
>> 怎么会跳转这里呢?
>>
>>
>>
>>
>>
>> 在 2019-07-20 03:23:28,"Caizhi Weng" <tsreaper96@gmail.com> 写道:
>> >Hi Henry,
>> >
>> >LOG.error(e.getLocalizedMessage());
>> >running = true;
>> >
>> >这里写错了吧,应该是 running = false;
>> >
>> >Henry <henrybao91@163.com> 于2019年7月19日周五 下午4:04写道:
>> >
>> >>
>> >>
>> >> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的
source
>> >> 代码,但是里面没有写log里报的哪个错的提示。
>> >> package com.JavaCustoms;
>> >> import org.apache.activemq.ActiveMQConnectionFactory;
>> >> import org.apache.flink.configuration.Configuration;
>> >> import
>> org.apache.flink.streaming.api.functions.source.RichSourceFunction;
>> >> import org.slf4j.Logger;
>> >> import org.slf4j.LoggerFactory;
>> >>
>> >> import javax.jms.*;
>> >>
>> >> public class FlinkJMSStreamSource extends RichSourceFunction<String>
{
>> >> private static final long serialVersionUID = 1L;
>> >> private static final Logger LOG =
>> >> LoggerFactory.getLogger(FlinkJMSStreamSource.class);
>> >> private transient volatile boolean running;
>> >> private transient MessageConsumer consumer;
>> >> private transient Connection connection;
>> >>
>> >> // topic name
>> >> private static final String topicName = "flink_mypay";
>> >> // tcp str
>> >> private static final String tcpStr = "tcp://server.mn:61616";
>> >> // 持久订阅的id标识
>> >> private static final String clientId = "flink_hz";
>> >> // Subscription name
>> >> private static final String subscriptionName = "flink_topic_mypay";
>> >>
>> >> private void init() throws JMSException {
>> >> // Create a ConnectionFactory
>> >> ActiveMQConnectionFactory connectionFactory = new
>> >> ActiveMQConnectionFactory(tcpStr);
>> >>
>> >> // Create a Connection
>> >> connection = connectionFactory.createConnection();
>> >> connection.setClientID(clientId);
>> >> //    connection.start();
>> >>
>> >>       // Create a Session
>> >> Session session = connection.createSession(false,
>> >> Session.AUTO_ACKNOWLEDGE);
>> >>
>> >> // Create a MessageConsumer from the Session to the Topic or Queue
>> >> Topic topic = session.createTopic(topicName);
>> >> consumer = session.createDurableSubscriber(topic, subscriptionName);
>> >> connection.start();
>> >>    }
>> >>
>> >> @Override
>> >> public void open(Configuration parameters) throws Exception {
>> >> super.open(parameters);
>> >> running = true;
>> >>       init();
>> >>    }
>> >>
>> >> @Override
>> >> public void run(SourceContext<String> ctx) {
>> >> // this source never completes
>> >>
>> >> while (running) {
>> >> try {
>> >>             Message message = consumer.receive();
>> >>             BytesMessage bytesMessage = (BytesMessage) message;
>> >> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
>> >>             bytesMessage.readBytes(bytes);
>> >>
>> >> String text = new String(bytes);
>> >>             ctx.collect(text);
>> >>
>> >>          } catch (JMSException e) {
>> >> LOG.error(e.getLocalizedMessage());
>> >> running = true;
>> >>          }
>> >>       }
>> >> try {
>> >>          close();
>> >>       } catch (Exception e) {
>> >> LOG.error(e.getMessage(), e);
>> >>       }
>> >>    }
>> >>
>> >> @Override
>> >> public void cancel() {
>> >> running = false;
>> >>    }
>> >>
>> >> @Override
>> >> public void close() throws Exception {
>> >> LOG.info("Closing");
>> >> try {
>> >> connection.close();
>> >>       } catch (JMSException e) {
>> >> throw new RuntimeException("Error while closing ActiveMQ connection ",
>> e);
>> >>       }
>> >>    }
>> >> }
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2019-07-19 14:43:17,"Caizhi Weng" <tsreaper96@gmail.com> 写道:
>> >> >Hi Henry
>> >> >
>> >> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF`
(log4j) 或者 `<root
>> >> >level="OFF"> <appender-ref ref="file"/> </root>` (logback)
把 log 关掉,或者把
>> >> log
>> >> >等级设成更高的 FATAL... 但我感觉问题还是自定义的
source 里写 log 的时候死循环了...
>> >> >
>> >> >Henry <henrybao91@163.com> 于2019年7月19日周五 下午2:20写道:
>> >> >
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >>
>> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2019-07-19 11:11:37,"Caizhi Weng" <tsreaper96@gmail.com>
写道:
>> >> >> >Hi Henry,
>> >> >> >
>> >> >> >这个 source 看起来不像是 Flink 提供的 source,应该是
source 本身实现的问题。你可能需要修改 source
>> >> >> >的源码让它出错后关闭或者进行其它处理...
>> >> >> >
>> >> >> >Henry <henrybao91@163.com> 于2019年7月19日周五
上午9:31写道:
>> >> >> >
>> >> >> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
>> >> >> >> 报错图片链接:
>> >> >> >> https://img-blog.csdnimg.cn/20190719092540880.png
>> >> >> >> https://img-blog.csdnimg.cn/20190719092848500.png
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
>> >> >> >>
>> >> >>
>> >>
>> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
>> >> >>
>> >>
>>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message