Python mimetypes 模块,guess_extension() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mimetypes.guess_extension()

项目:Telethon    作者:LonamiWebs    | 项目源码 | 文件源码
def get_extension(media):
    """Gets the corresponding extension for any Telegram media"""

    # Photos are always compressed as .jpg by Telegram
    if isinstance(media, (UserProfilePhoto, ChatPhoto, MessageMediaPhoto)):
        return '.jpg'

    # Documents will come with a mime type
    if isinstance(media, MessageMediaDocument):
        if isinstance(media.document, Document):
            if media.document.mime_type == 'application/octet-stream':
                # Octet stream are just bytes, which have no default extension
                return ''
            else:
                extension = guess_extension(media.document.mime_type)
                return extension if extension else ''

    return ''
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def _get_email_thread_attachment(ticket, email_category=None):

    try:
        _emails = ImplementationFactory.instance.get_singleton_of(
            'MailerServiceBase'
        ).get_emails(ticket)
    except (KeyError, MailerServiceException) as ex:
        raise InternalServerError(str(ex))

    emails = [email for email in _emails if email.category.lower() == email_category]

    try:
        content, filetype = utils.get_email_thread_content(ticket, emails)
    except (utils.EmailThreadTemplateNotFound, utils.EmailThreadTemplateSyntaxError) as ex:
        raise InternalServerError(str(ex))

    content = base64.b64encode(content)
    name = 'ticket_{}_emails_{}{}'.format(
        ticket.publicId,
        datetime.strftime(datetime.now(), '%d-%m-%Y_%H-%M-%S'),
        mimetypes.guess_extension(filetype),
    )

    return {'filetype': filetype, 'content': content, 'name': name}
项目:sample-platform    作者:CCExtractor    | 项目源码 | 文件源码
def validate_file(form, field):
        # File cannot end with a forbidden extension
        filename, file_extension = os.path.splitext(field.data.filename)
        if len(file_extension) > 0:
            forbidden_ext = ForbiddenExtension.query.filter(
                ForbiddenExtension.extension == file_extension[1:]).first()
            if forbidden_ext is not None:
                raise ValidationError('Extension not allowed')
        mimedata = field.data
        mimetype = magic.from_buffer(field.data.read(1024), mime=True)
        # File Pointer returns to beginning
        field.data.seek(0, 0)
        # Check for permitted mimetype
        forbidden_mime = ForbiddenMimeType.query.filter(
            ForbiddenMimeType.mimetype == mimetype).first()
        if forbidden_mime is not None:
            raise ValidationError('File MimeType not allowed')
        extension = mimetypes.guess_extension(mimetype)
        if extension is not None:
            forbidden_real = ForbiddenExtension.query.filter(
                ForbiddenExtension.extension == extension[1:]).first()
            if forbidden_real is not None:
                raise ValidationError('Extension not allowed')
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def filename_from_url(url, content_type):
    fn = urlsplit(url).path.rstrip('/')
    fn = os.path.basename(fn) if fn else 'index'
    if '.' not in fn and content_type:
        content_type = content_type.split(';')[0]
        if content_type == 'text/plain':
            # mimetypes returns '.ksh'
            ext = '.txt'
        else:
            ext = mimetypes.guess_extension(content_type)

        if ext == '.htm':  # Python 3
            ext = '.html'

        if ext:
            fn += ext

    return fn
项目:cuny-bdif    作者:aristotle-tek    | 项目源码 | 文件源码
def __init__(self, config_file=None):
        super(SonOfMMM, self).__init__(config_file)
        self.log_file = '%s.log' % self.instance_id
        self.log_path = os.path.join(self.working_dir, self.log_file)
        boto.set_file_logger(self.name, self.log_path)
        if self.sd.has_option('ffmpeg_args'):
            self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
        else:
            self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
        self.output_mimetype = self.sd.get('output_mimetype')
        if self.sd.has_option('output_ext'):
            self.output_ext = self.sd.get('output_ext')
        else:
            self.output_ext = mimetypes.guess_extension(self.output_mimetype)
        self.output_bucket = self.sd.get_obj('output_bucket')
        self.input_bucket = self.sd.get_obj('input_bucket')
        # check to see if there are any messages queue
        # if not, create messages for all files in input_bucket
        m = self.input_queue.read(1)
        if not m:
            self.queue_files()
项目:tucluster    作者:JamesRamm    | 项目源码 | 文件源码
def save_zip(self, stream, content_type, name=None):
        '''Save the zip stream to disk and extract its contents
        '''
        # Make sure the storage path exists
        ensure_dir(self._storage_path)

        ext = mimetypes.guess_extension(content_type)
        if not name:
            name = str(self._uuidgen())
        fname = '{uuid}{ext}'.format(uuid=name, ext=ext)
        archive_path = os.path.join(self._storage_path, fname)

        self._write(archive_path, stream)
        # extract the zip file
        directory = extract_model(archive_path, name, self._storage_path)
        return fmdb.id_from_path(directory), name
项目:munch-core    作者:crunchmail    | 项目源码 | 文件源码
def store(self):
        if len(self.data) >= self.MAX_SIZE:
            raise TooBigMedia(self.identifying_name, self.MAX_SIZE)

        mime = magic.from_buffer(self.data, mime=True)
        if mime not in self.allowed_mimetypes:
            raise InvalidMimeType(mime)

        self.extension = mimetypes.guess_extension(mime)

        # weirdness from mimetypes
        if self.extension == '.jpe':
            self.extension = '.jpeg'

        checksum = hashlib.sha1(self.data).hexdigest()
        fn = '{}{}'.format(checksum, self.extension)

        img = Image(organization=self.organization)
        img.file.save(fn, ContentFile(self.data))
        return img.get_absolute_url()
项目:learneveryword    作者:karan    | 项目源码 | 文件源码
def __init__(self, config_file=None):
        super(SonOfMMM, self).__init__(config_file)
        self.log_file = '%s.log' % self.instance_id
        self.log_path = os.path.join(self.working_dir, self.log_file)
        boto.set_file_logger(self.name, self.log_path)
        if self.sd.has_option('ffmpeg_args'):
            self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
        else:
            self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
        self.output_mimetype = self.sd.get('output_mimetype')
        if self.sd.has_option('output_ext'):
            self.output_ext = self.sd.get('output_ext')
        else:
            self.output_ext = mimetypes.guess_extension(self.output_mimetype)
        self.output_bucket = self.sd.get_obj('output_bucket')
        self.input_bucket = self.sd.get_obj('input_bucket')
        # check to see if there are any messages queue
        # if not, create messages for all files in input_bucket
        m = self.input_queue.read(1)
        if not m:
            self.queue_files()
项目:flux_line_bot    作者:blesscat    | 项目源码 | 文件源码
def fileinfo(self, path):
        info, images = self.robot_obj.file_info(path)
        for key, value in info.items():
            if len(value) > 30:
                logger.info("    :%s => %s", key, value[:30])
            else:
                logger.info("    :%s => %s", key, value)
        logger.info("%s" % info)

        previews = []
        for img in images:
            ext = mimetypes.guess_extension(img[0])
            if ext:
                ntf = NamedTemporaryFile(suffix=ext, delete=False)
                ntf.write(img[1])
                previews.append(ntf.name)
        if previews:
            os.system("open " + " ".join(previews))
项目:pytemplate    作者:krotos139    | 项目源码 | 文件源码
def add_media_to_archive(self, media, mime, name=''):
        """Adds to "Pictures" archive folder the file in `media` and register
        it into manifest file."""
        extension = None
        if hasattr(media, 'name') and not name:
            extension = path.splitext(media.name)
            name      = extension[0]
            extension = extension[1]

        if not extension:
            extension = guess_extension(mime)

        media_path = 'Pictures/%s%s' % (name, extension)
        media.seek(0)
        self.files[media_path] = media.read(-1)
        if hasattr(media, 'close'):
            media.close()

        files_node = self.manifest.getElementsByTagName('manifest:manifest')[0]
        node = self.create_node(self.manifest, 'manifest:file-entry', files_node)
        node.setAttribute('manifest:full-path', media_path)
        node.setAttribute('manifest:media-type', mime)

        return media_path
项目:webapp    作者:superchilli    | 项目源码 | 文件源码
def filename_from_url(url, content_type):
    fn = urlsplit(url).path.rstrip('/')
    fn = os.path.basename(fn) if fn else 'index'
    if '.' not in fn and content_type:
        content_type = content_type.split(';')[0]
        if content_type == 'text/plain':
            # mimetypes returns '.ksh'
            ext = '.txt'
        else:
            ext = mimetypes.guess_extension(content_type)

        if ext == '.htm':  # Python 3
            ext = '.html'

        if ext:
            fn += ext

    return fn
项目:xd    作者:century-arcade    | 项目源码 | 文件源码
def generate_email_files(msg):
    counter = 1
    upload_date = time.mktime(email.utils.parsedate(msg["Date"]))
    for part in msg.walk():
        # multipart/* are just containers
        if part.get_content_maintype() == 'multipart':
            continue
        # Applications should really sanitize the given filename so that an
        # email message can't be used to overwrite important files
        filename = part.get_filename()
        if not filename:
            ext = mimetypes.guess_extension(part.get_content_type())
            if not ext:
                # Use a generic bag-of-bits extension
                ext = '.bin'
            filename = 'part-%03d%s' % (counter, ext)
        counter += 1

        data = part.get_payload(decode=True)
        if parse_pathname(filename).ext == '.zip':
            for zipfn, zipdata, zipdt in generate_zip_files(data):
                yield zipfn, zipdata, zipdt
        else:
            yield filename, data, upload_date
项目:Chromium_DepotTools    作者:p07r0457    | 项目源码 | 文件源码
def __init__(self, config_file=None):
        Service.__init__(self, config_file)
        self.log_file = '%s.log' % self.instance_id
        self.log_path = os.path.join(self.working_dir, self.log_file)
        boto.set_file_logger(self.name, self.log_path)
        if self.sd.has_option('ffmpeg_args'):
            self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
        else:
            self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
        self.output_mimetype = self.sd.get('output_mimetype')
        if self.sd.has_option('output_ext'):
            self.output_ext = self.sd.get('output_ext')
        else:
            self.output_ext = mimetypes.guess_extension(self.output_mimetype)
        self.output_bucket = self.sd.get_obj('output_bucket')
        self.input_bucket = self.sd.get_obj('input_bucket')
        # check to see if there are any messages queue
        # if not, create messages for all files in input_bucket
        m = self.input_queue.read(1)
        if not m:
            self.queue_files()
项目:node-gn    作者:Shouqun    | 项目源码 | 文件源码
def __init__(self, config_file=None):
        Service.__init__(self, config_file)
        self.log_file = '%s.log' % self.instance_id
        self.log_path = os.path.join(self.working_dir, self.log_file)
        boto.set_file_logger(self.name, self.log_path)
        if self.sd.has_option('ffmpeg_args'):
            self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
        else:
            self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
        self.output_mimetype = self.sd.get('output_mimetype')
        if self.sd.has_option('output_ext'):
            self.output_ext = self.sd.get('output_ext')
        else:
            self.output_ext = mimetypes.guess_extension(self.output_mimetype)
        self.output_bucket = self.sd.get_obj('output_bucket')
        self.input_bucket = self.sd.get_obj('input_bucket')
        # check to see if there are any messages queue
        # if not, create messages for all files in input_bucket
        m = self.input_queue.read(1)
        if not m:
            self.queue_files()
项目:arachas    作者:GwentAPI    | 项目源码 | 文件源码
def run(self):
        while True:
            # The name will be used for saving the file
            name, url = self.imageQueue.get()
            res = requests.get(url, headers=HEADERS, timeout=TIMEOUT, stream=True)

            if res.status_code == 200:
                content_type = res.headers['content-type']
                # With the content type received from the web server, use mimetypes to guess the file extension.
                extension = mimetypes.guess_extension(content_type)

                filepath = os.path.join('./' + IMAGE_FOLDER + '/' + name + extension)
                with open(filepath, 'wb') as f:
                    # Stream the files.
                    for chunk in res:
                        f.write(chunk)
            # Notify that we have finished one task.
            self.imageQueue.task_done()

# Function to retrieve a list of URL for every pages of cards.
# The url parameter is the entry point of the website where we might extract the information.
项目:alfred-ec2    作者:SoMuchToGrok    | 项目源码 | 文件源码
def __init__(self, config_file=None):
        super(SonOfMMM, self).__init__(config_file)
        self.log_file = '%s.log' % self.instance_id
        self.log_path = os.path.join(self.working_dir, self.log_file)
        boto.set_file_logger(self.name, self.log_path)
        if self.sd.has_option('ffmpeg_args'):
            self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
        else:
            self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
        self.output_mimetype = self.sd.get('output_mimetype')
        if self.sd.has_option('output_ext'):
            self.output_ext = self.sd.get('output_ext')
        else:
            self.output_ext = mimetypes.guess_extension(self.output_mimetype)
        self.output_bucket = self.sd.get_obj('output_bucket')
        self.input_bucket = self.sd.get_obj('input_bucket')
        # check to see if there are any messages queue
        # if not, create messages for all files in input_bucket
        m = self.input_queue.read(1)
        if not m:
            self.queue_files()
项目:python-flask-security    作者:weinbergdavid    | 项目源码 | 文件源码
def filename_from_url(url, content_type):
    fn = urlsplit(url).path.rstrip('/')
    fn = os.path.basename(fn) if fn else 'index'
    if '.' not in fn and content_type:
        content_type = content_type.split(';')[0]
        if content_type == 'text/plain':
            # mimetypes returns '.ksh'
            ext = '.txt'
        else:
            ext = mimetypes.guess_extension(content_type)

        if ext == '.htm':  # Python 3
            ext = '.html'

        if ext:
            fn += ext

    return fn
项目:depot_tools    作者:webrtc-uwp    | 项目源码 | 文件源码
def __init__(self, config_file=None):
        Service.__init__(self, config_file)
        self.log_file = '%s.log' % self.instance_id
        self.log_path = os.path.join(self.working_dir, self.log_file)
        boto.set_file_logger(self.name, self.log_path)
        if self.sd.has_option('ffmpeg_args'):
            self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
        else:
            self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
        self.output_mimetype = self.sd.get('output_mimetype')
        if self.sd.has_option('output_ext'):
            self.output_ext = self.sd.get('output_ext')
        else:
            self.output_ext = mimetypes.guess_extension(self.output_mimetype)
        self.output_bucket = self.sd.get_obj('output_bucket')
        self.input_bucket = self.sd.get_obj('input_bucket')
        # check to see if there are any messages queue
        # if not, create messages for all files in input_bucket
        m = self.input_queue.read(1)
        if not m:
            self.queue_files()
项目:BitBot    作者:crack00r    | 项目源码 | 文件源码
def get_extension(media):
    """Gets the corresponding extension for any Telegram media"""

    # Photos are always compressed as .jpg by Telegram
    if (isinstance(media, UserProfilePhoto) or isinstance(media, ChatPhoto) or
            isinstance(media, MessageMediaPhoto)):
        return '.jpg'

    # Documents will come with a mime type, from which we can guess their mime type
    if isinstance(media, MessageMediaDocument):
        extension = guess_extension(media.document.mime_type)
        return extension if extension else ''

    return None
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def parse_attachment(part):
    """
        Get attachments of an email

        :param `Message` part: A `Message`
        :rtype: list
        :return: The list of attachments
    """
    attachment = {}
    attachment['content_type'] = part.get_content_type()

    if attachment['content_type'].lower() in ['message/rfc822', 'message/delivery-status']:
        attachment['content'] = str(part)
    else:
        attachment['content'] = part.get_payload(decode=True)

    filename = part.get_filename()

    if not filename:
        filename = hashlib.sha1(attachment['content']).hexdigest()
        if attachment['content_type']:
            extension = mimetypes.guess_extension(attachment['content_type'])
            if extension:
                filename += extension

    attachment['filename'] = get_valid_filename(utils.decode_every_charset_in_the_world(filename))
    return attachment
项目:embeddeddata    作者:toolforge    | 项目源码 | 文件源码
def remux_detect(f):
    from detection.utils import filetype

    f = os.path.abspath(f)
    mime = filetype(f)
    ext = mimetypes.guess_extension(mime, strict=False)
    if ext:
        if ext[0] == '.':
            ext = ext[1:]
        if ext == 'ogx':
            ext = 'ogg'
    else:
        # naive get extension from mime
        ext = mime.split('/')[1]
        if ext[:2] == 'x-':
            ext = ext[2:]
    with tempfile.NamedTemporaryFile(suffix='.'+ext) as tmp:
        args = ['ffmpeg',
                '-loglevel', 'warning',
                '-y',
                '-i', f,
                '-c', 'copy',
                tmp.name]
        subprocess.call(args)

        size = os.path.getsize(tmp.name)
        if size:
            return size, False
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def save(self, image_stream, image_content_type):
        ext = mimetypes.guess_extension(image_content_type)
        name = '{uuid}{ext}'.format(uuid=self._uuidgen(), ext=ext)
        image_path = os.path.join(self._storage_path, name)

        with self._fopen(image_path, 'wb') as image_file:
            while True:
                chunk = image_stream.read(self._CHUNK_SIZE_BYTES)
                if not chunk:
                    break

                image_file.write(chunk)

        return name
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def upload(url):
    cclive = subprocess.Popen("cclive --support | xargs | tr ' ' '|'", stdout=subprocess.PIPE, shell=True)
    (cclive_formats, err) = cclive.communicate()

    re_youtube = "youtube|youtu\.be|yooouuutuuube"
    search = ".*(?:{}|{}).*".format(re_youtube, cclive_formats)
    try:
        if re.match(search, url, re.I):
            if re.match(".*(?:{}).*".format(re_youtube), url, re.I):
                cmd = "youtube-dl --quiet --recode-video webm --format webm/mp4 --output /tmp/%\(id\)s.webm {}".format(url)
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
                yt = ".*(?:youtube.*?(?:v=|/v/)|youtu\.be/|yooouuutuuube.*?id=)([-_a-zA-Z0-9]+).*"
                file = "/tmp/{}.webm".format(re.match(yt, url, re.I).group(1))
            else:
                cmd = "cclive --quiet -f fmt43_360p {} --O /tmp/pomf.webm --exec 'echo -n %f'".format(url, "/tmp")
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
                (file, err) = p.communicate()
        else:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:37.0) Gecko/20100101 Firefox/37.0',
                'Referer': 'http://www.amazon.com/'
            }

            extension = guess_extension(guess_type(url)[0]).replace('jpe','jpg')
            temp = tempfile.NamedTemporaryFile(suffix=extension)
            content = requests.get(url).content
            temp.write(content)
            file = temp.name

        fh = open(file, "rb")
        fh.seek(0)

        content = requests.post(url="http://pomf.se/upload.php", files={"files[]":fh})
        if not content.status_code // 100 == 2:
            raise Exception("Unexpected response {}".format(content))
        return "http://a.pomf.se/{}".format(content.json()["files"][0]["url"])
    except Exception as e:
        return "Error: {}".format(e)
项目:discordbot.py    作者:rauenzi    | 项目源码 | 文件源码
def downloadImage(url, folder, name, loop, chunkSize=20):
    result = {'canAccessURL': False, 'isImage': False, 'fileSaved': False}
    headers = {
        'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
        'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
        'Accept-Encoding': 'none',
        'Accept-Language': 'en-US,en;q=0.8',
        'Connection': 'keep-alive'}
    async with aiohttp.ClientSession(loop=loop) as session:
        with aiohttp.Timeout(10, loop=session.loop):
            async with session.get(url, headers=headers) as response:
                content_type = response.headers['content-type']
                if response.status == 200:
                    result['canAccessURL'] = True
                if "image" in content_type:
                    result['isImage'] = True
                if not result['canAccessURL'] or not result['isImage']:
                    return result
                extension = mimetypes.guess_extension(content_type)
                if extension == '.jpe':
                    extension = '.jpg'

                with open(folder + "/" + name + extension, 'wb') as fd:
                    while True:
                        chunk = await response.content.read(chunkSize)
                        if not chunk:
                            break
                        fd.write(chunk)
                result['fileSaved'] = True
                return result
项目:discordbot.py    作者:rauenzi    | 项目源码 | 文件源码
def downloadImage(url, folder, name, loop, chunkSize=20):
        result = {'canAccessURL': False, 'isImage': False, 'fileSaved': False}
        headers = {
            'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
            'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
            'Accept-Encoding': 'none',
            'Accept-Language': 'en-US,en;q=0.8',
            'Connection': 'keep-alive'}
        async with aiohttp.ClientSession(loop=loop) as session:
            with aiohttp.Timeout(10, loop=session.loop):
                async with session.get(url, headers=headers) as response:
                    content_type = response.headers['content-type']
                    if response.status == 200:
                        result['canAccessURL'] = True
                    if "image" in content_type:
                        result['isImage'] = True
                    if not result['canAccessURL'] or not result['isImage']:
                        return result
                    extension = mimetypes.guess_extension(content_type)
                    if extension == '.jpe':
                        extension = '.jpg'

                    with open(folder + "/" + name + extension, 'wb') as fd:
                        while True:
                            chunk = await response.content.read(chunkSize)
                            if not chunk:
                                break
                            fd.write(chunk)
                    result['fileSaved'] = True
                    return result
项目:hangoutsbot    作者:das7pad    | 项目源码 | 文件源码
def upload_image(self, image_uri, sync, username, userid, channel_name):
        token = self.apikey
        logger.info('downloading %s', image_uri)
        filename = os.path.basename(image_uri)
        request = urllib.request.Request(image_uri)
        request.add_header("Authorization", "Bearer %s" % token)
        image_response = urllib.request.urlopen(request)
        content_type = image_response.info().get_content_type()

        filename_extension = mimetypes.guess_extension(content_type).lower() # returns with "."
        physical_extension = "." + filename.rsplit(".", 1).pop().lower()

        if physical_extension == filename_extension:
            pass
        elif filename_extension == ".jpe" and physical_extension in [ ".jpg", ".jpeg", ".jpe", ".jif", ".jfif" ]:
            # account for mimetypes idiosyncrancy to return jpe for valid jpeg
            pass
        else:
            logger.warning("unable to determine extension: {} {}".format(filename_extension, physical_extension))
            filename += filename_extension

        logger.info('uploading as %s', filename)
        image_id = yield from self.bot._client.upload_image(image_response, filename=filename)

        logger.info('sending HO message, image_id: %s', image_id)
        yield from sync._bridgeinstance._send_to_internal_chat(
            sync.hangoutid,
            "shared media from slack",
            {   "sync": sync,
                "source_user": username,
                "source_uid": userid,
                "source_title": channel_name },
            image_id=image_id )
项目:Python-Scripts    作者:amitsagtani97    | 项目源码 | 文件源码
def download(url, filename):
    file_url = requests.get(url)
    file_extension = mimetypes.guess_extension(file_url.headers['content-type'])
    with open(filename+file_extension, 'wb') as file:
        file.write(file_url.content)
项目:omnic    作者:michaelpb    | 项目源码 | 文件源码
def __init__(self, s):
        self.str = s

        # Extract arguments (anything that follows ':')
        if ':' in s:
            self.ts_format, _, arguments_str = s.partition(':')
            self.arguments = tuple(arguments_str.split(','))
        else:
            self.ts_format = s
            self.arguments = tuple()

        # Check if is mimetype, extension or qualifier
        self.is_qualifier = False
        self.mimetype = None
        self.extension = None
        if '/' in self.ts_format:
            self.mimetype = self.ts_format
            ext = mimetypes.guess_extension(self.mimetype)
            if ext:
                self.extension = ext.strip('.').upper()
        elif self.ts_format.isupper():
            self.extension = self.ts_format
            fn = 'fn.%s' % self.extension
            self.mimetype, _ = mimetypes.guess_type(fn)  # discard encoding
        else:
            # Is qualifier, can't determine mimetype OR extension
            self.is_qualifier = True
项目:osp-scraper    作者:opensyllabus    | 项目源码 | 文件源码
def guess_extension(mimetype):
    """guess a file extension from mimetype, without leading `.`

    Returns `unknown` if an extension could not be guessed
    """
    x = (mimetypes.guess_extension(mimetype.split(';')[0]) or '.unknown')[1:]
    return x if x != 'htm' else 'html'
项目:aioworkers    作者:aioworkers    | 项目源码 | 文件源码
def load_conf(self, fd, *, path=None, mime_type=None, response=None):
        if isinstance(response, http.client.HTTPResponse):
            url = URL(response.geturl())
            self.uris.append(url)
            mime_type = response.headers.get('Content-Type')
            if mime_type:
                mime_type = mime_type.split(';')[0].strip()
            logger.info('Config found: {} [{}]'.format(url, mime_type))
        if path:
            loader = registry.get(path.suffix)
            path = path.absolute()
            self.uris.append(path)
            logger.info('Config found: {}'.format(path))
        elif mime_type in registry:
            loader = registry.get(mime_type)
        elif mimetypes.guess_extension(mime_type) in registry:
            loader = registry.get(mimetypes.guess_extension(mime_type))
        elif not mime_type:
            raise LookupError('Not found mime_type %s' % mime_type)
        else:
            raise NotImplemented
        if response is not None:
            return loader.load_bytes(response.read())
        elif fd is None:
            return loader.load_path(path)
        with fd:
            return loader.load_fd(fd)
项目:ecs    作者:ecs-org    | 项目源码 | 文件源码
def get_filename(self):
        if self.mimetype == 'application/vnd.ms-excel': # HACK: we want .xls not .xlb for excel files
            ext = '.xls'
        else:
            ext = mimetypes.guess_extension(self.mimetype) or '.bin'
        name_slices = [
            self.doctype.name if self.doctype else 'Unterlage', self.name,
            self.version, timezone.localtime(self.date).strftime('%Y.%m.%d')
        ]
        if self.parent_object and hasattr(self.parent_object, 'get_filename_slice'):
            name_slices.insert(0, self.parent_object.get_filename_slice())
        name = slugify('-'.join(name_slices))
        return ''.join([name, ext])
项目:mybookshelf2    作者:izderadicka    | 项目源码 | 文件源码
def ext_from_mimetype(mimetype):
    return mimetypes.guess_extension(mimetype)
项目:pelisalacarta-ce    作者:pelisalacarta-ce    | 项目源码 | 文件源码
def __get_download_filename__(self):
        #Obtenemos nombre de archivo y extension
        if "filename" in self.response_headers.get("content-disposition","") and "attachment" in self.response_headers.get("content-disposition",""):
              cd_filename, cd_ext = os.path.splitext(urllib.unquote_plus(re.compile("attachment; filename ?= ?[\"|']?([^\"']+)[\"|']?").match(self.response_headers.get("content-disposition")).group(1)))
        if "filename" in self.response_headers.get("content-disposition","") and "inline" in self.response_headers.get("content-disposition",""):
              cd_filename, cd_ext = os.path.splitext(urllib.unquote_plus(re.compile("inline; filename ?= ?[\"|']?([^\"']+)[\"|']?").match(self.response_headers.get("content-disposition")).group(1)))
        else:
              cd_filename, cd_ext = "",""

        url_filename, url_ext = os.path.splitext(urllib.unquote_plus(filetools.basename(urlparse.urlparse(self.url)[2])))
        if self.response_headers.get("content-type","application/octet-stream") <> "application/octet-stream":
            mime_ext = mimetypes.guess_extension(self.response_headers.get("content-type"))
        else:
            mime_ext = ""

        #Seleccionamos el nombre mas adecuado
        if cd_filename:
            self.remote_filename = cd_filename
            if not self._filename:
                self._filename = cd_filename

        elif url_filename:
            self.remote_filename = url_filename
            if not self._filename:
                self._filename = url_filename

        #Seleccionamos la extension mas adecuada
        if cd_ext: 
            if not cd_ext in self._filename: self._filename += cd_ext
            if self.remote_filename: self.remote_filename += cd_ext
        elif mime_ext:
            if not mime_ext in self._filename: self._filename += mime_ext
            if self.remote_filename: self.remote_filename += mime_ext
        elif url_ext:
            if not url_ext in self._filename: self._filename += url_ext
            if self.remote_filename: self.remote_filename += url_ext
项目:RedditDownloader    作者:shadowmoose    | 项目源码 | 文件源码
def handle(url, data):
    try:
        config = Config()
        config.browser_user_agent = data['user_agent']
        article = Article(url, config)
        article.download()
        article.parse()
        if article.top_image:
            print('\t\tNewspaper located image: %s' % article.top_image)

            r = requests.get(article.top_image, headers = {'User-Agent': data['user_agent']}, stream=True)
            if r.status_code == 200:
                content_type = r.headers['content-type']
                ext = mimetypes.guess_extension(content_type)
                if not ext or ext=='':
                    print('\t\tNewsPaper Error locating file MIME Type: %s' % url)
                    return False
                if '.jp' in ext:
                    ext = '.jpg'
                path = data['single_file'] % ext
                if not os.path.isfile(path):
                    if not os.path.isdir(data['parent_dir']):
                        print("\t\t+Building dir: %s" % data['parent_dir'])
                        os.makedirs(data['parent_dir'])# Parent dir for the full filepath is supplied already.
                    with open(path, 'wb') as f:
                        r.raw.decode_content = True
                        shutil.copyfileobj(r.raw, f)
                return path
            else:
                print('\t\tError Reading Image: %s responded with code %i!' % (url, r.status_code) )
                return False
    except Exception as e:
        print('\t\t"Newspaper" Generic handler failed. '+(str(e).strip()) )
    return False
项目:flux_line_bot    作者:blesscat    | 项目源码 | 文件源码
def play_info(self):
        metadata, images = self.robot_obj.play_info()
        logger.info("Metadata:")
        for k, v in metadata.items():
            logger.info("  %s=%s", k, v)
        tempfiles = []
        if images:
            for mime, buf in images:
                ext = mimetypes.guess_extension(mime)
                if ext:
                    ntf = NamedTemporaryFile(suffix=".jpg", delete=False)
                    ntf.write(buf)
                    tempfiles.append(ntf)
            os.system("open " + " ".join([n.name for n in tempfiles]))
项目:flux_line_bot    作者:blesscat    | 项目源码 | 文件源码
def scan_oneshot(self, filename=None):
        images = self.task.oneshot()
        tempfiles = []
        for mime, buf in images:
            ext = mimetypes.guess_extension(mime)
            if ext:
                ntf = NamedTemporaryFile(suffix=".jpg", delete=False)
                ntf.write(buf)
                tempfiles.append(ntf)

        os.system("open " + " ".join([n.name for n in tempfiles]))
项目:flux_line_bot    作者:blesscat    | 项目源码 | 文件源码
def scanimages(self, filename=None):
        images = self.task.scanimages()
        tempfiles = []
        for mime, buf in images:
            ext = mimetypes.guess_extension(mime)
            if ext:
                ntf = NamedTemporaryFile(suffix=".jpg", delete=False)
                ntf.write(buf)
                tempfiles.append(ntf)

        os.system("open " + " ".join([n.name for n in tempfiles]))
项目:SuperOcto    作者:mcecchi    | 项目源码 | 文件源码
def get_filename(self, content_type):
        if not self._basename:
            return None

        typeValue = map(str.strip, content_type.split(";"))
        if len(typeValue) == 0:
            return None

        extension = mimetypes.guess_extension(typeValue[0])
        if not extension:
            return None

        return "%s%s" % (self._basename, extension)
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def _find_attachments_in_email(mesg, expand_attachment, atts):

    # MHTML detection
    if mesg.get_content_maintype() == "multipart" and mesg.get_content_subtype() == "related":
        for part in mesg.walk():
            if part.is_multipart():
                continue
            payload = part.get_payload(decode=True)
            if isinstance(payload, str) and payload.startswith('ActiveMime'):
                return

    for part in mesg.walk():
        content_type = part.get_content_type()
        if part.is_multipart():
            continue
        payload = part.get_payload(decode=True)

        if content_type.startswith('text/') and expand_attachment:
            normalized = payload.lstrip(" \t\r\n")
            if any(normalized.startswith(m) for m in EMAIL_MAGIC):
                new_mesg = email.message_from_string(normalized)
                _find_attachments_in_email(new_mesg, expand_attachment, atts)
                continue

        if content_type in SAFE_MEDIA_TYPE:
            continue

        filename = part.get_filename()
        if filename is None:
            ext = mimetypes.guess_extension(content_type) or ''
            filename = '<unknown>' + ext
        else:
            # Sanitize the header value
            filename = _decode_header(filename)
            filename = utils.get_filename_from_path(filename)
        tempfile_path = utils.store_temp_file(payload, filename)
        atts.append((tempfile_path, filename, content_type))
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def get_extension(attachment):
    """

    """
    try:
        filename = attachment.get_filename()
        if filename:
            extension = os.path.splitext(filename)[1]
        else:
            extension = mimetypes.guess_extension(attachment.get_content_type())
        return extension or '.bin'

    except AttributeError:
        return None
项目:malyzer    作者:srozb    | 项目源码 | 文件源码
def _determineExtension(determined_type):
    extension = config.default_ext
    if determined_type in type_override:
        return type_override[determined_type]
    try:
        extension = guess_extension(determined_type)
    except:
        pass
    return extension
项目:fame_modules    作者:certsocietegenerale    | 项目源码 | 文件源码
def each(self, target):
        fp = open(target)
        msg = email.message_from_file(fp)

        fp.close()
        path_temp = tempdir()
        counter = 1
        for part in msg.walk():
            # multipart/* are just containers
            if part.get_content_maintype() == 'multipart':
                continue
            # Applications should really sanitize the given filename so that an
            # email message can't be used to overwrite important files
            filename = part.get_filename()
            if not filename:
                ext = mimetypes.guess_extension(part.get_content_type())
                if not ext:
                    # Use a generic bag-of-bits extension
                    ext = '.bin'
                filename = 'part-%03d%s' % (counter, ext)
            counter += 1
            filepath = os.path.join(path_temp, filename)
            fp = open(filepath, 'wb')
            fp.write(part.get_payload(decode=True))
            fp.close()
            self.add_extracted_file(filepath)
项目:Mastodon.py    作者:halcy    | 项目源码 | 文件源码
def media_post(self, media_file, mime_type=None, description=None):
        """
        Post an image. `media_file` can either be image data or
        a file name. If image data is passed directly, the mime
        type has to be specified manually, otherwise, it is
        determined from the file name.

        Throws a `MastodonIllegalArgumentError` if the mime type of the
        passed data or file can not be determined properly.

        Returns a `media dict`_. This contains the id that can be used in
        status_post to attach the media file to a toot.
        """
        if mime_type is None and os.path.isfile(media_file):
            mime_type = mimetypes.guess_type(media_file)[0]
            media_file = open(media_file, 'rb')

        if mime_type is None:
            raise MastodonIllegalArgumentError('Could not determine mime type'
                                               ' or data passed directly '
                                               'without mime type.')

        random_suffix = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10))
        file_name = "mastodonpyupload_" + str(time.time()) + "_" + str(random_suffix) + mimetypes.guess_extension(
            mime_type)

        media_file_description = (file_name, media_file, mime_type)
        return self.__api_request('POST', '/api/v1/media',
                                  files={'file': media_file_description},
                                  params={'description': description})

    ###
    # Writing data: Domain blocks
    ###
项目:pygear    作者:amir-khakshour    | 项目源码 | 文件源码
def get_buffer_extension(buffer):
    ext = get_mime_type_buffer(buffer)
    ext = mimetypes.guess_extension(ext)
    return ext if ext else '.png'
项目:interfax-python    作者:interfax    | 项目源码 | 文件源码
def _init_document(self, data, mime_type):
        """Upload the data using the documents API."""
        filename = 'upload-{0}{1}'.format(uuid4(), guess_extension(mime_type))
        document = self.client.documents.create(filename, len(data))

        cursor = 0

        while cursor < len(data):
            chunk = data[cursor:cursor + self.chunk_size]

            document.upload(cursor, cursor + len(chunk) - 1, chunk)
            cursor += len(chunk)

        self._init_url(document.uri)
项目:ohlife_replacement_server    作者:paulx3    | 项目源码 | 文件源码
def email_login():
    """
    login check
    :return:
    """
    if flask.request.method == 'POST':
        content = flask.request.form.get("html")
        if content is not None:
            print(content)
            soup = BeautifulSoup(content, "html.parser")
            save_key = soup.find(id="save_key").text.strip()
            # session_id will expire after 24 hours
            session_id = save_signer.unsign(save_key, max_age=86400)
            session_id = bytes.decode(session_id)
            user = User.query.filter_by(session_id=session_id).first_or_404()
            # try to save the attachment file
            limit_counter = 0
            try:
                for attachment in flask.request.files:
                    if limit_counter >= 1:
                        break
                    file_name = str(uuid.uuid1()) + guess_extension(flask.request.files[attachment].mimetype)
                    flask.request.files[attachment].save(file_name)
                    flask.session["file_name"] = file_name
                    limit_counter += 1
            except AttributeError:
                flask.session["file_name"] = ""
            flask.session["entry"] = soup.select('div[style]')[0].text
            flask.session["user_real_id"] = user.user_id
            # after login flask_login will push user_id into session and this user_id is our session_id
            # as the get_id method in User model returns user's session id
            flask_login.login_user(user)
            return flask.redirect(flask.url_for('protected_save'))
    return flask.redirect(flask.url_for('protected_save'))
项目:teleflask    作者:luckydonald    | 项目源码 | 文件源码
def prepare_file(self):
        """
        This sets `self.file` to a fitting :class:`InputFile`
        or a fitting sublcass (:class:`InputFileFromDisk`, :class:`InputFileFromURL`)
        :return: Nothing
        """
        if self.file_content:
            file_name = "file"
            file_suffix = ".blob"
            if self.file_path:
                file_name = os.path.basename(os.path.normpath(self.file_path))  # http://stackoverflow.com/a/3925147
                file_name, file_suffix = os.path.splitext(file_name)  # http://stackoverflow.com/a/541394/3423324
            elif self.file_url:
                from urllib.parse import urlparse  # http://stackoverflow.com/a/18727481/3423324
                url = urlparse(self.file_url)
                file_name = os.path.basename(url.path)
                file_name, file_suffix = os.path.splitext(file_name)
            # end if
            if self.file_mime:
                import mimetypes
                file_suffix = mimetypes.guess_extension(self.file_mime)
                file_suffix = '.jpg' if file_suffix == '.jpe' else file_suffix  # .jpe -> .jpg
            # end if
            if not file_suffix or not file_suffix.strip().lstrip("."):
                logger.debug("file_suffix was empty. Using '.blob'")
                file_suffix = ".blob"
            # end if
            file_name = "{filename}{suffix}".format(filename=file_name, suffix=file_suffix)
            self.file = InputFile(self.file_content, file_name=file_name, file_mime=self.file_mime)
        elif self.file_path:
            self.file = InputFileFromDisk(self.file_path, file_mime=self.file_mime)
        elif self.file_url:
            self.file = InputFileFromURL(self.file_url, file_mime=self.file_mime)
        # end if
    # end def prepare_file
项目:teleflask    作者:luckydonald    | 项目源码 | 文件源码
def send(self, sender: PytgbotApiBot, receiver, reply_id)->PytgbotApiMessage:
        if self.receiver:
            receiver = self.receiver
        # end if
        if self.reply_id is not DEFAULT_MESSAGE_ID:
            reply_id = self.reply_id
        # end if
        self.prepare_file()
        assert isinstance(self.file, (InputFile, InputFileFromDisk, InputFileFromURL))
        if not any([self.file.file_name.endswith(x) for x in [".jpg", ".jpeg", ".gif", ".png", ".tif", ".bmp"]]):
            if self.file.file_mime in ["image/jpg", "image/jpeg", "image/jpe"]:  # manually, to avoid .jpe ending.
                self.file.file_name+=".jpg"
            else:
                import mimetypes
                ext = mimetypes.guess_extension(self.file.file_mime)  # automatically
                if ext not in [".jpg", ".jpeg", ".gif", ".png", ".tif", ".bmp"]:
                    ext = ".unknown-file-type.png"  # At least we can try setting it as .png
                self.file.file_name += ext
        try:
            return sender.send_photo(
                receiver, self.file, caption=self.caption, reply_to_message_id=reply_id, reply_markup=self.reply_markup,
                disable_notification = self.disable_notification
            )
        except TgApiServerException as e:
            should_backoff(e)  # checks if it should raise an DoRetryException
            raise  # else it just raises as usual
        # end try
    # end def send
# end class PhotoMessage
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _download_http_url(link, session, temp_dir, hashes):
    """Download link url into temp_dir using provided session"""
    target_url = link.url.split('#', 1)[0]
    try:
        resp = session.get(
            target_url,
            # We use Accept-Encoding: identity here because requests
            # defaults to accepting compressed responses. This breaks in
            # a variety of ways depending on how the server is configured.
            # - Some servers will notice that the file isn't a compressible
            #   file and will leave the file alone and with an empty
            #   Content-Encoding
            # - Some servers will notice that the file is already
            #   compressed and will leave the file alone and will add a
            #   Content-Encoding: gzip header
            # - Some servers won't notice anything at all and will take
            #   a file that's already been compressed and compress it again
            #   and set the Content-Encoding: gzip header
            # By setting this to request only the identity encoding We're
            # hoping to eliminate the third case. Hopefully there does not
            # exist a server which when given a file will notice it is
            # already compressed and that you're not asking for a
            # compressed file and will then decompress it before sending
            # because if that's the case I don't think it'll ever be
            # possible to make this work.
            headers={"Accept-Encoding": "identity"},
            stream=True,
        )
        resp.raise_for_status()
    except requests.HTTPError as exc:
        logger.critical(
            "HTTP error %s while getting %s", exc.response.status_code, link,
        )
        raise

    content_type = resp.headers.get('content-type', '')
    filename = link.filename  # fallback
    # Have a look at the Content-Disposition header for a better guess
    content_disposition = resp.headers.get('content-disposition')
    if content_disposition:
        type, params = cgi.parse_header(content_disposition)
        # We use ``or`` here because we don't want to use an "empty" value
        # from the filename param.
        filename = params.get('filename') or filename
    ext = splitext(filename)[1]
    if not ext:
        ext = mimetypes.guess_extension(content_type)
        if ext:
            filename += ext
    if not ext and link.url != resp.url:
        ext = os.path.splitext(resp.url)[1]
        if ext:
            filename += ext
    file_path = os.path.join(temp_dir, filename)
    with open(file_path, 'wb') as content_file:
        _download_url(resp, link, content_file, hashes)
    return file_path, content_type
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _download_http_url(link, session, temp_dir, hashes):
    """Download link url into temp_dir using provided session"""
    target_url = link.url.split('#', 1)[0]
    try:
        resp = session.get(
            target_url,
            # We use Accept-Encoding: identity here because requests
            # defaults to accepting compressed responses. This breaks in
            # a variety of ways depending on how the server is configured.
            # - Some servers will notice that the file isn't a compressible
            #   file and will leave the file alone and with an empty
            #   Content-Encoding
            # - Some servers will notice that the file is already
            #   compressed and will leave the file alone and will add a
            #   Content-Encoding: gzip header
            # - Some servers won't notice anything at all and will take
            #   a file that's already been compressed and compress it again
            #   and set the Content-Encoding: gzip header
            # By setting this to request only the identity encoding We're
            # hoping to eliminate the third case. Hopefully there does not
            # exist a server which when given a file will notice it is
            # already compressed and that you're not asking for a
            # compressed file and will then decompress it before sending
            # because if that's the case I don't think it'll ever be
            # possible to make this work.
            headers={"Accept-Encoding": "identity"},
            stream=True,
        )
        resp.raise_for_status()
    except requests.HTTPError as exc:
        logger.critical(
            "HTTP error %s while getting %s", exc.response.status_code, link,
        )
        raise

    content_type = resp.headers.get('content-type', '')
    filename = link.filename  # fallback
    # Have a look at the Content-Disposition header for a better guess
    content_disposition = resp.headers.get('content-disposition')
    if content_disposition:
        type, params = cgi.parse_header(content_disposition)
        # We use ``or`` here because we don't want to use an "empty" value
        # from the filename param.
        filename = params.get('filename') or filename
    ext = splitext(filename)[1]
    if not ext:
        ext = mimetypes.guess_extension(content_type)
        if ext:
            filename += ext
    if not ext and link.url != resp.url:
        ext = os.path.splitext(resp.url)[1]
        if ext:
            filename += ext
    file_path = os.path.join(temp_dir, filename)
    with open(file_path, 'wb') as content_file:
        _download_url(resp, link, content_file, hashes)
    return file_path, content_type