airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boris Tyukin <bo...@boristyukin.com>
Subject Re: parsing task instance log files
Date Fri, 10 Feb 2017 18:03:33 GMT
thanks for responding, Laura. I am using XCOMs but my problem that HIVE
query would not return the value I need (map reduce counter) so I thought
to parse hive output and extract mapreduce jobid and then use mapred cli to
get that counter for that job_id.

another option I found is to gather statistics on my refreshed table and
then use describe extended to pull number of rows - this will take more
time because of the first step than just by grabbing mapreduce counter from
hadoop.

does it make any sense?


On Fri, Feb 10, 2017 at 10:43 AM, Laura Lorenz <llorenz@industrydive.com>
wrote:

> I don't use the HiveCliHook so I'm not sure how it works, but is the only
> place you can retrieve these counts the logfiles? If you have them at time
> of query in your python callable, you could push them anywhere you like
> inline at the conclusion of the task. Or, you may prefer to have your
> PythonOperator `return` some data structure with those counts, which will
> be stored by default in the airflow metadata database per the XCom system
> <https://airflow.incubator.apache.org/concepts.html#xcoms>. Then,
> depending
> what you want to do with that, you could query those out of the metadata
> database with the ad-hoc querying or charting UIs right within Airflow, or
> a later task altogether.
>
> On Fri, Feb 10, 2017 at 8:58 AM, Boris Tyukin <boris@boristyukin.com>
> wrote:
>
> > please...?
> >
> > On Thu, Feb 9, 2017 at 8:35 AM, Boris Tyukin <boris@boristyukin.com>
> > wrote:
> >
> > > Hello,
> > >
> > > I am using HiveCliHook called from PythonOperator to run a series of
> > > queries and want to capture record counts for auditing and validation
> > > purposes.
> > >
> > > *I am thinking to use on_success_callback to invoke python function
> that
> > > will read the log file, produced by airflow and then parse it out using
> > > regex. *
> > >
> > > *I am going to use this method from models to get to the file log:*
> > >
> > > *def log_filepath(self): iso = self.execution_date.isoformat() log =
> > > os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER'))
> return
> > (
> > > "{log}/{self.dag_id}/{self.task_id}/{iso}.log".format(**locals()))*
> > > Is this a good strategy or there is an easier way? I wondering if
> someone
> > > did something similar.
> > >
> > > Another challenge is that the same log file contains multiple attempts
> > and
> > > reruns of the same task so I guess I need to parse the file backwards.
> > >
> > > thanks,
> > > Boris
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message