Python magic 模块,from_buffer() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用magic.from_buffer()

项目:sqlalchemy-media    作者:pylover    | 项目源码 | 文件源码
def magic_mime_from_buffer(buffer: bytes) -> str:
    """
    Try to detect mimetype using ``magic`` library.

    .. warning:: :exc:`.OptionalPackageRequirementError` will be raised if ``python-magic`` is not installed.

    :param buffer: buffer from header of file.

    :return: The mimetype
    """

    if magic is None:  # pragma: no cover
        raise OptionalPackageRequirementError('python-magic')

    return magic.from_buffer(buffer, mime=True)


# wand image
项目:sample-platform    作者:CCExtractor    | 项目源码 | 文件源码
def validate_file(form, field):
        # File cannot end with a forbidden extension
        filename, file_extension = os.path.splitext(field.data.filename)
        if len(file_extension) > 0:
            forbidden_ext = ForbiddenExtension.query.filter(
                ForbiddenExtension.extension == file_extension[1:]).first()
            if forbidden_ext is not None:
                raise ValidationError('Extension not allowed')
        mimedata = field.data
        mimetype = magic.from_buffer(field.data.read(1024), mime=True)
        # File Pointer returns to beginning
        field.data.seek(0, 0)
        # Check for permitted mimetype
        forbidden_mime = ForbiddenMimeType.query.filter(
            ForbiddenMimeType.mimetype == mimetype).first()
        if forbidden_mime is not None:
            raise ValidationError('File MimeType not allowed')
        extension = mimetypes.guess_extension(mimetype)
        if extension is not None:
            forbidden_real = ForbiddenExtension.query.filter(
                ForbiddenExtension.extension == extension[1:]).first()
            if forbidden_real is not None:
                raise ValidationError('Extension not allowed')
项目:cloudstrype    作者:btimby    | 项目源码 | 文件源码
def write(self, data):
        """
        Write data to multiple clouds.

        Uses a write-through buffer to ensure each chunk is the proper size.
        Close flushes the remainder of the buffer.
        """
        if self._closed:
            raise IOError('I/O operation on closed file.')
        if self._size == 0:
            # First block. See if we can get a more specific mime type by
            # examining the data.
            mime = magic.from_buffer(data, mime=True)
            # Choose the better mimetype somehow, self.mime is determined by
            # the filename. mime is determined by magic.
            if not self.mime or mime != 'application/octet-strem':
                self.mime = mime
        self._size += len(data)
        self._md5.update(data)
        self._sha1.update(data)
        self._write_chunk(data)
项目:cloudstrype    作者:btimby    | 项目源码 | 文件源码
def write(self, data):
        """
        Write data to multiple clouds.

        Uses a write-through buffer to ensure each chunk is the proper size.
        Close flushes the remainder of the buffer.
        """
        if self._closed:
            raise IOError('I/O operation on closed file.')
        if self._size == 0:
            # First block. See if we can get a more specific mime type by
            # examining the data.
            mime = magic.from_buffer(data, mime=True)
            # Choose the better mimetype somehow, self.mime is determined by
            # the filename. mime is determined by magic.
            if not self.mime or mime != 'application/octet-strem':
                self.mime = mime
        self._size += len(data)
        self._md5.update(data)
        self._sha1.update(data)
        self._write_chunk(data)
项目:cloudstrype    作者:btimby    | 项目源码 | 文件源码
def write(self, data):
        """
        Write data to multiple clouds.

        Uses a write-through buffer to ensure each chunk is the proper size.
        Close flushes the remainder of the buffer.
        """
        if self._closed:
            raise IOError('I/O operation on closed file.')
        if self._size == 0:
            # First block. See if we can get a more specific mime type by
            # examining the data.
            mime = magic.from_buffer(data, mime=True)
            # Choose the better mimetype somehow, self.mime is determined by
            # the filename. mime is determined by magic.
            if not self.mime or mime != 'application/octet-strem':
                self.mime = mime
        self._size += len(data)
        self._md5.update(data)
        self._sha1.update(data)
        self._write_chunk(data)
项目:multiuploader    作者:vinaypost    | 项目源码 | 文件源码
def clean_file(self):

        content = self.cleaned_data[u'file']
        filename, extension = os.path.splitext(content.name)

        if self.check_extension:
            if re.match(self._options['acceptFileTypes'], extension, flags=re.I) is None:
                raise forms.ValidationError('acceptFileTypes')

        if self.check_content_type:
            content_type = magic.from_buffer(content.read(1024), mime=True)
            if content_type.lower() in self._options['allowedContentTypes']:
                if content._size > self._options['maxFileSize']:
                    raise forms.ValidationError("maxFileSize")
            else:
                raise forms.ValidationError("acceptFileTypes")

        return content
项目:S4    作者:MichaelAquilina    | 项目源码 | 文件源码
def load_index(self):
        try:
            resp = self.boto.get_object(
                Bucket=self.bucket,
                Key=self.index_path(),
            )
            body = resp['Body'].read()
            content_type = magic.from_buffer(body, mime=True)
            if content_type == 'text/plain':
                logger.debug('Detected plain text encoding for index')
                return json.loads(body.decode('utf-8'))
            elif content_type == 'application/zlib':
                logger.debug('Detected zlib encoding for index')
                body = zlib.decompress(body)
                return json.loads(body.decode('utf-8'))
            elif content_type == 'application/x-empty':
                return {}
            else:
                raise ValueError('Unknown content type for index', content_type)
        except (ClientError):
            return {}
项目:analyst-scripts    作者:Te-k    | 项目源码 | 文件源码
def get_filetype(data):
    """There are two versions of python-magic floating around, and annoyingly, the interface
    changed between versions, so we try one method and if it fails, then we try the other.
    NOTE: you may need to alter the magic_file for your system to point to the magic file."""
    if sys.modules.has_key('magic'):
        try:
            ms = magic.open(magic.MAGIC_NONE)
            ms.load()
            return ms.buffer(data)
        except:
            try:
                return magic.from_buffer(data)
            except magic.MagicException:
                magic_custom = magic.Magic(magic_file='C:\windows\system32\magic')
                return magic_custom.from_buffer(data)
    return ''
项目:munch-core    作者:crunchmail    | 项目源码 | 文件源码
def store(self):
        if len(self.data) >= self.MAX_SIZE:
            raise TooBigMedia(self.identifying_name, self.MAX_SIZE)

        mime = magic.from_buffer(self.data, mime=True)
        if mime not in self.allowed_mimetypes:
            raise InvalidMimeType(mime)

        self.extension = mimetypes.guess_extension(mime)

        # weirdness from mimetypes
        if self.extension == '.jpe':
            self.extension = '.jpeg'

        checksum = hashlib.sha1(self.data).hexdigest()
        fn = '{}{}'.format(checksum, self.extension)

        img = Image(organization=self.organization)
        img.file.save(fn, ContentFile(self.data))
        return img.get_absolute_url()
项目:daisychain    作者:daisychainme    | 项目源码 | 文件源码
def new_video(self, fb_user, fields):
        if 'message' not in fields:
            fields['message'] = ''
        if 'title' not in fields:
            fields['title'] = ''

        video_file = fields['video']
        video_file.seek(0)
        mime_type = magic.from_buffer(video_file.read(), mime=True)
        video_file.seek(0)

        post_data = [('access_token', (None, fb_user.access_token)),
                     ('source', (str(uuid4()) + '.' + mime_type.split('/')[1], video_file)),
                     ('message', (None, fields['message']))]

        try:
            fb_request_url = Config.get("API_BASE_URI_VIDEO") + "/me/videos"
            resp = requests.post(fb_request_url, files=post_data)
        except Exception:
            pass
            log.error(_("A failure occurred while posting on Facebook : "
                        "called with data: {}".format(post_data)))
        video_file.close()
项目:cuckoodroid-2.0    作者:idanr1986    | 项目源码 | 文件源码
def _get_filetype(self, data):
        """Gets filetype, uses libmagic if available.
        @param data: data to be analyzed.
        @return: file type or None.
        """
        if not HAVE_MAGIC:
            return None

        try:
            ms = magic.open(magic.MAGIC_NONE)
            ms.load()
            file_type = ms.buffer(data)
        except:
            try:
                file_type = magic.from_buffer(data)
            except Exception:
                return None
        finally:
            try:
                ms.close()
            except:
                pass

        return file_type
项目:django-upload-validator    作者:mckinseyacademy    | 项目源码 | 文件源码
def check_word_or_excel(self, fileobj, detected_type, extension):
        """
        Returns proper mimetype in case of word or excel files
        """
        word_strings = ['Microsoft Word', 'Microsoft Office Word', 'Microsoft Macintosh Word']
        excel_strings = ['Microsoft Excel', 'Microsoft Office Excel', 'Microsoft Macintosh Excel']
        office_strings = ['Microsoft OOXML']

        file_type_details = magic.from_buffer(fileobj.read(READ_SIZE))

        fileobj.seek(0)

        if any(string in file_type_details for string in word_strings):
            detected_type = 'application/msword'
        elif any(string in file_type_details for string in excel_strings):
            detected_type = 'application/vnd.ms-excel'
        elif any(string in file_type_details for string in office_strings) or \
                (detected_type == 'application/vnd.ms-office'):
            if extension in ('.doc', '.docx'):
                detected_type = 'application/msword'
            if extension in ('.xls', '.xlsx'):
                detected_type = 'application/vnd.ms-excel'

        return detected_type
项目:maltindex    作者:joxeankoret    | 项目源码 | 文件源码
def run(self, directory):
    for root, dirs, files in os.walk(directory, followlinks=True):
      for name in files:
        filename = os.path.join(root, name)
        try:
          file_type = magic.from_buffer(open(filename).read(1024))
        except:
          log("Error reading file %s: %s" % (filename, str(sys.exc_info()[1])))
          continue

        if is_executable(file_type):
          md5_hash = md5(open(filename, "rb").read()).hexdigest()
          if not self.is_file_indexed(md5_hash):
            self.do_run(filename, file_type)
          else:
            log("File already indexed %s" % name)

#-------------------------------------------------------------------------------
项目:chalupas    作者:Antojitos    | 项目源码 | 文件源码
def test_html_to_md(self):
        """HTML to MD"""

        self.set_original_document_from_file('demo.html')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'html',
                    'to': 'md'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'octet-stream' in response.content_type
        self.assertEqual(magic.from_buffer(destination_document, mime=True), 'text/x-c++')
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
项目:chalupas    作者:Antojitos    | 项目源码 | 文件源码
def test_html_to_rst(self):
        """HTML to RST"""

        self.set_original_document_from_file('demo.html')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'html',
                    'to': 'rst'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'octet-stream' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/x-c'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
项目:chalupas    作者:Antojitos    | 项目源码 | 文件源码
def test_html_to_docx(self):
        """HTML to DOCX"""

        self.set_original_document_from_file('demo.html')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'html',
                    'to': 'docx'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' in response.content_type
        self.assertEqual(magic.from_buffer(destination_document, mime=True), 'application/vnd.openxmlformats-officedocument.wordprocessingml.document')
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []

    #
    # From MD
    #
项目:chalupas    作者:Antojitos    | 项目源码 | 文件源码
def test_md_to_html(self):
        """MD to HTML"""

        self.set_original_document_from_file('demo.md')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'md',
                    'to': 'html'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'text/html' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/html'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
项目:chalupas    作者:Antojitos    | 项目源码 | 文件源码
def test_md_to_rst(self):
        """HTML to RST"""

        self.set_original_document_from_file('demo.md')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'md',
                    'to': 'rst'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'octet-stream' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/x-c'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
项目:chalupas    作者:Antojitos    | 项目源码 | 文件源码
def test_rst_to_html(self):
        """RST to HTML"""

        self.set_original_document_from_file('demo.rst')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'rst',
                    'to': 'html'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'text/html' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/html'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
项目:chalupas    作者:Antojitos    | 项目源码 | 文件源码
def test_rst_to_md(self):
        """RST to MD"""

        self.set_original_document_from_file('demo.rst')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'rst',
                    'to': 'md'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'octet-stream' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/plain'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
项目:chalupas    作者:Antojitos    | 项目源码 | 文件源码
def test_rst_to_docx(self):
        """RST to DOCX"""

        self.set_original_document_from_file('demo.rst')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'rst',
                    'to': 'docx'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []

    #
    # From DOCX
    #
项目:chalupas    作者:Antojitos    | 项目源码 | 文件源码
def test_docx_to_html(self):
        """DOCX to HTML"""

        self.set_original_document_from_file('demo.docx')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'docx',
                    'to': 'html'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'text/html' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/html'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
项目:chalupas    作者:Antojitos    | 项目源码 | 文件源码
def test_rst_to_html(self):
        """RST to HTML"""

        self.set_original_document_from_file('demo.rst')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'rst',
                    'to': 'html'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'text/html' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/html'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
项目:chalupas    作者:Antojitos    | 项目源码 | 文件源码
def test_docx_to_rst(self):
        """DOCX to RST"""

        self.set_original_document_from_file('demo.docx')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': (self.original_document, self.original_document_name),
                    'from': 'docx',
                    'to': 'rst'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'octet-stream' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/plain'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
项目:chalupas    作者:Antojitos    | 项目源码 | 文件源码
def test_convert_from_string(self):
        """Convert from string"""

        self.set_original_document_from_string('#1')

        response = self.app.post('/convert/',
                buffered=True,
                content_type='multipart/form-data',
                data={
                    'document': self.original_document_content_string,
                    'from': 'md',
                    'to': 'rst'
                })

        destination_document = response.data

        assert '200' in response.status
        assert 'octet-stream' in response.content_type
        assert magic.from_buffer(destination_document, mime=True) == 'text/plain'
        assert os.listdir(app.config['CONVERSION_FOLDER']) == []
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def _get_filetype(data):
    """Gets filetype, uses libmagic if available.
    @param data: data to be analyzed.
    @return: file type or None.
    """
    if not HAVE_MAGIC:
        return None

    try:
        ms = magic.open(magic.MAGIC_SYMLINK)
        ms.load()
        file_type = ms.buffer(data)
    except:
        try:
            file_type = magic.from_buffer(data)
        except Exception:
            return None
    finally:
        try:
            ms.close()
        except:
            pass

    return file_type
项目:wypok    作者:arturtamborski    | 项目源码 | 文件源码
def __call__(self, data):
        if self.max_size is not None and data.size > self.max_size:
            params = {
                'max_size': filesizeformat(self.max_size),
                'size': filesizeformat(data.size),
            }
            raise ValidationError(self.error_messages['max_size'], 'max_size', params)

        if self.min_size is not None and data.size < self.min_size:
            params = {
                'min_size': filesizeformat(self.mix_size),
                'size': filesizeformat(data.size)
            }
            raise ValidationError(self.error_messages['min_size'], 'min_size', params)

        if self.content_types is not None and len(self.content_types):
            content_type = magic.from_buffer(data.read(), mime=True)
            data.seek(0) # seek to start for future mime checks by django

            if content_type not in self.content_types:
                params = {
                    'content_type': content_type
                }
                raise ValidationError(self.error_messages['content_type'], 'content_type', params)
项目:auto-tagging-image-search    作者:OsamaJBR    | 项目源码 | 文件源码
def upload_image():
    name = request.forms.get('name')
    data = request.files.get('data')
    if name and data and data.file:
        raw = data.file.read()
        filename = data.filename
        save_path="{path}/{file}".format(
                path=config.get('storage','imagesdir'),
                file=filename
                )
        if not os.path.exists(config.get('storage','imagesdir')):
            os.makedirs(save_path)
        if 'image' not in magic.from_buffer(raw):
            return HTTPResponse(status=400,body=json.dumps({'error' : 'file type is not allowed'}))
        with open(save_path,'w') as open_file:
            open_file.write(raw)
        if queue.add_to_queue(queue_name='images',image=save_path):
            return HTTPResponse(status=200,body=json.dumps({'status' : 'Image Stored'}))
        else:
            return HTTPResponse(status=500,body=json.dumps({'error' : 'Internal Server Error'}))
    else:
        return HTTPResponse(status=400,body=json.dumps({'error' : 'missing fields'}))
项目:Snakepit    作者:K4lium    | 项目源码 | 文件源码
def get_type(data):
    try:
        ms = magic.open(magic.MAGIC_NONE)
        ms.load()
        file_type = ms.buffer(data)
    except:
        try:
            file_type = magic.from_buffer(data)
        except:
            return ''
    finally:
        try:
            ms.close()
        except:
            pass

    return file_type
项目:a4-opin    作者:liqd    | 项目源码 | 文件源码
def validate_file_type_and_size(upload):

    file_max_mb = 5
    max_size = file_max_mb*10**6
    fileformats = settings.FILE_ALIASES['*']['fileformats']
    mimetypes = [mimetype for name, mimetype in fileformats]
    names = [name for name, mimetype in fileformats]

    errors = []

    filetype = magic.from_buffer(upload.read(), mime=True)
    if filetype.lower() not in mimetypes:
        msg = _(
            'Unsupported file format. Supported formats are {}.'.format(
                ', '.join(names)
            )
        )
        errors.append(ValidationError(msg))
    if upload.size > max_size:
        msg = _('File should be at most {} MB'.format(file_max_mb))
        errors.append(ValidationError(msg))
    if errors:
        raise ValidationError(errors)
    return upload
项目:PyCIRCLeanMail    作者:CIRCL    | 项目源码 | 文件源码
def __init__(self, file_obj, orig_filename=None):
        self.file_obj = BytesIO(file_obj)
        self.orig_filename = orig_filename
        if self.orig_filename:
            self.final_filename = self.orig_filename
        else:
            self.final_filename = 'unknownfile.bin'
        self.log_details = {'origFilename': self.orig_filename}
        self.log_string = ''
        if self.orig_filename:
            a, self.extension = os.path.splitext(self.orig_filename)
        else:
            self.extension = None

        try:
            mt = magic.from_buffer(self.file_obj.getvalue(), mime=True)
        except UnicodeEncodeError as e:
            # FIXME: The encoding of the file is broken (possibly UTF-16)
            mt = ''
            self.log_details.update({'UnicodeError': e})
        try:
            self.mimetype = mt.decode("utf-8")
        except:
            self.mimetype = mt

        if self.mimetype and '/' in self.mimetype:
            self.main_type, self.sub_type = self.mimetype.split('/')
        else:
            self.main_type = ''
            self.sub_type = ''
项目:transfer    作者:viur-framework    | 项目源码 | 文件源码
def genReqStr( params ):
        boundary_str = "---"+''.join( [ random.choice(string.ascii_lowercase+string.ascii_uppercase + string.digits) for x in range(13) ] )
        boundary = boundary_str.encode("UTF-8")
        res = b'Content-Type: multipart/mixed; boundary="'+boundary+b'"\nMIME-Version: 1.0\n'
        res += b'\n--'+boundary
        for(key, value) in list(params.items()):
            if all( [x in dir( value ) for x in ["name", "read", "mimetype"] ] ): #File
                dataVal = value.read()
                type = value.mimetype
                if type=="application/octet-stream":
                    type = magic.from_buffer(dataVal, mime=True)
                res += b'\nContent-Type: '+type.encode("UTF-8")+b'\nMIME-Version: 1.0\nContent-Disposition: form-data; name="'+key.encode("UTF-8")+b'"; filename="'+os.path.basename(value.name).decode(sys.getfilesystemencoding()).encode("UTF-8")+b'"\n\n'
                res += dataVal
                res += b'\n--'+boundary
            elif isinstance( value, list ):
                for val in value:
                    res += b'\nContent-Type: application/octet-stream\nMIME-Version: 1.0\nContent-Disposition: form-data; name="'+key.encode("UTF-8")+b'"\n\n'
                    if isinstance( val, unicode ):
                        res += val.encode("UTF-8")
                    else:
                        res += str(val)
                    res += b'\n--'+boundary
            else:
                res += b'\nContent-Type: application/octet-stream\nMIME-Version: 1.0\nContent-Disposition: form-data; name="'+key.encode("UTF-8")+b'"\n\n'
                if isinstance( value, unicode ):
                    res += unicode( value ).encode("UTF-8")
                else:
                    res += str( value )
                res += b'\n--'+boundary
        res += b'--\n'
        return( res, boundary )
项目:alexandriadocs    作者:srtab    | 项目源码 | 文件源码
def __call__(self, value):
        # Check the content type
        mimetype = magic.from_buffer(value.read(1024), mime=True)
        if self.allowed_mimetypes and mimetype not in self.allowed_mimetypes:
            message = self.mime_message % {
                'mimetype': mimetype,
                'allowed_mimetypes': ', '.join(self.allowed_mimetypes)
            }
            raise ValidationError(message)
项目:django-content-gallery    作者:Kemaweyan    | 项目源码 | 文件源码
def create_in_memory_image(image, name, size):
    """
    Resizes the image and saves it as InMemoryUploadedFile object
    Returns the InMemoryUploadedFile object with the image data
    """
    output = io.BytesIO()  # create an io object
    # resize the image and save it to the io object
    image_resize(image, output, size)
    # get MIME type of the image
    mime = magic.from_buffer(output.getvalue(), mime=True)
    # create InMemoryUploadedFile using data from the io
    return uploadedfile.InMemoryUploadedFile(output, 'ImageField', name,
        mime, sys.getsizeof(output), None)
项目:ijust_server    作者:koala-team    | 项目源码 | 文件源码
def validate_file(self):
        data = self.body.data.read(16)
        self.body.data.seek(0)

        if not magic.from_buffer(data, mime=True) in self.allowed_extensions:
            return False
        return True
项目:ijust_server    作者:koala-team    | 项目源码 | 文件源码
def validate_file(self):
        data = self.testcase.data.read(16)
        self.testcase.data.seek(0)

        if not magic.from_buffer(data, mime=True) in self.allowed_extensions:
            return False
        return True
项目:ijust_server    作者:koala-team    | 项目源码 | 文件源码
def validate_file(self):
        data = self.code.data.read(16)
        self.code.data.seek(0)

        #if not magic.from_buffer(data, mime=True) in self.allowed_extensions:
        #    return False
        #return True
        return magic.from_buffer(data, mime=True).startswith('text/')
项目:omnic    作者:michaelpb    | 项目源码 | 文件源码
def detect(self, path):
        if os.path.isdir(path):
            return DIRECTORY
        with open(path, 'rb') as fd:
            mimetype = magic.from_buffer(fd.read(128), mime=True)
            if mimetype and mimetype not in UNKNOWN_MIMETYPE:
                return TypeString(mimetype)
项目:munch-core    作者:crunchmail    | 项目源码 | 文件源码
def mimetype(self):
        """ Readable mimetype, guessed from file content """
        self.file.open()
        return magic.from_buffer(self.file.read(1024), mime=True)
项目:panoptes-python-client    作者:zooniverse    | 项目源码 | 文件源码
def add_location(self, location):
        """
        Add a media location to this subject.

        - **location** can be an open :py:class:`file` object, a path to a
          local file, or a :py:class:`dict` containing MIME types and URLs for
          remote media.

        Examples::

            subject.add_location(my_file)
            subject.add_location('/data/image.jpg')
            subject.add_location({'image/png': 'https://example.com/image.png'})
        """
        if type(location) is dict:
            self.locations.append(location)
            self._media_files.append(None)
            return
        elif type(location) in (str,) + _OLD_STR_TYPES:
            f = open(location, 'rb')
        else:
            f = location

        try:
            media_data = f.read()
            if MEDIA_TYPE_DETECTION == 'magic':
                media_type = magic.from_buffer(media_data, mime=True)
            else:
                media_type = 'image/{}'.format(imghdr.what(None, media_data))
            self.locations.append(media_type)
            self._media_files.append(media_data)
        finally:
            f.close()
项目:daisychain    作者:daisychainme    | 项目源码 | 文件源码
def new_picture(self, fb_user, fields):
        if 'message' not in fields:
            fields['message'] = ''

        image_file = fields['image']
        image_file.seek(0)
        mime_type = magic.from_buffer(image_file.read(), mime=True)

        image_file.seek(0)

        post_data = [('access_token', (None, fb_user.access_token)),
                     ('source', (str(uuid4()) + '.' + mime_type.split('/')[1], image_file)),
                     ('caption', (None, fields['message'])),
                     ('message', (None, fields['message']))]

        try:
            fb_request_url = Config.get("API_BASE_URI") + "/{}/photos".format(fb_user.username)
            resp = requests.post(fb_request_url, files=post_data)

            if resp.ok and "post_id" in resp.json():
                log.info(_("A new Post with ID {} published!".format(resp.json()['post_id'])))
            else:
                raise Exception
        except Exception:
            pass
            log.error(_("A failure occurred while posting on Facebook : "
                        "called with data: {}\n response: {}".format(post_data,
                                                                     resp.json()
                                                                     )
                        )
                      )
        image_file.close()
        webhook_data = {
            "time": 0,
            "id": "101915710270588",
            "changed_fields": ["statuses"],
            "uid": fb_user.username
        }
        self.fire_trigger(webhook_data)
项目:django-upload-validator    作者:mckinseyacademy    | 项目源码 | 文件源码
def __call__(self, fileobj):
        detected_type = magic.from_buffer(fileobj.read(READ_SIZE), mime=True)
        root, extension = os.path.splitext(fileobj.name.lower())

        # seek back to start so a valid file could be read
        # later without resetting the position
        fileobj.seek(0)

        # some versions of libmagic do not report proper mimes for Office subtypes
        # use detection details to transform it to proper mime
        if detected_type in ('application/octet-stream', 'application/vnd.ms-office'):
            detected_type = self.check_word_or_excel(fileobj, detected_type, extension)

        if detected_type not in self.allowed_mimes:
            # use more readable file type names for feedback message
            allowed_types = map(lambda mime_type: mime_type.split('/')[1], self.allowed_mimes)

            raise ValidationError(
                message=self.type_message,
                params={
                    'detected_type': detected_type,
                    'allowed_types': ', '.join(allowed_types)
                },
                code='invalid_type'
            )

        if self.allowed_exts and (extension not in self.allowed_exts):
            raise ValidationError(
                message=self.extension_message,
                params={
                    'extension': extension,
                    'allowed_extensions': ', '.join(self.allowed_exts)
                },
                code='invalid_extension'
            )
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def get_filetype(data):
    """There are two versions of python-magic floating around, and annoyingly, the interface
        changed between versions, so we try one method and if it fails, then we try the other"""
        if sys.modules.has_key('magic'):
            try:
                ms = magic.open(magic.MAGIC_NONE)
                        ms.load()
                        return ms.buffer(data)
                except:
                    return magic.from_buffer(data)
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def MIME_TYPE(data, mime=True):
    try:
        return magic.from_buffer(data, mime=mime)
    except magic.MagicException:
        return "none/none"
项目:papis    作者:alejandrogallo    | 项目源码 | 文件源码
def file_is(file_description, fmt):
    """Get if file stored in `file_path` is a `fmt` document.

    :file_path: Full path for a `fmt` file or a buffer containing `fmt` data.
    :returns: True if is `fmt` and False otherwise

    """
    import magic
    logger.debug("Checking filetype")
    if isinstance(file_description, str):
        # This means that the file_description is a string
        result = re.match(
            r".*%s.*" % fmt, magic.from_file(file_description, mime=True),
            re.IGNORECASE
        )
        if result:
            logger.debug(
                "File %s appears to be of type %s" % (file_description, fmt)
            )
    elif isinstance(file_description, bytes):
        # Suppose that file_description is a buffer
        result = re.match(
            r".*%s.*" % fmt, magic.from_buffer(file_description, mime=True)
        )
        if result:
            logger.debug(
                "Buffer appears to be of type %s" % (fmt)
            )
    return True if result else False
项目:pyhashdd    作者:hashdd    | 项目源码 | 文件源码
def process(self):
        return magic.from_buffer(self.buffer)
项目:ndeftool    作者:nfcpy    | 项目源码 | 文件源码
def pack_file(f):
    record_data = f.read()
    record_type = magic.from_buffer(record_data, mime=1)
    record_name = getattr(f, 'name', '<stdin>')[0:255]
    return ndef.Record(record_type, record_name, record_data)
项目:curl2share    作者:cuongnv23    | 项目源码 | 文件源码
def mimetype(file_header):
        """
        Detect mime type by reading file header.
        file_header: first bytes to read
        """
        return magic.from_buffer(file_header, mime=True)
项目:curl2share    作者:cuongnv23    | 项目源码 | 文件源码
def mimetype(file_path):
        """
        Detect mime type by reading first 1024 bytes of file
        file_path: file to detect mime type
        """
        # read first 1024 bytes of file to determine mime type
        return magic.from_buffer(open(file_path, 'rb').read(1024), mime=True)
项目:Intruder-detector-with-Raspberry-Pi-and-Pushbullet    作者:DeligenceTechnologies    | 项目源码 | 文件源码
def _magic_get_file_type(f, _):
    file_type = magic.from_buffer(f.read(1024), mime=True)
    f.seek(0)
    return maybe_decode(file_type)