更新windows内置office目录名, 适配jodconverter

This commit is contained in:
陈精华
2022-12-19 14:45:45 +08:00
parent 7d3a4ebc4e
commit d761d0cc88
12504 changed files with 3 additions and 3 deletions

View File

@@ -0,0 +1,62 @@
# Copyright (C) 2001-2007 Python Software Foundation
# Author: Barry Warsaw
# Contact: email-sig@python.org
"""A package for parsing, handling, and generating email messages."""
__all__ = [
'base64mime',
'charset',
'encoders',
'errors',
'feedparser',
'generator',
'header',
'iterators',
'message',
'message_from_file',
'message_from_binary_file',
'message_from_string',
'message_from_bytes',
'mime',
'parser',
'quoprimime',
'utils',
]
# Some convenience routines. Don't import Parser and Message as side-effects
# of importing email since those cascadingly import most of the rest of the
# email package.
def message_from_string(s, *args, **kws):
"""Parse a string into a Message object model.
Optional _class and strict are passed to the Parser constructor.
"""
from email.parser import Parser
return Parser(*args, **kws).parsestr(s)
def message_from_bytes(s, *args, **kws):
"""Parse a bytes string into a Message object model.
Optional _class and strict are passed to the Parser constructor.
"""
from email.parser import BytesParser
return BytesParser(*args, **kws).parsebytes(s)
def message_from_file(fp, *args, **kws):
"""Read a file and parse its contents into a Message object model.
Optional _class and strict are passed to the Parser constructor.
"""
from email.parser import Parser
return Parser(*args, **kws).parse(fp)
def message_from_binary_file(fp, *args, **kws):
"""Read a binary file and parse its contents into a Message object model.
Optional _class and strict are passed to the Parser constructor.
"""
from email.parser import BytesParser
return BytesParser(*args, **kws).parse(fp)

View File

@@ -0,0 +1,233 @@
""" Routines for manipulating RFC2047 encoded words.
This is currently a package-private API, but will be considered for promotion
to a public API if there is demand.
"""
# An ecoded word looks like this:
#
# =?charset[*lang]?cte?encoded_string?=
#
# for more information about charset see the charset module. Here it is one
# of the preferred MIME charset names (hopefully; you never know when parsing).
# cte (Content Transfer Encoding) is either 'q' or 'b' (ignoring case). In
# theory other letters could be used for other encodings, but in practice this
# (almost?) never happens. There could be a public API for adding entries
# to the CTE tables, but YAGNI for now. 'q' is Quoted Printable, 'b' is
# Base64. The meaning of encoded_string should be obvious. 'lang' is optional
# as indicated by the brackets (they are not part of the syntax) but is almost
# never encountered in practice.
#
# The general interface for a CTE decoder is that it takes the encoded_string
# as its argument, and returns a tuple (cte_decoded_string, defects). The
# cte_decoded_string is the original binary that was encoded using the
# specified cte. 'defects' is a list of MessageDefect instances indicating any
# problems encountered during conversion. 'charset' and 'lang' are the
# corresponding strings extracted from the EW, case preserved.
#
# The general interface for a CTE encoder is that it takes a binary sequence
# as input and returns the cte_encoded_string, which is an ascii-only string.
#
# Each decoder must also supply a length function that takes the binary
# sequence as its argument and returns the length of the resulting encoded
# string.
#
# The main API functions for the module are decode, which calls the decoder
# referenced by the cte specifier, and encode, which adds the appropriate
# RFC 2047 "chrome" to the encoded string, and can optionally automatically
# select the shortest possible encoding. See their docstrings below for
# details.
import re
import base64
import binascii
import functools
from string import ascii_letters, digits
from email import errors
__all__ = ['decode_q',
'encode_q',
'decode_b',
'encode_b',
'len_q',
'len_b',
'decode',
'encode',
]
#
# Quoted Printable
#
# regex based decoder.
_q_byte_subber = functools.partial(re.compile(br'=([a-fA-F0-9]{2})').sub,
lambda m: bytes.fromhex(m.group(1).decode()))
def decode_q(encoded):
encoded = encoded.replace(b'_', b' ')
return _q_byte_subber(encoded), []
# dict mapping bytes to their encoded form
class _QByteMap(dict):
safe = b'-!*+/' + ascii_letters.encode('ascii') + digits.encode('ascii')
def __missing__(self, key):
if key in self.safe:
self[key] = chr(key)
else:
self[key] = "={:02X}".format(key)
return self[key]
_q_byte_map = _QByteMap()
# In headers spaces are mapped to '_'.
_q_byte_map[ord(' ')] = '_'
def encode_q(bstring):
return ''.join(_q_byte_map[x] for x in bstring)
def len_q(bstring):
return sum(len(_q_byte_map[x]) for x in bstring)
#
# Base64
#
def decode_b(encoded):
# First try encoding with validate=True, fixing the padding if needed.
# This will succeed only if encoded includes no invalid characters.
pad_err = len(encoded) % 4
missing_padding = b'==='[:4-pad_err] if pad_err else b''
try:
return (
base64.b64decode(encoded + missing_padding, validate=True),
[errors.InvalidBase64PaddingDefect()] if pad_err else [],
)
except binascii.Error:
# Since we had correct padding, this is likely an invalid char error.
#
# The non-alphabet characters are ignored as far as padding
# goes, but we don't know how many there are. So try without adding
# padding to see if it works.
try:
return (
base64.b64decode(encoded, validate=False),
[errors.InvalidBase64CharactersDefect()],
)
except binascii.Error:
# Add as much padding as could possibly be necessary (extra padding
# is ignored).
try:
return (
base64.b64decode(encoded + b'==', validate=False),
[errors.InvalidBase64CharactersDefect(),
errors.InvalidBase64PaddingDefect()],
)
except binascii.Error:
# This only happens when the encoded string's length is 1 more
# than a multiple of 4, which is invalid.
#
# bpo-27397: Just return the encoded string since there's no
# way to decode.
return encoded, [errors.InvalidBase64LengthDefect()]
def encode_b(bstring):
return base64.b64encode(bstring).decode('ascii')
def len_b(bstring):
groups_of_3, leftover = divmod(len(bstring), 3)
# 4 bytes out for each 3 bytes (or nonzero fraction thereof) in.
return groups_of_3 * 4 + (4 if leftover else 0)
_cte_decoders = {
'q': decode_q,
'b': decode_b,
}
def decode(ew):
"""Decode encoded word and return (string, charset, lang, defects) tuple.
An RFC 2047/2243 encoded word has the form:
=?charset*lang?cte?encoded_string?=
where '*lang' may be omitted but the other parts may not be.
This function expects exactly such a string (that is, it does not check the
syntax and may raise errors if the string is not well formed), and returns
the encoded_string decoded first from its Content Transfer Encoding and
then from the resulting bytes into unicode using the specified charset. If
the cte-decoded string does not successfully decode using the specified
character set, a defect is added to the defects list and the unknown octets
are replaced by the unicode 'unknown' character \\uFDFF.
The specified charset and language are returned. The default for language,
which is rarely if ever encountered, is the empty string.
"""
_, charset, cte, cte_string, _ = ew.split('?')
charset, _, lang = charset.partition('*')
cte = cte.lower()
# Recover the original bytes and do CTE decoding.
bstring = cte_string.encode('ascii', 'surrogateescape')
bstring, defects = _cte_decoders[cte](bstring)
# Turn the CTE decoded bytes into unicode.
try:
string = bstring.decode(charset)
except UnicodeError:
defects.append(errors.UndecodableBytesDefect("Encoded word "
"contains bytes not decodable using {} charset".format(charset)))
string = bstring.decode(charset, 'surrogateescape')
except LookupError:
string = bstring.decode('ascii', 'surrogateescape')
if charset.lower() != 'unknown-8bit':
defects.append(errors.CharsetError("Unknown charset {} "
"in encoded word; decoded as unknown bytes".format(charset)))
return string, charset, lang, defects
_cte_encoders = {
'q': encode_q,
'b': encode_b,
}
_cte_encode_length = {
'q': len_q,
'b': len_b,
}
def encode(string, charset='utf-8', encoding=None, lang=''):
"""Encode string using the CTE encoding that produces the shorter result.
Produces an RFC 2047/2243 encoded word of the form:
=?charset*lang?cte?encoded_string?=
where '*lang' is omitted unless the 'lang' parameter is given a value.
Optional argument charset (defaults to utf-8) specifies the charset to use
to encode the string to binary before CTE encoding it. Optional argument
'encoding' is the cte specifier for the encoding that should be used ('q'
or 'b'); if it is None (the default) the encoding which produces the
shortest encoded sequence is used, except that 'q' is preferred if it is up
to five characters longer. Optional argument 'lang' (default '') gives the
RFC 2243 language string to specify in the encoded word.
"""
if charset == 'unknown-8bit':
bstring = string.encode('ascii', 'surrogateescape')
else:
bstring = string.encode(charset)
if encoding is None:
qlen = _cte_encode_length['q'](bstring)
blen = _cte_encode_length['b'](bstring)
# Bias toward q. 5 is arbitrary.
encoding = 'q' if qlen - blen < 5 else 'b'
encoded = _cte_encoders[encoding](bstring)
if lang:
lang = '*' + lang
return "=?{}{}?{}?{}?=".format(charset, lang, encoding, encoded)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,374 @@
"""Policy framework for the email package.
Allows fine grained feature control of how the package parses and emits data.
"""
import abc
from email import header
from email import charset as _charset
from email.utils import _has_surrogates
__all__ = [
'Policy',
'Compat32',
'compat32',
]
class _PolicyBase:
"""Policy Object basic framework.
This class is useless unless subclassed. A subclass should define
class attributes with defaults for any values that are to be
managed by the Policy object. The constructor will then allow
non-default values to be set for these attributes at instance
creation time. The instance will be callable, taking these same
attributes keyword arguments, and returning a new instance
identical to the called instance except for those values changed
by the keyword arguments. Instances may be added, yielding new
instances with any non-default values from the right hand
operand overriding those in the left hand operand. That is,
A + B == A(<non-default values of B>)
The repr of an instance can be used to reconstruct the object
if and only if the repr of the values can be used to reconstruct
those values.
"""
def __init__(self, **kw):
"""Create new Policy, possibly overriding some defaults.
See class docstring for a list of overridable attributes.
"""
for name, value in kw.items():
if hasattr(self, name):
super(_PolicyBase,self).__setattr__(name, value)
else:
raise TypeError(
"{!r} is an invalid keyword argument for {}".format(
name, self.__class__.__name__))
def __repr__(self):
args = [ "{}={!r}".format(name, value)
for name, value in self.__dict__.items() ]
return "{}({})".format(self.__class__.__name__, ', '.join(args))
def clone(self, **kw):
"""Return a new instance with specified attributes changed.
The new instance has the same attribute values as the current object,
except for the changes passed in as keyword arguments.
"""
newpolicy = self.__class__.__new__(self.__class__)
for attr, value in self.__dict__.items():
object.__setattr__(newpolicy, attr, value)
for attr, value in kw.items():
if not hasattr(self, attr):
raise TypeError(
"{!r} is an invalid keyword argument for {}".format(
attr, self.__class__.__name__))
object.__setattr__(newpolicy, attr, value)
return newpolicy
def __setattr__(self, name, value):
if hasattr(self, name):
msg = "{!r} object attribute {!r} is read-only"
else:
msg = "{!r} object has no attribute {!r}"
raise AttributeError(msg.format(self.__class__.__name__, name))
def __add__(self, other):
"""Non-default values from right operand override those from left.
The object returned is a new instance of the subclass.
"""
return self.clone(**other.__dict__)
def _append_doc(doc, added_doc):
doc = doc.rsplit('\n', 1)[0]
added_doc = added_doc.split('\n', 1)[1]
return doc + '\n' + added_doc
def _extend_docstrings(cls):
if cls.__doc__ and cls.__doc__.startswith('+'):
cls.__doc__ = _append_doc(cls.__bases__[0].__doc__, cls.__doc__)
for name, attr in cls.__dict__.items():
if attr.__doc__ and attr.__doc__.startswith('+'):
for c in (c for base in cls.__bases__ for c in base.mro()):
doc = getattr(getattr(c, name), '__doc__')
if doc:
attr.__doc__ = _append_doc(doc, attr.__doc__)
break
return cls
class Policy(_PolicyBase, metaclass=abc.ABCMeta):
r"""Controls for how messages are interpreted and formatted.
Most of the classes and many of the methods in the email package accept
Policy objects as parameters. A Policy object contains a set of values and
functions that control how input is interpreted and how output is rendered.
For example, the parameter 'raise_on_defect' controls whether or not an RFC
violation results in an error being raised or not, while 'max_line_length'
controls the maximum length of output lines when a Message is serialized.
Any valid attribute may be overridden when a Policy is created by passing
it as a keyword argument to the constructor. Policy objects are immutable,
but a new Policy object can be created with only certain values changed by
calling the Policy instance with keyword arguments. Policy objects can
also be added, producing a new Policy object in which the non-default
attributes set in the right hand operand overwrite those specified in the
left operand.
Settable attributes:
raise_on_defect -- If true, then defects should be raised as errors.
Default: False.
linesep -- string containing the value to use as separation
between output lines. Default '\n'.
cte_type -- Type of allowed content transfer encodings
7bit -- ASCII only
8bit -- Content-Transfer-Encoding: 8bit is allowed
Default: 8bit. Also controls the disposition of
(RFC invalid) binary data in headers; see the
documentation of the binary_fold method.
max_line_length -- maximum length of lines, excluding 'linesep',
during serialization. None or 0 means no line
wrapping is done. Default is 78.
mangle_from_ -- a flag that, when True escapes From_ lines in the
body of the message by putting a `>' in front of
them. This is used when the message is being
serialized by a generator. Default: True.
message_factory -- the class to use to create new message objects.
If the value is None, the default is Message.
"""
raise_on_defect = False
linesep = '\n'
cte_type = '8bit'
max_line_length = 78
mangle_from_ = False
message_factory = None
def handle_defect(self, obj, defect):
"""Based on policy, either raise defect or call register_defect.
handle_defect(obj, defect)
defect should be a Defect subclass, but in any case must be an
Exception subclass. obj is the object on which the defect should be
registered if it is not raised. If the raise_on_defect is True, the
defect is raised as an error, otherwise the object and the defect are
passed to register_defect.
This method is intended to be called by parsers that discover defects.
The email package parsers always call it with Defect instances.
"""
if self.raise_on_defect:
raise defect
self.register_defect(obj, defect)
def register_defect(self, obj, defect):
"""Record 'defect' on 'obj'.
Called by handle_defect if raise_on_defect is False. This method is
part of the Policy API so that Policy subclasses can implement custom
defect handling. The default implementation calls the append method of
the defects attribute of obj. The objects used by the email package by
default that get passed to this method will always have a defects
attribute with an append method.
"""
obj.defects.append(defect)
def header_max_count(self, name):
"""Return the maximum allowed number of headers named 'name'.
Called when a header is added to a Message object. If the returned
value is not 0 or None, and there are already a number of headers with
the name 'name' equal to the value returned, a ValueError is raised.
Because the default behavior of Message's __setitem__ is to append the
value to the list of headers, it is easy to create duplicate headers
without realizing it. This method allows certain headers to be limited
in the number of instances of that header that may be added to a
Message programmatically. (The limit is not observed by the parser,
which will faithfully produce as many headers as exist in the message
being parsed.)
The default implementation returns None for all header names.
"""
return None
@abc.abstractmethod
def header_source_parse(self, sourcelines):
"""Given a list of linesep terminated strings constituting the lines of
a single header, return the (name, value) tuple that should be stored
in the model. The input lines should retain their terminating linesep
characters. The lines passed in by the email package may contain
surrogateescaped binary data.
"""
raise NotImplementedError
@abc.abstractmethod
def header_store_parse(self, name, value):
"""Given the header name and the value provided by the application
program, return the (name, value) that should be stored in the model.
"""
raise NotImplementedError
@abc.abstractmethod
def header_fetch_parse(self, name, value):
"""Given the header name and the value from the model, return the value
to be returned to the application program that is requesting that
header. The value passed in by the email package may contain
surrogateescaped binary data if the lines were parsed by a BytesParser.
The returned value should not contain any surrogateescaped data.
"""
raise NotImplementedError
@abc.abstractmethod
def fold(self, name, value):
"""Given the header name and the value from the model, return a string
containing linesep characters that implement the folding of the header
according to the policy controls. The value passed in by the email
package may contain surrogateescaped binary data if the lines were
parsed by a BytesParser. The returned value should not contain any
surrogateescaped data.
"""
raise NotImplementedError
@abc.abstractmethod
def fold_binary(self, name, value):
"""Given the header name and the value from the model, return binary
data containing linesep characters that implement the folding of the
header according to the policy controls. The value passed in by the
email package may contain surrogateescaped binary data.
"""
raise NotImplementedError
@_extend_docstrings
class Compat32(Policy):
"""+
This particular policy is the backward compatibility Policy. It
replicates the behavior of the email package version 5.1.
"""
mangle_from_ = True
def _sanitize_header(self, name, value):
# If the header value contains surrogates, return a Header using
# the unknown-8bit charset to encode the bytes as encoded words.
if not isinstance(value, str):
# Assume it is already a header object
return value
if _has_surrogates(value):
return header.Header(value, charset=_charset.UNKNOWN8BIT,
header_name=name)
else:
return value
def header_source_parse(self, sourcelines):
"""+
The name is parsed as everything up to the ':' and returned unmodified.
The value is determined by stripping leading whitespace off the
remainder of the first line, joining all subsequent lines together, and
stripping any trailing carriage return or linefeed characters.
"""
name, value = sourcelines[0].split(':', 1)
value = value.lstrip(' \t') + ''.join(sourcelines[1:])
return (name, value.rstrip('\r\n'))
def header_store_parse(self, name, value):
"""+
The name and value are returned unmodified.
"""
return (name, value)
def header_fetch_parse(self, name, value):
"""+
If the value contains binary data, it is converted into a Header object
using the unknown-8bit charset. Otherwise it is returned unmodified.
"""
return self._sanitize_header(name, value)
def fold(self, name, value):
"""+
Headers are folded using the Header folding algorithm, which preserves
existing line breaks in the value, and wraps each resulting line to the
max_line_length. Non-ASCII binary data are CTE encoded using the
unknown-8bit charset.
"""
return self._fold(name, value, sanitize=True)
def fold_binary(self, name, value):
"""+
Headers are folded using the Header folding algorithm, which preserves
existing line breaks in the value, and wraps each resulting line to the
max_line_length. If cte_type is 7bit, non-ascii binary data is CTE
encoded using the unknown-8bit charset. Otherwise the original source
header is used, with its existing line breaks and/or binary data.
"""
folded = self._fold(name, value, sanitize=self.cte_type=='7bit')
return folded.encode('ascii', 'surrogateescape')
def _fold(self, name, value, sanitize):
parts = []
parts.append('%s: ' % name)
if isinstance(value, str):
if _has_surrogates(value):
if sanitize:
h = header.Header(value,
charset=_charset.UNKNOWN8BIT,
header_name=name)
else:
# If we have raw 8bit data in a byte string, we have no idea
# what the encoding is. There is no safe way to split this
# string. If it's ascii-subset, then we could do a normal
# ascii split, but if it's multibyte then we could break the
# string. There's no way to know so the least harm seems to
# be to not split the string and risk it being too long.
parts.append(value)
h = None
else:
h = header.Header(value, header_name=name)
else:
# Assume it is a Header-like object.
h = value
if h is not None:
# The Header class interprets a value of None for maxlinelen as the
# default value of 78, as recommended by RFC 2822.
maxlinelen = 0
if self.max_line_length is not None:
maxlinelen = self.max_line_length
parts.append(h.encode(linesep=self.linesep, maxlinelen=maxlinelen))
parts.append(self.linesep)
return ''.join(parts)
compat32 = Compat32()

View File

@@ -0,0 +1,216 @@
:mod:`email` Package Architecture
=================================
Overview
--------
The email package consists of three major components:
Model
An object structure that represents an email message, and provides an
API for creating, querying, and modifying a message.
Parser
Takes a sequence of characters or bytes and produces a model of the
email message represented by those characters or bytes.
Generator
Takes a model and turns it into a sequence of characters or bytes. The
sequence can either be intended for human consumption (a printable
unicode string) or bytes suitable for transmission over the wire. In
the latter case all data is properly encoded using the content transfer
encodings specified by the relevant RFCs.
Conceptually the package is organized around the model. The model provides both
"external" APIs intended for use by application programs using the library,
and "internal" APIs intended for use by the Parser and Generator components.
This division is intentionally a bit fuzzy; the API described by this
documentation is all a public, stable API. This allows for an application
with special needs to implement its own parser and/or generator.
In addition to the three major functional components, there is a third key
component to the architecture:
Policy
An object that specifies various behavioral settings and carries
implementations of various behavior-controlling methods.
The Policy framework provides a simple and convenient way to control the
behavior of the library, making it possible for the library to be used in a
very flexible fashion while leveraging the common code required to parse,
represent, and generate message-like objects. For example, in addition to the
default :rfc:`5322` email message policy, we also have a policy that manages
HTTP headers in a fashion compliant with :rfc:`2616`. Individual policy
controls, such as the maximum line length produced by the generator, can also
be controlled individually to meet specialized application requirements.
The Model
---------
The message model is implemented by the :class:`~email.message.Message` class.
The model divides a message into the two fundamental parts discussed by the
RFC: the header section and the body. The `Message` object acts as a
pseudo-dictionary of named headers. Its dictionary interface provides
convenient access to individual headers by name. However, all headers are kept
internally in an ordered list, so that the information about the order of the
headers in the original message is preserved.
The `Message` object also has a `payload` that holds the body. A `payload` can
be one of two things: data, or a list of `Message` objects. The latter is used
to represent a multipart MIME message. Lists can be nested arbitrarily deeply
in order to represent the message, with all terminal leaves having non-list
data payloads.
Message Lifecycle
-----------------
The general lifecycle of a message is:
Creation
A `Message` object can be created by a Parser, or it can be
instantiated as an empty message by an application.
Manipulation
The application may examine one or more headers, and/or the
payload, and it may modify one or more headers and/or
the payload. This may be done on the top level `Message`
object, or on any sub-object.
Finalization
The Model is converted into a unicode or binary stream,
or the model is discarded.
Header Policy Control During Lifecycle
--------------------------------------
One of the major controls exerted by the Policy is the management of headers
during the `Message` lifecycle. Most applications don't need to be aware of
this.
A header enters the model in one of two ways: via a Parser, or by being set to
a specific value by an application program after the Model already exists.
Similarly, a header exits the model in one of two ways: by being serialized by
a Generator, or by being retrieved from a Model by an application program. The
Policy object provides hooks for all four of these pathways.
The model storage for headers is a list of (name, value) tuples.
The Parser identifies headers during parsing, and passes them to the
:meth:`~email.policy.Policy.header_source_parse` method of the Policy. The
result of that method is the (name, value) tuple to be stored in the model.
When an application program supplies a header value (for example, through the
`Message` object `__setitem__` interface), the name and the value are passed to
the :meth:`~email.policy.Policy.header_store_parse` method of the Policy, which
returns the (name, value) tuple to be stored in the model.
When an application program retrieves a header (through any of the dict or list
interfaces of `Message`), the name and value are passed to the
:meth:`~email.policy.Policy.header_fetch_parse` method of the Policy to
obtain the value returned to the application.
When a Generator requests a header during serialization, the name and value are
passed to the :meth:`~email.policy.Policy.fold` method of the Policy, which
returns a string containing line breaks in the appropriate places. The
:meth:`~email.policy.Policy.cte_type` Policy control determines whether or
not Content Transfer Encoding is performed on the data in the header. There is
also a :meth:`~email.policy.Policy.binary_fold` method for use by generators
that produce binary output, which returns the folded header as binary data,
possibly folded at different places than the corresponding string would be.
Handling Binary Data
--------------------
In an ideal world all message data would conform to the RFCs, meaning that the
parser could decode the message into the idealized unicode message that the
sender originally wrote. In the real world, the email package must also be
able to deal with badly formatted messages, including messages containing
non-ASCII characters that either have no indicated character set or are not
valid characters in the indicated character set.
Since email messages are *primarily* text data, and operations on message data
are primarily text operations (except for binary payloads of course), the model
stores all text data as unicode strings. Un-decodable binary inside text
data is handled by using the `surrogateescape` error handler of the ASCII
codec. As with the binary filenames the error handler was introduced to
handle, this allows the email package to "carry" the binary data received
during parsing along until the output stage, at which time it is regenerated
in its original form.
This carried binary data is almost entirely an implementation detail. The one
place where it is visible in the API is in the "internal" API. A Parser must
do the `surrogateescape` encoding of binary input data, and pass that data to
the appropriate Policy method. The "internal" interface used by the Generator
to access header values preserves the `surrogateescaped` bytes. All other
interfaces convert the binary data either back into bytes or into a safe form
(losing information in some cases).
Backward Compatibility
----------------------
The :class:`~email.policy.Policy.Compat32` Policy provides backward
compatibility with version 5.1 of the email package. It does this via the
following implementation of the four+1 Policy methods described above:
header_source_parse
Splits the first line on the colon to obtain the name, discards any spaces
after the colon, and joins the remainder of the line with all of the
remaining lines, preserving the linesep characters to obtain the value.
Trailing carriage return and/or linefeed characters are stripped from the
resulting value string.
header_store_parse
Returns the name and value exactly as received from the application.
header_fetch_parse
If the value contains any `surrogateescaped` binary data, return the value
as a :class:`~email.header.Header` object, using the character set
`unknown-8bit`. Otherwise just returns the value.
fold
Uses :class:`~email.header.Header`'s folding to fold headers in the
same way the email5.1 generator did.
binary_fold
Same as fold, but encodes to 'ascii'.
New Algorithm
-------------
header_source_parse
Same as legacy behavior.
header_store_parse
Same as legacy behavior.
header_fetch_parse
If the value is already a header object, returns it. Otherwise, parses the
value using the new parser, and returns the resulting object as the value.
`surrogateescaped` bytes get turned into unicode unknown character code
points.
fold
Uses the new header folding algorithm, respecting the policy settings.
surrogateescaped bytes are encoded using the ``unknown-8bit`` charset for
``cte_type=7bit`` or ``8bit``. Returns a string.
At some point there will also be a ``cte_type=unicode``, and for that
policy fold will serialize the idealized unicode message with RFC-like
folding, converting any surrogateescaped bytes into the unicode
unknown character glyph.
binary_fold
Uses the new header folding algorithm, respecting the policy settings.
surrogateescaped bytes are encoded using the `unknown-8bit` charset for
``cte_type=7bit``, and get turned back into bytes for ``cte_type=8bit``.
Returns bytes.
At some point there will also be a ``cte_type=unicode``, and for that
policy binary_fold will serialize the message according to :rfc:``5335``.

View File

@@ -0,0 +1,119 @@
# Copyright (C) 2002-2007 Python Software Foundation
# Author: Ben Gertzfield
# Contact: email-sig@python.org
"""Base64 content transfer encoding per RFCs 2045-2047.
This module handles the content transfer encoding method defined in RFC 2045
to encode arbitrary 8-bit data using the three 8-bit bytes in four 7-bit
characters encoding known as Base64.
It is used in the MIME standards for email to attach images, audio, and text
using some 8-bit character sets to messages.
This module provides an interface to encode and decode both headers and bodies
with Base64 encoding.
RFC 2045 defines a method for including character set information in an
`encoded-word' in a header. This method is commonly used for 8-bit real names
in To:, From:, Cc:, etc. fields, as well as Subject: lines.
This module does not do the line wrapping or end-of-line character conversion
necessary for proper internationalized headers; it only does dumb encoding and
decoding. To deal with the various line wrapping issues, use the email.header
module.
"""
__all__ = [
'body_decode',
'body_encode',
'decode',
'decodestring',
'header_encode',
'header_length',
]
from base64 import b64encode
from binascii import b2a_base64, a2b_base64
CRLF = '\r\n'
NL = '\n'
EMPTYSTRING = ''
# See also Charset.py
MISC_LEN = 7
# Helpers
def header_length(bytearray):
"""Return the length of s when it is encoded with base64."""
groups_of_3, leftover = divmod(len(bytearray), 3)
# 4 bytes out for each 3 bytes (or nonzero fraction thereof) in.
n = groups_of_3 * 4
if leftover:
n += 4
return n
def header_encode(header_bytes, charset='iso-8859-1'):
"""Encode a single header line with Base64 encoding in a given charset.
charset names the character set to use to encode the header. It defaults
to iso-8859-1. Base64 encoding is defined in RFC 2045.
"""
if not header_bytes:
return ""
if isinstance(header_bytes, str):
header_bytes = header_bytes.encode(charset)
encoded = b64encode(header_bytes).decode("ascii")
return '=?%s?b?%s?=' % (charset, encoded)
def body_encode(s, maxlinelen=76, eol=NL):
r"""Encode a string with base64.
Each line will be wrapped at, at most, maxlinelen characters (defaults to
76 characters).
Each line of encoded text will end with eol, which defaults to "\n". Set
this to "\r\n" if you will be using the result of this function directly
in an email.
"""
if not s:
return s
encvec = []
max_unencoded = maxlinelen * 3 // 4
for i in range(0, len(s), max_unencoded):
# BAW: should encode() inherit b2a_base64()'s dubious behavior in
# adding a newline to the encoded string?
enc = b2a_base64(s[i:i + max_unencoded]).decode("ascii")
if enc.endswith(NL) and eol != NL:
enc = enc[:-1] + eol
encvec.append(enc)
return EMPTYSTRING.join(encvec)
def decode(string):
"""Decode a raw base64 string, returning a bytes object.
This function does not parse a full MIME header value encoded with
base64 (like =?iso-8859-1?b?bmloISBuaWgh?=) -- please use the high
level email.header class for that functionality.
"""
if not string:
return bytes()
elif isinstance(string, str):
return a2b_base64(string.encode('raw-unicode-escape'))
else:
return a2b_base64(string)
# For convenience and backwards compatibility w/ standard base64 module
body_decode = decode
decodestring = decode

View File

@@ -0,0 +1,404 @@
# Copyright (C) 2001-2007 Python Software Foundation
# Author: Ben Gertzfield, Barry Warsaw
# Contact: email-sig@python.org
__all__ = [
'Charset',
'add_alias',
'add_charset',
'add_codec',
]
from functools import partial
import email.base64mime
import email.quoprimime
from email import errors
from email.encoders import encode_7or8bit
# Flags for types of header encodings
QP = 1 # Quoted-Printable
BASE64 = 2 # Base64
SHORTEST = 3 # the shorter of QP and base64, but only for headers
# In "=?charset?q?hello_world?=", the =?, ?q?, and ?= add up to 7
RFC2047_CHROME_LEN = 7
DEFAULT_CHARSET = 'us-ascii'
UNKNOWN8BIT = 'unknown-8bit'
EMPTYSTRING = ''
# Defaults
CHARSETS = {
# input header enc body enc output conv
'iso-8859-1': (QP, QP, None),
'iso-8859-2': (QP, QP, None),
'iso-8859-3': (QP, QP, None),
'iso-8859-4': (QP, QP, None),
# iso-8859-5 is Cyrillic, and not especially used
# iso-8859-6 is Arabic, also not particularly used
# iso-8859-7 is Greek, QP will not make it readable
# iso-8859-8 is Hebrew, QP will not make it readable
'iso-8859-9': (QP, QP, None),
'iso-8859-10': (QP, QP, None),
# iso-8859-11 is Thai, QP will not make it readable
'iso-8859-13': (QP, QP, None),
'iso-8859-14': (QP, QP, None),
'iso-8859-15': (QP, QP, None),
'iso-8859-16': (QP, QP, None),
'windows-1252':(QP, QP, None),
'viscii': (QP, QP, None),
'us-ascii': (None, None, None),
'big5': (BASE64, BASE64, None),
'gb2312': (BASE64, BASE64, None),
'euc-jp': (BASE64, None, 'iso-2022-jp'),
'shift_jis': (BASE64, None, 'iso-2022-jp'),
'iso-2022-jp': (BASE64, None, None),
'koi8-r': (BASE64, BASE64, None),
'utf-8': (SHORTEST, BASE64, 'utf-8'),
}
# Aliases for other commonly-used names for character sets. Map
# them to the real ones used in email.
ALIASES = {
'latin_1': 'iso-8859-1',
'latin-1': 'iso-8859-1',
'latin_2': 'iso-8859-2',
'latin-2': 'iso-8859-2',
'latin_3': 'iso-8859-3',
'latin-3': 'iso-8859-3',
'latin_4': 'iso-8859-4',
'latin-4': 'iso-8859-4',
'latin_5': 'iso-8859-9',
'latin-5': 'iso-8859-9',
'latin_6': 'iso-8859-10',
'latin-6': 'iso-8859-10',
'latin_7': 'iso-8859-13',
'latin-7': 'iso-8859-13',
'latin_8': 'iso-8859-14',
'latin-8': 'iso-8859-14',
'latin_9': 'iso-8859-15',
'latin-9': 'iso-8859-15',
'latin_10':'iso-8859-16',
'latin-10':'iso-8859-16',
'cp949': 'ks_c_5601-1987',
'euc_jp': 'euc-jp',
'euc_kr': 'euc-kr',
'ascii': 'us-ascii',
}
# Map charsets to their Unicode codec strings.
CODEC_MAP = {
'gb2312': 'eucgb2312_cn',
'big5': 'big5_tw',
# Hack: We don't want *any* conversion for stuff marked us-ascii, as all
# sorts of garbage might be sent to us in the guise of 7-bit us-ascii.
# Let that stuff pass through without conversion to/from Unicode.
'us-ascii': None,
}
# Convenience functions for extending the above mappings
def add_charset(charset, header_enc=None, body_enc=None, output_charset=None):
"""Add character set properties to the global registry.
charset is the input character set, and must be the canonical name of a
character set.
Optional header_enc and body_enc is either Charset.QP for
quoted-printable, Charset.BASE64 for base64 encoding, Charset.SHORTEST for
the shortest of qp or base64 encoding, or None for no encoding. SHORTEST
is only valid for header_enc. It describes how message headers and
message bodies in the input charset are to be encoded. Default is no
encoding.
Optional output_charset is the character set that the output should be
in. Conversions will proceed from input charset, to Unicode, to the
output charset when the method Charset.convert() is called. The default
is to output in the same character set as the input.
Both input_charset and output_charset must have Unicode codec entries in
the module's charset-to-codec mapping; use add_codec(charset, codecname)
to add codecs the module does not know about. See the codecs module's
documentation for more information.
"""
if body_enc == SHORTEST:
raise ValueError('SHORTEST not allowed for body_enc')
CHARSETS[charset] = (header_enc, body_enc, output_charset)
def add_alias(alias, canonical):
"""Add a character set alias.
alias is the alias name, e.g. latin-1
canonical is the character set's canonical name, e.g. iso-8859-1
"""
ALIASES[alias] = canonical
def add_codec(charset, codecname):
"""Add a codec that map characters in the given charset to/from Unicode.
charset is the canonical name of a character set. codecname is the name
of a Python codec, as appropriate for the second argument to the unicode()
built-in, or to the encode() method of a Unicode string.
"""
CODEC_MAP[charset] = codecname
# Convenience function for encoding strings, taking into account
# that they might be unknown-8bit (ie: have surrogate-escaped bytes)
def _encode(string, codec):
if codec == UNKNOWN8BIT:
return string.encode('ascii', 'surrogateescape')
else:
return string.encode(codec)
class Charset:
"""Map character sets to their email properties.
This class provides information about the requirements imposed on email
for a specific character set. It also provides convenience routines for
converting between character sets, given the availability of the
applicable codecs. Given a character set, it will do its best to provide
information on how to use that character set in an email in an
RFC-compliant way.
Certain character sets must be encoded with quoted-printable or base64
when used in email headers or bodies. Certain character sets must be
converted outright, and are not allowed in email. Instances of this
module expose the following information about a character set:
input_charset: The initial character set specified. Common aliases
are converted to their `official' email names (e.g. latin_1
is converted to iso-8859-1). Defaults to 7-bit us-ascii.
header_encoding: If the character set must be encoded before it can be
used in an email header, this attribute will be set to
Charset.QP (for quoted-printable), Charset.BASE64 (for
base64 encoding), or Charset.SHORTEST for the shortest of
QP or BASE64 encoding. Otherwise, it will be None.
body_encoding: Same as header_encoding, but describes the encoding for the
mail message's body, which indeed may be different than the
header encoding. Charset.SHORTEST is not allowed for
body_encoding.
output_charset: Some character sets must be converted before they can be
used in email headers or bodies. If the input_charset is
one of them, this attribute will contain the name of the
charset output will be converted to. Otherwise, it will
be None.
input_codec: The name of the Python codec used to convert the
input_charset to Unicode. If no conversion codec is
necessary, this attribute will be None.
output_codec: The name of the Python codec used to convert Unicode
to the output_charset. If no conversion codec is necessary,
this attribute will have the same value as the input_codec.
"""
def __init__(self, input_charset=DEFAULT_CHARSET):
# RFC 2046, $4.1.2 says charsets are not case sensitive. We coerce to
# unicode because its .lower() is locale insensitive. If the argument
# is already a unicode, we leave it at that, but ensure that the
# charset is ASCII, as the standard (RFC XXX) requires.
try:
if isinstance(input_charset, str):
input_charset.encode('ascii')
else:
input_charset = str(input_charset, 'ascii')
except UnicodeError:
raise errors.CharsetError(input_charset)
input_charset = input_charset.lower()
# Set the input charset after filtering through the aliases
self.input_charset = ALIASES.get(input_charset, input_charset)
# We can try to guess which encoding and conversion to use by the
# charset_map dictionary. Try that first, but let the user override
# it.
henc, benc, conv = CHARSETS.get(self.input_charset,
(SHORTEST, BASE64, None))
if not conv:
conv = self.input_charset
# Set the attributes, allowing the arguments to override the default.
self.header_encoding = henc
self.body_encoding = benc
self.output_charset = ALIASES.get(conv, conv)
# Now set the codecs. If one isn't defined for input_charset,
# guess and try a Unicode codec with the same name as input_codec.
self.input_codec = CODEC_MAP.get(self.input_charset,
self.input_charset)
self.output_codec = CODEC_MAP.get(self.output_charset,
self.output_charset)
def __repr__(self):
return self.input_charset.lower()
def __eq__(self, other):
return str(self) == str(other).lower()
def get_body_encoding(self):
"""Return the content-transfer-encoding used for body encoding.
This is either the string `quoted-printable' or `base64' depending on
the encoding used, or it is a function in which case you should call
the function with a single argument, the Message object being
encoded. The function should then set the Content-Transfer-Encoding
header itself to whatever is appropriate.
Returns "quoted-printable" if self.body_encoding is QP.
Returns "base64" if self.body_encoding is BASE64.
Returns conversion function otherwise.
"""
assert self.body_encoding != SHORTEST
if self.body_encoding == QP:
return 'quoted-printable'
elif self.body_encoding == BASE64:
return 'base64'
else:
return encode_7or8bit
def get_output_charset(self):
"""Return the output character set.
This is self.output_charset if that is not None, otherwise it is
self.input_charset.
"""
return self.output_charset or self.input_charset
def header_encode(self, string):
"""Header-encode a string by converting it first to bytes.
The type of encoding (base64 or quoted-printable) will be based on
this charset's `header_encoding`.
:param string: A unicode string for the header. It must be possible
to encode this string to bytes using the character set's
output codec.
:return: The encoded string, with RFC 2047 chrome.
"""
codec = self.output_codec or 'us-ascii'
header_bytes = _encode(string, codec)
# 7bit/8bit encodings return the string unchanged (modulo conversions)
encoder_module = self._get_encoder(header_bytes)
if encoder_module is None:
return string
return encoder_module.header_encode(header_bytes, codec)
def header_encode_lines(self, string, maxlengths):
"""Header-encode a string by converting it first to bytes.
This is similar to `header_encode()` except that the string is fit
into maximum line lengths as given by the argument.
:param string: A unicode string for the header. It must be possible
to encode this string to bytes using the character set's
output codec.
:param maxlengths: Maximum line length iterator. Each element
returned from this iterator will provide the next maximum line
length. This parameter is used as an argument to built-in next()
and should never be exhausted. The maximum line lengths should
not count the RFC 2047 chrome. These line lengths are only a
hint; the splitter does the best it can.
:return: Lines of encoded strings, each with RFC 2047 chrome.
"""
# See which encoding we should use.
codec = self.output_codec or 'us-ascii'
header_bytes = _encode(string, codec)
encoder_module = self._get_encoder(header_bytes)
encoder = partial(encoder_module.header_encode, charset=codec)
# Calculate the number of characters that the RFC 2047 chrome will
# contribute to each line.
charset = self.get_output_charset()
extra = len(charset) + RFC2047_CHROME_LEN
# Now comes the hard part. We must encode bytes but we can't split on
# bytes because some character sets are variable length and each
# encoded word must stand on its own. So the problem is you have to
# encode to bytes to figure out this word's length, but you must split
# on characters. This causes two problems: first, we don't know how
# many octets a specific substring of unicode characters will get
# encoded to, and second, we don't know how many ASCII characters
# those octets will get encoded to. Unless we try it. Which seems
# inefficient. In the interest of being correct rather than fast (and
# in the hope that there will be few encoded headers in any such
# message), brute force it. :(
lines = []
current_line = []
maxlen = next(maxlengths) - extra
for character in string:
current_line.append(character)
this_line = EMPTYSTRING.join(current_line)
length = encoder_module.header_length(_encode(this_line, charset))
if length > maxlen:
# This last character doesn't fit so pop it off.
current_line.pop()
# Does nothing fit on the first line?
if not lines and not current_line:
lines.append(None)
else:
separator = (' ' if lines else '')
joined_line = EMPTYSTRING.join(current_line)
header_bytes = _encode(joined_line, codec)
lines.append(encoder(header_bytes))
current_line = [character]
maxlen = next(maxlengths) - extra
joined_line = EMPTYSTRING.join(current_line)
header_bytes = _encode(joined_line, codec)
lines.append(encoder(header_bytes))
return lines
def _get_encoder(self, header_bytes):
if self.header_encoding == BASE64:
return email.base64mime
elif self.header_encoding == QP:
return email.quoprimime
elif self.header_encoding == SHORTEST:
len64 = email.base64mime.header_length(header_bytes)
lenqp = email.quoprimime.header_length(header_bytes)
if len64 < lenqp:
return email.base64mime
else:
return email.quoprimime
else:
return None
def body_encode(self, string):
"""Body-encode a string by converting it first to bytes.
The type of encoding (base64 or quoted-printable) will be based on
self.body_encoding. If body_encoding is None, we assume the
output charset is a 7bit encoding, so re-encoding the decoded
string using the ascii codec produces the correct string version
of the content.
"""
if not string:
return string
if self.body_encoding is BASE64:
if isinstance(string, str):
string = string.encode(self.output_charset)
return email.base64mime.body_encode(string)
elif self.body_encoding is QP:
# quopromime.body_encode takes a string, but operates on it as if
# it were a list of byte codes. For a (minimal) history on why
# this is so, see changeset 0cf700464177. To correctly encode a
# character set, then, we must turn it into pseudo bytes via the
# latin1 charset, which will encode any byte as a single code point
# between 0 and 255, which is what body_encode is expecting.
if isinstance(string, str):
string = string.encode(self.output_charset)
string = string.decode('latin1')
return email.quoprimime.body_encode(string)
else:
if isinstance(string, str):
string = string.encode(self.output_charset).decode('ascii')
return string

View File

@@ -0,0 +1,250 @@
import binascii
import email.charset
import email.message
import email.errors
from email import quoprimime
class ContentManager:
def __init__(self):
self.get_handlers = {}
self.set_handlers = {}
def add_get_handler(self, key, handler):
self.get_handlers[key] = handler
def get_content(self, msg, *args, **kw):
content_type = msg.get_content_type()
if content_type in self.get_handlers:
return self.get_handlers[content_type](msg, *args, **kw)
maintype = msg.get_content_maintype()
if maintype in self.get_handlers:
return self.get_handlers[maintype](msg, *args, **kw)
if '' in self.get_handlers:
return self.get_handlers[''](msg, *args, **kw)
raise KeyError(content_type)
def add_set_handler(self, typekey, handler):
self.set_handlers[typekey] = handler
def set_content(self, msg, obj, *args, **kw):
if msg.get_content_maintype() == 'multipart':
# XXX: is this error a good idea or not? We can remove it later,
# but we can't add it later, so do it for now.
raise TypeError("set_content not valid on multipart")
handler = self._find_set_handler(msg, obj)
msg.clear_content()
handler(msg, obj, *args, **kw)
def _find_set_handler(self, msg, obj):
full_path_for_error = None
for typ in type(obj).__mro__:
if typ in self.set_handlers:
return self.set_handlers[typ]
qname = typ.__qualname__
modname = getattr(typ, '__module__', '')
full_path = '.'.join((modname, qname)) if modname else qname
if full_path_for_error is None:
full_path_for_error = full_path
if full_path in self.set_handlers:
return self.set_handlers[full_path]
if qname in self.set_handlers:
return self.set_handlers[qname]
name = typ.__name__
if name in self.set_handlers:
return self.set_handlers[name]
if None in self.set_handlers:
return self.set_handlers[None]
raise KeyError(full_path_for_error)
raw_data_manager = ContentManager()
def get_text_content(msg, errors='replace'):
content = msg.get_payload(decode=True)
charset = msg.get_param('charset', 'ASCII')
return content.decode(charset, errors=errors)
raw_data_manager.add_get_handler('text', get_text_content)
def get_non_text_content(msg):
return msg.get_payload(decode=True)
for maintype in 'audio image video application'.split():
raw_data_manager.add_get_handler(maintype, get_non_text_content)
def get_message_content(msg):
return msg.get_payload(0)
for subtype in 'rfc822 external-body'.split():
raw_data_manager.add_get_handler('message/'+subtype, get_message_content)
def get_and_fixup_unknown_message_content(msg):
# If we don't understand a message subtype, we are supposed to treat it as
# if it were application/octet-stream, per
# tools.ietf.org/html/rfc2046#section-5.2.4. Feedparser doesn't do that,
# so do our best to fix things up. Note that it is *not* appropriate to
# model message/partial content as Message objects, so they are handled
# here as well. (How to reassemble them is out of scope for this comment :)
return bytes(msg.get_payload(0))
raw_data_manager.add_get_handler('message',
get_and_fixup_unknown_message_content)
def _prepare_set(msg, maintype, subtype, headers):
msg['Content-Type'] = '/'.join((maintype, subtype))
if headers:
if not hasattr(headers[0], 'name'):
mp = msg.policy
headers = [mp.header_factory(*mp.header_source_parse([header]))
for header in headers]
try:
for header in headers:
if header.defects:
raise header.defects[0]
msg[header.name] = header
except email.errors.HeaderDefect as exc:
raise ValueError("Invalid header: {}".format(
header.fold(policy=msg.policy))) from exc
def _finalize_set(msg, disposition, filename, cid, params):
if disposition is None and filename is not None:
disposition = 'attachment'
if disposition is not None:
msg['Content-Disposition'] = disposition
if filename is not None:
msg.set_param('filename',
filename,
header='Content-Disposition',
replace=True)
if cid is not None:
msg['Content-ID'] = cid
if params is not None:
for key, value in params.items():
msg.set_param(key, value)
# XXX: This is a cleaned-up version of base64mime.body_encode (including a bug
# fix in the calculation of unencoded_bytes_per_line). It would be nice to
# drop both this and quoprimime.body_encode in favor of enhanced binascii
# routines that accepted a max_line_length parameter.
def _encode_base64(data, max_line_length):
encoded_lines = []
unencoded_bytes_per_line = max_line_length // 4 * 3
for i in range(0, len(data), unencoded_bytes_per_line):
thisline = data[i:i+unencoded_bytes_per_line]
encoded_lines.append(binascii.b2a_base64(thisline).decode('ascii'))
return ''.join(encoded_lines)
def _encode_text(string, charset, cte, policy):
lines = string.encode(charset).splitlines()
linesep = policy.linesep.encode('ascii')
def embedded_body(lines): return linesep.join(lines) + linesep
def normal_body(lines): return b'\n'.join(lines) + b'\n'
if cte==None:
# Use heuristics to decide on the "best" encoding.
if max((len(x) for x in lines), default=0) <= policy.max_line_length:
try:
return '7bit', normal_body(lines).decode('ascii')
except UnicodeDecodeError:
pass
if policy.cte_type == '8bit':
return '8bit', normal_body(lines).decode('ascii', 'surrogateescape')
sniff = embedded_body(lines[:10])
sniff_qp = quoprimime.body_encode(sniff.decode('latin-1'),
policy.max_line_length)
sniff_base64 = binascii.b2a_base64(sniff)
# This is a little unfair to qp; it includes lineseps, base64 doesn't.
if len(sniff_qp) > len(sniff_base64):
cte = 'base64'
else:
cte = 'quoted-printable'
if len(lines) <= 10:
return cte, sniff_qp
if cte == '7bit':
data = normal_body(lines).decode('ascii')
elif cte == '8bit':
data = normal_body(lines).decode('ascii', 'surrogateescape')
elif cte == 'quoted-printable':
data = quoprimime.body_encode(normal_body(lines).decode('latin-1'),
policy.max_line_length)
elif cte == 'base64':
data = _encode_base64(embedded_body(lines), policy.max_line_length)
else:
raise ValueError("Unknown content transfer encoding {}".format(cte))
return cte, data
def set_text_content(msg, string, subtype="plain", charset='utf-8', cte=None,
disposition=None, filename=None, cid=None,
params=None, headers=None):
_prepare_set(msg, 'text', subtype, headers)
cte, payload = _encode_text(string, charset, cte, msg.policy)
msg.set_payload(payload)
msg.set_param('charset',
email.charset.ALIASES.get(charset, charset),
replace=True)
msg['Content-Transfer-Encoding'] = cte
_finalize_set(msg, disposition, filename, cid, params)
raw_data_manager.add_set_handler(str, set_text_content)
def set_message_content(msg, message, subtype="rfc822", cte=None,
disposition=None, filename=None, cid=None,
params=None, headers=None):
if subtype == 'partial':
raise ValueError("message/partial is not supported for Message objects")
if subtype == 'rfc822':
if cte not in (None, '7bit', '8bit', 'binary'):
# http://tools.ietf.org/html/rfc2046#section-5.2.1 mandate.
raise ValueError(
"message/rfc822 parts do not support cte={}".format(cte))
# 8bit will get coerced on serialization if policy.cte_type='7bit'. We
# may end up claiming 8bit when it isn't needed, but the only negative
# result of that should be a gateway that needs to coerce to 7bit
# having to look through the whole embedded message to discover whether
# or not it actually has to do anything.
cte = '8bit' if cte is None else cte
elif subtype == 'external-body':
if cte not in (None, '7bit'):
# http://tools.ietf.org/html/rfc2046#section-5.2.3 mandate.
raise ValueError(
"message/external-body parts do not support cte={}".format(cte))
cte = '7bit'
elif cte is None:
# http://tools.ietf.org/html/rfc2046#section-5.2.4 says all future
# subtypes should be restricted to 7bit, so assume that.
cte = '7bit'
_prepare_set(msg, 'message', subtype, headers)
msg.set_payload([message])
msg['Content-Transfer-Encoding'] = cte
_finalize_set(msg, disposition, filename, cid, params)
raw_data_manager.add_set_handler(email.message.Message, set_message_content)
def set_bytes_content(msg, data, maintype, subtype, cte='base64',
disposition=None, filename=None, cid=None,
params=None, headers=None):
_prepare_set(msg, maintype, subtype, headers)
if cte == 'base64':
data = _encode_base64(data, max_line_length=msg.policy.max_line_length)
elif cte == 'quoted-printable':
# XXX: quoprimime.body_encode won't encode newline characters in data,
# so we can't use it. This means max_line_length is ignored. Another
# bug to fix later. (Note: encoders.quopri is broken on line ends.)
data = binascii.b2a_qp(data, istext=False, header=False, quotetabs=True)
data = data.decode('ascii')
elif cte == '7bit':
# Make sure it really is only ASCII. The early warning here seems
# worth the overhead...if you care write your own content manager :).
data.encode('ascii')
elif cte in ('8bit', 'binary'):
data = data.decode('ascii', 'surrogateescape')
msg.set_payload(data)
msg['Content-Transfer-Encoding'] = cte
_finalize_set(msg, disposition, filename, cid, params)
for typ in (bytes, bytearray, memoryview):
raw_data_manager.add_set_handler(typ, set_bytes_content)

View File

@@ -0,0 +1,69 @@
# Copyright (C) 2001-2006 Python Software Foundation
# Author: Barry Warsaw
# Contact: email-sig@python.org
"""Encodings and related functions."""
__all__ = [
'encode_7or8bit',
'encode_base64',
'encode_noop',
'encode_quopri',
]
from base64 import encodebytes as _bencode
from quopri import encodestring as _encodestring
def _qencode(s):
enc = _encodestring(s, quotetabs=True)
# Must encode spaces, which quopri.encodestring() doesn't do
return enc.replace(b' ', b'=20')
def encode_base64(msg):
"""Encode the message's payload in Base64.
Also, add an appropriate Content-Transfer-Encoding header.
"""
orig = msg.get_payload(decode=True)
encdata = str(_bencode(orig), 'ascii')
msg.set_payload(encdata)
msg['Content-Transfer-Encoding'] = 'base64'
def encode_quopri(msg):
"""Encode the message's payload in quoted-printable.
Also, add an appropriate Content-Transfer-Encoding header.
"""
orig = msg.get_payload(decode=True)
encdata = _qencode(orig)
msg.set_payload(encdata)
msg['Content-Transfer-Encoding'] = 'quoted-printable'
def encode_7or8bit(msg):
"""Set the Content-Transfer-Encoding header to 7bit or 8bit."""
orig = msg.get_payload(decode=True)
if orig is None:
# There's no payload. For backwards compatibility we use 7bit
msg['Content-Transfer-Encoding'] = '7bit'
return
# We play a trick to make this go fast. If decoding from ASCII succeeds,
# we know the data must be 7bit, otherwise treat it as 8bit.
try:
orig.decode('ascii')
except UnicodeError:
msg['Content-Transfer-Encoding'] = '8bit'
else:
msg['Content-Transfer-Encoding'] = '7bit'
def encode_noop(msg):
"""Do nothing."""

View File

@@ -0,0 +1,110 @@
# Copyright (C) 2001-2006 Python Software Foundation
# Author: Barry Warsaw
# Contact: email-sig@python.org
"""email package exception classes."""
class MessageError(Exception):
"""Base class for errors in the email package."""
class MessageParseError(MessageError):
"""Base class for message parsing errors."""
class HeaderParseError(MessageParseError):
"""Error while parsing headers."""
class BoundaryError(MessageParseError):
"""Couldn't find terminating boundary."""
class MultipartConversionError(MessageError, TypeError):
"""Conversion to a multipart is prohibited."""
class CharsetError(MessageError):
"""An illegal charset was given."""
# These are parsing defects which the parser was able to work around.
class MessageDefect(ValueError):
"""Base class for a message defect."""
def __init__(self, line=None):
if line is not None:
super().__init__(line)
self.line = line
class NoBoundaryInMultipartDefect(MessageDefect):
"""A message claimed to be a multipart but had no boundary parameter."""
class StartBoundaryNotFoundDefect(MessageDefect):
"""The claimed start boundary was never found."""
class CloseBoundaryNotFoundDefect(MessageDefect):
"""A start boundary was found, but not the corresponding close boundary."""
class FirstHeaderLineIsContinuationDefect(MessageDefect):
"""A message had a continuation line as its first header line."""
class MisplacedEnvelopeHeaderDefect(MessageDefect):
"""A 'Unix-from' header was found in the middle of a header block."""
class MissingHeaderBodySeparatorDefect(MessageDefect):
"""Found line with no leading whitespace and no colon before blank line."""
# XXX: backward compatibility, just in case (it was never emitted).
MalformedHeaderDefect = MissingHeaderBodySeparatorDefect
class MultipartInvariantViolationDefect(MessageDefect):
"""A message claimed to be a multipart but no subparts were found."""
class InvalidMultipartContentTransferEncodingDefect(MessageDefect):
"""An invalid content transfer encoding was set on the multipart itself."""
class UndecodableBytesDefect(MessageDefect):
"""Header contained bytes that could not be decoded"""
class InvalidBase64PaddingDefect(MessageDefect):
"""base64 encoded sequence had an incorrect length"""
class InvalidBase64CharactersDefect(MessageDefect):
"""base64 encoded sequence had characters not in base64 alphabet"""
class InvalidBase64LengthDefect(MessageDefect):
"""base64 encoded sequence had invalid length (1 mod 4)"""
# These errors are specific to header parsing.
class HeaderDefect(MessageDefect):
"""Base class for a header defect."""
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
class InvalidHeaderDefect(HeaderDefect):
"""Header is not valid, message gives details."""
class HeaderMissingRequiredValue(HeaderDefect):
"""A header that must have a value had none"""
class NonPrintableDefect(HeaderDefect):
"""ASCII characters outside the ascii-printable range found"""
def __init__(self, non_printables):
super().__init__(non_printables)
self.non_printables = non_printables
def __str__(self):
return ("the following ASCII non-printables found in header: "
"{}".format(self.non_printables))
class ObsoleteHeaderDefect(HeaderDefect):
"""Header uses syntax declared obsolete by RFC 5322"""
class NonASCIILocalPartDefect(HeaderDefect):
"""local_part contains non-ASCII characters"""
# This defect only occurs during unicode parsing, not when
# parsing messages decoded from binary.

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,71 @@
# Copyright (C) 2001-2006 Python Software Foundation
# Author: Barry Warsaw
# Contact: email-sig@python.org
"""Various types of useful iterators and generators."""
__all__ = [
'body_line_iterator',
'typed_subpart_iterator',
'walk',
# Do not include _structure() since it's part of the debugging API.
]
import sys
from io import StringIO
# This function will become a method of the Message class
def walk(self):
"""Walk over the message tree, yielding each subpart.
The walk is performed in depth-first order. This method is a
generator.
"""
yield self
if self.is_multipart():
for subpart in self.get_payload():
yield from subpart.walk()
# These two functions are imported into the Iterators.py interface module.
def body_line_iterator(msg, decode=False):
"""Iterate over the parts, returning string payloads line-by-line.
Optional decode (default False) is passed through to .get_payload().
"""
for subpart in msg.walk():
payload = subpart.get_payload(decode=decode)
if isinstance(payload, str):
yield from StringIO(payload)
def typed_subpart_iterator(msg, maintype='text', subtype=None):
"""Iterate over the subparts with a given MIME type.
Use `maintype' as the main MIME type to match against; this defaults to
"text". Optional `subtype' is the MIME subtype to match against; if
omitted, only the main type is matched.
"""
for subpart in msg.walk():
if subpart.get_content_maintype() == maintype:
if subtype is None or subpart.get_content_subtype() == subtype:
yield subpart
def _structure(msg, fp=None, level=0, include_default=False):
"""A handy debugging aid"""
if fp is None:
fp = sys.stdout
tab = ' ' * (level * 4)
print(tab + msg.get_content_type(), end='', file=fp)
if include_default:
print(' [%s]' % msg.get_default_type(), file=fp)
else:
print(file=fp)
if msg.is_multipart():
for subpart in msg.get_payload():
_structure(subpart, fp, level+1, include_default)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,37 @@
# Copyright (C) 2001-2006 Python Software Foundation
# Author: Keith Dart
# Contact: email-sig@python.org
"""Class representing application/* type MIME documents."""
__all__ = ["MIMEApplication"]
from email import encoders
from email.mime.nonmultipart import MIMENonMultipart
class MIMEApplication(MIMENonMultipart):
"""Class for generating application/* MIME documents."""
def __init__(self, _data, _subtype='octet-stream',
_encoder=encoders.encode_base64, *, policy=None, **_params):
"""Create an application/* type MIME document.
_data is a string containing the raw application data.
_subtype is the MIME content type subtype, defaulting to
'octet-stream'.
_encoder is a function which will perform the actual encoding for
transport of the application data, defaulting to base64 encoding.
Any additional keyword arguments are passed to the base class
constructor, which turns them into parameters on the Content-Type
header.
"""
if _subtype is None:
raise TypeError('Invalid application MIME subtype')
MIMENonMultipart.__init__(self, 'application', _subtype, policy=policy,
**_params)
self.set_payload(_data)
_encoder(self)

View File

@@ -0,0 +1,74 @@
# Copyright (C) 2001-2007 Python Software Foundation
# Author: Anthony Baxter
# Contact: email-sig@python.org
"""Class representing audio/* type MIME documents."""
__all__ = ['MIMEAudio']
import sndhdr
from io import BytesIO
from email import encoders
from email.mime.nonmultipart import MIMENonMultipart
_sndhdr_MIMEmap = {'au' : 'basic',
'wav' :'x-wav',
'aiff':'x-aiff',
'aifc':'x-aiff',
}
# There are others in sndhdr that don't have MIME types. :(
# Additional ones to be added to sndhdr? midi, mp3, realaudio, wma??
def _whatsnd(data):
"""Try to identify a sound file type.
sndhdr.what() has a pretty cruddy interface, unfortunately. This is why
we re-do it here. It would be easier to reverse engineer the Unix 'file'
command and use the standard 'magic' file, as shipped with a modern Unix.
"""
hdr = data[:512]
fakefile = BytesIO(hdr)
for testfn in sndhdr.tests:
res = testfn(hdr, fakefile)
if res is not None:
return _sndhdr_MIMEmap.get(res[0])
return None
class MIMEAudio(MIMENonMultipart):
"""Class for generating audio/* MIME documents."""
def __init__(self, _audiodata, _subtype=None,
_encoder=encoders.encode_base64, *, policy=None, **_params):
"""Create an audio/* type MIME document.
_audiodata is a string containing the raw audio data. If this data
can be decoded by the standard Python `sndhdr' module, then the
subtype will be automatically included in the Content-Type header.
Otherwise, you can specify the specific audio subtype via the
_subtype parameter. If _subtype is not given, and no subtype can be
guessed, a TypeError is raised.
_encoder is a function which will perform the actual encoding for
transport of the image data. It takes one argument, which is this
Image instance. It should use get_payload() and set_payload() to
change the payload to the encoded form. It should also add any
Content-Transfer-Encoding or other headers to the message as
necessary. The default encoding is Base64.
Any additional keyword arguments are passed to the base class
constructor, which turns them into parameters on the Content-Type
header.
"""
if _subtype is None:
_subtype = _whatsnd(_audiodata)
if _subtype is None:
raise TypeError('Could not find audio MIME subtype')
MIMENonMultipart.__init__(self, 'audio', _subtype, policy=policy,
**_params)
self.set_payload(_audiodata)
_encoder(self)

View File

@@ -0,0 +1,30 @@
# Copyright (C) 2001-2006 Python Software Foundation
# Author: Barry Warsaw
# Contact: email-sig@python.org
"""Base class for MIME specializations."""
__all__ = ['MIMEBase']
import email.policy
from email import message
class MIMEBase(message.Message):
"""Base class for MIME specializations."""
def __init__(self, _maintype, _subtype, *, policy=None, **_params):
"""This constructor adds a Content-Type: and a MIME-Version: header.
The Content-Type: header is taken from the _maintype and _subtype
arguments. Additional parameters for this header are taken from the
keyword arguments.
"""
if policy is None:
policy = email.policy.compat32
message.Message.__init__(self, policy=policy)
ctype = '%s/%s' % (_maintype, _subtype)
self.add_header('Content-Type', ctype, **_params)
self['MIME-Version'] = '1.0'

View File

@@ -0,0 +1,47 @@
# Copyright (C) 2001-2006 Python Software Foundation
# Author: Barry Warsaw
# Contact: email-sig@python.org
"""Class representing image/* type MIME documents."""
__all__ = ['MIMEImage']
import imghdr
from email import encoders
from email.mime.nonmultipart import MIMENonMultipart
class MIMEImage(MIMENonMultipart):
"""Class for generating image/* type MIME documents."""
def __init__(self, _imagedata, _subtype=None,
_encoder=encoders.encode_base64, *, policy=None, **_params):
"""Create an image/* type MIME document.
_imagedata is a string containing the raw image data. If this data
can be decoded by the standard Python `imghdr' module, then the
subtype will be automatically included in the Content-Type header.
Otherwise, you can specify the specific image subtype via the _subtype
parameter.
_encoder is a function which will perform the actual encoding for
transport of the image data. It takes one argument, which is this
Image instance. It should use get_payload() and set_payload() to
change the payload to the encoded form. It should also add any
Content-Transfer-Encoding or other headers to the message as
necessary. The default encoding is Base64.
Any additional keyword arguments are passed to the base class
constructor, which turns them into parameters on the Content-Type
header.
"""
if _subtype is None:
_subtype = imghdr.what(None, _imagedata)
if _subtype is None:
raise TypeError('Could not guess image MIME subtype')
MIMENonMultipart.__init__(self, 'image', _subtype, policy=policy,
**_params)
self.set_payload(_imagedata)
_encoder(self)

View File

@@ -0,0 +1,34 @@
# Copyright (C) 2001-2006 Python Software Foundation
# Author: Barry Warsaw
# Contact: email-sig@python.org
"""Class representing message/* MIME documents."""
__all__ = ['MIMEMessage']
from email import message
from email.mime.nonmultipart import MIMENonMultipart
class MIMEMessage(MIMENonMultipart):
"""Class representing message/* MIME documents."""
def __init__(self, _msg, _subtype='rfc822', *, policy=None):
"""Create a message/* type MIME document.
_msg is a message object and must be an instance of Message, or a
derived class of Message, otherwise a TypeError is raised.
Optional _subtype defines the subtype of the contained message. The
default is "rfc822" (this is defined by the MIME standard, even though
the term "rfc822" is technically outdated by RFC 2822).
"""
MIMENonMultipart.__init__(self, 'message', _subtype, policy=policy)
if not isinstance(_msg, message.Message):
raise TypeError('Argument is not an instance of Message')
# It's convenient to use this base class method. We need to do it
# this way or we'll get an exception
message.Message.attach(self, _msg)
# And be sure our default type is set correctly
self.set_default_type('message/rfc822')

View File

@@ -0,0 +1,48 @@
# Copyright (C) 2002-2006 Python Software Foundation
# Author: Barry Warsaw
# Contact: email-sig@python.org
"""Base class for MIME multipart/* type messages."""
__all__ = ['MIMEMultipart']
from email.mime.base import MIMEBase
class MIMEMultipart(MIMEBase):
"""Base class for MIME multipart/* type messages."""
def __init__(self, _subtype='mixed', boundary=None, _subparts=None,
*, policy=None,
**_params):
"""Creates a multipart/* type message.
By default, creates a multipart/mixed message, with proper
Content-Type and MIME-Version headers.
_subtype is the subtype of the multipart content type, defaulting to
`mixed'.
boundary is the multipart boundary string. By default it is
calculated as needed.
_subparts is a sequence of initial subparts for the payload. It
must be an iterable object, such as a list. You can always
attach new subparts to the message by using the attach() method.
Additional parameters for the Content-Type header are taken from the
keyword arguments (or passed into the _params argument).
"""
MIMEBase.__init__(self, 'multipart', _subtype, policy=policy, **_params)
# Initialise _payload to an empty list as the Message superclass's
# implementation of is_multipart assumes that _payload is a list for
# multipart messages.
self._payload = []
if _subparts:
for p in _subparts:
self.attach(p)
if boundary:
self.set_boundary(boundary)

View File

@@ -0,0 +1,22 @@
# Copyright (C) 2002-2006 Python Software Foundation
# Author: Barry Warsaw
# Contact: email-sig@python.org
"""Base class for MIME type messages that are not multipart."""
__all__ = ['MIMENonMultipart']
from email import errors
from email.mime.base import MIMEBase
class MIMENonMultipart(MIMEBase):
"""Base class for MIME non-multipart type messages."""
def attach(self, payload):
# The public API prohibits attaching multiple subparts to MIMEBase
# derived subtypes since none of them are, by definition, of content
# type multipart/*
raise errors.MultipartConversionError(
'Cannot attach additional subparts to non-multipart/*')

View File

@@ -0,0 +1,42 @@
# Copyright (C) 2001-2006 Python Software Foundation
# Author: Barry Warsaw
# Contact: email-sig@python.org
"""Class representing text/* type MIME documents."""
__all__ = ['MIMEText']
from email.charset import Charset
from email.mime.nonmultipart import MIMENonMultipart
class MIMEText(MIMENonMultipart):
"""Class for generating text/* type MIME documents."""
def __init__(self, _text, _subtype='plain', _charset=None, *, policy=None):
"""Create a text/* type MIME document.
_text is the string for this message object.
_subtype is the MIME sub content type, defaulting to "plain".
_charset is the character set parameter added to the Content-Type
header. This defaults to "us-ascii". Note that as a side-effect, the
Content-Transfer-Encoding header will also be set.
"""
# If no _charset was specified, check to see if there are non-ascii
# characters present. If not, use 'us-ascii', otherwise use utf-8.
# XXX: This can be removed once #7304 is fixed.
if _charset is None:
try:
_text.encode('us-ascii')
_charset = 'us-ascii'
except UnicodeEncodeError:
_charset = 'utf-8'
MIMENonMultipart.__init__(self, 'text', _subtype, policy=policy,
**{'charset': str(_charset)})
self.set_payload(_text, _charset)

View File

@@ -0,0 +1,131 @@
# Copyright (C) 2001-2007 Python Software Foundation
# Author: Barry Warsaw, Thomas Wouters, Anthony Baxter
# Contact: email-sig@python.org
"""A parser of RFC 2822 and MIME email messages."""
__all__ = ['Parser', 'HeaderParser', 'BytesParser', 'BytesHeaderParser',
'FeedParser', 'BytesFeedParser']
from io import StringIO, TextIOWrapper
from email.feedparser import FeedParser, BytesFeedParser
from email._policybase import compat32
class Parser:
def __init__(self, _class=None, *, policy=compat32):
"""Parser of RFC 2822 and MIME email messages.
Creates an in-memory object tree representing the email message, which
can then be manipulated and turned over to a Generator to return the
textual representation of the message.
The string must be formatted as a block of RFC 2822 headers and header
continuation lines, optionally preceded by a `Unix-from' header. The
header block is terminated either by the end of the string or by a
blank line.
_class is the class to instantiate for new message objects when they
must be created. This class must have a constructor that can take
zero arguments. Default is Message.Message.
The policy keyword specifies a policy object that controls a number of
aspects of the parser's operation. The default policy maintains
backward compatibility.
"""
self._class = _class
self.policy = policy
def parse(self, fp, headersonly=False):
"""Create a message structure from the data in a file.
Reads all the data from the file and returns the root of the message
structure. Optional headersonly is a flag specifying whether to stop
parsing after reading the headers or not. The default is False,
meaning it parses the entire contents of the file.
"""
feedparser = FeedParser(self._class, policy=self.policy)
if headersonly:
feedparser._set_headersonly()
while True:
data = fp.read(8192)
if not data:
break
feedparser.feed(data)
return feedparser.close()
def parsestr(self, text, headersonly=False):
"""Create a message structure from a string.
Returns the root of the message structure. Optional headersonly is a
flag specifying whether to stop parsing after reading the headers or
not. The default is False, meaning it parses the entire contents of
the file.
"""
return self.parse(StringIO(text), headersonly=headersonly)
class HeaderParser(Parser):
def parse(self, fp, headersonly=True):
return Parser.parse(self, fp, True)
def parsestr(self, text, headersonly=True):
return Parser.parsestr(self, text, True)
class BytesParser:
def __init__(self, *args, **kw):
"""Parser of binary RFC 2822 and MIME email messages.
Creates an in-memory object tree representing the email message, which
can then be manipulated and turned over to a Generator to return the
textual representation of the message.
The input must be formatted as a block of RFC 2822 headers and header
continuation lines, optionally preceded by a `Unix-from' header. The
header block is terminated either by the end of the input or by a
blank line.
_class is the class to instantiate for new message objects when they
must be created. This class must have a constructor that can take
zero arguments. Default is Message.Message.
"""
self.parser = Parser(*args, **kw)
def parse(self, fp, headersonly=False):
"""Create a message structure from the data in a binary file.
Reads all the data from the file and returns the root of the message
structure. Optional headersonly is a flag specifying whether to stop
parsing after reading the headers or not. The default is False,
meaning it parses the entire contents of the file.
"""
fp = TextIOWrapper(fp, encoding='ascii', errors='surrogateescape')
try:
return self.parser.parse(fp, headersonly)
finally:
fp.detach()
def parsebytes(self, text, headersonly=False):
"""Create a message structure from a byte string.
Returns the root of the message structure. Optional headersonly is a
flag specifying whether to stop parsing after reading the headers or
not. The default is False, meaning it parses the entire contents of
the file.
"""
text = text.decode('ASCII', errors='surrogateescape')
return self.parser.parsestr(text, headersonly)
class BytesHeaderParser(BytesParser):
def parse(self, fp, headersonly=True):
return BytesParser.parse(self, fp, headersonly=True)
def parsebytes(self, text, headersonly=True):
return BytesParser.parsebytes(self, text, headersonly=True)

View File

@@ -0,0 +1,224 @@
"""This will be the home for the policy that hooks in the new
code that adds all the email6 features.
"""
import re
import sys
from email._policybase import Policy, Compat32, compat32, _extend_docstrings
from email.utils import _has_surrogates
from email.headerregistry import HeaderRegistry as HeaderRegistry
from email.contentmanager import raw_data_manager
from email.message import EmailMessage
__all__ = [
'Compat32',
'compat32',
'Policy',
'EmailPolicy',
'default',
'strict',
'SMTP',
'HTTP',
]
linesep_splitter = re.compile(r'\n|\r')
@_extend_docstrings
class EmailPolicy(Policy):
"""+
PROVISIONAL
The API extensions enabled by this policy are currently provisional.
Refer to the documentation for details.
This policy adds new header parsing and folding algorithms. Instead of
simple strings, headers are custom objects with custom attributes
depending on the type of the field. The folding algorithm fully
implements RFCs 2047 and 5322.
In addition to the settable attributes listed above that apply to
all Policies, this policy adds the following additional attributes:
utf8 -- if False (the default) message headers will be
serialized as ASCII, using encoded words to encode
any non-ASCII characters in the source strings. If
True, the message headers will be serialized using
utf8 and will not contain encoded words (see RFC
6532 for more on this serialization format).
refold_source -- if the value for a header in the Message object
came from the parsing of some source, this attribute
indicates whether or not a generator should refold
that value when transforming the message back into
stream form. The possible values are:
none -- all source values use original folding
long -- source values that have any line that is
longer than max_line_length will be
refolded
all -- all values are refolded.
The default is 'long'.
header_factory -- a callable that takes two arguments, 'name' and
'value', where 'name' is a header field name and
'value' is an unfolded header field value, and
returns a string-like object that represents that
header. A default header_factory is provided that
understands some of the RFC5322 header field types.
(Currently address fields and date fields have
special treatment, while all other fields are
treated as unstructured. This list will be
completed before the extension is marked stable.)
content_manager -- an object with at least two methods: get_content
and set_content. When the get_content or
set_content method of a Message object is called,
it calls the corresponding method of this object,
passing it the message object as its first argument,
and any arguments or keywords that were passed to
it as additional arguments. The default
content_manager is
:data:`~email.contentmanager.raw_data_manager`.
"""
message_factory = EmailMessage
utf8 = False
refold_source = 'long'
header_factory = HeaderRegistry()
content_manager = raw_data_manager
def __init__(self, **kw):
# Ensure that each new instance gets a unique header factory
# (as opposed to clones, which share the factory).
if 'header_factory' not in kw:
object.__setattr__(self, 'header_factory', HeaderRegistry())
super().__init__(**kw)
def header_max_count(self, name):
"""+
The implementation for this class returns the max_count attribute from
the specialized header class that would be used to construct a header
of type 'name'.
"""
return self.header_factory[name].max_count
# The logic of the next three methods is chosen such that it is possible to
# switch a Message object between a Compat32 policy and a policy derived
# from this class and have the results stay consistent. This allows a
# Message object constructed with this policy to be passed to a library
# that only handles Compat32 objects, or to receive such an object and
# convert it to use the newer style by just changing its policy. It is
# also chosen because it postpones the relatively expensive full rfc5322
# parse until as late as possible when parsing from source, since in many
# applications only a few headers will actually be inspected.
def header_source_parse(self, sourcelines):
"""+
The name is parsed as everything up to the ':' and returned unmodified.
The value is determined by stripping leading whitespace off the
remainder of the first line, joining all subsequent lines together, and
stripping any trailing carriage return or linefeed characters. (This
is the same as Compat32).
"""
name, value = sourcelines[0].split(':', 1)
value = value.lstrip(' \t') + ''.join(sourcelines[1:])
return (name, value.rstrip('\r\n'))
def header_store_parse(self, name, value):
"""+
The name is returned unchanged. If the input value has a 'name'
attribute and it matches the name ignoring case, the value is returned
unchanged. Otherwise the name and value are passed to header_factory
method, and the resulting custom header object is returned as the
value. In this case a ValueError is raised if the input value contains
CR or LF characters.
"""
if hasattr(value, 'name') and value.name.lower() == name.lower():
return (name, value)
if isinstance(value, str) and len(value.splitlines())>1:
# XXX this error message isn't quite right when we use splitlines
# (see issue 22233), but I'm not sure what should happen here.
raise ValueError("Header values may not contain linefeed "
"or carriage return characters")
return (name, self.header_factory(name, value))
def header_fetch_parse(self, name, value):
"""+
If the value has a 'name' attribute, it is returned to unmodified.
Otherwise the name and the value with any linesep characters removed
are passed to the header_factory method, and the resulting custom
header object is returned. Any surrogateescaped bytes get turned
into the unicode unknown-character glyph.
"""
if hasattr(value, 'name'):
return value
# We can't use splitlines here because it splits on more than \r and \n.
value = ''.join(linesep_splitter.split(value))
return self.header_factory(name, value)
def fold(self, name, value):
"""+
Header folding is controlled by the refold_source policy setting. A
value is considered to be a 'source value' if and only if it does not
have a 'name' attribute (having a 'name' attribute means it is a header
object of some sort). If a source value needs to be refolded according
to the policy, it is converted into a custom header object by passing
the name and the value with any linesep characters removed to the
header_factory method. Folding of a custom header object is done by
calling its fold method with the current policy.
Source values are split into lines using splitlines. If the value is
not to be refolded, the lines are rejoined using the linesep from the
policy and returned. The exception is lines containing non-ascii
binary data. In that case the value is refolded regardless of the
refold_source setting, which causes the binary data to be CTE encoded
using the unknown-8bit charset.
"""
return self._fold(name, value, refold_binary=True)
def fold_binary(self, name, value):
"""+
The same as fold if cte_type is 7bit, except that the returned value is
bytes.
If cte_type is 8bit, non-ASCII binary data is converted back into
bytes. Headers with binary data are not refolded, regardless of the
refold_header setting, since there is no way to know whether the binary
data consists of single byte characters or multibyte characters.
If utf8 is true, headers are encoded to utf8, otherwise to ascii with
non-ASCII unicode rendered as encoded words.
"""
folded = self._fold(name, value, refold_binary=self.cte_type=='7bit')
charset = 'utf8' if self.utf8 else 'ascii'
return folded.encode(charset, 'surrogateescape')
def _fold(self, name, value, refold_binary=False):
if hasattr(value, 'name'):
return value.fold(policy=self)
maxlen = self.max_line_length if self.max_line_length else sys.maxsize
lines = value.splitlines()
refold = (self.refold_source == 'all' or
self.refold_source == 'long' and
(lines and len(lines[0])+len(name)+2 > maxlen or
any(len(x) > maxlen for x in lines[1:])))
if refold or refold_binary and _has_surrogates(value):
return self.header_factory(name, ''.join(lines)).fold(policy=self)
return name + ': ' + self.linesep.join(lines) + self.linesep
default = EmailPolicy()
# Make the default policy use the class default header_factory
del default.header_factory
strict = default.clone(raise_on_defect=True)
SMTP = default.clone(linesep='\r\n')
HTTP = default.clone(linesep='\r\n', max_line_length=None)
SMTPUTF8 = SMTP.clone(utf8=True)

View File

@@ -0,0 +1,299 @@
# Copyright (C) 2001-2006 Python Software Foundation
# Author: Ben Gertzfield
# Contact: email-sig@python.org
"""Quoted-printable content transfer encoding per RFCs 2045-2047.
This module handles the content transfer encoding method defined in RFC 2045
to encode US ASCII-like 8-bit data called `quoted-printable'. It is used to
safely encode text that is in a character set similar to the 7-bit US ASCII
character set, but that includes some 8-bit characters that are normally not
allowed in email bodies or headers.
Quoted-printable is very space-inefficient for encoding binary files; use the
email.base64mime module for that instead.
This module provides an interface to encode and decode both headers and bodies
with quoted-printable encoding.
RFC 2045 defines a method for including character set information in an
`encoded-word' in a header. This method is commonly used for 8-bit real names
in To:/From:/Cc: etc. fields, as well as Subject: lines.
This module does not do the line wrapping or end-of-line character
conversion necessary for proper internationalized headers; it only
does dumb encoding and decoding. To deal with the various line
wrapping issues, use the email.header module.
"""
__all__ = [
'body_decode',
'body_encode',
'body_length',
'decode',
'decodestring',
'header_decode',
'header_encode',
'header_length',
'quote',
'unquote',
]
import re
from string import ascii_letters, digits, hexdigits
CRLF = '\r\n'
NL = '\n'
EMPTYSTRING = ''
# Build a mapping of octets to the expansion of that octet. Since we're only
# going to have 256 of these things, this isn't terribly inefficient
# space-wise. Remember that headers and bodies have different sets of safe
# characters. Initialize both maps with the full expansion, and then override
# the safe bytes with the more compact form.
_QUOPRI_MAP = ['=%02X' % c for c in range(256)]
_QUOPRI_HEADER_MAP = _QUOPRI_MAP[:]
_QUOPRI_BODY_MAP = _QUOPRI_MAP[:]
# Safe header bytes which need no encoding.
for c in b'-!*+/' + ascii_letters.encode('ascii') + digits.encode('ascii'):
_QUOPRI_HEADER_MAP[c] = chr(c)
# Headers have one other special encoding; spaces become underscores.
_QUOPRI_HEADER_MAP[ord(' ')] = '_'
# Safe body bytes which need no encoding.
for c in (b' !"#$%&\'()*+,-./0123456789:;<>'
b'?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`'
b'abcdefghijklmnopqrstuvwxyz{|}~\t'):
_QUOPRI_BODY_MAP[c] = chr(c)
# Helpers
def header_check(octet):
"""Return True if the octet should be escaped with header quopri."""
return chr(octet) != _QUOPRI_HEADER_MAP[octet]
def body_check(octet):
"""Return True if the octet should be escaped with body quopri."""
return chr(octet) != _QUOPRI_BODY_MAP[octet]
def header_length(bytearray):
"""Return a header quoted-printable encoding length.
Note that this does not include any RFC 2047 chrome added by
`header_encode()`.
:param bytearray: An array of bytes (a.k.a. octets).
:return: The length in bytes of the byte array when it is encoded with
quoted-printable for headers.
"""
return sum(len(_QUOPRI_HEADER_MAP[octet]) for octet in bytearray)
def body_length(bytearray):
"""Return a body quoted-printable encoding length.
:param bytearray: An array of bytes (a.k.a. octets).
:return: The length in bytes of the byte array when it is encoded with
quoted-printable for bodies.
"""
return sum(len(_QUOPRI_BODY_MAP[octet]) for octet in bytearray)
def _max_append(L, s, maxlen, extra=''):
if not isinstance(s, str):
s = chr(s)
if not L:
L.append(s.lstrip())
elif len(L[-1]) + len(s) <= maxlen:
L[-1] += extra + s
else:
L.append(s.lstrip())
def unquote(s):
"""Turn a string in the form =AB to the ASCII character with value 0xab"""
return chr(int(s[1:3], 16))
def quote(c):
return _QUOPRI_MAP[ord(c)]
def header_encode(header_bytes, charset='iso-8859-1'):
"""Encode a single header line with quoted-printable (like) encoding.
Defined in RFC 2045, this `Q' encoding is similar to quoted-printable, but
used specifically for email header fields to allow charsets with mostly 7
bit characters (and some 8 bit) to remain more or less readable in non-RFC
2045 aware mail clients.
charset names the character set to use in the RFC 2046 header. It
defaults to iso-8859-1.
"""
# Return empty headers as an empty string.
if not header_bytes:
return ''
# Iterate over every byte, encoding if necessary.
encoded = header_bytes.decode('latin1').translate(_QUOPRI_HEADER_MAP)
# Now add the RFC chrome to each encoded chunk and glue the chunks
# together.
return '=?%s?q?%s?=' % (charset, encoded)
_QUOPRI_BODY_ENCODE_MAP = _QUOPRI_BODY_MAP[:]
for c in b'\r\n':
_QUOPRI_BODY_ENCODE_MAP[c] = chr(c)
def body_encode(body, maxlinelen=76, eol=NL):
"""Encode with quoted-printable, wrapping at maxlinelen characters.
Each line of encoded text will end with eol, which defaults to "\\n". Set
this to "\\r\\n" if you will be using the result of this function directly
in an email.
Each line will be wrapped at, at most, maxlinelen characters before the
eol string (maxlinelen defaults to 76 characters, the maximum value
permitted by RFC 2045). Long lines will have the 'soft line break'
quoted-printable character "=" appended to them, so the decoded text will
be identical to the original text.
The minimum maxlinelen is 4 to have room for a quoted character ("=XX")
followed by a soft line break. Smaller values will generate a
ValueError.
"""
if maxlinelen < 4:
raise ValueError("maxlinelen must be at least 4")
if not body:
return body
# quote special characters
body = body.translate(_QUOPRI_BODY_ENCODE_MAP)
soft_break = '=' + eol
# leave space for the '=' at the end of a line
maxlinelen1 = maxlinelen - 1
encoded_body = []
append = encoded_body.append
for line in body.splitlines():
# break up the line into pieces no longer than maxlinelen - 1
start = 0
laststart = len(line) - 1 - maxlinelen
while start <= laststart:
stop = start + maxlinelen1
# make sure we don't break up an escape sequence
if line[stop - 2] == '=':
append(line[start:stop - 1])
start = stop - 2
elif line[stop - 1] == '=':
append(line[start:stop])
start = stop - 1
else:
append(line[start:stop] + '=')
start = stop
# handle rest of line, special case if line ends in whitespace
if line and line[-1] in ' \t':
room = start - laststart
if room >= 3:
# It's a whitespace character at end-of-line, and we have room
# for the three-character quoted encoding.
q = quote(line[-1])
elif room == 2:
# There's room for the whitespace character and a soft break.
q = line[-1] + soft_break
else:
# There's room only for a soft break. The quoted whitespace
# will be the only content on the subsequent line.
q = soft_break + quote(line[-1])
append(line[start:-1] + q)
else:
append(line[start:])
# add back final newline if present
if body[-1] in CRLF:
append('')
return eol.join(encoded_body)
# BAW: I'm not sure if the intent was for the signature of this function to be
# the same as base64MIME.decode() or not...
def decode(encoded, eol=NL):
"""Decode a quoted-printable string.
Lines are separated with eol, which defaults to \\n.
"""
if not encoded:
return encoded
# BAW: see comment in encode() above. Again, we're building up the
# decoded string with string concatenation, which could be done much more
# efficiently.
decoded = ''
for line in encoded.splitlines():
line = line.rstrip()
if not line:
decoded += eol
continue
i = 0
n = len(line)
while i < n:
c = line[i]
if c != '=':
decoded += c
i += 1
# Otherwise, c == "=". Are we at the end of the line? If so, add
# a soft line break.
elif i+1 == n:
i += 1
continue
# Decode if in form =AB
elif i+2 < n and line[i+1] in hexdigits and line[i+2] in hexdigits:
decoded += unquote(line[i:i+3])
i += 3
# Otherwise, not in form =AB, pass literally
else:
decoded += c
i += 1
if i == n:
decoded += eol
# Special case if original string did not end with eol
if encoded[-1] not in '\r\n' and decoded.endswith(eol):
decoded = decoded[:-1]
return decoded
# For convenience and backwards compatibility w/ standard base64 module
body_decode = decode
decodestring = decode
def _unquote_match(match):
"""Turn a match in the form =AB to the ASCII character with value 0xab"""
s = match.group(0)
return unquote(s)
# Header decoding is done a bit differently
def header_decode(s):
"""Decode a string encoded with RFC 2045 MIME header `Q' encoding.
This function does not parse a full MIME header value encoded with
quoted-printable (like =?iso-8859-1?q?Hello_World?=) -- please use
the high level email.header class for that functionality.
"""
s = s.replace('_', ' ')
return re.sub(r'=[a-fA-F0-9]{2}', _unquote_match, s, flags=re.ASCII)

View File

@@ -0,0 +1,376 @@
# Copyright (C) 2001-2010 Python Software Foundation
# Author: Barry Warsaw
# Contact: email-sig@python.org
"""Miscellaneous utilities."""
__all__ = [
'collapse_rfc2231_value',
'decode_params',
'decode_rfc2231',
'encode_rfc2231',
'formataddr',
'formatdate',
'format_datetime',
'getaddresses',
'make_msgid',
'mktime_tz',
'parseaddr',
'parsedate',
'parsedate_tz',
'parsedate_to_datetime',
'unquote',
]
import os
import re
import time
import random
import socket
import datetime
import urllib.parse
from email._parseaddr import quote
from email._parseaddr import AddressList as _AddressList
from email._parseaddr import mktime_tz
from email._parseaddr import parsedate, parsedate_tz, _parsedate_tz
# Intrapackage imports
from email.charset import Charset
COMMASPACE = ', '
EMPTYSTRING = ''
UEMPTYSTRING = ''
CRLF = '\r\n'
TICK = "'"
specialsre = re.compile(r'[][\\()<>@,:;".]')
escapesre = re.compile(r'[\\"]')
def _has_surrogates(s):
"""Return True if s contains surrogate-escaped binary data."""
# This check is based on the fact that unless there are surrogates, utf8
# (Python's default encoding) can encode any string. This is the fastest
# way to check for surrogates, see issue 11454 for timings.
try:
s.encode()
return False
except UnicodeEncodeError:
return True
# How to deal with a string containing bytes before handing it to the
# application through the 'normal' interface.
def _sanitize(string):
# Turn any escaped bytes into unicode 'unknown' char. If the escaped
# bytes happen to be utf-8 they will instead get decoded, even if they
# were invalid in the charset the source was supposed to be in. This
# seems like it is not a bad thing; a defect was still registered.
original_bytes = string.encode('utf-8', 'surrogateescape')
return original_bytes.decode('utf-8', 'replace')
# Helpers
def formataddr(pair, charset='utf-8'):
"""The inverse of parseaddr(), this takes a 2-tuple of the form
(realname, email_address) and returns the string value suitable
for an RFC 2822 From, To or Cc header.
If the first element of pair is false, then the second element is
returned unmodified.
The optional charset is the character set that is used to encode
realname in case realname is not ASCII safe. Can be an instance of str or
a Charset-like object which has a header_encode method. Default is
'utf-8'.
"""
name, address = pair
# The address MUST (per RFC) be ascii, so raise a UnicodeError if it isn't.
address.encode('ascii')
if name:
try:
name.encode('ascii')
except UnicodeEncodeError:
if isinstance(charset, str):
charset = Charset(charset)
encoded_name = charset.header_encode(name)
return "%s <%s>" % (encoded_name, address)
else:
quotes = ''
if specialsre.search(name):
quotes = '"'
name = escapesre.sub(r'\\\g<0>', name)
return '%s%s%s <%s>' % (quotes, name, quotes, address)
return address
def getaddresses(fieldvalues):
"""Return a list of (REALNAME, EMAIL) for each fieldvalue."""
all = COMMASPACE.join(fieldvalues)
a = _AddressList(all)
return a.addresslist
def _format_timetuple_and_zone(timetuple, zone):
return '%s, %02d %s %04d %02d:%02d:%02d %s' % (
['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'][timetuple[6]],
timetuple[2],
['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'][timetuple[1] - 1],
timetuple[0], timetuple[3], timetuple[4], timetuple[5],
zone)
def formatdate(timeval=None, localtime=False, usegmt=False):
"""Returns a date string as specified by RFC 2822, e.g.:
Fri, 09 Nov 2001 01:08:47 -0000
Optional timeval if given is a floating point time value as accepted by
gmtime() and localtime(), otherwise the current time is used.
Optional localtime is a flag that when True, interprets timeval, and
returns a date relative to the local timezone instead of UTC, properly
taking daylight savings time into account.
Optional argument usegmt means that the timezone is written out as
an ascii string, not numeric one (so "GMT" instead of "+0000"). This
is needed for HTTP, and is only used when localtime==False.
"""
# Note: we cannot use strftime() because that honors the locale and RFC
# 2822 requires that day and month names be the English abbreviations.
if timeval is None:
timeval = time.time()
if localtime or usegmt:
dt = datetime.datetime.fromtimestamp(timeval, datetime.timezone.utc)
else:
dt = datetime.datetime.utcfromtimestamp(timeval)
if localtime:
dt = dt.astimezone()
usegmt = False
return format_datetime(dt, usegmt)
def format_datetime(dt, usegmt=False):
"""Turn a datetime into a date string as specified in RFC 2822.
If usegmt is True, dt must be an aware datetime with an offset of zero. In
this case 'GMT' will be rendered instead of the normal +0000 required by
RFC2822. This is to support HTTP headers involving date stamps.
"""
now = dt.timetuple()
if usegmt:
if dt.tzinfo is None or dt.tzinfo != datetime.timezone.utc:
raise ValueError("usegmt option requires a UTC datetime")
zone = 'GMT'
elif dt.tzinfo is None:
zone = '-0000'
else:
zone = dt.strftime("%z")
return _format_timetuple_and_zone(now, zone)
def make_msgid(idstring=None, domain=None):
"""Returns a string suitable for RFC 2822 compliant Message-ID, e.g:
<142480216486.20800.16526388040877946887@nightshade.la.mastaler.com>
Optional idstring if given is a string used to strengthen the
uniqueness of the message id. Optional domain if given provides the
portion of the message id after the '@'. It defaults to the locally
defined hostname.
"""
timeval = int(time.time()*100)
pid = os.getpid()
randint = random.getrandbits(64)
if idstring is None:
idstring = ''
else:
idstring = '.' + idstring
if domain is None:
domain = socket.getfqdn()
msgid = '<%d.%d.%d%s@%s>' % (timeval, pid, randint, idstring, domain)
return msgid
def parsedate_to_datetime(data):
*dtuple, tz = _parsedate_tz(data)
if tz is None:
return datetime.datetime(*dtuple[:6])
return datetime.datetime(*dtuple[:6],
tzinfo=datetime.timezone(datetime.timedelta(seconds=tz)))
def parseaddr(addr):
"""
Parse addr into its constituent realname and email address parts.
Return a tuple of realname and email address, unless the parse fails, in
which case return a 2-tuple of ('', '').
"""
addrs = _AddressList(addr).addresslist
if not addrs:
return '', ''
return addrs[0]
# rfc822.unquote() doesn't properly de-backslash-ify in Python pre-2.3.
def unquote(str):
"""Remove quotes from a string."""
if len(str) > 1:
if str.startswith('"') and str.endswith('"'):
return str[1:-1].replace('\\\\', '\\').replace('\\"', '"')
if str.startswith('<') and str.endswith('>'):
return str[1:-1]
return str
# RFC2231-related functions - parameter encoding and decoding
def decode_rfc2231(s):
"""Decode string according to RFC 2231"""
parts = s.split(TICK, 2)
if len(parts) <= 2:
return None, None, s
return parts
def encode_rfc2231(s, charset=None, language=None):
"""Encode string according to RFC 2231.
If neither charset nor language is given, then s is returned as-is. If
charset is given but not language, the string is encoded using the empty
string for language.
"""
s = urllib.parse.quote(s, safe='', encoding=charset or 'ascii')
if charset is None and language is None:
return s
if language is None:
language = ''
return "%s'%s'%s" % (charset, language, s)
rfc2231_continuation = re.compile(r'^(?P<name>\w+)\*((?P<num>[0-9]+)\*?)?$',
re.ASCII)
def decode_params(params):
"""Decode parameters list according to RFC 2231.
params is a sequence of 2-tuples containing (param name, string value).
"""
# Copy params so we don't mess with the original
params = params[:]
new_params = []
# Map parameter's name to a list of continuations. The values are a
# 3-tuple of the continuation number, the string value, and a flag
# specifying whether a particular segment is %-encoded.
rfc2231_params = {}
name, value = params.pop(0)
new_params.append((name, value))
while params:
name, value = params.pop(0)
if name.endswith('*'):
encoded = True
else:
encoded = False
value = unquote(value)
mo = rfc2231_continuation.match(name)
if mo:
name, num = mo.group('name', 'num')
if num is not None:
num = int(num)
rfc2231_params.setdefault(name, []).append((num, value, encoded))
else:
new_params.append((name, '"%s"' % quote(value)))
if rfc2231_params:
for name, continuations in rfc2231_params.items():
value = []
extended = False
# Sort by number
continuations.sort()
# And now append all values in numerical order, converting
# %-encodings for the encoded segments. If any of the
# continuation names ends in a *, then the entire string, after
# decoding segments and concatenating, must have the charset and
# language specifiers at the beginning of the string.
for num, s, encoded in continuations:
if encoded:
# Decode as "latin-1", so the characters in s directly
# represent the percent-encoded octet values.
# collapse_rfc2231_value treats this as an octet sequence.
s = urllib.parse.unquote(s, encoding="latin-1")
extended = True
value.append(s)
value = quote(EMPTYSTRING.join(value))
if extended:
charset, language, value = decode_rfc2231(value)
new_params.append((name, (charset, language, '"%s"' % value)))
else:
new_params.append((name, '"%s"' % value))
return new_params
def collapse_rfc2231_value(value, errors='replace',
fallback_charset='us-ascii'):
if not isinstance(value, tuple) or len(value) != 3:
return unquote(value)
# While value comes to us as a unicode string, we need it to be a bytes
# object. We do not want bytes() normal utf-8 decoder, we want a straight
# interpretation of the string as character bytes.
charset, language, text = value
if charset is None:
# Issue 17369: if charset/lang is None, decode_rfc2231 couldn't parse
# the value, so use the fallback_charset.
charset = fallback_charset
rawbytes = bytes(text, 'raw-unicode-escape')
try:
return str(rawbytes, charset, errors)
except LookupError:
# charset is not a known codec.
return unquote(text)
#
# datetime doesn't provide a localtime function yet, so provide one. Code
# adapted from the patch in issue 9527. This may not be perfect, but it is
# better than not having it.
#
def localtime(dt=None, isdst=-1):
"""Return local time as an aware datetime object.
If called without arguments, return current time. Otherwise *dt*
argument should be a datetime instance, and it is converted to the
local time zone according to the system time zone database. If *dt* is
naive (that is, dt.tzinfo is None), it is assumed to be in local time.
In this case, a positive or zero value for *isdst* causes localtime to
presume initially that summer time (for example, Daylight Saving Time)
is or is not (respectively) in effect for the specified time. A
negative value for *isdst* causes the localtime() function to attempt
to divine whether summer time is in effect for the specified time.
"""
if dt is None:
return datetime.datetime.now(datetime.timezone.utc).astimezone()
if dt.tzinfo is not None:
return dt.astimezone()
# We have a naive datetime. Convert to a (localtime) timetuple and pass to
# system mktime together with the isdst hint. System mktime will return
# seconds since epoch.
tm = dt.timetuple()[:-1] + (isdst,)
seconds = time.mktime(tm)
localtm = time.localtime(seconds)
try:
delta = datetime.timedelta(seconds=localtm.tm_gmtoff)
tz = datetime.timezone(delta, localtm.tm_zone)
except AttributeError:
# Compute UTC offset and compare with the value implied by tm_isdst.
# If the values match, use the zone name implied by tm_isdst.
delta = dt - datetime.datetime(*time.gmtime(seconds)[:6])
dst = time.daylight and localtm.tm_isdst > 0
gmtoff = -(time.altzone if dst else time.timezone)
if delta == datetime.timedelta(seconds=gmtoff):
tz = datetime.timezone(delta, time.tzname[dst])
else:
tz = datetime.timezone(delta)
return dt.replace(tzinfo=tz)