#!/usr/bin/python # # Being a modest reimplementation of mhn -show in Python. # We are invoked as ' -show', more or less; the messages # are always in the current folder. The usual case is a single message. # # BUGS: boy do we hardcode things. # import sys, os, commands, re import getopt import email, email.Message, email.Errors, email.Iterators, email.Parser class mhnError(Exception): pass progname = "mhnshow" # specifying the absolute path makes cks happy in obscure situations, # and since this all about cks he's happy with the potential lossage. nukehdrsProg = "/u/cks/bin/mh/nukehdrs" prefText = 1 demoronise = 1 doHtmlRender = 1 wrapLongText = 0 class MyMsg(email.Message.Message): def __init__(self): self.hdrs = None email.Message.Message.__init__(self) def sethdrs(self, hdrs): self.hdrs = hdrs # Given a list of message numbers, return full paths. # commands.getoutput helpfully kills the final newline for us, so we only # care about interior ones. def mhpath(msglst): return commands.getoutput("mhpath %s" % " ".join(msglst)).split('\n') # This function loads an email message. Since mhn itself pukes on invalid # mail messages, we are no worse off than we used to be if this explodes # in our face (and better off in the case of Python 2.4). def loadmsg(fn): hdrs = [] try: fp = open(fn, "r") while 1: l = fp.readline() if not l or l == '\n': break # special bonus ZMailer hack; skip envelope headers # entirely. sl = l.split() if not sl or \ sl[0] in ("from", "to", "external", "rcvdfrom", "with"): continue # special bonus smtpsink hack: skip all envelope # information very simply. if l == l.lstrip() and \ sl[0].islower() and sl[0][-1] != ':': continue hdrs.append(l) # The REAL headers are now, unaltered, in hdrs. Glue the # rest in. body = fp.read() hdrs = "".join(hdrs) prs = email.Parser.Parser(MyMsg) m = prs.parsestr("%s\n%s" % (hdrs, body)) m.sethdrs(hdrs) return m #return email.message_from_file(fp) except EnvironmentError, e: raise mhnError, "cannot read in %s: %s" % (fn, str(e)) except email.Errors.MessageParseError, e: raise mhnError, "cannot parse %s: %s" (fn, str(e)) # Get the decoded contents of a given part. # We transform the contents to utf-8 if possible. def getcharset(p): cset = p.get_charset() if cset is not None: return cset.input_codec return p.get_param("charset", None) def getpartcont(p): t = None msg = "" try: t = p.get_payload(None, True) cs = getcharset(p) if cs and cs != "utf-8": try: t = t.decode(cs, "replace").encode("utf-8") except (UnicodeDecodeError, LookupError), e: t = "++ decode error from '%s': %s\n%s" % (cs, e, t) except Exception, e: msg = "++ exception during getpartcont: %s\n" % e if t == None: t = p.as_string() if t == None: t = "" return msg + t def getpartencode(p): enc = p.get("Content-Transfer-Encoding", "") return enc.lower() # This goes into the output stream: def complain(msg): sys.stdout.write("!! %s: %s\n" % (progname, msg)) def note(msg): sys.stdout.write("++ %s: %s\n" % (progname, msg)) # Demoronize content if desired. charPairs = ((u'\x82', ','), (u'\x84', ',,'), (u'\x85', '...'), (u'\x88', '^'), (u'\x8b', '<'), (u'\x8c', 'Oe'), (u'\x91', "`"), (u'\x92', "'"), (u'\x93', '"'), (u'\x94', '"'), (u'\x95', '*'), (u'\x96', '-'), (u'\x97', '--'), (u'\x9b', '>'), (u'\x9c', 'oe'), # These are overridden by HTML specific translations later. # (or will be when we have that) (u'\x83', 'f'), (u'\x98', '~'), (u'\x99', '(tm)'), ) def demoroniser(buf): if not demoronise: return buf # at this point, input is supposed to be in utf-8. We must # decode to unicode, do the codepoint replacement, and then # re-encode to utf-8. doing otherwise smashes characters. buf = buf.decode("utf-8", "replace") for o, n in charPairs: buf = buf.replace(o, n) buf = buf.encode("utf-8") return buf # Handle displaying various sorts of content types. # We make gratuitously ugly assumptions, like 'the current terminal # can display all of these character sets', which are perhaps a bit # laughable. def showhtml(p): c = demoroniser(getpartcont(p)) if not doHtmlRender: sys.stdout.write(c) return try: sys.stdout.flush() fp = os.popen("lynx -dump -force_html -stdin", "w") fp.write(c) fp.close() except EnvironmentError, e: raise mhnError, "error dumping HTML contents: %s" % str(e) # Optionally, we wrap overly long lines in text blocks. We do this in the # same way 'fmt' does, which is to say that we never reflow blocks. LONGLINE = 100 LLRATIO = 0.3 def countbiglines(c): cl = c.split("\n") ll = 0; tl = 0 for l in cl: if len(l) >= LONGLINE: ll += len(l) tl += len(l) return (tl, ll, cl) # initial whitespace, non-whitespace, and trailing whitespace, all # elements optional. wsre = re.compile("^([ \t]*)([^ \t].*)\s*$") def fmtline(ln, wlen): # The 'display length' of a string of characters, counting tabs. def displen(str): pos = 0 for c in str: if c == '\t': pos = (pos // 8)*8 + 8 else: pos += 1 return pos # Find the span of initial whitespace, if any. mr = wsre.search(ln) # failure to match means that we have no non-whitespace, because # it's the only required element. In that case we return a blank # line. if not mr: return [''] ws = mr.group(1) nws = mr.group(2) # If the length of the initial whitespace is guaranteed to wrap # no matter what, we punt the line (minus trailing space) back # as is. dl = displen(ws) if dl >= wlen: return [ws+nws] # Otherwise, we split to words and then wrap the words. wds = nws.split() lines = []; curline = []; curlen = dl for wd in wds: if curline and (curlen + len(wd) + 1 > wlen): lines.append(ws + " ".join(curline)) curline = []; curlen = dl curline.append(wd) curlen += len(wd) + 1 if curline: lines.append(ws + " ".join(curline)) return lines def fmtwrap(larray, wlen): na = [] for l in larray: if len(l) < wlen: na.append(l) else: na.extend(fmtline(l, wlen)) return "\n".join(na) def showplain(p): c = demoroniser(getpartcont(p)) enc = getpartencode(p) # we always linewrap quoted-printable encoded messages because # they often have long run-on paragraphs with no hard returns. if wrapLongText or enc == "quoted-printable": (tot, lng, splt) = countbiglines(c) # We base on a) ratio or b) absolute characters in long # lines, because I have a limited tolerance for the latter. if tot and (lng > (2*1024) or (float(lng) / tot) > LLRATIO) \ and lng > 150: c = fmtwrap(splt, 75) elif enc == "quoted-printable": c = fmtwrap(splt, 75) # trim surplus trailing newlines: we want exactly one blank line # at the end. while c and c.endswith("\n\n"): c = c[:-1] sys.stdout.write(c) sys.stdout.write("\n") # this is nominally plaintext, except that it pisses us off in a clever # way from email.*, because email.* manages to bodge it up and we have # to correct their bodge. (Perhaps this correction should take place # elsewhere. Perhaps.) def showdelstat(p): c = getpartcont(p) l = c.split("\n", 2) if len(l) == 3 and l[1] == "" and \ l[0].lower() == "content-type: message/delivery-status": c = l[2] sys.stdout.write(c) sys.stdout.write("\n") # Handle multipart/alternative in the approved manner: we pick the best # (ie last) content that we can handle. def showalternative(p): m = p.get_payload() # If we prefer text/plain to text/html and this is a text/plain + # text/html message, we pick the text/plain version. if prefText and len(m) == 2 \ and m[0].get_content_type() == "text/plain" \ and m[1].get_content_type() == "text/html": noteshowpart(m[0]) return if isinstance(m, str): complain("broken multipart/alternative") # This is potentially dubious, but if we have content... if m and m != '\n': print m return m.reverse() for alt in m: if alt.get_content_type() in knownParts: noteshowpart(alt) return # Nothing we know how to handle. complain("cannot display multipart/alternative composed of: %s" % (", ".join([x.get_content_type() for x in m]))) def showmulti(p): # It can happen that this returns a string, not payload. # This seems to happen for broken MIME messages; the case I have # seen is a mail system that replaced the body of a multipart/mixed # message in a bounce with '(Body supressed)'. t = p.get_payload() if isinstance(t, str): sys.stdout.write(t) return for sp in p.get_payload(): noteshowpart(sp) # Our over-all decision maker about what to do about a particular part. def prepost(str): l = str.strip() if not l: return if l == 'This is a multi-part message in MIME format.' or \ l == 'This is a MIME-encapsulated message': return sys.stdout.write(str) def showpart(p): ct = p.get_content_type() cm = p.get_content_maintype() if p.preamble: prepost(p.preamble) if ct in knownParts: knownParts[ct](p) elif cm in knownParts: knownParts[cm](p) else: if ct[0] in ('a', 'e', 'i', 'o', 'u'): ana = "an" else: ana = "a" note("skipping %s %s part" % (ana, ct)) if p.epilogue: prepost(p.epilogue) # We don't show notes about parts that showpart will only produce a # complaint about. def noteshowpart(p): ct = p.get_content_type(); cm = p.get_content_maintype() if ct in knownParts or cm in knownParts or p.preamble or p.epilogue: sys.stdout.write("++ part %s:\n" % ct) showpart(p) # Show an RFC822 message. We must deal specially with the headers, and # then we dump the body. def showmsg(p): if hasattr(p, "hdrs") and p.hdrs: htxt = p.hdrs else: # Unnatural intimacy, but we do what we can. # (we would actually like to do our own header parsing # instead of email.Parser.Parser()'s, so we can save the # raw information, but that's too big a project for version # 0.1) htxt = "\n".join(["%s:\t%s" % (x[0], x[1]) for x in p._headers]) htxt += "\n" try: sys.stdout.flush() fp = os.popen(nukehdrsProg, "w") fp.write(htxt); fp.write("\n") fp.close() except EnvironmentError, e: raise mhnError, "error writing message headers: %s" % str(e) if p.is_multipart() and len(p.get_payload()) > 1: noteshowpart(p) else: # repair missing character set value from the content-type # parameter. WAT? the parser should do this for us, damnit. cs = p.get_param("charset") if not p.get_charset() and cs: p.set_charset(cs) showpart(p) # A message/rfc822 object is merely a thin container; the actual *message* # is inside it as its sole content. At least we hope it's the sole content! def showrfc822(p): pl = p.get_payload() if len(pl) != 1: complain("message/rfc822 with multiple contents!") complain("types: %s" % ", ".join([x.get_content_type() for x in pl])) for pe in pl: showmsg(pe) # and we're done. # application/pgp is merely the signature portion, so we don't need to # mention it. knownParts = { 'text/plain': showplain, 'text/html': showhtml, # this notice is actually plaintext. 'message/delivery-status': showdelstat, # we show these plain instead of running them through nukehdrs # because they're usually in bounces of spam and we want to see the # full headers to see, eg, Received:. 'text/rfc822-headers': showplain, # the parcels: 'multipart/alternative': showalternative, 'message/rfc822': showrfc822, # We think of these as generic things in which we spew out anything # we can recognize. 'message/digest': showmulti, 'multipart': showmulti, } # Actually do stuff. def warn(msg): sys.stderr.write("%s: %s\n" % (progname, msg)) def die(msg): warn(msg) sys.exit(1) # process handles one file (not message; we have turned message numbers # into files before now). def process(fn): try: showmsg(loadmsg(fn)) except EnvironmentError, e: die("showing %s: %s" % (fn, str(e))) except mhnError, e: die("showing %s: %s" % (fn, str(e))) # Handle the arguments. If our argument list ends in -show, it is a list # of MH message numbers, and we mhpath it; if it doesn't, it is a list of # filenames and we just use them directly. def usage(): sys.stderr.write("usage: %s [-N prog] [-HDsw] arg [... arg] [-show]\n") sys.exit(2) def parseargs(sargs): global nukehdrsProg; global prefText; global demoronise global doHtmlRender; global wrapLongText try: opts, args = getopt.getopt(sargs, "N:HDsw", []) except getopt.error, cause: warn(cause) usage() for o, a in opts: if o == '-N': nukehdrsProg = a elif o == '-H': prefText = 0 elif o == '-D': demoronise = 0 elif o == '-s': doHtmlRender = 0 elif o == '-w': wrapLongText = 1 else: die("Chris failed to handle option '%s'" % o) return args # We accept options from either $MHNSHOWOPTS or from the command line, # because when we're in the MH context we can't pass command line arguments # through to mhnshow any other way. Because -show shows up in a funky place # we remove it from the arglist before we run the thing past getopt(). def main(args): mhnmode = 0 if args[-1] == "-show": mhnmode = 1 args = args[:-1] # Take options from the environment first. envopts = os.getenv("MHNSHOWOPTS") if envopts: parseargs(envopts.split()) # Then take command-line arguments. files = parseargs(args) if not files: warn("No arguments supplied.") usage() # Now process. if mhnmode: files = mhpath(files) for fn in files: process(fn) if __name__ == "__main__": main(sys.argv[1:])