ponymail-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Francesco Chicchiriccò"<ilgro...@apache.org>
Subject Re: incubator-ponymail git commit: import-mbox.py messages need the thread number
Date Mon, 21 Nov 2016 11:40:17 GMT
Hi all,
not sure but it seems that the commit below broke my scheduled import from mbox:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "import-mbox.py", line 297, in run
    'source': message.as_string()
  File "/usr/lib/python3.5/email/message.py", line 159, in as_string
    g.flatten(self, unixfrom=unixfrom)
  File "/usr/lib/python3.5/email/generator.py", line 115, in flatten
    self._write(msg)
  File "/usr/lib/python3.5/email/generator.py", line 181, in _write
    self._dispatch(msg)
  File "/usr/lib/python3.5/email/generator.py", line 214, in _dispatch
    meth(msg)
  File "/usr/lib/python3.5/email/generator.py", line 243, in _handle_text
    msg.set_payload(payload, charset)
  File "/usr/lib/python3.5/email/message.py", line 316, in set_payload
    payload = payload.encode(charset.output_charset)
UnicodeEncodeError: 'ascii' codec can't encode character '\ufffd' in position 3657: ordinal
not in range(128)

Any hint / workaround?

On 2016-11-21 00:20 (+0100), sebb@apache.org wrote: 
> Repository: incubator-ponymail
> Updated Branches:
>   refs/heads/master 1a3bff403 -> af1544e7b
> 
> 
> import-mbox.py messages need the thread number
> 
> This fixes #248
> 
> Project: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/repo
> Commit: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/commit/af1544e7
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/tree/af1544e7
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/diff/af1544e7
> 
> Branch: refs/heads/master
> Commit: af1544e7b63d81a5998a4b3a1471586d63d72a4e
> Parents: 1a3bff4
> Author: Sebb <sebb@apache.org>
> Authored: Sun Nov 20 23:19:55 2016 +0000
> Committer: Sebb <sebb@apache.org>
> Committed: Sun Nov 20 23:19:55 2016 +0000
> 
> ----------------------------------------------------------------------
>  tools/import-mbox.py | 59 +++++++++++++++++++++++++++--------------------
>  1 file changed, 34 insertions(+), 25 deletions(-)
> ----------------------------------------------------------------------
> 
> 
> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/blob/af1544e7/tools/import-mbox.py
> ----------------------------------------------------------------------
> diff --git a/tools/import-mbox.py b/tools/import-mbox.py
> index 15f09ad..12bc0d1 100755
> --- a/tools/import-mbox.py
> +++ b/tools/import-mbox.py
> @@ -107,7 +107,9 @@ es = Elasticsearch([
>  rootURL = ""
>  
>  class BulkThread(Thread):
> -    def assign(self, json, xes, dtype = 'mbox', wc = 'quorum'):
> +
> +    def assign(self, id, json, xes, dtype = 'mbox', wc = 'quorum'):
> +        self.id = id
>          self.json = json
>          self.xes = xes
>          self.dtype = dtype
> @@ -133,17 +135,24 @@ class BulkThread(Thread):
>          try:
>              helpers.bulk(self.xes, js_arr)
>          except Exception as err:
> -            print("Warning: Could not bulk insert: %s" % err)
> -        #print("Inserted %u entries" % len(js_arr))
> +            print("%d: Warning: Could not bulk insert: %s into %s" % (self.id,err,self.dtype))
> +#         print("%d: Inserted %u entries into %s" % (self.id, len(js_arr),self.dtype))
>  
>  
>  class SlurpThread(Thread):
>  
> +    def __init__(self, index):
> +        self.id = index
> +        super(SlurpThread, self).__init__()
> +
> +    def printid(self,message):
> +        print("%d: %s" % (self.id, message))
> +
>      def run(self):
>          global block, y, es, lists, baddies, config, resendTo, timeout, dedupped, dedup
>          ja = []
>          jas = []
> -        print("Thread started")
> +        self.printid("Thread started")
>          mla = None
>          ml = ""
>          mboxfile = ""
> @@ -152,16 +161,16 @@ class SlurpThread(Thread):
>          archie = archiver.Archiver(parseHTML = parseHTML)
>      
>          while len(lists) > 0:
> -            print("%u elements left to slurp" % len(lists))
> +            self.printid("%u elements left to slurp" % len(lists))
>  
>              block.acquire()
>              try:
>                  mla = lists.pop(0)
>                  if not mla:
> -                    print("Nothing more to do here")
> +                    self.printid("Nothing more to do here")
>                      return
>              except Exception as err:
> -                print("Could not pop list: %s" % err)
> +                self.printid("Could not pop list: %s" % err)
>                  return
>              finally:
>                  block.release()
> @@ -184,7 +193,7 @@ class SlurpThread(Thread):
>                  tmpname = mla[0]
>                  filename = mla[0]
>                  if filename.find(".gz") != -1:
> -                    print("Decompressing %s..." % filename)
> +                    self.printid("Decompressing %s..." % filename)
>                      try:
>                          with open(filename, "rb") as bf:
>                              bmd = bf.read()
> @@ -197,16 +206,16 @@ class SlurpThread(Thread):
>                              tmpname = tmpfile.name
>                              filename = tmpname
>                              dFile = True # Slated for deletion upon having been read
> -                            print("%s -> %u bytes" % (tmpname, len(bmd)))
> +                            self.printid("%s -> %u bytes" % (tmpname, len(bmd)))
>                      except Exception as err:
> -                        print("This wasn't a gzip file: %s" % err )
> -                print("Slurping %s" % filename)
> +                        self.printid("This wasn't a gzip file: %s" % err )
> +                self.printid("Slurping %s" % filename)
>                  messages = mailbox.mbox(tmpname)
>  
>              else:
>                  ml = mla[0]
>                  mboxfile = mla[1]
> -                print("Slurping %s/%s" % (ml, mboxfile))
> +                self.printid("Slurping %s/%s" % (ml, mboxfile))
>                  m = re.match(r"(\d\d\d\d)(\d\d)", mboxfile)
>                  EY = 1997
>                  EM = 1
> @@ -232,7 +241,7 @@ class SlurpThread(Thread):
>                  if fromFilter and 'from' in message and message['from'].find(fromFilter)
== -1:
>                      continue
>                  if resendTo:
> -                    print("Delivering message %s via MTA" % message['message-id'] if
'message-id' in message else '??')
> +                    self.printid("Delivering message %s via MTA" % message['message-id']
if 'message-id' in message else '??')
>                      s = SMTP('localhost')
>                      try:
>                          if list_override:
> @@ -245,7 +254,7 @@ class SlurpThread(Thread):
>                      s.send_message(message, from_addr=None, to_addrs=(resendTo))
>                      continue
>                  if (time.time() - stime > timeout): # break out after N seconds,
it shouldn't take this long..!
> -                    print("Whoa, this is taking way too long, ignoring %s for now" %
tmpname)
> +                    self.printid("Whoa, this is taking way too long, ignoring %s for
now" % tmpname)
>                      break
>  
>                  json, contents = archie.compute_updates(list_override, private, message)
> @@ -271,7 +280,7 @@ class SlurpThread(Thread):
>                          }
>                      )
>                      if res and len(res['hits']['hits']) > 0:
> -                        print("Dedupping %s" % json['message-id'])
> +                        self.printid("Dedupping %s" % json['message-id'])
>                          dedupped += 1
>                          continue
>  
> @@ -305,43 +314,43 @@ class SlurpThread(Thread):
>                      if len(ja) >= 40:
>                          if not args.dry:
>                              bulk = BulkThread()
> -                            bulk.assign(ja, es, 'mbox')
> +                            bulk.assign(self.id, ja, es, 'mbox')
>                              bulk.insert()
>                          ja = []
>                          
>                          if not args.dry:
>                              bulks = BulkThread()
> -                            bulks.assign(jas, es, 'mbox_source')
> +                            bulks.assign(self.id, jas, es, 'mbox_source')
>                              bulks.insert()
>                          jas = []
>                  else:
> -                    print("Failed to parse: Return=%s Message-Id=%s" % (message.get('Return-Path'),
message.get('Message-Id')))
> +                    self.printid("Failed to parse: Return=%s Message-Id=%s" % (message.get('Return-Path'),
message.get('Message-Id')))
>                      bad += 1
>  
>              if filebased:
> -                print("Parsed %u records (failed: %u) from %s" % (count, bad, filename))
> +                self.printid("Parsed %u records (failed: %u) from %s" % (count, bad,
filename))
>                  if dFile:
>                      os.unlink(tmpname)
>              elif imap:
> -                print("Parsed %u records (failed: %u) from imap" % (count, bad))
> +                self.printid("Parsed %u records (failed: %u) from imap" % (count, bad))
>              else:
> -                print("Parsed %s/%s: %u records (failed: %u) from %s" % (ml, mboxfile,
count, bad, tmpname))
> +                self.printid("Parsed %s/%s: %u records (failed: %u) from %s" % (ml,
mboxfile, count, bad, tmpname))
>                  os.unlink(tmpname)
>                  
>              y += count
>              baddies += bad
>              if not args.dry:
>                  bulk = BulkThread()
> -                bulk.assign(ja, es, 'mbox')
> +                bulk.assign(self.id, ja, es, 'mbox')
>                  bulk.insert()
>              ja = []
>              
>              if not args.dry:
>                  bulks = BulkThread()
> -                bulks.assign(jas, es, 'mbox_source')
> +                bulks.assign(self.id, jas, es, 'mbox_source')
>                  bulks.insert()
>              jas = []
> -        print("Done, %u elements left to slurp" % len(lists))
> +        self.printid("Done, %u elements left to slurp" % len(lists))
>          
>  parser = argparse.ArgumentParser(description='Command line options.')
>  parser.add_argument('--source', dest='source', type=str, nargs=1,
> @@ -637,7 +646,7 @@ threads = []
>  cc = min(len(lists), int( multiprocessing.cpu_count() / 2) + 1)
>  print("Starting up to %u threads to fetch the %u %s lists" % (cc, len(lists), project))
>  for i in range(1,cc+1):
> -    t = SlurpThread()
> +    t = SlurpThread(i)
>      threads.append(t)
>      t.start()
>      print("Started no. %u" % i)
> 
> 

Mime
View raw message