Python email.message 模块,replace() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用email.message.replace()

项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        visible_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
            visible_headers.write(line.replace(os.linesep, '\n'))
        body = self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
        msg = BabylMessage(original_headers.getvalue() + body)
        msg.set_visible(visible_headers.getvalue())
        if key in self._labels:
            msg.set_labels(self._labels[key])
        return msg
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        visible_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
            visible_headers.write(line.replace(os.linesep, '\n'))
        body = self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
        msg = BabylMessage(original_headers.getvalue() + body)
        msg.set_visible(visible_headers.getvalue())
        if key in self._labels:
            msg.set_labels(self._labels[key])
        return msg
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def get_bytes(self, key):
        """Return a string representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip b'1,' line specifying labels.
        original_headers = io.BytesIO()
        while True:
            line = self._file.readline()
            if line == b'*** EOOH ***' + linesep or not line:
                break
            original_headers.write(line.replace(linesep, b'\n'))
        while True:
            line = self._file.readline()
            if line == linesep or not line:
                break
        headers = original_headers.getvalue()
        n = stop - self._file.tell()
        assert n >= 0
        data = self._file.read(n)
        data = data.replace(linesep, b'\n')
        return headers + data
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        visible_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
            visible_headers.write(line.replace(os.linesep, '\n'))
        body = self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
        msg = BabylMessage(original_headers.getvalue() + body)
        msg.set_visible(visible_headers.getvalue())
        if key in self._labels:
            msg.set_labels(self._labels[key])
        return msg
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        visible_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
            visible_headers.write(line.replace(os.linesep, '\n'))
        body = self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
        msg = BabylMessage(original_headers.getvalue() + body)
        msg.set_visible(visible_headers.getvalue())
        if key in self._labels:
            msg.set_labels(self._labels[key])
        return msg
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        visible_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
            visible_headers.write(line.replace(os.linesep, '\n'))
        body = self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
        msg = BabylMessage(original_headers.getvalue() + body)
        msg.set_visible(visible_headers.getvalue())
        if key in self._labels:
            msg.set_labels(self._labels[key])
        return msg
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        visible_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
            visible_headers.write(line.replace(os.linesep, '\n'))
        body = self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
        msg = BabylMessage(original_headers.getvalue() + body)
        msg.set_visible(visible_headers.getvalue())
        if key in self._labels:
            msg.set_labels(self._labels[key])
        return msg
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _create_tmp(self):
        """Create a file in the tmp subdirectory and open and return it."""
        now = time.time()
        hostname = socket.gethostname()
        if '/' in hostname:
            hostname = hostname.replace('/', r'\057')
        if ':' in hostname:
            hostname = hostname.replace(':', r'\072')
        uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
                                    Maildir._count, hostname)
        path = os.path.join(self._path, 'tmp', uniq)
        try:
            os.stat(path)
        except FileNotFoundError:
            Maildir._count += 1
            try:
                return _create_carefully(path)
            except FileExistsError:
                pass

        # Fall through to here if stat succeeded or open raised EEXIST.
        raise ExternalClashError('Name clash prevented file creation: %s' %
                                 path)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def get_bytes(self, key):
        """Return a string representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip b'1,' line specifying labels.
        original_headers = io.BytesIO()
        while True:
            line = self._file.readline()
            if line == b'*** EOOH ***' + linesep or not line:
                break
            original_headers.write(line.replace(linesep, b'\n'))
        while True:
            line = self._file.readline()
            if line == linesep or not line:
                break
        headers = original_headers.getvalue()
        n = stop - self._file.tell()
        assert n >= 0
        data = self._file.read(n)
        data = data.replace(linesep, b'\n')
        return headers + data
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        visible_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
            visible_headers.write(line.replace(os.linesep, '\n'))
        body = self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
        msg = BabylMessage(original_headers.getvalue() + body)
        msg.set_visible(visible_headers.getvalue())
        if key in self._labels:
            msg.set_labels(self._labels[key])
        return msg
项目:empyrion-python-api    作者:huhlig    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        visible_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
            visible_headers.write(line.replace(os.linesep, '\n'))
        body = self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
        msg = BabylMessage(original_headers.getvalue() + body)
        msg.set_visible(visible_headers.getvalue())
        if key in self._labels:
            msg.set_labels(self._labels[key])
        return msg
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _create_tmp(self):
        """Create a file in the tmp subdirectory and open and return it."""
        now = time.time()
        hostname = socket.gethostname()
        if '/' in hostname:
            hostname = hostname.replace('/', r'\057')
        if ':' in hostname:
            hostname = hostname.replace(':', r'\072')
        uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
                                    Maildir._count, hostname)
        path = os.path.join(self._path, 'tmp', uniq)
        try:
            os.stat(path)
        except FileNotFoundError:
            Maildir._count += 1
            try:
                return _create_carefully(path)
            except FileExistsError:
                pass

        # Fall through to here if stat succeeded or open raised EEXIST.
        raise ExternalClashError('Name clash prevented file creation: %s' %
                                 path)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def get_bytes(self, key):
        """Return a string representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip b'1,' line specifying labels.
        original_headers = io.BytesIO()
        while True:
            line = self._file.readline()
            if line == b'*** EOOH ***' + linesep or not line:
                break
            original_headers.write(line.replace(linesep, b'\n'))
        while True:
            line = self._file.readline()
            if line == linesep or not line:
                break
        headers = original_headers.getvalue()
        n = stop - self._file.tell()
        assert n >= 0
        data = self._file.read(n)
        data = data.replace(linesep, b'\n')
        return headers + data
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _dump_message(self, message, target, mangle_from_=False):
        # Most files are opened in binary mode to allow predictable seeking.
        # To get native line endings on disk, the user-friendly \n line endings
        # used in strings and by email.Message are translated here.
        """Dump message contents to target file."""
        if isinstance(message, email.message.Message):
            buffer = StringIO.StringIO()
            gen = email.generator.Generator(buffer, mangle_from_, 0)
            gen.flatten(message)
            buffer.seek(0)
            target.write(buffer.read().replace('\n', os.linesep))
        elif isinstance(message, str):
            if mangle_from_:
                message = message.replace('\nFrom ', '\n>From ')
            message = message.replace('\n', os.linesep)
            target.write(message)
        elif hasattr(message, 'read'):
            while True:
                line = message.readline()
                if line == '':
                    break
                if mangle_from_ and line.startswith('From '):
                    line = '>From ' + line[5:]
                line = line.replace('\n', os.linesep)
                target.write(line)
        else:
            raise TypeError('Invalid message type: %s' % type(message))
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _create_tmp(self):
        """Create a file in the tmp subdirectory and open and return it."""
        now = time.time()
        hostname = socket.gethostname()
        if '/' in hostname:
            hostname = hostname.replace('/', r'\057')
        if ':' in hostname:
            hostname = hostname.replace(':', r'\072')
        uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
                                    Maildir._count, hostname)
        path = os.path.join(self._path, 'tmp', uniq)
        try:
            os.stat(path)
        except OSError, e:
            if e.errno == errno.ENOENT:
                Maildir._count += 1
                try:
                    return _create_carefully(path)
                except OSError, e:
                    if e.errno != errno.EEXIST:
                        raise
            else:
                raise

        # Fall through to here if stat succeeded or open raised EEXIST.
        raise ExternalClashError('Name clash prevented file creation: %s' %
                                 path)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        from_line = self._file.readline().replace(os.linesep, '')
        string = self._file.read(stop - self._file.tell())
        msg = self._message_factory(string.replace(os.linesep, '\n'))
        msg.set_from(from_line[5:])
        return msg
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def get_string(self, key, from_=False):
        """Return a string representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        if not from_:
            self._file.readline()
        string = self._file.read(stop - self._file.tell())
        return string.replace(os.linesep, '\n')
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def get_file(self, key):
        """Return a file-like representation or raise a KeyError."""
        return StringIO.StringIO(self.get_string(key).replace('\n',
                                                              os.linesep))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _dump_message(self, message, target, mangle_from_=False):
        # Most files are opened in binary mode to allow predictable seeking.
        # To get native line endings on disk, the user-friendly \n line endings
        # used in strings and by email.Message are translated here.
        """Dump message contents to target file."""
        if isinstance(message, email.message.Message):
            buffer = StringIO.StringIO()
            gen = email.generator.Generator(buffer, mangle_from_, 0)
            gen.flatten(message)
            buffer.seek(0)
            data = buffer.read().replace('\n', os.linesep)
            target.write(data)
            if self._append_newline and not data.endswith(os.linesep):
                # Make sure the message ends with a newline
                target.write(os.linesep)
        elif isinstance(message, str):
            if mangle_from_:
                message = message.replace('\nFrom ', '\n>From ')
            message = message.replace('\n', os.linesep)
            target.write(message)
            if self._append_newline and not message.endswith(os.linesep):
                # Make sure the message ends with a newline
                target.write(os.linesep)
        elif hasattr(message, 'read'):
            lastline = None
            while True:
                line = message.readline()
                if line == '':
                    break
                if mangle_from_ and line.startswith('From '):
                    line = '>From ' + line[5:]
                line = line.replace('\n', os.linesep)
                target.write(line)
                lastline = line
            if self._append_newline and lastline and not lastline.endswith(os.linesep):
                # Make sure the message ends with a newline
                target.write(os.linesep)
        else:
            raise TypeError('Invalid message type: %s' % type(message))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _create_tmp(self):
        """Create a file in the tmp subdirectory and open and return it."""
        now = time.time()
        hostname = socket.gethostname()
        if '/' in hostname:
            hostname = hostname.replace('/', r'\057')
        if ':' in hostname:
            hostname = hostname.replace(':', r'\072')
        uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
                                    Maildir._count, hostname)
        path = os.path.join(self._path, 'tmp', uniq)
        try:
            os.stat(path)
        except OSError, e:
            if e.errno == errno.ENOENT:
                Maildir._count += 1
                try:
                    return _create_carefully(path)
                except OSError, e:
                    if e.errno != errno.EEXIST:
                        raise
            else:
                raise

        # Fall through to here if stat succeeded or open raised EEXIST.
        raise ExternalClashError('Name clash prevented file creation: %s' %
                                 path)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        from_line = self._file.readline().replace(os.linesep, '')
        string = self._file.read(stop - self._file.tell())
        msg = self._message_factory(string.replace(os.linesep, '\n'))
        msg.set_from(from_line[5:])
        return msg
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def get_string(self, key, from_=False):
        """Return a string representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        if not from_:
            self._file.readline()
        string = self._file.read(stop - self._file.tell())
        return string.replace(os.linesep, '\n')
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def get_file(self, key):
        """Return a file-like representation or raise a KeyError."""
        return StringIO.StringIO(self.get_string(key).replace('\n',
                                                              os.linesep))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def get_bytes(self, key):
        """Return a bytes representation or raise a KeyError."""
        f = open(os.path.join(self._path, self._lookup(key)), 'rb')
        try:
            return f.read().replace(linesep, b'\n')
        finally:
            f.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _create_tmp(self):
        """Create a file in the tmp subdirectory and open and return it."""
        now = time.time()
        hostname = socket.gethostname()
        if '/' in hostname:
            hostname = hostname.replace('/', r'\057')
        if ':' in hostname:
            hostname = hostname.replace(':', r'\072')
        uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
                                    Maildir._count, hostname)
        path = os.path.join(self._path, 'tmp', uniq)
        try:
            os.stat(path)
        except OSError as e:
            if e.errno == errno.ENOENT:
                Maildir._count += 1
                try:
                    return _create_carefully(path)
                except OSError as e:
                    if e.errno != errno.EEXIST:
                        raise
            else:
                raise

        # Fall through to here if stat succeeded or open raised EEXIST.
        raise ExternalClashError('Name clash prevented file creation: %s' %
                                 path)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        from_line = self._file.readline().replace(linesep, b'')
        string = self._file.read(stop - self._file.tell())
        msg = self._message_factory(string.replace(linesep, b'\n'))
        msg.set_from(from_line[5:].decode('ascii'))
        return msg
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def get_bytes(self, key, from_=False):
        """Return a string representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        if not from_:
            self._file.readline()
        string = self._file.read(stop - self._file.tell())
        return string.replace(linesep, b'\n')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip b'1,' line specifying labels.
        original_headers = io.BytesIO()
        while True:
            line = self._file.readline()
            if line == b'*** EOOH ***' + linesep or not line:
                break
            original_headers.write(line.replace(linesep, b'\n'))
        visible_headers = io.BytesIO()
        while True:
            line = self._file.readline()
            if line == linesep or not line:
                break
            visible_headers.write(line.replace(linesep, b'\n'))
        # Read up to the stop, or to the end
        n = stop - self._file.tell()
        assert n >= 0
        body = self._file.read(n)
        body = body.replace(linesep, b'\n')
        msg = BabylMessage(original_headers.getvalue() + body)
        msg.set_visible(visible_headers.getvalue())
        if key in self._labels:
            msg.set_labels(self._labels[key])
        return msg
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def get_file(self, key):
        """Return a file-like representation or raise a KeyError."""
        return io.BytesIO(self.get_bytes(key).replace(b'\n', linesep))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _dump_message(self, message, target, mangle_from_=False):
        # Most files are opened in binary mode to allow predictable seeking.
        # To get native line endings on disk, the user-friendly \n line endings
        # used in strings and by email.Message are translated here.
        """Dump message contents to target file."""
        if isinstance(message, email.message.Message):
            buffer = StringIO.StringIO()
            gen = email.generator.Generator(buffer, mangle_from_, 0)
            gen.flatten(message)
            buffer.seek(0)
            data = buffer.read().replace('\n', os.linesep)
            target.write(data)
            if self._append_newline and not data.endswith(os.linesep):
                # Make sure the message ends with a newline
                target.write(os.linesep)
        elif isinstance(message, str):
            if mangle_from_:
                message = message.replace('\nFrom ', '\n>From ')
            message = message.replace('\n', os.linesep)
            target.write(message)
            if self._append_newline and not message.endswith(os.linesep):
                # Make sure the message ends with a newline
                target.write(os.linesep)
        elif hasattr(message, 'read'):
            lastline = None
            while True:
                line = message.readline()
                if line == '':
                    break
                if mangle_from_ and line.startswith('From '):
                    line = '>From ' + line[5:]
                line = line.replace('\n', os.linesep)
                target.write(line)
                lastline = line
            if self._append_newline and lastline and not lastline.endswith(os.linesep):
                # Make sure the message ends with a newline
                target.write(os.linesep)
        else:
            raise TypeError('Invalid message type: %s' % type(message))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _create_tmp(self):
        """Create a file in the tmp subdirectory and open and return it."""
        now = time.time()
        hostname = socket.gethostname()
        if '/' in hostname:
            hostname = hostname.replace('/', r'\057')
        if ':' in hostname:
            hostname = hostname.replace(':', r'\072')
        uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
                                    Maildir._count, hostname)
        path = os.path.join(self._path, 'tmp', uniq)
        try:
            os.stat(path)
        except OSError, e:
            if e.errno == errno.ENOENT:
                Maildir._count += 1
                try:
                    return _create_carefully(path)
                except OSError, e:
                    if e.errno != errno.EEXIST:
                        raise
            else:
                raise

        # Fall through to here if stat succeeded or open raised EEXIST.
        raise ExternalClashError('Name clash prevented file creation: %s' %
                                 path)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        from_line = self._file.readline().replace(os.linesep, '')
        string = self._file.read(stop - self._file.tell())
        msg = self._message_factory(string.replace(os.linesep, '\n'))
        msg.set_from(from_line[5:])
        return msg
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def get_string(self, key, from_=False):
        """Return a string representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        if not from_:
            self._file.readline()
        string = self._file.read(stop - self._file.tell())
        return string.replace(os.linesep, '\n')
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def get_file(self, key):
        """Return a file-like representation or raise a KeyError."""
        return StringIO.StringIO(self.get_string(key).replace('\n',
                                                              os.linesep))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _dump_message(self, message, target, mangle_from_=False):
        # Most files are opened in binary mode to allow predictable seeking.
        # To get native line endings on disk, the user-friendly \n line endings
        # used in strings and by email.Message are translated here.
        """Dump message contents to target file."""
        if isinstance(message, email.message.Message):
            buffer = StringIO.StringIO()
            gen = email.generator.Generator(buffer, mangle_from_, 0)
            gen.flatten(message)
            buffer.seek(0)
            data = buffer.read().replace('\n', os.linesep)
            target.write(data)
            if self._append_newline and not data.endswith(os.linesep):
                # Make sure the message ends with a newline
                target.write(os.linesep)
        elif isinstance(message, str):
            if mangle_from_:
                message = message.replace('\nFrom ', '\n>From ')
            message = message.replace('\n', os.linesep)
            target.write(message)
            if self._append_newline and not message.endswith(os.linesep):
                # Make sure the message ends with a newline
                target.write(os.linesep)
        elif hasattr(message, 'read'):
            lastline = None
            while True:
                line = message.readline()
                if line == '':
                    break
                if mangle_from_ and line.startswith('From '):
                    line = '>From ' + line[5:]
                line = line.replace('\n', os.linesep)
                target.write(line)
                lastline = line
            if self._append_newline and lastline and not lastline.endswith(os.linesep):
                # Make sure the message ends with a newline
                target.write(os.linesep)
        else:
            raise TypeError('Invalid message type: %s' % type(message))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _create_tmp(self):
        """Create a file in the tmp subdirectory and open and return it."""
        now = time.time()
        hostname = socket.gethostname()
        if '/' in hostname:
            hostname = hostname.replace('/', r'\057')
        if ':' in hostname:
            hostname = hostname.replace(':', r'\072')
        uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
                                    Maildir._count, hostname)
        path = os.path.join(self._path, 'tmp', uniq)
        try:
            os.stat(path)
        except OSError, e:
            if e.errno == errno.ENOENT:
                Maildir._count += 1
                try:
                    return _create_carefully(path)
                except OSError, e:
                    if e.errno != errno.EEXIST:
                        raise
            else:
                raise

        # Fall through to here if stat succeeded or open raised EEXIST.
        raise ExternalClashError('Name clash prevented file creation: %s' %
                                 path)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        from_line = self._file.readline().replace(os.linesep, '')
        string = self._file.read(stop - self._file.tell())
        msg = self._message_factory(string.replace(os.linesep, '\n'))
        msg.set_from(from_line[5:])
        return msg
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def get_string(self, key, from_=False):
        """Return a string representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        if not from_:
            self._file.readline()
        string = self._file.read(stop - self._file.tell())
        return string.replace(os.linesep, '\n')
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def get_file(self, key):
        """Return a file-like representation or raise a KeyError."""
        return StringIO.StringIO(self.get_string(key).replace('\n',
                                                              os.linesep))
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _dump_message(self, message, target, mangle_from_=False):
        # Most files are opened in binary mode to allow predictable seeking.
        # To get native line endings on disk, the user-friendly \n line endings
        # used in strings and by email.Message are translated here.
        """Dump message contents to target file."""
        if isinstance(message, email.message.Message):
            buffer = StringIO.StringIO()
            gen = email.generator.Generator(buffer, mangle_from_, 0)
            gen.flatten(message)
            buffer.seek(0)
            data = buffer.read().replace('\n', os.linesep)
            target.write(data)
            if self._append_newline and not data.endswith(os.linesep):
                # Make sure the message ends with a newline
                target.write(os.linesep)
        elif isinstance(message, str):
            if mangle_from_:
                message = message.replace('\nFrom ', '\n>From ')
            message = message.replace('\n', os.linesep)
            target.write(message)
            if self._append_newline and not message.endswith(os.linesep):
                # Make sure the message ends with a newline
                target.write(os.linesep)
        elif hasattr(message, 'read'):
            lastline = None
            while True:
                line = message.readline()
                if line == '':
                    break
                if mangle_from_ and line.startswith('From '):
                    line = '>From ' + line[5:]
                line = line.replace('\n', os.linesep)
                target.write(line)
                lastline = line
            if self._append_newline and lastline and not lastline.endswith(os.linesep):
                # Make sure the message ends with a newline
                target.write(os.linesep)
        else:
            raise TypeError('Invalid message type: %s' % type(message))
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _create_tmp(self):
        """Create a file in the tmp subdirectory and open and return it."""
        now = time.time()
        hostname = socket.gethostname()
        if '/' in hostname:
            hostname = hostname.replace('/', r'\057')
        if ':' in hostname:
            hostname = hostname.replace(':', r'\072')
        uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
                                    Maildir._count, hostname)
        path = os.path.join(self._path, 'tmp', uniq)
        try:
            os.stat(path)
        except OSError, e:
            if e.errno == errno.ENOENT:
                Maildir._count += 1
                try:
                    return _create_carefully(path)
                except OSError, e:
                    if e.errno != errno.EEXIST:
                        raise
            else:
                raise

        # Fall through to here if stat succeeded or open raised EEXIST.
        raise ExternalClashError('Name clash prevented file creation: %s' %
                                 path)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        from_line = self._file.readline().replace(os.linesep, '')
        string = self._file.read(stop - self._file.tell())
        msg = self._message_factory(string.replace(os.linesep, '\n'))
        msg.set_from(from_line[5:])
        return msg
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def get_string(self, key, from_=False):
        """Return a string representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        if not from_:
            self._file.readline()
        string = self._file.read(stop - self._file.tell())
        return string.replace(os.linesep, '\n')
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def get_file(self, key):
        """Return a file-like representation or raise a KeyError."""
        return StringIO.StringIO(self.get_string(key).replace('\n',
                                                              os.linesep))
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _dump_message(self, message, target, mangle_from_=False):
        # Most files are opened in binary mode to allow predictable seeking.
        # To get native line endings on disk, the user-friendly \n line endings
        # used in strings and by email.Message are translated here.
        """Dump message contents to target file."""
        if isinstance(message, email.message.Message):
            buffer = StringIO.StringIO()
            gen = email.generator.Generator(buffer, mangle_from_, 0)
            gen.flatten(message)
            buffer.seek(0)
            data = buffer.read().replace('\n', os.linesep)
            target.write(data)
            if self._append_newline and not data.endswith(os.linesep):
                # Make sure the message ends with a newline
                target.write(os.linesep)
        elif isinstance(message, str):
            if mangle_from_:
                message = message.replace('\nFrom ', '\n>From ')
            message = message.replace('\n', os.linesep)
            target.write(message)
            if self._append_newline and not message.endswith(os.linesep):
                # Make sure the message ends with a newline
                target.write(os.linesep)
        elif hasattr(message, 'read'):
            lastline = None
            while True:
                line = message.readline()
                if line == '':
                    break
                if mangle_from_ and line.startswith('From '):
                    line = '>From ' + line[5:]
                line = line.replace('\n', os.linesep)
                target.write(line)
                lastline = line
            if self._append_newline and lastline and not lastline.endswith(os.linesep):
                # Make sure the message ends with a newline
                target.write(os.linesep)
        else:
            raise TypeError('Invalid message type: %s' % type(message))
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _create_tmp(self):
        """Create a file in the tmp subdirectory and open and return it."""
        now = time.time()
        hostname = socket.gethostname()
        if '/' in hostname:
            hostname = hostname.replace('/', r'\057')
        if ':' in hostname:
            hostname = hostname.replace(':', r'\072')
        uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
                                    Maildir._count, hostname)
        path = os.path.join(self._path, 'tmp', uniq)
        try:
            os.stat(path)
        except OSError, e:
            if e.errno == errno.ENOENT:
                Maildir._count += 1
                try:
                    return _create_carefully(path)
                except OSError, e:
                    if e.errno != errno.EEXIST:
                        raise
            else:
                raise

        # Fall through to here if stat succeeded or open raised EEXIST.
        raise ExternalClashError('Name clash prevented file creation: %s' %
                                 path)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        from_line = self._file.readline().replace(os.linesep, '')
        string = self._file.read(stop - self._file.tell())
        msg = self._message_factory(string.replace(os.linesep, '\n'))
        msg.set_from(from_line[5:])
        return msg
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def get_string(self, key, from_=False):
        """Return a string representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        if not from_:
            self._file.readline()
        string = self._file.read(stop - self._file.tell())
        return string.replace(os.linesep, '\n')
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def get_file(self, key):
        """Return a file-like representation or raise a KeyError."""
        return StringIO.StringIO(self.get_string(key).replace('\n',
                                                              os.linesep))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def get_bytes(self, key):
        """Return a bytes representation or raise a KeyError."""
        with open(os.path.join(self._path, self._lookup(key)), 'rb') as f:
            return f.read().replace(linesep, b'\n')