Skip to content

API Documentation

eml_parser serves as a python module for parsing eml files and returning various information found in the e-mail as well as computed information.

CustomPolicy

Bases: EmailPolicy

Custom parsing policy based on the default policy but relaxing some checks and early fixing invalid values.

Source code in eml_parser/parser.py
class CustomPolicy(email.policy.EmailPolicy):
    """Custom parsing policy based on the default policy but relaxing some checks and early fixing invalid values."""

    def __init__(self) -> None:
        """Constructor."""
        super().__init__(max_line_length=0, refold_source='none')

    def header_fetch_parse(self, name: str, value: str) -> str:
        """Early fix parsing issues and pass the name/value to the parent header_fetch_parse method for proper parsing."""
        header = name.lower()

        if header == 'message-id':
            if '[' in value and not eml_parser.regexes.email_regex.match(value):
                # try workaround for bad message-id formats
                m = eml_parser.regexes.email_regex.search(value)
                if m:
                    value = f'<{m.group(1)}>'
                else:
                    value = ''
                    logger.warning('Header field "message-id" is in an invalid format and cannot be fixed, it will be dropped.')
        elif header == 'date':
            try:
                value = super().header_fetch_parse(name, value)
            except TypeError:
                logger.warning('Error parsing date.', exc_info=True)
                return eml_parser.decode.default_date

            return eml_parser.decode.robust_string2date(value).isoformat()

        return super().header_fetch_parse(name, value)

__init__()

Constructor.

Source code in eml_parser/parser.py
def __init__(self) -> None:
    """Constructor."""
    super().__init__(max_line_length=0, refold_source='none')

header_fetch_parse(name, value)

Early fix parsing issues and pass the name/value to the parent header_fetch_parse method for proper parsing.

Source code in eml_parser/parser.py
def header_fetch_parse(self, name: str, value: str) -> str:
    """Early fix parsing issues and pass the name/value to the parent header_fetch_parse method for proper parsing."""
    header = name.lower()

    if header == 'message-id':
        if '[' in value and not eml_parser.regexes.email_regex.match(value):
            # try workaround for bad message-id formats
            m = eml_parser.regexes.email_regex.search(value)
            if m:
                value = f'<{m.group(1)}>'
            else:
                value = ''
                logger.warning('Header field "message-id" is in an invalid format and cannot be fixed, it will be dropped.')
    elif header == 'date':
        try:
            value = super().header_fetch_parse(name, value)
        except TypeError:
            logger.warning('Error parsing date.', exc_info=True)
            return eml_parser.decode.default_date

        return eml_parser.decode.robust_string2date(value).isoformat()

    return super().header_fetch_parse(name, value)

EmlParser

eml-parser class.

Source code in eml_parser/parser.py
class EmlParser:
    """eml-parser class."""

    MULTIPART_RECURSION_LIMIT = 100

    # pylint: disable=too-many-arguments
    def __init__(
        self,
        include_raw_body: bool = False,
        include_attachment_data: bool = False,
        pconf: dict | None = None,
        policy: email.policy.Policy | None = None,
        ignore_bad_start: bool = False,
        email_force_tld: bool = False,
        domain_force_tld: bool = False,
        ip_force_routable: bool = False,
        parse_attachments: bool = True,
        include_www: bool = True,
        include_href: bool = True,
    ) -> None:
        """Initialisation.

        Args:
            include_raw_body (bool, optional): Boolean parameter which indicates whether
                                               to include the original file contents in
                                               the returned structure. Default is False.
            include_attachment_data (bool, optional): Boolean parameter which indicates whether
                                                      to include raw attachment data in the
                                                      returned structure. Default is False.
            pconf (dict, optional): A dict with various optional configuration parameters,
                                    e.g. whitelist IPs, whitelist e-mail addresses, etc.
            policy (CustomPolicy, optional): Policy to use when parsing e-mails.
                                                    Default = CustomPolicy.
            ignore_bad_start (bool, optional): Ignore invalid file start. This has a considerable performance impact.
            email_force_tld (bool, optional): Only match e-mail addresses with a TLD, i.e. exclude something like
                                              john@doe. If enabled, it uses domain_force_tld and ip_force_routable settings
                                              to validate the host portion of the address. By default this is disabled.
            domain_force_tld (bool, optional): For domain validation, requires global IP or a valid TLD.
                                               By default this is disabled.
            ip_force_routable (bool, optional): For IP validation, requires globally routable IP.
                                               By default this is disabled.
            parse_attachments (bool, optional): Set this to false if you want to disable the parsing of attachments.
                                                Please note that HTML attachments as well as other text data marked to be
                                                in-lined, will always be parsed.
            include_www (bool, optional): Include potential URLs starting with www
            include_href (bool, optional): Include potential URLs in HREFs matching non-simple regular expressions

        """
        self.include_raw_body = include_raw_body
        self.include_attachment_data = include_attachment_data
        # If no pconf was specified, default to empty dict
        self.pconf = pconf or {}
        self.policy = policy or CustomPolicy()
        self.ignore_bad_start = ignore_bad_start
        self.email_force_tld = email_force_tld
        self.domain_force_tld = domain_force_tld
        self.ip_force_routable = ip_force_routable
        self.parse_attachments = parse_attachments
        self.include_www = include_www
        self.include_href = include_href
        self._psl = publicsuffixlist.PublicSuffixList(accept_unknown=not self.domain_force_tld)

        if self.email_force_tld:
            eml_parser.regexes.email_regex = eml_parser.regexes.email_force_tld_regex

        # If no whitelisting is required, set to emtpy list
        if 'whiteip' not in self.pconf:
            self.pconf['whiteip'] = []
        # If no whitelisting is required, set to emtpy list
        if 'whitefor' not in self.pconf:
            self.pconf['whitefor'] = []

        self.msg: email.message.Message | None = None

    def decode_email(self, eml_file: os.PathLike, ignore_bad_start: bool = False) -> dict:
        """Function for decoding an EML file into an easily parsable structure.

        Some intelligence is applied while parsing the file in order to work around
        broken files.
        Besides just parsing, this function also computes hashes and extracts meta
        information from the source file.

        Args:
            eml_file: Path to the file to be parsed. os.PathLike objects are supported.
            ignore_bad_start: Ignore invalid file start for this run. This has a considerable performance impact.

        Returns:
            dict: A dictionary with the content of the EML parsed and broken down into
                  key-value pairs.
        """
        eml_file_path = pathlib.Path(eml_file)

        with eml_file_path.open('rb') as fp:
            raw_email = fp.read()

        return self.decode_email_bytes(raw_email, ignore_bad_start=ignore_bad_start)

    def decode_email_bytes(self, eml_file: bytes, ignore_bad_start: bool = False) -> dict:
        """Function for decoding an EML file into an easily parsable structure.

        Some intelligence is applied while parsing the file in order to work around
        broken files.
        Besides just parsing, this function also computes hashes and extracts meta
        information from the source file.

        Args:
            eml_file: Contents of the raw EML file passed to this function as string.
            ignore_bad_start: Ignore invalid file start for this run. This has a considerable performance impact.

        Returns:
            dict: A dictionary with the content of the EML parsed and broken down into
                  key-value pairs.
        """
        if self.ignore_bad_start or ignore_bad_start:
            # Skip invalid start of file
            # Note that this has a considerable performance impact, which is why it is disabled by default.
            _eml_file = b''

            if b':' not in eml_file.split(b'\n', 1):
                start = True
                for line in eml_file.split(b'\n'):
                    if start and b':' not in line:
                        continue

                    start = False

                    _eml_file += line
            else:
                _eml_file = eml_file
        else:
            _eml_file = eml_file

        self.msg = email.message_from_bytes(_eml_file, policy=self.policy)

        return self.parse_email()

    def parse_email(self) -> dict:
        """Parse an e-mail and return a dictionary containing the various parts of the e-mail broken down into key-value pairs.

        Returns:
          dict: A dictionary with the content of the EML parsed and broken down into
                key-value pairs.
        """
        header: dict[str, typing.Any] = {}
        report_struc: dict[str, typing.Any] = {}  # Final structure
        headers_struc: dict[str, typing.Any] = {}  # header_structure
        bodys_struc: dict[str, typing.Any] = {}  # body structure

        if self.msg is None:
            raise ValueError('msg is not set.')

        # parse and decode subject
        subject = self.msg.get('subject', '')
        headers_struc['subject'] = eml_parser.decode.decode_field(subject)

        # If parsing had problems, report it
        if self.msg.defects:
            headers_struc['defect'] = []
            for exception in self.msg.defects:
                headers_struc['defect'].append(str(exception))

        # parse and decode "from"
        # @TODO verify if this hack is necessary for other e-mail fields as well
        try:
            msg_header_field = str(self.msg.get('from', '')).lower()
        except (IndexError, AttributeError):
            # We have hit current open issue #27257
            # https://bugs.python.org/issue27257
            # The field will be set to emtpy as a workaround.
            #
            logger.exception('We hit bug 27257!')

            _from = eml_parser.decode.workaround_bug_27257(self.msg, 'from')
            del self.msg['from']

            if _from:
                self.msg.add_header('from', _from[0])
                __from = _from[0].lower()
            else:
                self.msg.add_header('from', '')
                __from = ''

            msg_header_field = __from
        except ValueError:
            _field_item = eml_parser.decode.workaround_field_value_parsing_errors(self.msg, 'from')
            msg_header_field = eml_parser.decode.rfc2047_decode(_field_item[0]).lower()

        if msg_header_field != '':
            from_ = email.utils.parseaddr(msg_header_field)

            if (from_ and from_ == ('', '')) or not isinstance(from_, collections.abc.Sequence):
                m = eml_parser.regexes.email_regex.search(msg_header_field)
                if m:
                    headers_struc['from'] = m.group(1)
                else:
                    logger.warning('FROM header parsing failed.')
                    headers_struc['from'] = msg_header_field

            else:
                headers_struc['from'] = from_[1]

        # parse and decode "to"
        headers_struc['to'] = self.headeremail2list('to')
        # parse and decode "cc"
        headers_struc['cc'] = self.headeremail2list('cc')
        if not headers_struc['cc']:
            headers_struc.pop('cc')

        # parse and decode delivered-to
        headers_struc['delivered_to'] = self.headeremail2list('delivered-to')
        if not headers_struc['delivered_to']:
            headers_struc.pop('delivered_to')

        # parse and decode Date
        # If date field is present
        if 'date' in self.msg and self.msg.get('date') is not None:
            headers_struc['date'] = datetime.datetime.fromisoformat(typing.cast('str', self.msg.get('date')))
        else:
            # If date field is absent...
            headers_struc['date'] = datetime.datetime.fromisoformat(eml_parser.decode.default_date)

        # mail receiver path / parse any domain, e-mail
        # @TODO parse case where domain is specified but in parentheses only an IP
        headers_struc['received'] = []
        headers_struc['received_email'] = []
        headers_struc['received_domain'] = []
        headers_struc['received_ip'] = []
        try:
            found_smtpin: collections.Counter = collections.Counter()  # Array for storing potential duplicate "HOP"

            for received_line in self.msg.get_all('received', []):
                line = str(received_line).lower()

                received_line_flat = re.sub(r'(\r|\n|\s|\t)+', ' ', line, flags=re.UNICODE)

                # Parse and split routing headers.
                # Return dict of list
                #   date string
                #   from list
                #   for list
                #   by list
                #   with string
                #   warning list
                parsed_routing = eml_parser.routing.parserouting(received_line_flat)

                # If required collect the IP of the gateway that have injected the mail.
                # Iterate all parsed item and find IP
                # It is parsed from the MOST recent to the OLDEST (from IN > Out)
                # We match external IP from the most "OUT" Found.
                # Warning .. It may be spoofed !!
                # It add a warning if multiple identical items are found.

                if self.pconf.get('byhostentry'):
                    for by_item in parsed_routing.get('by', []):
                        for byhostentry_ in self.pconf['byhostentry']:
                            byhostentry = byhostentry_.lower()

                            if byhostentry in by_item:
                                # Save the last Found.. ( most external )
                                headers_struc['received_src'] = parsed_routing.get('from')

                                # Increment watched by detection counter, and warn if needed
                                found_smtpin[byhostentry] += 1
                                if found_smtpin[byhostentry] > 1:  # Twice found the header...
                                    if parsed_routing.get('warning'):
                                        parsed_routing['warning'].append(['Duplicate SMTP by entrypoint'])
                                    else:
                                        parsed_routing['warning'] = ['Duplicate SMTP by entrypoint']

                headers_struc['received'].append(parsed_routing)

                # Parse IPs in "received headers"
                ips_in_received_line = eml_parser.regexes.ipv6_regex.findall(received_line_flat) + eml_parser.regexes.ipv4_regex.findall(received_line_flat)
                for ip in ips_in_received_line:
                    if ip in self.pconf['whiteip']:
                        continue
                    valid_ip = self.get_valid_domain_or_ip(ip)
                    if valid_ip:
                        headers_struc['received_ip'].append(valid_ip)
                    else:
                        logger.debug('Invalid IP in received line - "%s"', ip)

                # search for domain
                for m in eml_parser.regexes.recv_dom_regex.findall(received_line_flat):
                    try:
                        _ = ipaddress.ip_address(m)  # type of findall is list[str], so this is correct
                    except ValueError:
                        # we find IPs using the previous IP crawler, hence we ignore them
                        # here.
                        # iff the regex fails, we add the entry
                        headers_struc['received_domain'].append(m)

                # search for e-mail addresses
                for mail_candidate in eml_parser.regexes.email_regex.findall(received_line_flat):
                    if self.email_force_tld:
                        mail_candidate = self.get_valid_domain_or_ip(mail_candidate)
                    if mail_candidate is not None and mail_candidate not in parsed_routing.get('for', []):
                        headers_struc['received_email'] += [mail_candidate]

        except TypeError:  # Ready to parse email without received headers.
            logger.exception('Exception occurred while parsing received lines.')

        # Concatenate for emails into one array | uniq
        # for rapid "find"
        headers_struc['received_foremail'] = []
        if 'received' in headers_struc:
            for _parsed_routing in headers_struc['received']:
                for itemfor in _parsed_routing.get('for', []):
                    if itemfor not in self.pconf['whitefor']:
                        headers_struc['received_foremail'].append(itemfor)

        # Uniq data found
        headers_struc['received_email'] = list(set(headers_struc['received_email']))
        headers_struc['received_domain'] = list(set(headers_struc['received_domain']))
        headers_struc['received_ip'] = list(set(headers_struc['received_ip']))

        # Clean up if empty
        if not headers_struc['received_email']:
            del headers_struc['received_email']

        if 'received_foremail' in headers_struc:
            if not headers_struc['received_foremail']:
                del headers_struc['received_foremail']
            else:
                headers_struc['received_foremail'] = list(set(headers_struc['received_foremail']))

        if not headers_struc['received_domain']:
            del headers_struc['received_domain']

        if not headers_struc['received_ip']:
            del headers_struc['received_ip']
        ####################

        # Parse text body
        raw_body = self.get_raw_body_text(self.msg)

        if self.include_raw_body:
            bodys_struc['raw_body'] = raw_body

        bodys = {}

        # Is it a multipart email ?
        if len(raw_body) == 1:
            multipart = False
        else:
            multipart = True

        for body_tup in raw_body:
            bodie: dict[str, typing.Any] = {}
            _, body, body_multhead, boundary = body_tup
            # Parse any URLs and mail found in the body
            list_observed_urls: list[str] = []
            list_observed_urls_noscheme: list[str] = []
            list_observed_email: typing.Counter[str] = Counter()
            list_observed_dom: typing.Counter[str] = Counter()
            list_observed_ip: typing.Counter[str] = Counter()

            # If we start directly a findall on 500K+ body we got time and memory issues...
            # if more than 4K.. lets cheat, we will cut around the thing we search "://, @, ."
            # in order to reduce regex complexity.
            for body_slice in self.string_sliding_window_loop(body):
                for url_match in self.get_uri_ondata(body_slice):
                    if ':/' in url_match[:10]:
                        list_observed_urls.append(url_match)
                    else:
                        list_observed_urls_noscheme.append(url_match)

                for match in eml_parser.regexes.email_regex.findall(body_slice):
                    valid_email = self.get_valid_domain_or_ip(match.lower())
                    if valid_email:
                        list_observed_email[match.lower()] = 1

                for match in eml_parser.regexes.dom_regex.findall(body_slice):
                    valid_domain = self.get_valid_domain_or_ip(match.lower())
                    if valid_domain:
                        list_observed_dom[match.lower()] = 1

                for ip_regex in (eml_parser.regexes.ipv4_regex, eml_parser.regexes.ipv6_regex):
                    for match in ip_regex.findall(body_slice):
                        valid_ip = self.get_valid_domain_or_ip(match.lower())
                        if valid_ip in self.pconf['whiteip']:
                            continue
                        if valid_ip:
                            list_observed_ip[valid_ip] = 1

            # Report uri,email and observed domain or hash if no raw body
            if self.include_raw_body:
                if list_observed_urls:
                    bodie['uri'] = list(set(list_observed_urls))

                if list_observed_urls_noscheme:
                    bodie['uri_noscheme'] = list(set(list_observed_urls_noscheme))

                if list_observed_email:
                    bodie['email'] = list(list_observed_email)

                if list_observed_dom:
                    bodie['domain'] = list(list_observed_dom)

                if list_observed_ip:
                    bodie['ip'] = list(list_observed_ip)

            else:
                if list_observed_urls:
                    bodie['uri_hash'] = []
                    for element in list_observed_urls:
                        bodie['uri_hash'].append(self.get_hash(element.lower(), 'sha256'))
                if list_observed_email:
                    bodie['email_hash'] = []
                    for element in list_observed_email:
                        # Email already lowered
                        bodie['email_hash'].append(self.get_hash(element, 'sha256'))
                if list_observed_dom:
                    bodie['domain_hash'] = []
                    # for uri in list(set(list_observed_dom)):
                    for element in list_observed_dom:
                        bodie['domain_hash'].append(self.get_hash(element, 'sha256'))
                if list_observed_ip:
                    bodie['ip_hash'] = []
                    for element in list_observed_ip:
                        # IP (v6) already lowered
                        bodie['ip_hash'].append(self.get_hash(element, 'sha256'))

            # For mail without multipart we will only get the "content....something" headers
            # all other headers are in "header"
            # but we need to convert header tuples in dict..
            # "a","toto"           a: [toto,titi]
            # "a","titi"   --->    c: [truc]
            # "c","truc"
            ch: dict[str, list] = {}
            for k, v in body_multhead:
                # make sure we are working with strings only
                v = str(v)

                # We are using replace . to : for avoiding issue in mongo
                k = k.lower().replace('.', ':')  # Lot of lowers, pre-compute :) .

                if multipart:
                    if k in ch:
                        ch[k].append(v)
                    else:
                        ch[k] = [v]
                elif k.startswith('content'):  # otherwise, we got all header headers
                    # if not multipart, store only content-xx related header with part
                    if k in ch:
                        ch[k].append(v)
                    else:
                        ch[k] = [v]

            bodie['content_header'] = ch  # Store content headers dict

            if self.include_raw_body:
                bodie['content'] = body

            # Sometimes bad people play with multiple header instances.
            # We "display" the "LAST" one .. as does thunderbird
            val = ch.get('content-type')
            if val:
                header_val = val[-1]
                bodie['content_type'] = header_val.split(';', 1)[0].strip()

            # Hash the body
            bodie['hash'] = hashlib.sha256(body.encode('utf-8')).hexdigest()

            if boundary is not None:
                # only include boundary key if there is a value set
                bodie['boundary'] = boundary

            uid = str(uuid.uuid1())
            bodys[uid] = bodie

        bodys_struc = bodys

        # Get all other bulk raw headers
        # "a","toto"           a: [toto,titi]
        # "a","titi"   --->    c: [truc]
        # "c","truc"
        #
        for k in set(self.msg.keys()):
            k = k.lower()  # Lot of lower, pre-compute...
            decoded_values = []

            try:
                for value in self.msg.get_all(k, []):
                    if value:
                        decoded_values.append(value)
            except (IndexError, AttributeError, TypeError):
                # We have hit a field value parsing error.
                # Try to work around this by using a relaxed policy, if possible.
                # Parsing might not give meaningful results in this case!
                logger.error('ERROR: Field value parsing error, trying to work around this!')
                decoded_values = eml_parser.decode.workaround_field_value_parsing_errors(self.msg, k)
            except ValueError:
                # extract values using a relaxed policy
                for _field in eml_parser.decode.workaround_field_value_parsing_errors(self.msg, k):
                    # check if this is a RFC2047 encoded field
                    if eml_parser.regexes.email_regex_rfc2047.search(_field):
                        decoded_values.append(eml_parser.decode.rfc2047_decode(_field))
                    else:
                        logger.error('ERROR: Field value parsing error, trying to work around this! - %s', _field)

            if decoded_values:
                if k in header:
                    header[k] += decoded_values
                else:
                    header[k] = decoded_values

        headers_struc['header'] = header

        # parse attachments
        if self.parse_attachments:
            try:
                report_struc['attachment'] = self.traverse_multipart(self.msg, 0)
            except (binascii.Error, AssertionError):
                # we hit this exception if the payload contains invalid data
                logger.exception('Exception occurred while parsing attachment data. Collected data will not be complete!')
                report_struc['attachment'] = None

            # Dirty hack... transform hash into list.. need to be done in the function.
            # Mandatory to search efficiently in mongodb
            # See Bug 11 of eml_parser
            if not report_struc['attachment']:
                del report_struc['attachment']
            else:
                newattach = []
                for attachment in report_struc['attachment']:
                    newattach.append(report_struc['attachment'][attachment])
                report_struc['attachment'] = newattach

        newbody = []
        for _, body in bodys_struc.items():
            newbody.append(body)
        report_struc['body'] = newbody
        # End of dirty hack

        # Get all other bulk headers
        report_struc['header'] = headers_struc

        return report_struc

    @staticmethod
    def string_sliding_window_loop(body: str, slice_step: int = 500, max_distance: int = 100) -> typing.Iterator[str]:
        """Yield a more or less constant slice of a large string.

        If we directly do a *regex* findall on 500K+ body we get time and memory issues.
        If more than the configured slice step, lets cheat, we will cut around the thing we search "://, @, ."
        in order to reduce regex complexity.

        In case we find a *://* at the first 8 characters of a sliced body window, we rewind the window by 16 characters.
        If we find the same string at the end of a sliced body window we try to look for invalid URL characters up to *max_distance*
        length, until which we give up and return the sliced body part. This is done in order to return a maximum possible
        correct URLs.

        The choice for 8 character is because *https://* is 8 characters, which is the maximum size we accept for schemes.

        Args:
            body: Body to slice into smaller pieces.
            slice_step: Slice this number or characters.
            max_distance: In case we find a *://* in a string window towards the end, we try our best to enlarge the window
                            as to not cut off URLs. This variable sets the maximum permitted additional window size to consider.

        Returns:
            typing.Iterator[str]: Sliced body string.
        """
        body_length = len(body)

        if body_length <= slice_step:
            yield body

        else:
            ptr_start = 0

            for ptr_end in range(slice_step, body_length + slice_step, slice_step):
                if ' ' in body[ptr_end - 1 : ptr_end]:
                    while not (eml_parser.regexes.window_slice_regex.match(body[ptr_end - 1 : ptr_end]) or ptr_end > body_length):
                        if ptr_end > body_length:
                            ptr_end = body_length
                            break

                        ptr_end += 1

                # Found a :// near the start of the slice, rewind
                if ptr_start > 16 and '://' in body[ptr_start - 8 : ptr_start + 8]:
                    ptr_start -= 16

                # Found a :// near the end of the slice, rewind from that location
                if ptr_end < body_length and '://' in body[ptr_end - 8 : ptr_end + 8]:
                    pos = body.rfind('://', ptr_end - 8, ptr_end + 8)
                    ptr_end = pos - 8

                # Found a :// within the slice; try to expand the slice until we find an invalid
                # URL character in order to avoid cutting off URLs
                if '://' in body[ptr_start:ptr_end] and not body[ptr_end - 1 : ptr_end] == ' ':
                    distance = 1

                    while body[ptr_end - 1 : ptr_end] not in (' ', '>') and distance < max_distance and ptr_end <= body_length:
                        distance += 1
                        ptr_end += 1

                yield body[ptr_start:ptr_end]

                ptr_start = ptr_end

    def get_valid_domain_or_ip(self, data: str) -> str | None:
        """Function to determine if an IP address, Email address, or Domain is valid.

        Args:
            data (str): Text input which should be validated.

        Returns:
            str: Returns a string of the validated IP address or the host.
        """
        host = data.rpartition('@')[-1].strip(' \r\n\t[]')
        try:
            # Zone index support was added to ipaddress in Python 3.9
            addr, _, _ = host.partition('%')
            valid_ip = ipaddress.ip_address(addr)
            if self.ip_force_routable:
                # Not a precise filter for IPv4/IPv6 addresses. Can be enhanced with pconf whiteip ranges
                if valid_ip.is_global and not valid_ip.is_reserved:
                    return str(valid_ip)
            else:
                return str(valid_ip)
        except ValueError:
            # _psl uses self.domain_force_tld
            valid_domain = self._psl.publicsuffix(host)
            if valid_domain:
                return host

        return None

    def clean_found_uri(self, url: str) -> str | None:
        """Function for validating URLs from the input string.

        Args:
            url (str): Text input which should have a single URL validated.

        Returns:
            str: Returns a valid URL, if found in the input string.
        """
        if '.' not in url and '[' not in url:
            # if we found a URL like e.g. http://afafasasfasfas; that makes no
            # sense, thus skip it, but include http://[2001:db8::1]
            return None

        try:
            # Remove leading spaces and quote characters
            url = url.lstrip(' \t\n\r\f\v\'"«»“”‘’').replace('\r', '').replace('\n', '')
            url = urllib.parse.urlparse(url).geturl()
            scheme_url = url
            if ':/' not in scheme_url:
                scheme_url = 'noscheme://' + url

            _hostname = urllib.parse.urlparse(scheme_url).hostname

            if _hostname is None:
                return None

            host = _hostname.rstrip('.')

            if self.get_valid_domain_or_ip(host) is None:
                return None
        except ValueError:
            logger.warning('Unable to parse URL - %s', url)
            return None

        # let's try to be smart by stripping of noisy bogus parts
        url = re.split(r"""[', ")}\\]""", url, maxsplit=1)[0]

        # filter bogus URLs
        if url.endswith('://'):
            return None

        if '&' in url:
            url = unescape(url)

        return url

    def get_uri_ondata(self, body: str) -> list[str]:
        """Function for extracting URLs from the input string.

        Args:
            body (str): Text input which should be searched for URLs.

        Returns:
            list: Returns a list of URLs found in the input string.
        """
        list_observed_urls: typing.Counter[str] = Counter()

        if self.include_www:
            for found_url in eml_parser.regexes.url_regex_www.findall(body):
                for found_url_split in eml_parser.regexes.url_regex_www_comma.split(found_url):
                    clean_uri = self.clean_found_uri(found_url_split)
                    if clean_uri is not None:
                        list_observed_urls[clean_uri] = 1
        else:
            for found_url in eml_parser.regexes.url_regex_simple.findall(body):
                for found_url_split in eml_parser.regexes.url_regex_comma.split(found_url):
                    clean_uri = self.clean_found_uri(found_url_split)
                    if clean_uri is not None:
                        list_observed_urls[clean_uri] = 1

        if self.include_href:
            for found_url in eml_parser.regexes.url_regex_href.findall(body):
                clean_uri = self.clean_found_uri(found_url)
                if clean_uri is not None:
                    list_observed_urls[clean_uri] = 1

        return list(list_observed_urls)

    def headeremail2list(self, header: str) -> list[str]:
        """Parses a given header field with e-mail addresses to a list of e-mail addresses.

        Args:
            header (str): The header field to decode.

        Returns:
            list: Returns a list of strings which represent e-mail addresses.
        """
        if self.msg is None:
            raise ValueError('msg is not set.')

        try:
            field = email.utils.getaddresses(self.msg.get_all(header, []))
        except (IndexError, AttributeError):
            field = email.utils.getaddresses(eml_parser.decode.workaround_bug_27257(self.msg, header))
        except ValueError:
            _field = eml_parser.decode.workaround_field_value_parsing_errors(self.msg, header)
            field = []

            for v in _field:
                v = eml_parser.decode.rfc2047_decode(v).replace('\n', '').replace('\r', '')

                parsing_result: dict[str, typing.Any] = {}
                parser_cls = typing.cast('email.headerregistry.AddressHeader', email.headerregistry.HeaderRegistry()[header])
                parser_cls.parse(v, parsing_result)
                for _group in parsing_result['groups']:
                    for _address in _group.addresses:
                        field.append((_address.display_name, _address.addr_spec))

        return_field = []

        for m in field:
            if not m[1] == '':
                if self.email_force_tld:
                    if eml_parser.regexes.email_force_tld_regex.match(m[1]):
                        return_field.append(m[1].lower())
                else:
                    return_field.append(m[1].lower())

        return return_field

    def get_raw_body_text(
        self, msg: email.message.Message, boundary: str | None = None, depth: int = 0
    ) -> list[tuple[typing.Any, typing.Any, typing.Any, str | None]]:
        """This method recursively retrieves all e-mail body parts and returns them as a list.

        Args:
            msg (email.message.Message): The actual e-mail message or sub-message.
            boundary: Used for passing the boundary marker of multipart messages, and used to easier distinguish different parts.
            depth: Parameter used to track the current recursion level.

        Returns:
            list: Returns a list of sets which are in the form of "set(encoding, raw_body_string, message field headers, possible boundary marker)"

        Raises:
            RecursionError: If recursion limit exceeded.
        """
        raw_body: list[tuple[typing.Any, typing.Any, typing.Any, str | None]] = []

        if depth > EmlParser.MULTIPART_RECURSION_LIMIT:
            logger.warning('multi-part nesting limit (%d) exceeded, aborting', EmlParser.MULTIPART_RECURSION_LIMIT)
            raise RecursionError('Recursion limit exceeded')

        if msg.is_multipart():
            boundary = msg.get_boundary(failobj=None)
            for part in msg.get_payload():
                raw_body.extend(self.get_raw_body_text(typing.cast('email.message.Message', part), boundary=boundary, depth=depth + 1))
        else:
            # Treat text document attachments as belonging to the body of the mail.
            # Attachments with a file-extension of .htm/.html are implicitly treated
            # as text as well in order not to escape later checks (e.g. URL scan).

            try:
                filename = msg.get_filename('').lower()
            except (binascii.Error, AssertionError):
                logger.exception('Exception occurred while trying to parse the content-disposition header. Collected data will not be complete.')
                filename = ''

            # pylint: disable=too-many-boolean-expressions
            if (
                ('content-disposition' not in msg and msg.get_content_maintype() == 'text')
                or (filename.endswith(('.html', '.htm')))
                or ('content-disposition' in msg and msg.get_content_disposition() == 'inline' and msg.get_content_maintype() == 'text')
            ):
                encoding = msg.get('content-transfer-encoding', '').lower()

                charset = msg.get_content_charset()
                if charset is None:
                    raw_body_b = typing.cast('bytes', msg.get_payload(decode=True))
                    raw_body_str = eml_parser.decode.decode_string(raw_body_b, None)
                else:
                    try:
                        raw_body_str = typing.cast('bytes', msg.get_payload(decode=True)).decode(charset, 'ignore')
                    except (LookupError, ValueError):
                        logger.debug('An exception occurred while decoding the payload!', exc_info=True)
                        raw_body_str = typing.cast('bytes', msg.get_payload(decode=True)).decode('ascii', 'ignore')

                # In case we hit bug 27257 or any other parsing error, try to downgrade the used policy
                try:
                    raw_body.append((encoding, raw_body_str, msg.items(), boundary))
                except (AttributeError, TypeError, ValueError):
                    former_policy: email.policy.Policy = msg.policy
                    msg.policy = email.policy.compat32
                    raw_body.append((encoding, raw_body_str, msg.items(), boundary))
                    msg.policy = former_policy

        return raw_body

    @staticmethod
    def get_file_hash(data: bytes) -> dict[str, str]:
        """Generate hashes of various types (``MD5``, ``SHA-1``, ``SHA-256``, ``SHA-512``) for the provided data.

        Args:
          data (bytes): The data to calculate the hashes on.

        Returns:
          dict: Returns a dict with as key the hash-type and value the calculated hash.
        """
        hashalgo = ['md5', 'sha1', 'sha256', 'sha512']
        return {k: EmlParser.get_hash(data, k) for k in hashalgo}

    @staticmethod
    def get_hash(value: str | bytes, hash_type: str) -> str:
        """Generate a hash of type *hash_type* for a given value.

        Args:
            value: String or bytes object to calculate the hash on.
            hash_type: Hash type to use, can be any of 'md5', 'sha1', 'sha256', 'sha512'.

        Returns:
            str: Returns the calculated hash as a string.
        """
        if hash_type not in ('md5', 'sha1', 'sha256', 'sha512'):
            raise ValueError(f'Invalid hash type requested - "{hash_type}"')

        if isinstance(value, str):
            _value = value.encode('utf-8')
        else:
            _value = value

        hash_algo = getattr(hashlib, hash_type)

        return hash_algo(_value).hexdigest()

    def traverse_multipart(self, msg: email.message.Message, counter: int = 0, depth: int = 0) -> dict[str, typing.Any]:
        """Recursively traverses all e-mail message multi-part elements and returns in a parsed form as a dict.

        Args:
            msg (email.message.Message): An e-mail message object.
            counter (int, optional): A counter which is used for generating attachments
                file-names in case there are none found in the header. Default = 0.
            depth: Parameter used to track the current recursion level.

        Returns:
            dict: Returns a dict with all original multi-part headers as well as generated hash check-sums,
                date size, file extension, real mime-type.

        Raises:
            RecursionError: If recursion limit exceeded.
        """
        attachments = {}

        if depth > EmlParser.MULTIPART_RECURSION_LIMIT:
            logger.warning('multi-part nesting limit (%d) exceeded, aborting', EmlParser.MULTIPART_RECURSION_LIMIT)
            raise RecursionError('Recursion limit exceeded')

        if msg.is_multipart():
            if 'content-type' in msg:
                if msg.get_content_type() == 'message/rfc822':
                    # This is an e-mail message attachment, add it to the attachment list apart from parsing it
                    attachments.update(self.prepare_multipart_part_attachment(msg, counter))

            for part in msg.get_payload():
                attachments.update(self.traverse_multipart(typing.cast('email.message.EmailMessage', part), counter=counter, depth=depth + 1))
        else:
            return self.prepare_multipart_part_attachment(msg, counter)

        return attachments

    def prepare_multipart_part_attachment(self, msg: email.message.Message, counter: int = 0) -> dict[str, typing.Any]:
        """Extract meta-information from a multipart-part.

        Args:
            msg (email.message.Message): An e-mail message object.
            counter (int, optional): A counter which is used for generating attachments
                file-names in case there are none found in the header. Default = 0.

        Returns:
            dict: Returns a dict with original multi-part headers as well as generated hash check-sums,
                date size, file extension, real mime-type.
        """
        attachment: dict[str, typing.Any] = {}

        # In case we hit bug 27257, try to downgrade the used policy
        try:
            lower_keys = [k.lower() for k in msg.keys()]
        except AttributeError:
            former_policy: email.policy.Policy = msg.policy
            msg.policy = email.policy.compat32
            lower_keys = [k.lower() for k in msg.keys()]
            msg.policy = former_policy

        if ('content-disposition' in lower_keys and msg.get_content_disposition() != 'inline') or msg.get_content_maintype() != 'text':
            # if it's an attachment-type, pull out the filename
            # and calculate the size in bytes
            if msg.get_content_type() == 'message/rfc822':
                payload = msg.get_payload()
                if len(payload) > 1:
                    logger.warning('More than one payload for "message/rfc822" part detected. This is not supported, please report!')

                try:
                    custom_policy: email.policy.Policy = email.policy.default.clone(max_line_length=0)
                    data = typing.cast('list[email.message.EmailMessage]', payload)[0].as_bytes(policy=custom_policy)
                except UnicodeEncodeError:
                    custom_policy = email.policy.compat32.clone(max_line_length=0)
                    data = typing.cast('list[email.message.EmailMessage]', payload)[0].as_bytes(policy=custom_policy)

                file_size = len(data)
            else:
                data = typing.cast('bytes', msg.get_payload(decode=True))
                file_size = len(data)

            filename = msg.get_filename('')
            if filename == '':
                filename = f'part-{counter:03d}'
            else:
                filename = eml_parser.decode.decode_field(filename)

            file_id = str(uuid.uuid1())
            attachment[file_id] = {}
            attachment[file_id]['filename'] = filename
            attachment[file_id]['size'] = file_size

            # in case there is no extension pathlib.Path(filename).suffix returns an empty string
            extension = pathlib.Path(filename).suffix
            if extension:
                # strip leading dot and lower-case
                attachment[file_id]['extension'] = extension[1:].lower()

            attachment[file_id]['hash'] = self.get_file_hash(data)

            mime_type, mime_type_short = self.get_mime_type(data)

            if not (mime_type is None or mime_type_short is None):
                attachment[file_id]['mime_type'] = mime_type
                # attachments[file_id]['mime_type_short'] = attachments[file_id]['mime_type'].split(",")[0]
                attachment[file_id]['mime_type_short'] = mime_type_short
            elif magic is not None:
                logger.warning('Error determining attachment mime-type - "%s"', str(file_id))

            if self.include_attachment_data:
                attachment[file_id]['raw'] = base64.b64encode(data)

            ch: dict[str, list[str]] = {}
            for k, v in msg.items():
                k = k.lower()
                v = str(v)

                if k in ch:
                    ch[k].append(v)
                else:
                    ch[k] = [v]

            attachment[file_id]['content_header'] = ch

            counter += 1

        return attachment

    @staticmethod
    def get_mime_type(data: bytes) -> tuple[str, str] | tuple[None, None]:
        """Get mime-type information based on the provided bytes object.

        Args:
            data: Binary data.

        Returns:
            typing.Tuple[str, str]: Identified mime information and mime-type. If **magic** is not available, returns *None, None*.
                                    E.g. *"ELF 64-bit LSB shared object, x86-64, version 1 (SYSV)", "application/x-sharedlib"*
        """
        if magic is None:
            return None, None

        detected = magic.detect_from_content(data)
        return detected.name, detected.mime_type

__init__(include_raw_body=False, include_attachment_data=False, pconf=None, policy=None, ignore_bad_start=False, email_force_tld=False, domain_force_tld=False, ip_force_routable=False, parse_attachments=True, include_www=True, include_href=True)

Initialisation.

Parameters:

Name Type Description Default
include_raw_body bool

Boolean parameter which indicates whether to include the original file contents in the returned structure. Default is False.

False
include_attachment_data bool

Boolean parameter which indicates whether to include raw attachment data in the returned structure. Default is False.

False
pconf dict

A dict with various optional configuration parameters, e.g. whitelist IPs, whitelist e-mail addresses, etc.

None
policy CustomPolicy

Policy to use when parsing e-mails. Default = CustomPolicy.

None
ignore_bad_start bool

Ignore invalid file start. This has a considerable performance impact.

False
email_force_tld bool

Only match e-mail addresses with a TLD, i.e. exclude something like john@doe. If enabled, it uses domain_force_tld and ip_force_routable settings to validate the host portion of the address. By default this is disabled.

False
domain_force_tld bool

For domain validation, requires global IP or a valid TLD. By default this is disabled.

False
ip_force_routable bool

For IP validation, requires globally routable IP. By default this is disabled.

False
parse_attachments bool

Set this to false if you want to disable the parsing of attachments. Please note that HTML attachments as well as other text data marked to be in-lined, will always be parsed.

True
include_www bool

Include potential URLs starting with www

True
include_href bool

Include potential URLs in HREFs matching non-simple regular expressions

True
Source code in eml_parser/parser.py
def __init__(
    self,
    include_raw_body: bool = False,
    include_attachment_data: bool = False,
    pconf: dict | None = None,
    policy: email.policy.Policy | None = None,
    ignore_bad_start: bool = False,
    email_force_tld: bool = False,
    domain_force_tld: bool = False,
    ip_force_routable: bool = False,
    parse_attachments: bool = True,
    include_www: bool = True,
    include_href: bool = True,
) -> None:
    """Initialisation.

    Args:
        include_raw_body (bool, optional): Boolean parameter which indicates whether
                                           to include the original file contents in
                                           the returned structure. Default is False.
        include_attachment_data (bool, optional): Boolean parameter which indicates whether
                                                  to include raw attachment data in the
                                                  returned structure. Default is False.
        pconf (dict, optional): A dict with various optional configuration parameters,
                                e.g. whitelist IPs, whitelist e-mail addresses, etc.
        policy (CustomPolicy, optional): Policy to use when parsing e-mails.
                                                Default = CustomPolicy.
        ignore_bad_start (bool, optional): Ignore invalid file start. This has a considerable performance impact.
        email_force_tld (bool, optional): Only match e-mail addresses with a TLD, i.e. exclude something like
                                          john@doe. If enabled, it uses domain_force_tld and ip_force_routable settings
                                          to validate the host portion of the address. By default this is disabled.
        domain_force_tld (bool, optional): For domain validation, requires global IP or a valid TLD.
                                           By default this is disabled.
        ip_force_routable (bool, optional): For IP validation, requires globally routable IP.
                                           By default this is disabled.
        parse_attachments (bool, optional): Set this to false if you want to disable the parsing of attachments.
                                            Please note that HTML attachments as well as other text data marked to be
                                            in-lined, will always be parsed.
        include_www (bool, optional): Include potential URLs starting with www
        include_href (bool, optional): Include potential URLs in HREFs matching non-simple regular expressions

    """
    self.include_raw_body = include_raw_body
    self.include_attachment_data = include_attachment_data
    # If no pconf was specified, default to empty dict
    self.pconf = pconf or {}
    self.policy = policy or CustomPolicy()
    self.ignore_bad_start = ignore_bad_start
    self.email_force_tld = email_force_tld
    self.domain_force_tld = domain_force_tld
    self.ip_force_routable = ip_force_routable
    self.parse_attachments = parse_attachments
    self.include_www = include_www
    self.include_href = include_href
    self._psl = publicsuffixlist.PublicSuffixList(accept_unknown=not self.domain_force_tld)

    if self.email_force_tld:
        eml_parser.regexes.email_regex = eml_parser.regexes.email_force_tld_regex

    # If no whitelisting is required, set to emtpy list
    if 'whiteip' not in self.pconf:
        self.pconf['whiteip'] = []
    # If no whitelisting is required, set to emtpy list
    if 'whitefor' not in self.pconf:
        self.pconf['whitefor'] = []

    self.msg: email.message.Message | None = None

clean_found_uri(url)

Function for validating URLs from the input string.

Parameters:

Name Type Description Default
url str

Text input which should have a single URL validated.

required

Returns:

Name Type Description
str str | None

Returns a valid URL, if found in the input string.

Source code in eml_parser/parser.py
def clean_found_uri(self, url: str) -> str | None:
    """Function for validating URLs from the input string.

    Args:
        url (str): Text input which should have a single URL validated.

    Returns:
        str: Returns a valid URL, if found in the input string.
    """
    if '.' not in url and '[' not in url:
        # if we found a URL like e.g. http://afafasasfasfas; that makes no
        # sense, thus skip it, but include http://[2001:db8::1]
        return None

    try:
        # Remove leading spaces and quote characters
        url = url.lstrip(' \t\n\r\f\v\'"«»“”‘’').replace('\r', '').replace('\n', '')
        url = urllib.parse.urlparse(url).geturl()
        scheme_url = url
        if ':/' not in scheme_url:
            scheme_url = 'noscheme://' + url

        _hostname = urllib.parse.urlparse(scheme_url).hostname

        if _hostname is None:
            return None

        host = _hostname.rstrip('.')

        if self.get_valid_domain_or_ip(host) is None:
            return None
    except ValueError:
        logger.warning('Unable to parse URL - %s', url)
        return None

    # let's try to be smart by stripping of noisy bogus parts
    url = re.split(r"""[', ")}\\]""", url, maxsplit=1)[0]

    # filter bogus URLs
    if url.endswith('://'):
        return None

    if '&' in url:
        url = unescape(url)

    return url

decode_email(eml_file, ignore_bad_start=False)

Function for decoding an EML file into an easily parsable structure.

Some intelligence is applied while parsing the file in order to work around broken files. Besides just parsing, this function also computes hashes and extracts meta information from the source file.

Parameters:

Name Type Description Default
eml_file PathLike

Path to the file to be parsed. os.PathLike objects are supported.

required
ignore_bad_start bool

Ignore invalid file start for this run. This has a considerable performance impact.

False

Returns:

Name Type Description
dict dict

A dictionary with the content of the EML parsed and broken down into key-value pairs.

Source code in eml_parser/parser.py
def decode_email(self, eml_file: os.PathLike, ignore_bad_start: bool = False) -> dict:
    """Function for decoding an EML file into an easily parsable structure.

    Some intelligence is applied while parsing the file in order to work around
    broken files.
    Besides just parsing, this function also computes hashes and extracts meta
    information from the source file.

    Args:
        eml_file: Path to the file to be parsed. os.PathLike objects are supported.
        ignore_bad_start: Ignore invalid file start for this run. This has a considerable performance impact.

    Returns:
        dict: A dictionary with the content of the EML parsed and broken down into
              key-value pairs.
    """
    eml_file_path = pathlib.Path(eml_file)

    with eml_file_path.open('rb') as fp:
        raw_email = fp.read()

    return self.decode_email_bytes(raw_email, ignore_bad_start=ignore_bad_start)

decode_email_bytes(eml_file, ignore_bad_start=False)

Function for decoding an EML file into an easily parsable structure.

Some intelligence is applied while parsing the file in order to work around broken files. Besides just parsing, this function also computes hashes and extracts meta information from the source file.

Parameters:

Name Type Description Default
eml_file bytes

Contents of the raw EML file passed to this function as string.

required
ignore_bad_start bool

Ignore invalid file start for this run. This has a considerable performance impact.

False

Returns:

Name Type Description
dict dict

A dictionary with the content of the EML parsed and broken down into key-value pairs.

Source code in eml_parser/parser.py
def decode_email_bytes(self, eml_file: bytes, ignore_bad_start: bool = False) -> dict:
    """Function for decoding an EML file into an easily parsable structure.

    Some intelligence is applied while parsing the file in order to work around
    broken files.
    Besides just parsing, this function also computes hashes and extracts meta
    information from the source file.

    Args:
        eml_file: Contents of the raw EML file passed to this function as string.
        ignore_bad_start: Ignore invalid file start for this run. This has a considerable performance impact.

    Returns:
        dict: A dictionary with the content of the EML parsed and broken down into
              key-value pairs.
    """
    if self.ignore_bad_start or ignore_bad_start:
        # Skip invalid start of file
        # Note that this has a considerable performance impact, which is why it is disabled by default.
        _eml_file = b''

        if b':' not in eml_file.split(b'\n', 1):
            start = True
            for line in eml_file.split(b'\n'):
                if start and b':' not in line:
                    continue

                start = False

                _eml_file += line
        else:
            _eml_file = eml_file
    else:
        _eml_file = eml_file

    self.msg = email.message_from_bytes(_eml_file, policy=self.policy)

    return self.parse_email()

get_file_hash(data) staticmethod

Generate hashes of various types (MD5, SHA-1, SHA-256, SHA-512) for the provided data.

Parameters:

Name Type Description Default
data bytes

The data to calculate the hashes on.

required

Returns:

Name Type Description
dict dict[str, str]

Returns a dict with as key the hash-type and value the calculated hash.

Source code in eml_parser/parser.py
@staticmethod
def get_file_hash(data: bytes) -> dict[str, str]:
    """Generate hashes of various types (``MD5``, ``SHA-1``, ``SHA-256``, ``SHA-512``) for the provided data.

    Args:
      data (bytes): The data to calculate the hashes on.

    Returns:
      dict: Returns a dict with as key the hash-type and value the calculated hash.
    """
    hashalgo = ['md5', 'sha1', 'sha256', 'sha512']
    return {k: EmlParser.get_hash(data, k) for k in hashalgo}

get_hash(value, hash_type) staticmethod

Generate a hash of type hash_type for a given value.

Parameters:

Name Type Description Default
value str | bytes

String or bytes object to calculate the hash on.

required
hash_type str

Hash type to use, can be any of 'md5', 'sha1', 'sha256', 'sha512'.

required

Returns:

Name Type Description
str str

Returns the calculated hash as a string.

Source code in eml_parser/parser.py
@staticmethod
def get_hash(value: str | bytes, hash_type: str) -> str:
    """Generate a hash of type *hash_type* for a given value.

    Args:
        value: String or bytes object to calculate the hash on.
        hash_type: Hash type to use, can be any of 'md5', 'sha1', 'sha256', 'sha512'.

    Returns:
        str: Returns the calculated hash as a string.
    """
    if hash_type not in ('md5', 'sha1', 'sha256', 'sha512'):
        raise ValueError(f'Invalid hash type requested - "{hash_type}"')

    if isinstance(value, str):
        _value = value.encode('utf-8')
    else:
        _value = value

    hash_algo = getattr(hashlib, hash_type)

    return hash_algo(_value).hexdigest()

get_mime_type(data) staticmethod

Get mime-type information based on the provided bytes object.

Parameters:

Name Type Description Default
data bytes

Binary data.

required

Returns:

Type Description
tuple[str, str] | tuple[None, None]

typing.Tuple[str, str]: Identified mime information and mime-type. If magic is not available, returns None, None. E.g. "ELF 64-bit LSB shared object, x86-64, version 1 (SYSV)", "application/x-sharedlib"

Source code in eml_parser/parser.py
@staticmethod
def get_mime_type(data: bytes) -> tuple[str, str] | tuple[None, None]:
    """Get mime-type information based on the provided bytes object.

    Args:
        data: Binary data.

    Returns:
        typing.Tuple[str, str]: Identified mime information and mime-type. If **magic** is not available, returns *None, None*.
                                E.g. *"ELF 64-bit LSB shared object, x86-64, version 1 (SYSV)", "application/x-sharedlib"*
    """
    if magic is None:
        return None, None

    detected = magic.detect_from_content(data)
    return detected.name, detected.mime_type

get_raw_body_text(msg, boundary=None, depth=0)

This method recursively retrieves all e-mail body parts and returns them as a list.

Parameters:

Name Type Description Default
msg Message

The actual e-mail message or sub-message.

required
boundary str | None

Used for passing the boundary marker of multipart messages, and used to easier distinguish different parts.

None
depth int

Parameter used to track the current recursion level.

0

Returns:

Name Type Description
list list[tuple[Any, Any, Any, str | None]]

Returns a list of sets which are in the form of "set(encoding, raw_body_string, message field headers, possible boundary marker)"

Raises:

Type Description
RecursionError

If recursion limit exceeded.

Source code in eml_parser/parser.py
def get_raw_body_text(
    self, msg: email.message.Message, boundary: str | None = None, depth: int = 0
) -> list[tuple[typing.Any, typing.Any, typing.Any, str | None]]:
    """This method recursively retrieves all e-mail body parts and returns them as a list.

    Args:
        msg (email.message.Message): The actual e-mail message or sub-message.
        boundary: Used for passing the boundary marker of multipart messages, and used to easier distinguish different parts.
        depth: Parameter used to track the current recursion level.

    Returns:
        list: Returns a list of sets which are in the form of "set(encoding, raw_body_string, message field headers, possible boundary marker)"

    Raises:
        RecursionError: If recursion limit exceeded.
    """
    raw_body: list[tuple[typing.Any, typing.Any, typing.Any, str | None]] = []

    if depth > EmlParser.MULTIPART_RECURSION_LIMIT:
        logger.warning('multi-part nesting limit (%d) exceeded, aborting', EmlParser.MULTIPART_RECURSION_LIMIT)
        raise RecursionError('Recursion limit exceeded')

    if msg.is_multipart():
        boundary = msg.get_boundary(failobj=None)
        for part in msg.get_payload():
            raw_body.extend(self.get_raw_body_text(typing.cast('email.message.Message', part), boundary=boundary, depth=depth + 1))
    else:
        # Treat text document attachments as belonging to the body of the mail.
        # Attachments with a file-extension of .htm/.html are implicitly treated
        # as text as well in order not to escape later checks (e.g. URL scan).

        try:
            filename = msg.get_filename('').lower()
        except (binascii.Error, AssertionError):
            logger.exception('Exception occurred while trying to parse the content-disposition header. Collected data will not be complete.')
            filename = ''

        # pylint: disable=too-many-boolean-expressions
        if (
            ('content-disposition' not in msg and msg.get_content_maintype() == 'text')
            or (filename.endswith(('.html', '.htm')))
            or ('content-disposition' in msg and msg.get_content_disposition() == 'inline' and msg.get_content_maintype() == 'text')
        ):
            encoding = msg.get('content-transfer-encoding', '').lower()

            charset = msg.get_content_charset()
            if charset is None:
                raw_body_b = typing.cast('bytes', msg.get_payload(decode=True))
                raw_body_str = eml_parser.decode.decode_string(raw_body_b, None)
            else:
                try:
                    raw_body_str = typing.cast('bytes', msg.get_payload(decode=True)).decode(charset, 'ignore')
                except (LookupError, ValueError):
                    logger.debug('An exception occurred while decoding the payload!', exc_info=True)
                    raw_body_str = typing.cast('bytes', msg.get_payload(decode=True)).decode('ascii', 'ignore')

            # In case we hit bug 27257 or any other parsing error, try to downgrade the used policy
            try:
                raw_body.append((encoding, raw_body_str, msg.items(), boundary))
            except (AttributeError, TypeError, ValueError):
                former_policy: email.policy.Policy = msg.policy
                msg.policy = email.policy.compat32
                raw_body.append((encoding, raw_body_str, msg.items(), boundary))
                msg.policy = former_policy

    return raw_body

get_uri_ondata(body)

Function for extracting URLs from the input string.

Parameters:

Name Type Description Default
body str

Text input which should be searched for URLs.

required

Returns:

Name Type Description
list list[str]

Returns a list of URLs found in the input string.

Source code in eml_parser/parser.py
def get_uri_ondata(self, body: str) -> list[str]:
    """Function for extracting URLs from the input string.

    Args:
        body (str): Text input which should be searched for URLs.

    Returns:
        list: Returns a list of URLs found in the input string.
    """
    list_observed_urls: typing.Counter[str] = Counter()

    if self.include_www:
        for found_url in eml_parser.regexes.url_regex_www.findall(body):
            for found_url_split in eml_parser.regexes.url_regex_www_comma.split(found_url):
                clean_uri = self.clean_found_uri(found_url_split)
                if clean_uri is not None:
                    list_observed_urls[clean_uri] = 1
    else:
        for found_url in eml_parser.regexes.url_regex_simple.findall(body):
            for found_url_split in eml_parser.regexes.url_regex_comma.split(found_url):
                clean_uri = self.clean_found_uri(found_url_split)
                if clean_uri is not None:
                    list_observed_urls[clean_uri] = 1

    if self.include_href:
        for found_url in eml_parser.regexes.url_regex_href.findall(body):
            clean_uri = self.clean_found_uri(found_url)
            if clean_uri is not None:
                list_observed_urls[clean_uri] = 1

    return list(list_observed_urls)

get_valid_domain_or_ip(data)

Function to determine if an IP address, Email address, or Domain is valid.

Parameters:

Name Type Description Default
data str

Text input which should be validated.

required

Returns:

Name Type Description
str str | None

Returns a string of the validated IP address or the host.

Source code in eml_parser/parser.py
def get_valid_domain_or_ip(self, data: str) -> str | None:
    """Function to determine if an IP address, Email address, or Domain is valid.

    Args:
        data (str): Text input which should be validated.

    Returns:
        str: Returns a string of the validated IP address or the host.
    """
    host = data.rpartition('@')[-1].strip(' \r\n\t[]')
    try:
        # Zone index support was added to ipaddress in Python 3.9
        addr, _, _ = host.partition('%')
        valid_ip = ipaddress.ip_address(addr)
        if self.ip_force_routable:
            # Not a precise filter for IPv4/IPv6 addresses. Can be enhanced with pconf whiteip ranges
            if valid_ip.is_global and not valid_ip.is_reserved:
                return str(valid_ip)
        else:
            return str(valid_ip)
    except ValueError:
        # _psl uses self.domain_force_tld
        valid_domain = self._psl.publicsuffix(host)
        if valid_domain:
            return host

    return None

headeremail2list(header)

Parses a given header field with e-mail addresses to a list of e-mail addresses.

Parameters:

Name Type Description Default
header str

The header field to decode.

required

Returns:

Name Type Description
list list[str]

Returns a list of strings which represent e-mail addresses.

Source code in eml_parser/parser.py
def headeremail2list(self, header: str) -> list[str]:
    """Parses a given header field with e-mail addresses to a list of e-mail addresses.

    Args:
        header (str): The header field to decode.

    Returns:
        list: Returns a list of strings which represent e-mail addresses.
    """
    if self.msg is None:
        raise ValueError('msg is not set.')

    try:
        field = email.utils.getaddresses(self.msg.get_all(header, []))
    except (IndexError, AttributeError):
        field = email.utils.getaddresses(eml_parser.decode.workaround_bug_27257(self.msg, header))
    except ValueError:
        _field = eml_parser.decode.workaround_field_value_parsing_errors(self.msg, header)
        field = []

        for v in _field:
            v = eml_parser.decode.rfc2047_decode(v).replace('\n', '').replace('\r', '')

            parsing_result: dict[str, typing.Any] = {}
            parser_cls = typing.cast('email.headerregistry.AddressHeader', email.headerregistry.HeaderRegistry()[header])
            parser_cls.parse(v, parsing_result)
            for _group in parsing_result['groups']:
                for _address in _group.addresses:
                    field.append((_address.display_name, _address.addr_spec))

    return_field = []

    for m in field:
        if not m[1] == '':
            if self.email_force_tld:
                if eml_parser.regexes.email_force_tld_regex.match(m[1]):
                    return_field.append(m[1].lower())
            else:
                return_field.append(m[1].lower())

    return return_field

parse_email()

Parse an e-mail and return a dictionary containing the various parts of the e-mail broken down into key-value pairs.

Returns:

Name Type Description
dict dict

A dictionary with the content of the EML parsed and broken down into key-value pairs.

Source code in eml_parser/parser.py
def parse_email(self) -> dict:
    """Parse an e-mail and return a dictionary containing the various parts of the e-mail broken down into key-value pairs.

    Returns:
      dict: A dictionary with the content of the EML parsed and broken down into
            key-value pairs.
    """
    header: dict[str, typing.Any] = {}
    report_struc: dict[str, typing.Any] = {}  # Final structure
    headers_struc: dict[str, typing.Any] = {}  # header_structure
    bodys_struc: dict[str, typing.Any] = {}  # body structure

    if self.msg is None:
        raise ValueError('msg is not set.')

    # parse and decode subject
    subject = self.msg.get('subject', '')
    headers_struc['subject'] = eml_parser.decode.decode_field(subject)

    # If parsing had problems, report it
    if self.msg.defects:
        headers_struc['defect'] = []
        for exception in self.msg.defects:
            headers_struc['defect'].append(str(exception))

    # parse and decode "from"
    # @TODO verify if this hack is necessary for other e-mail fields as well
    try:
        msg_header_field = str(self.msg.get('from', '')).lower()
    except (IndexError, AttributeError):
        # We have hit current open issue #27257
        # https://bugs.python.org/issue27257
        # The field will be set to emtpy as a workaround.
        #
        logger.exception('We hit bug 27257!')

        _from = eml_parser.decode.workaround_bug_27257(self.msg, 'from')
        del self.msg['from']

        if _from:
            self.msg.add_header('from', _from[0])
            __from = _from[0].lower()
        else:
            self.msg.add_header('from', '')
            __from = ''

        msg_header_field = __from
    except ValueError:
        _field_item = eml_parser.decode.workaround_field_value_parsing_errors(self.msg, 'from')
        msg_header_field = eml_parser.decode.rfc2047_decode(_field_item[0]).lower()

    if msg_header_field != '':
        from_ = email.utils.parseaddr(msg_header_field)

        if (from_ and from_ == ('', '')) or not isinstance(from_, collections.abc.Sequence):
            m = eml_parser.regexes.email_regex.search(msg_header_field)
            if m:
                headers_struc['from'] = m.group(1)
            else:
                logger.warning('FROM header parsing failed.')
                headers_struc['from'] = msg_header_field

        else:
            headers_struc['from'] = from_[1]

    # parse and decode "to"
    headers_struc['to'] = self.headeremail2list('to')
    # parse and decode "cc"
    headers_struc['cc'] = self.headeremail2list('cc')
    if not headers_struc['cc']:
        headers_struc.pop('cc')

    # parse and decode delivered-to
    headers_struc['delivered_to'] = self.headeremail2list('delivered-to')
    if not headers_struc['delivered_to']:
        headers_struc.pop('delivered_to')

    # parse and decode Date
    # If date field is present
    if 'date' in self.msg and self.msg.get('date') is not None:
        headers_struc['date'] = datetime.datetime.fromisoformat(typing.cast('str', self.msg.get('date')))
    else:
        # If date field is absent...
        headers_struc['date'] = datetime.datetime.fromisoformat(eml_parser.decode.default_date)

    # mail receiver path / parse any domain, e-mail
    # @TODO parse case where domain is specified but in parentheses only an IP
    headers_struc['received'] = []
    headers_struc['received_email'] = []
    headers_struc['received_domain'] = []
    headers_struc['received_ip'] = []
    try:
        found_smtpin: collections.Counter = collections.Counter()  # Array for storing potential duplicate "HOP"

        for received_line in self.msg.get_all('received', []):
            line = str(received_line).lower()

            received_line_flat = re.sub(r'(\r|\n|\s|\t)+', ' ', line, flags=re.UNICODE)

            # Parse and split routing headers.
            # Return dict of list
            #   date string
            #   from list
            #   for list
            #   by list
            #   with string
            #   warning list
            parsed_routing = eml_parser.routing.parserouting(received_line_flat)

            # If required collect the IP of the gateway that have injected the mail.
            # Iterate all parsed item and find IP
            # It is parsed from the MOST recent to the OLDEST (from IN > Out)
            # We match external IP from the most "OUT" Found.
            # Warning .. It may be spoofed !!
            # It add a warning if multiple identical items are found.

            if self.pconf.get('byhostentry'):
                for by_item in parsed_routing.get('by', []):
                    for byhostentry_ in self.pconf['byhostentry']:
                        byhostentry = byhostentry_.lower()

                        if byhostentry in by_item:
                            # Save the last Found.. ( most external )
                            headers_struc['received_src'] = parsed_routing.get('from')

                            # Increment watched by detection counter, and warn if needed
                            found_smtpin[byhostentry] += 1
                            if found_smtpin[byhostentry] > 1:  # Twice found the header...
                                if parsed_routing.get('warning'):
                                    parsed_routing['warning'].append(['Duplicate SMTP by entrypoint'])
                                else:
                                    parsed_routing['warning'] = ['Duplicate SMTP by entrypoint']

            headers_struc['received'].append(parsed_routing)

            # Parse IPs in "received headers"
            ips_in_received_line = eml_parser.regexes.ipv6_regex.findall(received_line_flat) + eml_parser.regexes.ipv4_regex.findall(received_line_flat)
            for ip in ips_in_received_line:
                if ip in self.pconf['whiteip']:
                    continue
                valid_ip = self.get_valid_domain_or_ip(ip)
                if valid_ip:
                    headers_struc['received_ip'].append(valid_ip)
                else:
                    logger.debug('Invalid IP in received line - "%s"', ip)

            # search for domain
            for m in eml_parser.regexes.recv_dom_regex.findall(received_line_flat):
                try:
                    _ = ipaddress.ip_address(m)  # type of findall is list[str], so this is correct
                except ValueError:
                    # we find IPs using the previous IP crawler, hence we ignore them
                    # here.
                    # iff the regex fails, we add the entry
                    headers_struc['received_domain'].append(m)

            # search for e-mail addresses
            for mail_candidate in eml_parser.regexes.email_regex.findall(received_line_flat):
                if self.email_force_tld:
                    mail_candidate = self.get_valid_domain_or_ip(mail_candidate)
                if mail_candidate is not None and mail_candidate not in parsed_routing.get('for', []):
                    headers_struc['received_email'] += [mail_candidate]

    except TypeError:  # Ready to parse email without received headers.
        logger.exception('Exception occurred while parsing received lines.')

    # Concatenate for emails into one array | uniq
    # for rapid "find"
    headers_struc['received_foremail'] = []
    if 'received' in headers_struc:
        for _parsed_routing in headers_struc['received']:
            for itemfor in _parsed_routing.get('for', []):
                if itemfor not in self.pconf['whitefor']:
                    headers_struc['received_foremail'].append(itemfor)

    # Uniq data found
    headers_struc['received_email'] = list(set(headers_struc['received_email']))
    headers_struc['received_domain'] = list(set(headers_struc['received_domain']))
    headers_struc['received_ip'] = list(set(headers_struc['received_ip']))

    # Clean up if empty
    if not headers_struc['received_email']:
        del headers_struc['received_email']

    if 'received_foremail' in headers_struc:
        if not headers_struc['received_foremail']:
            del headers_struc['received_foremail']
        else:
            headers_struc['received_foremail'] = list(set(headers_struc['received_foremail']))

    if not headers_struc['received_domain']:
        del headers_struc['received_domain']

    if not headers_struc['received_ip']:
        del headers_struc['received_ip']
    ####################

    # Parse text body
    raw_body = self.get_raw_body_text(self.msg)

    if self.include_raw_body:
        bodys_struc['raw_body'] = raw_body

    bodys = {}

    # Is it a multipart email ?
    if len(raw_body) == 1:
        multipart = False
    else:
        multipart = True

    for body_tup in raw_body:
        bodie: dict[str, typing.Any] = {}
        _, body, body_multhead, boundary = body_tup
        # Parse any URLs and mail found in the body
        list_observed_urls: list[str] = []
        list_observed_urls_noscheme: list[str] = []
        list_observed_email: typing.Counter[str] = Counter()
        list_observed_dom: typing.Counter[str] = Counter()
        list_observed_ip: typing.Counter[str] = Counter()

        # If we start directly a findall on 500K+ body we got time and memory issues...
        # if more than 4K.. lets cheat, we will cut around the thing we search "://, @, ."
        # in order to reduce regex complexity.
        for body_slice in self.string_sliding_window_loop(body):
            for url_match in self.get_uri_ondata(body_slice):
                if ':/' in url_match[:10]:
                    list_observed_urls.append(url_match)
                else:
                    list_observed_urls_noscheme.append(url_match)

            for match in eml_parser.regexes.email_regex.findall(body_slice):
                valid_email = self.get_valid_domain_or_ip(match.lower())
                if valid_email:
                    list_observed_email[match.lower()] = 1

            for match in eml_parser.regexes.dom_regex.findall(body_slice):
                valid_domain = self.get_valid_domain_or_ip(match.lower())
                if valid_domain:
                    list_observed_dom[match.lower()] = 1

            for ip_regex in (eml_parser.regexes.ipv4_regex, eml_parser.regexes.ipv6_regex):
                for match in ip_regex.findall(body_slice):
                    valid_ip = self.get_valid_domain_or_ip(match.lower())
                    if valid_ip in self.pconf['whiteip']:
                        continue
                    if valid_ip:
                        list_observed_ip[valid_ip] = 1

        # Report uri,email and observed domain or hash if no raw body
        if self.include_raw_body:
            if list_observed_urls:
                bodie['uri'] = list(set(list_observed_urls))

            if list_observed_urls_noscheme:
                bodie['uri_noscheme'] = list(set(list_observed_urls_noscheme))

            if list_observed_email:
                bodie['email'] = list(list_observed_email)

            if list_observed_dom:
                bodie['domain'] = list(list_observed_dom)

            if list_observed_ip:
                bodie['ip'] = list(list_observed_ip)

        else:
            if list_observed_urls:
                bodie['uri_hash'] = []
                for element in list_observed_urls:
                    bodie['uri_hash'].append(self.get_hash(element.lower(), 'sha256'))
            if list_observed_email:
                bodie['email_hash'] = []
                for element in list_observed_email:
                    # Email already lowered
                    bodie['email_hash'].append(self.get_hash(element, 'sha256'))
            if list_observed_dom:
                bodie['domain_hash'] = []
                # for uri in list(set(list_observed_dom)):
                for element in list_observed_dom:
                    bodie['domain_hash'].append(self.get_hash(element, 'sha256'))
            if list_observed_ip:
                bodie['ip_hash'] = []
                for element in list_observed_ip:
                    # IP (v6) already lowered
                    bodie['ip_hash'].append(self.get_hash(element, 'sha256'))

        # For mail without multipart we will only get the "content....something" headers
        # all other headers are in "header"
        # but we need to convert header tuples in dict..
        # "a","toto"           a: [toto,titi]
        # "a","titi"   --->    c: [truc]
        # "c","truc"
        ch: dict[str, list] = {}
        for k, v in body_multhead:
            # make sure we are working with strings only
            v = str(v)

            # We are using replace . to : for avoiding issue in mongo
            k = k.lower().replace('.', ':')  # Lot of lowers, pre-compute :) .

            if multipart:
                if k in ch:
                    ch[k].append(v)
                else:
                    ch[k] = [v]
            elif k.startswith('content'):  # otherwise, we got all header headers
                # if not multipart, store only content-xx related header with part
                if k in ch:
                    ch[k].append(v)
                else:
                    ch[k] = [v]

        bodie['content_header'] = ch  # Store content headers dict

        if self.include_raw_body:
            bodie['content'] = body

        # Sometimes bad people play with multiple header instances.
        # We "display" the "LAST" one .. as does thunderbird
        val = ch.get('content-type')
        if val:
            header_val = val[-1]
            bodie['content_type'] = header_val.split(';', 1)[0].strip()

        # Hash the body
        bodie['hash'] = hashlib.sha256(body.encode('utf-8')).hexdigest()

        if boundary is not None:
            # only include boundary key if there is a value set
            bodie['boundary'] = boundary

        uid = str(uuid.uuid1())
        bodys[uid] = bodie

    bodys_struc = bodys

    # Get all other bulk raw headers
    # "a","toto"           a: [toto,titi]
    # "a","titi"   --->    c: [truc]
    # "c","truc"
    #
    for k in set(self.msg.keys()):
        k = k.lower()  # Lot of lower, pre-compute...
        decoded_values = []

        try:
            for value in self.msg.get_all(k, []):
                if value:
                    decoded_values.append(value)
        except (IndexError, AttributeError, TypeError):
            # We have hit a field value parsing error.
            # Try to work around this by using a relaxed policy, if possible.
            # Parsing might not give meaningful results in this case!
            logger.error('ERROR: Field value parsing error, trying to work around this!')
            decoded_values = eml_parser.decode.workaround_field_value_parsing_errors(self.msg, k)
        except ValueError:
            # extract values using a relaxed policy
            for _field in eml_parser.decode.workaround_field_value_parsing_errors(self.msg, k):
                # check if this is a RFC2047 encoded field
                if eml_parser.regexes.email_regex_rfc2047.search(_field):
                    decoded_values.append(eml_parser.decode.rfc2047_decode(_field))
                else:
                    logger.error('ERROR: Field value parsing error, trying to work around this! - %s', _field)

        if decoded_values:
            if k in header:
                header[k] += decoded_values
            else:
                header[k] = decoded_values

    headers_struc['header'] = header

    # parse attachments
    if self.parse_attachments:
        try:
            report_struc['attachment'] = self.traverse_multipart(self.msg, 0)
        except (binascii.Error, AssertionError):
            # we hit this exception if the payload contains invalid data
            logger.exception('Exception occurred while parsing attachment data. Collected data will not be complete!')
            report_struc['attachment'] = None

        # Dirty hack... transform hash into list.. need to be done in the function.
        # Mandatory to search efficiently in mongodb
        # See Bug 11 of eml_parser
        if not report_struc['attachment']:
            del report_struc['attachment']
        else:
            newattach = []
            for attachment in report_struc['attachment']:
                newattach.append(report_struc['attachment'][attachment])
            report_struc['attachment'] = newattach

    newbody = []
    for _, body in bodys_struc.items():
        newbody.append(body)
    report_struc['body'] = newbody
    # End of dirty hack

    # Get all other bulk headers
    report_struc['header'] = headers_struc

    return report_struc

prepare_multipart_part_attachment(msg, counter=0)

Extract meta-information from a multipart-part.

Parameters:

Name Type Description Default
msg Message

An e-mail message object.

required
counter int

A counter which is used for generating attachments file-names in case there are none found in the header. Default = 0.

0

Returns:

Name Type Description
dict dict[str, Any]

Returns a dict with original multi-part headers as well as generated hash check-sums, date size, file extension, real mime-type.

Source code in eml_parser/parser.py
def prepare_multipart_part_attachment(self, msg: email.message.Message, counter: int = 0) -> dict[str, typing.Any]:
    """Extract meta-information from a multipart-part.

    Args:
        msg (email.message.Message): An e-mail message object.
        counter (int, optional): A counter which is used for generating attachments
            file-names in case there are none found in the header. Default = 0.

    Returns:
        dict: Returns a dict with original multi-part headers as well as generated hash check-sums,
            date size, file extension, real mime-type.
    """
    attachment: dict[str, typing.Any] = {}

    # In case we hit bug 27257, try to downgrade the used policy
    try:
        lower_keys = [k.lower() for k in msg.keys()]
    except AttributeError:
        former_policy: email.policy.Policy = msg.policy
        msg.policy = email.policy.compat32
        lower_keys = [k.lower() for k in msg.keys()]
        msg.policy = former_policy

    if ('content-disposition' in lower_keys and msg.get_content_disposition() != 'inline') or msg.get_content_maintype() != 'text':
        # if it's an attachment-type, pull out the filename
        # and calculate the size in bytes
        if msg.get_content_type() == 'message/rfc822':
            payload = msg.get_payload()
            if len(payload) > 1:
                logger.warning('More than one payload for "message/rfc822" part detected. This is not supported, please report!')

            try:
                custom_policy: email.policy.Policy = email.policy.default.clone(max_line_length=0)
                data = typing.cast('list[email.message.EmailMessage]', payload)[0].as_bytes(policy=custom_policy)
            except UnicodeEncodeError:
                custom_policy = email.policy.compat32.clone(max_line_length=0)
                data = typing.cast('list[email.message.EmailMessage]', payload)[0].as_bytes(policy=custom_policy)

            file_size = len(data)
        else:
            data = typing.cast('bytes', msg.get_payload(decode=True))
            file_size = len(data)

        filename = msg.get_filename('')
        if filename == '':
            filename = f'part-{counter:03d}'
        else:
            filename = eml_parser.decode.decode_field(filename)

        file_id = str(uuid.uuid1())
        attachment[file_id] = {}
        attachment[file_id]['filename'] = filename
        attachment[file_id]['size'] = file_size

        # in case there is no extension pathlib.Path(filename).suffix returns an empty string
        extension = pathlib.Path(filename).suffix
        if extension:
            # strip leading dot and lower-case
            attachment[file_id]['extension'] = extension[1:].lower()

        attachment[file_id]['hash'] = self.get_file_hash(data)

        mime_type, mime_type_short = self.get_mime_type(data)

        if not (mime_type is None or mime_type_short is None):
            attachment[file_id]['mime_type'] = mime_type
            # attachments[file_id]['mime_type_short'] = attachments[file_id]['mime_type'].split(",")[0]
            attachment[file_id]['mime_type_short'] = mime_type_short
        elif magic is not None:
            logger.warning('Error determining attachment mime-type - "%s"', str(file_id))

        if self.include_attachment_data:
            attachment[file_id]['raw'] = base64.b64encode(data)

        ch: dict[str, list[str]] = {}
        for k, v in msg.items():
            k = k.lower()
            v = str(v)

            if k in ch:
                ch[k].append(v)
            else:
                ch[k] = [v]

        attachment[file_id]['content_header'] = ch

        counter += 1

    return attachment

string_sliding_window_loop(body, slice_step=500, max_distance=100) staticmethod

Yield a more or less constant slice of a large string.

If we directly do a regex findall on 500K+ body we get time and memory issues. If more than the configured slice step, lets cheat, we will cut around the thing we search "://, @, ." in order to reduce regex complexity.

In case we find a :// at the first 8 characters of a sliced body window, we rewind the window by 16 characters. If we find the same string at the end of a sliced body window we try to look for invalid URL characters up to max_distance length, until which we give up and return the sliced body part. This is done in order to return a maximum possible correct URLs.

The choice for 8 character is because https:// is 8 characters, which is the maximum size we accept for schemes.

Parameters:

Name Type Description Default
body str

Body to slice into smaller pieces.

required
slice_step int

Slice this number or characters.

500
max_distance int

In case we find a :// in a string window towards the end, we try our best to enlarge the window as to not cut off URLs. This variable sets the maximum permitted additional window size to consider.

100

Returns:

Type Description
Iterator[str]

typing.Iterator[str]: Sliced body string.

Source code in eml_parser/parser.py
@staticmethod
def string_sliding_window_loop(body: str, slice_step: int = 500, max_distance: int = 100) -> typing.Iterator[str]:
    """Yield a more or less constant slice of a large string.

    If we directly do a *regex* findall on 500K+ body we get time and memory issues.
    If more than the configured slice step, lets cheat, we will cut around the thing we search "://, @, ."
    in order to reduce regex complexity.

    In case we find a *://* at the first 8 characters of a sliced body window, we rewind the window by 16 characters.
    If we find the same string at the end of a sliced body window we try to look for invalid URL characters up to *max_distance*
    length, until which we give up and return the sliced body part. This is done in order to return a maximum possible
    correct URLs.

    The choice for 8 character is because *https://* is 8 characters, which is the maximum size we accept for schemes.

    Args:
        body: Body to slice into smaller pieces.
        slice_step: Slice this number or characters.
        max_distance: In case we find a *://* in a string window towards the end, we try our best to enlarge the window
                        as to not cut off URLs. This variable sets the maximum permitted additional window size to consider.

    Returns:
        typing.Iterator[str]: Sliced body string.
    """
    body_length = len(body)

    if body_length <= slice_step:
        yield body

    else:
        ptr_start = 0

        for ptr_end in range(slice_step, body_length + slice_step, slice_step):
            if ' ' in body[ptr_end - 1 : ptr_end]:
                while not (eml_parser.regexes.window_slice_regex.match(body[ptr_end - 1 : ptr_end]) or ptr_end > body_length):
                    if ptr_end > body_length:
                        ptr_end = body_length
                        break

                    ptr_end += 1

            # Found a :// near the start of the slice, rewind
            if ptr_start > 16 and '://' in body[ptr_start - 8 : ptr_start + 8]:
                ptr_start -= 16

            # Found a :// near the end of the slice, rewind from that location
            if ptr_end < body_length and '://' in body[ptr_end - 8 : ptr_end + 8]:
                pos = body.rfind('://', ptr_end - 8, ptr_end + 8)
                ptr_end = pos - 8

            # Found a :// within the slice; try to expand the slice until we find an invalid
            # URL character in order to avoid cutting off URLs
            if '://' in body[ptr_start:ptr_end] and not body[ptr_end - 1 : ptr_end] == ' ':
                distance = 1

                while body[ptr_end - 1 : ptr_end] not in (' ', '>') and distance < max_distance and ptr_end <= body_length:
                    distance += 1
                    ptr_end += 1

            yield body[ptr_start:ptr_end]

            ptr_start = ptr_end

traverse_multipart(msg, counter=0, depth=0)

Recursively traverses all e-mail message multi-part elements and returns in a parsed form as a dict.

Parameters:

Name Type Description Default
msg Message

An e-mail message object.

required
counter int

A counter which is used for generating attachments file-names in case there are none found in the header. Default = 0.

0
depth int

Parameter used to track the current recursion level.

0

Returns:

Name Type Description
dict dict[str, Any]

Returns a dict with all original multi-part headers as well as generated hash check-sums, date size, file extension, real mime-type.

Raises:

Type Description
RecursionError

If recursion limit exceeded.

Source code in eml_parser/parser.py
def traverse_multipart(self, msg: email.message.Message, counter: int = 0, depth: int = 0) -> dict[str, typing.Any]:
    """Recursively traverses all e-mail message multi-part elements and returns in a parsed form as a dict.

    Args:
        msg (email.message.Message): An e-mail message object.
        counter (int, optional): A counter which is used for generating attachments
            file-names in case there are none found in the header. Default = 0.
        depth: Parameter used to track the current recursion level.

    Returns:
        dict: Returns a dict with all original multi-part headers as well as generated hash check-sums,
            date size, file extension, real mime-type.

    Raises:
        RecursionError: If recursion limit exceeded.
    """
    attachments = {}

    if depth > EmlParser.MULTIPART_RECURSION_LIMIT:
        logger.warning('multi-part nesting limit (%d) exceeded, aborting', EmlParser.MULTIPART_RECURSION_LIMIT)
        raise RecursionError('Recursion limit exceeded')

    if msg.is_multipart():
        if 'content-type' in msg:
            if msg.get_content_type() == 'message/rfc822':
                # This is an e-mail message attachment, add it to the attachment list apart from parsing it
                attachments.update(self.prepare_multipart_part_attachment(msg, counter))

        for part in msg.get_payload():
            attachments.update(self.traverse_multipart(typing.cast('email.message.EmailMessage', part), counter=counter, depth=depth + 1))
    else:
        return self.prepare_multipart_part_attachment(msg, counter)

    return attachments