Return-Path: X-Original-To: apmail-sqoop-dev-archive@www.apache.org Delivered-To: apmail-sqoop-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 866C1104B2 for ; Tue, 3 Mar 2015 18:58:03 +0000 (UTC) Received: (qmail 88004 invoked by uid 500); 3 Mar 2015 18:57:05 -0000 Delivered-To: apmail-sqoop-dev-archive@sqoop.apache.org Received: (qmail 87956 invoked by uid 500); 3 Mar 2015 18:57:05 -0000 Mailing-List: contact dev-help@sqoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@sqoop.apache.org Delivered-To: mailing list dev@sqoop.apache.org Received: (qmail 87940 invoked by uid 99); 3 Mar 2015 18:57:04 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 18:57:04 +0000 Date: Tue, 3 Mar 2015 18:57:04 +0000 (UTC) From: "Jarek Jarcec Cecho (JIRA)" To: dev@sqoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (SQOOP-1803) JobManager and Execution Engine changes: Support for a injecting and pulling out configs and job output in connectors MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/SQOOP-1803?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1434= 5504#comment-14345504 ]=20 Jarek Jarcec Cecho commented on SQOOP-1803: ------------------------------------------- I was thinking about this one a bit myself. I have couple of thoughts of ge= tting data back from the execution engine and I'm wondering what others thi= nks. Please don't hesitate and chime in if I missed any approach. 1) DistributedCache In addition to [~gwenshap] comments about supportability (and/or debuggabil= ity) of {{DistributedCache}}, to my best knowledge it can be only used to d= istribute data from the launcher (Sqoop Server) to children mapreduce tasks= . I do not believe that it can be used to the other way around to get files= or data from individual tasks back to the launcher. Looking at the [latest= javadocs|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/file= cache/DistributedCache.html] it seems still valid as the documentation cont= ains note about immutability of the cache when the job is submitted: {quote} DistributedCache tracks modification timestamps of the cache files. Clearly= the cache files should not be modified by the application or externally wh= ile the job is executing. {quote} *Summary:* I believe that this solution is disqualified for the retrieving = data back from execution engine. 2) Counters [Counters|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapr= ed/Counters.html] are nice technique how to get insight into what is the ma= preduce job doing. Multiple mappers can be updating the counters in paralle= l and it's mapreduce responsibility to ensure that counters from all childr= en tasks are sum-upped correctly. The limitation of this solution is that c= ounters can be only {{long}} based (e.g. no {{String}}, {{Date}}, ...). Als= o the counters are cumulative in nature, so it might be a bit difficult to = retrieve discrete values - we would need to ensure that only certain mapper= s/reducers will update given counters whereas others won't or we would need= to figure out a way how to decode one single value when multiple mapper/re= ducers will update one counter at the same time. *Summary:* Whereas it's not impossible to use the counters to retrieve data= from execution engine, it seems that this solution will impose limitations= and will be "difficult" to implement and maintain. 3) HDFS Files Looking into how others are solving this problem, [Oozie|http://oozie.apach= e.org] launcher tasks (=3Done map mapreduce jobs) are generating files on H= DFS in predefined directory from where the Oozie server will pick them up t= o read any arbitrary values. This is neat solution as it allows us to retri= eve any value of any type from any part of the workflow (all processes can = create their own files if needed). The downside is that we would need to ag= ree on certain location where Server and the mapreduce job will be exchangi= ng files - this directory must exists and must be accessible by both Sqoop = (running under system user) and the mapreduce job itself (most likely runni= ng as end user). I believe that HDFS ACLs can be easily used to accomplish = this task. We would need to be careful here with edge conditions - we would need to ma= ke sure that we're cleaning up old and unused files (job failures, ...) and= that we are not leaking any sensitive information to the HDFS. *Summary:* Possible solution that will support all our use cases, but will = be a bit harder to implement. 4) Server side exchange only I was also looking into how things work currently in the server and I've re= alized something that made me thing about this proposal. Back when we were = defining the workflow, the intention was that only {{Initializer}} is allow= ed to generate state whereas all other parts of the workflow {{Partitioner}= }, {{Extractor}}, {{Loader}} and {{Destroyer}} should not generate any stat= e and only reuse the one that was pre-prepared in initializer. The reason f= or that is that {{Initializer}} is run only once whereas all other parts of= the workflow are run in parallel and/or not running on Sqoop server itself= , hence by allowing state to be generated only in {{Initializer}} we don't = have to deal with synchronizing the parallel pieces or deal with limitation= s in various execution engines. The intention is persisted in the API when = {{Initializer}} is given {{MutableContext}} where connector developer can s= et any properties that will be shared with rest of the workflow (~ the stat= e) and when all other parts are given only {{ImmutableContext}} that doesn'= t allow any changes to the shared properties. I have to say that we have sm= all exception in the code base, because {{Partitioner}} class is generating= {{Partition}} objects that can carry some context as well. However as the = {{Partition}} objects are not available in {{Destroyer}}, connector develop= er still needs to persist state that is required through entire workflow in= side the {{Initializer}}. Having said that, another option seems to be to simply not retrieve anythin= g from the execution engine and let connector update the configuration obje= cts based on info that the connector generated in {{Initializer}} - assumin= g that the job finished correctly. Looking at current connectors this shoul= d work well, as we need to update and persist state that is 'locked' at the= {{Initializer}} stage. For database-base connectors the "max value" should= be determined in initializer (it's currently not though) and the same for = Kafka and other connectors. The beauty of this approach is that it's simple= to implement and can actually be easily extended in the future to include = data coming from execution engine shell there be a need for it (for the app= roach 3) for example). > JobManager and Execution Engine changes: Support for a injecting and pull= ing out configs and job output in connectors=20 > -------------------------------------------------------------------------= --------------------------------------------- > > Key: SQOOP-1803 > URL: https://issues.apache.org/jira/browse/SQOOP-1803 > Project: Sqoop > Issue Type: Sub-task > Reporter: Veena Basavaraj > Assignee: Veena Basavaraj > Fix For: 1.99.6 > > > The details are in the design wiki, as the implementation happens more di= scussions can happen here. > https://cwiki.apache.org/confluence/display/SQOOP/Delta+Fetch+And+Merge+D= esign#DeltaFetchAndMergeDesign-Howtogetoutputfromconnectortosqoop? > The goal is to dynamically inject a IncrementalConfig instance into the F= romJobConfiguration. The current MFromConfig and MToConfig can already hold= a list of configs, and a strong sentiment was expressed to keep it as a li= st, why not for the first time actually make use of it and group the increm= ental related configs in one config object > This task will prepare the FromJobConfiguration from the job config data,= ExtractorContext with the relevant values from the prev job run=20 > This task will prepare the ToJobConfiguration from the job config data, L= oaderContext with the relevant values from the prev job run if any > We will use DistributedCache to get State information from the Extractor = and Loader out and finally persist it into the sqoop repository depending o= n SQOOP-1804 once the outputcommitter commit is called -- This message was sent by Atlassian JIRA (v6.3.4#6332)