ponymail-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject incubator-ponymail git commit: import-mbox.py messages need the thread number
Date Sun, 20 Nov 2016 23:20:19 GMT
Repository: incubator-ponymail
Updated Branches:
  refs/heads/master 1a3bff403 -> af1544e7b


import-mbox.py messages need the thread number

This fixes #248

Project: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/commit/af1544e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/tree/af1544e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/diff/af1544e7

Branch: refs/heads/master
Commit: af1544e7b63d81a5998a4b3a1471586d63d72a4e
Parents: 1a3bff4
Author: Sebb <sebb@apache.org>
Authored: Sun Nov 20 23:19:55 2016 +0000
Committer: Sebb <sebb@apache.org>
Committed: Sun Nov 20 23:19:55 2016 +0000

----------------------------------------------------------------------
 tools/import-mbox.py | 59 +++++++++++++++++++++++++++--------------------
 1 file changed, 34 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ponymail/blob/af1544e7/tools/import-mbox.py
----------------------------------------------------------------------
diff --git a/tools/import-mbox.py b/tools/import-mbox.py
index 15f09ad..12bc0d1 100755
--- a/tools/import-mbox.py
+++ b/tools/import-mbox.py
@@ -107,7 +107,9 @@ es = Elasticsearch([
 rootURL = ""
 
 class BulkThread(Thread):
-    def assign(self, json, xes, dtype = 'mbox', wc = 'quorum'):
+
+    def assign(self, id, json, xes, dtype = 'mbox', wc = 'quorum'):
+        self.id = id
         self.json = json
         self.xes = xes
         self.dtype = dtype
@@ -133,17 +135,24 @@ class BulkThread(Thread):
         try:
             helpers.bulk(self.xes, js_arr)
         except Exception as err:
-            print("Warning: Could not bulk insert: %s" % err)
-        #print("Inserted %u entries" % len(js_arr))
+            print("%d: Warning: Could not bulk insert: %s into %s" % (self.id,err,self.dtype))
+#         print("%d: Inserted %u entries into %s" % (self.id, len(js_arr),self.dtype))
 
 
 class SlurpThread(Thread):
 
+    def __init__(self, index):
+        self.id = index
+        super(SlurpThread, self).__init__()
+
+    def printid(self,message):
+        print("%d: %s" % (self.id, message))
+
     def run(self):
         global block, y, es, lists, baddies, config, resendTo, timeout, dedupped, dedup
         ja = []
         jas = []
-        print("Thread started")
+        self.printid("Thread started")
         mla = None
         ml = ""
         mboxfile = ""
@@ -152,16 +161,16 @@ class SlurpThread(Thread):
         archie = archiver.Archiver(parseHTML = parseHTML)
     
         while len(lists) > 0:
-            print("%u elements left to slurp" % len(lists))
+            self.printid("%u elements left to slurp" % len(lists))
 
             block.acquire()
             try:
                 mla = lists.pop(0)
                 if not mla:
-                    print("Nothing more to do here")
+                    self.printid("Nothing more to do here")
                     return
             except Exception as err:
-                print("Could not pop list: %s" % err)
+                self.printid("Could not pop list: %s" % err)
                 return
             finally:
                 block.release()
@@ -184,7 +193,7 @@ class SlurpThread(Thread):
                 tmpname = mla[0]
                 filename = mla[0]
                 if filename.find(".gz") != -1:
-                    print("Decompressing %s..." % filename)
+                    self.printid("Decompressing %s..." % filename)
                     try:
                         with open(filename, "rb") as bf:
                             bmd = bf.read()
@@ -197,16 +206,16 @@ class SlurpThread(Thread):
                             tmpname = tmpfile.name
                             filename = tmpname
                             dFile = True # Slated for deletion upon having been read
-                            print("%s -> %u bytes" % (tmpname, len(bmd)))
+                            self.printid("%s -> %u bytes" % (tmpname, len(bmd)))
                     except Exception as err:
-                        print("This wasn't a gzip file: %s" % err )
-                print("Slurping %s" % filename)
+                        self.printid("This wasn't a gzip file: %s" % err )
+                self.printid("Slurping %s" % filename)
                 messages = mailbox.mbox(tmpname)
 
             else:
                 ml = mla[0]
                 mboxfile = mla[1]
-                print("Slurping %s/%s" % (ml, mboxfile))
+                self.printid("Slurping %s/%s" % (ml, mboxfile))
                 m = re.match(r"(\d\d\d\d)(\d\d)", mboxfile)
                 EY = 1997
                 EM = 1
@@ -232,7 +241,7 @@ class SlurpThread(Thread):
                 if fromFilter and 'from' in message and message['from'].find(fromFilter)
== -1:
                     continue
                 if resendTo:
-                    print("Delivering message %s via MTA" % message['message-id'] if 'message-id'
in message else '??')
+                    self.printid("Delivering message %s via MTA" % message['message-id']
if 'message-id' in message else '??')
                     s = SMTP('localhost')
                     try:
                         if list_override:
@@ -245,7 +254,7 @@ class SlurpThread(Thread):
                     s.send_message(message, from_addr=None, to_addrs=(resendTo))
                     continue
                 if (time.time() - stime > timeout): # break out after N seconds, it shouldn't
take this long..!
-                    print("Whoa, this is taking way too long, ignoring %s for now" % tmpname)
+                    self.printid("Whoa, this is taking way too long, ignoring %s for now"
% tmpname)
                     break
 
                 json, contents = archie.compute_updates(list_override, private, message)
@@ -271,7 +280,7 @@ class SlurpThread(Thread):
                         }
                     )
                     if res and len(res['hits']['hits']) > 0:
-                        print("Dedupping %s" % json['message-id'])
+                        self.printid("Dedupping %s" % json['message-id'])
                         dedupped += 1
                         continue
 
@@ -305,43 +314,43 @@ class SlurpThread(Thread):
                     if len(ja) >= 40:
                         if not args.dry:
                             bulk = BulkThread()
-                            bulk.assign(ja, es, 'mbox')
+                            bulk.assign(self.id, ja, es, 'mbox')
                             bulk.insert()
                         ja = []
                         
                         if not args.dry:
                             bulks = BulkThread()
-                            bulks.assign(jas, es, 'mbox_source')
+                            bulks.assign(self.id, jas, es, 'mbox_source')
                             bulks.insert()
                         jas = []
                 else:
-                    print("Failed to parse: Return=%s Message-Id=%s" % (message.get('Return-Path'),
message.get('Message-Id')))
+                    self.printid("Failed to parse: Return=%s Message-Id=%s" % (message.get('Return-Path'),
message.get('Message-Id')))
                     bad += 1
 
             if filebased:
-                print("Parsed %u records (failed: %u) from %s" % (count, bad, filename))
+                self.printid("Parsed %u records (failed: %u) from %s" % (count, bad, filename))
                 if dFile:
                     os.unlink(tmpname)
             elif imap:
-                print("Parsed %u records (failed: %u) from imap" % (count, bad))
+                self.printid("Parsed %u records (failed: %u) from imap" % (count, bad))
             else:
-                print("Parsed %s/%s: %u records (failed: %u) from %s" % (ml, mboxfile, count,
bad, tmpname))
+                self.printid("Parsed %s/%s: %u records (failed: %u) from %s" % (ml, mboxfile,
count, bad, tmpname))
                 os.unlink(tmpname)
                 
             y += count
             baddies += bad
             if not args.dry:
                 bulk = BulkThread()
-                bulk.assign(ja, es, 'mbox')
+                bulk.assign(self.id, ja, es, 'mbox')
                 bulk.insert()
             ja = []
             
             if not args.dry:
                 bulks = BulkThread()
-                bulks.assign(jas, es, 'mbox_source')
+                bulks.assign(self.id, jas, es, 'mbox_source')
                 bulks.insert()
             jas = []
-        print("Done, %u elements left to slurp" % len(lists))
+        self.printid("Done, %u elements left to slurp" % len(lists))
         
 parser = argparse.ArgumentParser(description='Command line options.')
 parser.add_argument('--source', dest='source', type=str, nargs=1,
@@ -637,7 +646,7 @@ threads = []
 cc = min(len(lists), int( multiprocessing.cpu_count() / 2) + 1)
 print("Starting up to %u threads to fetch the %u %s lists" % (cc, len(lists), project))
 for i in range(1,cc+1):
-    t = SlurpThread()
+    t = SlurpThread(i)
     threads.append(t)
     t.start()
     print("Started no. %u" % i)


Mime
View raw message