Return-Path: X-Original-To: apmail-flume-user-archive@www.apache.org Delivered-To: apmail-flume-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E3A0210CA7 for ; Mon, 17 Jun 2013 13:47:43 +0000 (UTC) Received: (qmail 10400 invoked by uid 500); 17 Jun 2013 13:47:42 -0000 Delivered-To: apmail-flume-user-archive@flume.apache.org Received: (qmail 9946 invoked by uid 500); 17 Jun 2013 13:47:36 -0000 Mailing-List: contact user-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flume.apache.org Delivered-To: mailing list user@flume.apache.org Received: (qmail 9933 invoked by uid 99); 17 Jun 2013 13:47:34 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Jun 2013 13:47:34 +0000 X-ASF-Spam-Status: No, hits=-0.7 required=5.0 tests=RCVD_IN_DNSWL_LOW X-Spam-Check-By: apache.org Received-SPF: error (athena.apache.org: local policy) Received: from [209.85.214.50] (HELO mail-bk0-f50.google.com) (209.85.214.50) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Jun 2013 13:47:29 +0000 Received: by mail-bk0-f50.google.com with SMTP id ik8so1223333bkc.9 for ; Mon, 17 Jun 2013 06:46:48 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20120113; h=message-id:date:from:user-agent:mime-version:to:subject :content-type:x-gm-message-state; bh=4t531I/i3qPxQjI1zZp9AEj/AKT8tC/FvktBZsbkUgA=; b=pO8QSbJYfoqjmyZyRSjJpfOvXDCfIHfzAlGlCa1UlBfZelhFE5GDN0VQxumgpzhw2Y /MyaWCyIogkZGCV8p4x6r/fem64FnmFAnrqSnQlQc5TR9voBrFly/zf64xm+S6SkGQj3 0zxaadjT6grrjmS5Y3WCahbPZINTnhDsLtjxYporEqTcy+jbOaifSSwccnE9t8uQ+Qbj rcmxWN29i7R8tw1BEE6iwY6mFZAZje+UzuBjHlciZ5Ff65xFkK5dZbIAEqWf6L66bkFP VA27M5LzrroE88R2BCa4NulYqY599Ou8mfRN3mqlvOYJIUM529ekjWwLcng6MMu8P0xc buWQ== X-Received: by 10.204.71.77 with SMTP id g13mr1927679bkj.50.1371476808106; Mon, 17 Jun 2013 06:46:48 -0700 (PDT) Received: from Joshs-MacBook-Air.local ([82.16.22.110]) by mx.google.com with ESMTPSA id da16sm4181528bkb.2.2013.06.17.06.46.46 for (version=TLSv1 cipher=ECDHE-RSA-RC4-SHA bits=128/128); Mon, 17 Jun 2013 06:46:47 -0700 (PDT) Message-ID: <51BF1347.1040200@mydrivesolutions.com> Date: Mon, 17 Jun 2013 14:46:47 +0100 From: Josh Myers User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.8; rv:17.0) Gecko/20130509 Thunderbird/17.0.6 MIME-Version: 1.0 To: user@flume.apache.org Subject: Flume events rolling file too regularly Content-Type: text/plain; charset=US-ASCII; format=flowed X-Gm-Message-State: ALoCoQnJ+P/P03iZVWVY2m/l7L7AcMZmnMqjOeI4yd5e34S8MeydsiA5P9FQk/IR6FAP/kpR86dqZBVqmvjXjUsz5pfpEIpPFCN5Y49sOvnfrNBHeEJWgI0= X-Virus-Checked: Checked by ClamAV on apache.org Hi guys, We are sending JSON events from our pipeline into a flume http source. We have written a custom multiplexer and sink serializer. The events are being routed into the correct channels and consumed OK by the sinks. The custom serializer takes a JSON event and outputs a csv. Files are being written to s3 ( using s3n as hdfs ) but rather than appending the written csv file, each event seems to be generating it own csv. The output is what I would expect using rollCount 1, however we do occasionally get several events ( maybe 4 ) written per csv. Please see below for config. Ideally we want to use rollInterval of 24 hours, to generate a new .csv file every 24 hours, but have events pretty quickly flushed to the csv file after being sent. So one csv' per day that is consistently appended with whatever events we throw in. We found however that with a rollInterval of 24 hours the events weren't being flushed often enough... Any help would be hugely appreciated! Thanks. Josh ## Sources ################################################### agent.sources = http agent.sources.http.type = http agent.sources.http.bind = 0.0.0.0 agent.sources.http.port = 4444 agent.sources.http.channels = cappucino_s3_aggregate_profile_channel default_s3_channel cappucino_s3_trip_summary_channel ## Interceptors ################################################# agent.sources.http.interceptors = itime ihost agent.sources.http.interceptors.itime.type = timestamp agent.sources.http.interceptors.ihost.type = host agent.sources.http.interceptors.ihost.useIP = false agent.sources.http.interceptors.ihost.preserveExisting= false agent.sources.http.interceptors.ihost.hostHeader = hostname ## Multiplex Channels Mapping ###################################### agent.sources.http.selector.type = com.mydrivesolutions.flume.serialization.PipelineEventsChannelSelector agent.sources.http.selector.default = default_s3_channel ## Channels ######################################################## agent.channels = cappucino_s3_aggregate_profile_channel cappucino_s3_trip_summary_channel default_s3_channel agent.channels.cappucino_s3_aggregate_profile_channel.type = file agent.channels.cappucino_s3_aggregate_profile_channel.capacity = 10000000 agent.channels.cappucino_s3_aggregate_profile_channel.checkpointDir = /mnt/flume/cappucino_s3_aggregate_profile/checkpoint agent.channels.cappucino_s3_aggregate_profile_channel.dataDirs = /mnt/flume/cappucino_s3_aggregate_profile/data agent.channels.cappucino_s3_trip_summary_channel.type = file agent.channels.cappucino_s3_trip_summary_channel.capacity = 10000000 agent.channels.cappucino_s3_trip_summary_channel.checkpointDir = /mnt/flume/cappucino_s3_trip_summary/checkpoint agent.channels.cappucino_s3_trip_summary_channel.dataDirs = /mnt/flume/cappucino_s3_trip_summary/data ## Sinks ########################################################### agent.sinks = cappucino_s3_aggregate_profile_sink1 cappucino_s3_aggregate_profile_sink2 cappucino_s3_trip_summary_sink1 cappucino_s3_trip_summary_sink2 ## Serialize json events from the pipeline and write csv to HDFS (We are using s3 native FS as HDFS) ############################################################################### ## Capuccino_s3_aggregate_profile Sinks ################################################# agent.sinks.cappucino_s3_aggregate_profile_sink1.type = hdfs agent.sinks.cappucino_s3_aggregate_profile_sink1.channel = cappucino_s3_aggregate_profile_channel agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.path = s3n://mydrive-cappucino-reports/driver-profiles/%Y-%m-%d agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.fileType = DataStream agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.writeFormat = Text agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer = com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.columns = log_type reporting_bucket subscription_id agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.format = DriverProfile agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.delimiter = , agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.appendNewline = false agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.distanceMeasure = MILES agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.maxOpenFiles = 5000 agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.rollInterval = 20400 agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.callTimeout = 60000 agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.fileSuffix = .csv agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.inUseSuffix = .tmp agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.filePrefix = DriverProfile.%y-%m-%d.%H.%M agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.timeZone = UTC agent.sinks.cappucino_s3_aggregate_profile_sink2.type = hdfs agent.sinks.cappucino_s3_aggregate_profile_sink2.channel = cappucino_s3_aggregate_profile_channel agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.path = s3n://mydrive-cappucino-reports/driver-profiles/%Y-%m-%d agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.fileType = DataStream agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.writeFormat = Text agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer = com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.columns = log_type reporting_bucket subscription_id agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.format = DriverProfile agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.delimiter = , agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.appendNewline = false agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.distanceMeasure = MILES agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.maxOpenFiles = 5000 agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.rollInterval = 20400 agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.callTimeout = 60000 agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.fileSuffix = .csv agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.inUseSuffix = .tmp agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.filePrefix = DriverProfile.%y-%m-%d.%H.%M agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.timeZone = UTC ## Confused_s3_trip_summary Sinks ################################################# agent.sinks.cappucino_s3_trip_summary_sink1.type = hdfs agent.sinks.cappucino_s3_trip_summary_sink1.channel = cappucino_s3_trip_summary_channel agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.path = s3n://mydrive-cappucino-reports/trip-summaries/%Y-%m-%d agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.fileType = DataStream agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.writeFormat = Text agent.sinks.cappucino_s3_trip_summary_sink1.serializer = com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder agent.sinks.cappucino_s3_trip_summary_sink1.serializer.columns = log_type reporting_bucket subscription_id agent.sinks.cappucino_s3_trip_summary_sink1.serializer.format = BodyCSV agent.sinks.cappucino_s3_trip_summary_sink1.serializer.delimiter = , agent.sinks.cappucino_s3_trip_summary_sink1.serializer.appendNewline = false agent.sinks.cappucino_s3_trip_summary_sink1.serializer.distanceMeasure = MILES agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.maxOpenFiles = 5000 agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.rollInterval = 20400 agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.callTimeout = 60000 agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.fileSuffix = .csv agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.inUseSuffix = .tmp agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.filePrefix = TripSummary.%y-%m-%d.%H.%M agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.timeZone = UTC agent.sinks.cappucino_s3_trip_summary_sink2.type = hdfs agent.sinks.cappucino_s3_trip_summary_sink2.channel = cappucino_s3_trip_summary_channel agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.path = s3n://mydrive-cappucino-reports/trip-summaries/%Y-%m-%d agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.fileType = DataStream agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.writeFormat = Text agent.sinks.cappucino_s3_trip_summary_sink2.serializer = com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder agent.sinks.cappucino_s3_trip_summary_sink2.serializer.columns = log_type reporting_bucket subscription_id agent.sinks.cappucino_s3_trip_summary_sink2.serializer.format = BodyCSV agent.sinks.cappucino_s3_trip_summary_sink2.serializer.delimiter = , agent.sinks.cappucino_s3_trip_summary_sink2.serializer.appendNewline = false agent.sinks.cappucino_s3_trip_summary_sink2.serializer.distanceMeasure = MILES agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.maxOpenFiles = 5000 agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.rollInterval = 20400 agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.callTimeout = 60000 agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.fileSuffix = .csv agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.inUseSuffix = .tmp agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.filePrefix = TripSummary.%y-%m-%d.%H.%M agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.timeZone = UTC ## SinkGroups ########################################################### agent.sinkgroups = cappucino_s3_aggregate_profile_sinkgroup cappucino_s3_trip_summary_sinkgroup ## Confused_s3_aggregate_profile Failover SinkGroup ########################################## agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.sinks = cappucino_s3_aggregate_profile_sink1 cappucino_s3_aggregate_profile_sink2 agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.type = failover agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.priority.cappucino_s3_aggregate_profile_sink1 = 10 agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.priority.cappucino_s3_aggregate_profile_sink2 = 5 agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.maxpenalty = 30000 ## Confused_s3_trip_summary Failover SinkGroup ########################################## agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.sinks = cappucino_s3_trip_summary_sink1 cappucino_s3_trip_summary_sink2 agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.type = failover agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.priority.cappucino_s3_trip_summary_sink1 = 10 agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.priority.cappucino_s3_trip_summary_sink2 = 5 agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.maxpenalty = 30000 -- www.mydrivesolutions.com This email and any attachments is private and confidential. If you have received this message in error please remove it from your systems and notify the author. MyDrive Solutions Limited is registered in England and Wales, No 07330334. Registered office: Surrey Technology Centre, 40 Occam Road, Guildford GU2 7YG, UK