Python magic 模块,from_file() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用magic.from_file()

项目:S4    作者:MichaelAquilina    | 项目源码 | 文件源码
def _load_index(self):
        index_path = self.index_path()
        if not os.path.exists(index_path):
            return {}

        content_type = magic.from_file(index_path, mime=True)
        if content_type == 'text/plain':
            logger.debug('Detected plaintext encoding for reading index')
            method = open
        elif content_type in ('application/gzip', 'application/x-gzip'):
            logger.debug('Detected gzip encoding for reading index')
            method = gzip.open
        else:
            raise ValueError('Index is of unknown type', content_type)

        with method(index_path, 'rt') as fp:
            data = json.load(fp)
        return data
项目:refextract    作者:inspirehep    | 项目源码 | 文件源码
def get_plaintext_document_body(fpath, keep_layout=False):
    """Given a file-path to a full-text, return a list of unicode strings
       whereby each string is a line of the fulltext.
       In the case of a plain-text document, this simply means reading the
       contents in from the file. In the case of a PDF however,
       this means converting the document to plaintext.
       It raises UnknownDocumentTypeError if the document is not a PDF or
       plain text.
       @param fpath: (string) - the path to the fulltext file
       @return: (list) of strings - each string being a line in the document.
    """
    textbody = []
    mime_type = magic.from_file(fpath, mime=True)

    if mime_type == "text/plain":
        with open(fpath, "r") as f:
            textbody = [line.decode("utf-8") for line in f.readlines()]

    elif mime_type == "application/pdf":
        textbody = convert_PDF_to_plaintext(fpath, keep_layout)

    else:
        raise UnknownDocumentTypeError(mime_type)

    return textbody
项目:malgazer    作者:keithjjones    | 项目源码 | 文件源码
def __init__(self, filename):
        """
        Creates a file object for a malware sample.

        :param filename:  The file name of the available malware sample.
        """
        if not os.path.exists(filename):
            raise ValueError("File {0} does not exist!".format(filename))

        # Default settings of members
        self.running_entropy_data = None
        self.running_entropy_window_size = 0
        self.file_size = 0
        self.parsedfile = None

        # Fill out other data here...
        self.filename = filename
        self.data = list()
        self.filetype = magic.from_file(self.filename)
        self._read_file()
        self._parse_file_type()
项目:Snakepit    作者:K4lium    | 项目源码 | 文件源码
def get_type(self):
        try:
            ms = magic.open(magic.MAGIC_NONE)
            ms.load()
            file_type = ms.file(self.path)
        except:
            try:
                file_type = magic.from_file(self.path)
            except:
                try:
                    import subprocess
                    file_process = subprocess.Popen(['file', '-b', self.path], stdout = subprocess.PIPE)
                    file_type = file_process.stdout.read().strip()
                except:
                    return ''
        finally:
            try:
                ms.close()
            except:
                pass

        return file_type
项目:open-wob-api    作者:openstate    | 项目源码 | 文件源码
def file_parser(fname, pages=None):
    if magic.from_file(fname, mime=True) == 'application/pdf':
        try:
            text_array = []
            d = pdf.Document(fname)
            for i, p in enumerate(d, start=1):
                for f in p:
                    for b in f:
                        for l in b:
                            text_array.append(l.text.encode('UTF-8'))

                if i == pages:  # break after x pages
                    break

            print "Processed %i pages" % (i)
            return '\n'.join(text_array)
        except Exception as e:
            print "PDF Parser Exception: ", e
    else:
        try:
            content = parser.from_file(fname)['content']
            return (content or '').encode('UTF-8')
        except Exception as e:
            print "File Parser Exception: ", e
项目:ehForwarderBot    作者:blueset    | 项目源码 | 文件源码
def save_file(self, msg, msg_type):
        path = os.path.join("storage", self.channel_id)
        if not os.path.exists(path):
            os.makedirs(path)
        filename = "%s_%s_%s" % (msg_type, msg['NewMsgId'], int(time.time()))
        fullpath = os.path.join(path, filename)
        msg['Text'](fullpath)
        mime = magic.from_file(fullpath, mime=True)
        if isinstance(mime, bytes):
            mime = mime.decode()
        guess_ext = mimetypes.guess_extension(mime) or ".unknown"
        if guess_ext == ".unknown":
            self.logger.warning("File %s with mime %s has no matching extensions.", fullpath, mime)
        ext = ".jpeg" if mime == "image/jpeg" else guess_ext
        os.rename(fullpath, "%s%s" % (fullpath, ext))
        fullpath = "%s%s" % (fullpath, ext)
        self.logger.info("File saved from WeChat\nFull path: %s\nMIME: %s", fullpath, mime)
        return fullpath, mime
项目:gibbersense    作者:smxlabs    | 项目源码 | 文件源码
def file_magic(in_file):


   print "\n\t\tFile Type :", magic.from_file(in_file)
项目:polichombr    作者:ANSSI-FR    | 项目源码 | 文件源码
def do_sample_type_detect(datafile):
        """
            Checks the datafile type's.
        """
        mtype = magic.from_file(datafile, mime=True)
        stype = magic.from_file(datafile)
        return (mtype, stype)
项目:csirtg-smrt-py    作者:csirtgadgets    | 项目源码 | 文件源码
def _process_cache(self, split="\n", rstrip=True):
        try:
            ftype = magic.from_file(self.cache, mime=True)
        except AttributeError:
            try:
                mag = magic.open(magic.MAGIC_MIME)
                mag.load()
                ftype = mag.file(self.cache)
            except AttributeError as e:
                raise RuntimeError('unable to detect cached file type')

        if PYVERSION < 3:
            ftype = ftype.decode('utf-8')

        if ftype.startswith('application/x-gzip') or ftype.startswith('application/gzip'):
            from csirtg_smrt.decoders.zgzip import get_lines
            for l in get_lines(self.cache, split=split):
                yield l

            return

        if ftype == "application/zip":
            from csirtg_smrt.decoders.zzip import get_lines
            for l in get_lines(self.cache, split=split):
                yield l

            return

        # all others, mostly txt, etc...
        with open(self.cache) as f:
            for l in f:
                yield l
项目:csirtg-smrt-py    作者:csirtgadgets    | 项目源码 | 文件源码
def get_mimetype(f):
    try:
        ftype = magic.from_file(f, mime=True)
    except AttributeError:
        try:
            mag = magic.open(magic.MAGIC_MIME)
            mag.load()
            ftype = mag.file(f)
        except AttributeError as e:
            raise RuntimeError('unable to detect cached file type')

    if PYVERSION < 3:
        ftype = ftype.decode('utf-8')

    return ftype
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def preprocess(sample):
    """Preprocess files after upload.

    :param sample: :class:`~app.models.Sample`
    :return:
    """
    hash_path = os.path.join(
        current_app.config['APP_UPLOADS_SAMPLES'],
        sample.sha256
    )
    if zipfile.is_zipfile(hash_path):
        mt = magic.from_file(hash_path, mime=True)
        if mt in skip_mimes:
            return None
        current_app.log.debug('Extracting {}'.format(hash_path))
        zfile = zipfile.ZipFile(hash_path)
        for zipfo in zfile.namelist():
            cfg = current_app.config
            if zfile.getinfo(zipfo).compress_type == 99:  # PK compat. v5.1
                pwd = '-p{}'.format(cfg['INFECTED_PASSWD'])
                with popen('7z', 'e', '-so', pwd, hash_path) as zproc:
                    buf, stderr = zproc.communicate()
            else:
                buf = zfile.read(zipfo,
                                 pwd=bytes(cfg['INFECTED_PASSWD'], 'utf-8'))
            digests = get_hashes(buf)
            hash_path = os.path.join(cfg['APP_UPLOADS_SAMPLES'],
                                     digests.sha256)
            if not os.path.isfile(hash_path):
                with open(hash_path, 'wb') as wf:
                    wf.write(buf)
            s = Sample(user_id=sample.user_id, filename=zipfo,
                       parent_id=sample.id,
                       md5=digests.md5, sha1=digests.sha1,
                       sha256=digests.sha256, sha512=digests.sha512,
                       ctph=digests.ctph)
            db.session.add(s)
            db.session.commit()
项目:style50    作者:cs50    | 项目源码 | 文件源码
def _check(self, file):
        """
        Run apropriate check based on `file`'s extension and return it,
        otherwise raise an Error
        """

        if not os.path.exists(file):
            raise Error("file \"{}\" not found".format(file))

        _, extension = os.path.splitext(file)
        try:
            check = self.extension_map[extension[1:]]
        except KeyError:
            magic_type = magic.from_file(file)
            for name, cls in self.magic_map.items():
                if name in magic_type:
                    check = cls
                    break
            else:
                raise Error("unknown file type \"{}\", skipping...".format(file))

        try:
            with open(file) as f:
                code = f.read()
        except UnicodeDecodeError:
            raise Error("file does not seem to contain text, skipping...")

        # Ensure we don't warn about adding trailing newline
        try:
            if code[-1] != '\n':
                code += '\n'
        except IndexError:
            pass

        return check(code)
项目:oclubs    作者:SHSIDers    | 项目源码 | 文件源码
def handle(cls, user, club, file):
        filename = os.urandom(8).encode('hex')
        temppath = os.path.join('/tmp', filename)
        file.save(temppath)

        try:
            # Don't use mimetypes.guess_type(temppath) -- Faked extensions
            mime = magic.from_file(temppath, mime=True)
            if mime not in cls._mimedict:
                raise UploadNotSupported

            filename = filename + cls._mimedict[mime]
            permpath = cls.mk_internal_path(filename)
            permdir = os.path.dirname(permpath)
            if not os.path.isdir(permdir):
                os.makedirs(permdir, 0o755)

            # resize to 600, 450
            cls._thumb(temppath, permpath)
            fs.watch(permpath)
        finally:
            os.remove(temppath)

        obj = cls.new()
        obj.club = club
        obj.uploader = user
        obj._location = filename
        obj.mime = mime
        return obj.create()
项目:validatemyfile    作者:daisieh    | 项目源码 | 文件源码
def check(filepath):
    result = magic.from_file(filepath, mime=True)
    if re.match('application/pdf', result):
        return True
    return False
项目:guest-images    作者:S2E    | 项目源码 | 文件源码
def get_magic(filename):
    if g_m:
        return g_m.file(filename)
    else:
        return magic.from_file(filename)
项目:PeekabooAV    作者:scVENUS    | 项目源码 | 文件源码
def guess_mime_type_from_file_contents(file_path):
    """  Get type from file magic bytes. """
    mt = magic.from_file(file_path, mime=True)
    if mt:
        return mt
项目:fame    作者:certsocietegenerale    | 项目源码 | 文件源码
def _compute_default_properties(self):
        self['names'] = [os.path.basename(self['filepath'])]
        self['detailed_type'] = magic.from_file(self['filepath'])
        self['mime'] = magic.from_file(self['filepath'], mime=True)
        self['analysis'] = []

        # Init antivirus status
        self['antivirus'] = {}

        for module in dispatcher.get_antivirus_modules():
            self['antivirus'][module.name] = False

        self._set_type()

    # Convert mime/types into clearer type
项目:web_develop    作者:dongweiming    | 项目源码 | 文件源码
def create_by_old_paste(cls, filehash):
        filepath = get_file_path(filehash)
        mimetype = magic.from_file(filepath, mime=True)
        filestat = os.stat(filepath)
        size = filestat.st_size

        rst = cls(filehash, mimetype, size, filehash=filehash)
        return rst
项目:web_develop    作者:dongweiming    | 项目源码 | 文件源码
def create_by_old_paste(cls, filehash, symlink):
        filepath = get_file_path(filehash)
        mimetype = magic.from_file(filepath, mime=True)
        filestat = os.stat(filepath)
        size = filestat.st_size

        rst = cls(filehash, mimetype, size, filehash=filehash, symlink=symlink)
        return rst
项目:web_develop    作者:dongweiming    | 项目源码 | 文件源码
def create_by_old_paste(cls, filehash):
        filepath = get_file_path(filehash)
        mimetype = magic.from_file(filepath, mime=True)
        filestat = os.stat(filepath)
        size = filestat.st_size

        rst = cls(filehash, mimetype, size, filehash=filehash)
        return rst
项目:web_develop    作者:dongweiming    | 项目源码 | 文件源码
def create_by_old_paste(cls, filehash):
        filepath = get_file_path(filehash)
        mimetype = magic.from_file(filepath, mime=True)
        filestat = os.stat(filepath)
        size = filestat.st_size

        rst = cls(filehash, mimetype, size, filehash=filehash)
        return rst
项目:web_develop    作者:dongweiming    | 项目源码 | 文件源码
def create_by_old_paste(cls, filehash):
        filepath = get_file_path(filehash)
        mimetype = magic.from_file(filepath, mime=True)
        filestat = os.stat(filepath)
        size = filestat.st_size

        rst = cls(filehash, mimetype, size, filehash=filehash)
        return rst
项目:OneNet    作者:image-science-lab    | 项目源码 | 文件源码
def load_pickle(pickle_path, dataset_path):
    if not os.path.exists(pickle_path):

        import magic

        image_files = []
        for dir, _, _, in os.walk(dataset_path):
            filenames = glob.glob( os.path.join(dir, '*.JPEG'))  # may be JPEG, depending on your image files
            image_files.append(filenames)

            ## use magic to perform a simple check of the images
            # import magic
            # for filename in filenames:
            #   if magic.from_file(filename, mime=True) == 'image/jpeg':
            #       image_files.append(filename)
            #   else:
            #       print '%s is not a jpeg!' % filename
            #       print magic.from_file(filename)

        if len(image_files) > 0:
            image_files = np.hstack(image_files)

        dataset_filenames = {'image_path':image_files}
        pickle.dump( dataset_filenames, open( pickle_path, "wb" ) )
    else:
        dataset_filenames = pickle.load( open( pickle_path, "rb" ) )
    return dataset_filenames


# return a pd object
项目:find_pe_caves    作者:marcoramilli    | 项目源码 | 文件源码
def get_executables(files):
    """
    Filters the only executable files from a files array
    """
    exec_files = []
    for file in files:
        if "executable" in magic.from_file(file):
            exec_files.append(file)
    return exec_files
项目:loris-redux    作者:jpstroop    | 项目源码 | 文件源码
def _get_and_cache(file_path, supported_formats):
        mime_type = from_file(file_path, mime=True)
        try:
            fmt = supported_formats[mime_type]
            MagicCharacterizerMixin._cache[file_path] = fmt
            return fmt
        except KeyError:
            message = '{0} characterized as {1} format, which is not supported'
            message = message.format(file_path, mime_type)
            raise UnsupportedFormat(message, http_status_code=500)
项目:SSMA    作者:secrary    | 项目源码 | 文件源码
def file_info(self, report):
        info = []
        with open(self.filename, 'rb') as f:
            file = f.read()
            if report == "output":
                return ""
            else:
                info.append("File: {}".format(self.filename))
                info.append("Size: {} bytes".format(os.path.getsize(self.filename)))
                info.append("Type: {}".format(magic.from_file(self.filename, mime=True)))
                info.append("MD5: {}".format(hashlib.md5(file).hexdigest()))
                info.append("SHA1: {}".format(hashlib.sha1(file).hexdigest()))
                if ssdeep_r:
                    info.append("ssdeep: {}".format(self.get_ssdeep()))
        return info
项目:SSMA    作者:secrary    | 项目源码 | 文件源码
def file_info(filename):
    info = []
    with open(filename, 'rb') as f:
        file = f.read()
        info.append("File: {}".format(filename))
        info.append("Size: {} bytes".format(os.path.getsize(filename)))
        info.append("Type: {}".format(magic.from_file(filename, mime=True)))
        info.append("MD5:  {}".format(hashlib.md5(file).hexdigest()))
        info.append("SHA1: {}".format(hashlib.sha1(file).hexdigest()))
        if ssdeep_r:
            info.append("ssdeep: {}".format(ssdeep.hash_from_file(filename)))
    return info
项目:nemesis    作者:openstack    | 项目源码 | 文件源码
def post_file():
    file_uuid = secure_filename(str(uuid.uuid4()))
    filename = '/tmp/%s' % file_uuid

    try:
        file = request.files['file']
    except Exception:
        raise BadRequestException("Not a valid multipart upload form with "
                                  "key named file.")

    if 'Content-Range' in request.headers:
        # Extract starting byte from Content-Range header string.
        range_str = request.headers['Content-Range']
        start_bytes = int(range_str.split(' ')[1].split('-')[0])

        # Append chunk to the file on disk, or create new.
        with open(filename, 'a') as f:
            f.seek(start_bytes)
            f.write(file.stream.read())

    else:
        # This is not a chunked request, so just save the whole file.
        file.save(filename)

    # Generate hash of file, and create new, or renew existing db row.
    file_hashes = get_all_hashes(filename)
    file_size = os.path.getsize(filename)
    file_type = magic.from_file(filename, mime=True)
    file = create_or_renew_by_hash(file_hashes, file_size, file_type)
    file_id = file.file_id
    file_dict = file.to_dict()

    # Upload to swift and remove the local temp file.
    upload_to_swift(filename, file_uuid)
    os.remove(filename)

    # Send message to worker queue with file details.
    worker_msg = {"file_uuid": file_uuid, "file_id": file_id}
    submit_worker_notification(worker_msg)

    return jsonify(file_dict)
项目:STAR-SEQR    作者:ExpressionAnalysis    | 项目源码 | 文件源码
def maybe_gunzip(fname, base, ext):
    if fname and 'gzip' in magic.from_file(fname):
        start = time.time()
        print("Gunzip file " + str(fname))
        newf = safe_fname(base, ext)
        sh("gunzip", fname, "-c >", newf)
        fname = newf
        print("Gunzip took %g seconds" % (time.time() - start))
    return fname
项目:earthio    作者:ContinuumIO    | 项目源码 | 文件源码
def get_filetype(fpath):
    """Return a mime-style filetype string."""
    return magic.from_file(fpath, mime=True)
项目:papis    作者:alejandrogallo    | 项目源码 | 文件源码
def file_is(file_description, fmt):
    """Get if file stored in `file_path` is a `fmt` document.

    :file_path: Full path for a `fmt` file or a buffer containing `fmt` data.
    :returns: True if is `fmt` and False otherwise

    """
    import magic
    logger.debug("Checking filetype")
    if isinstance(file_description, str):
        # This means that the file_description is a string
        result = re.match(
            r".*%s.*" % fmt, magic.from_file(file_description, mime=True),
            re.IGNORECASE
        )
        if result:
            logger.debug(
                "File %s appears to be of type %s" % (file_description, fmt)
            )
    elif isinstance(file_description, bytes):
        # Suppose that file_description is a buffer
        result = re.match(
            r".*%s.*" % fmt, magic.from_buffer(file_description, mime=True)
        )
        if result:
            logger.debug(
                "Buffer appears to be of type %s" % (fmt)
            )
    return True if result else False
项目:firmflaws    作者:Ganapati    | 项目源码 | 文件源码
def register_files(self):
        print("Start registering files")
        for root, dirs, files in os.walk(self.extracted_path):
            for file in files:
                full_path = os.path.join(root, file)
                if not os.path.isfile(full_path):
                    continue
                path = full_path.replace(self.extracted_path, "")
                content = ""
                hash = ""
                with open(full_path, "rb") as fd:
                    content = fd.read()
                    hash_content = "%s:%s" % (file, content)
                    hash = hashlib.md5(hash_content.encode('utf-8')).hexdigest()
                try:
                    file_obj = FileModel.objects.get(hash=hash)
                    file_obj.firmware.add(self.firmware)
                    file_obj.save()
                except FileModel.DoesNotExist:
                    try:
                        file_obj = FileModel()
                        file_obj.filepath = os.path.join(root, file)
                        file_obj.hash = hash
                        file_obj.filesize = len(content)
                        file_obj.filename = path
                        file_obj.save()
                        file_obj.firmware.add(self.firmware)
                        file_obj.file_type = magic.from_file(os.path.join(root,
                                                                          file))
                        file_obj.save()
                        self.find_loots(file_obj)
                        # Performance tweak
                        file_obj.nb_loots = file_obj.loots.all().count()
                    except:
                        file_obj.file_type = "unknown"

        print("Files registered")
项目:gallery    作者:liam-middlebrook    | 项目源码 | 文件源码
def parse_file_info(file_path, dir_path):
    print("entering parse_file_info")
    mime_type = magic.from_file(file_path, mime=True)
    print(mime_type)
    print(file_path)
    if mime_type in file_mimetype_relation:
        return file_mimetype_relation[mime_type](file_path, dir_path)

    return None
项目:auto_mal    作者:0xhughes    | 项目源码 | 文件源码
def _get_file_type(full_targ_path):
    # This function takes the full path of a target sample and determines/returns the file type via python-magic.
    try:
        magicObj = magic.open(magic.MAGIC_NONE)
        magicObj.load()
        magic_out = str(magicObj.file(full_targ_path))
    except AttributeError:
        magic_out = str(magic.from_file(full_targ_path))

    return(magic_out)
项目:auto_mal    作者:0xhughes    | 项目源码 | 文件源码
def _get_file_type(full_targ_path):
    # This function takes the full path of a target sample and determines/returns the file type via python-magic.
    try:
        #magicObj = magic.open(magic.MAGIC_NONE)
        #magicObj.load()
        #magic_out = str(magicObj.file(full_targ_path))
        magicObj = magic.Magic(magic_file=r'C:/Program Files (x86)/GnuWin32/share/misc/magic', mime=True)
        magic_out = str(magicObj.from_file(full_targ_path))
        print magic_out
    except AttributeError:
        magic_out = str(magic.from_file(full_targ_path))
        print magic_out+" ERROR?!?!?!!?"

    return(magic_out)
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def get_type(self):
        """Get MIME file type.
        @return: file type.
        """
        file_type = None
        if HAVE_MAGIC:
            try:
                ms = magic.open(magic.MAGIC_SYMLINK)
                ms.load()
                file_type = ms.file(self.file_path)
            except:
                try:
                    file_type = magic.from_file(self.file_path)
                except:
                    pass
            finally:
                try:
                    ms.close()
                except:
                    pass

        if file_type is None:
            try:
                p = subprocess.Popen(["file", "-b", "-L", self.file_path],
                                     stdout=subprocess.PIPE)
                file_type = p.stdout.read().strip()
            except:
                pass

        return file_type
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def get_content_type(self):
        """Get MIME content file type (example: image/jpeg).
        @return: file content type.
        """
        file_type = None
        if HAVE_MAGIC:
            try:
                ms = magic.open(magic.MAGIC_MIME|magic.MAGIC_SYMLINK)
                ms.load()
                file_type = ms.file(self.file_path)
            except:
                try:
                    file_type = magic.from_file(self.file_path, mime=True)
                except:
                    pass
            finally:
                try:
                    ms.close()
                except:
                    pass

        if file_type is None:
            try:
                p = subprocess.Popen(["file", "-b", "-L", "--mime-type", self.file_path],
                                     stdout=subprocess.PIPE)
                file_type = p.stdout.read().strip()
            except:
                pass

        return file_type
项目:ph0neutria    作者:phage-nz    | 项目源码 | 文件源码
def processDownload(tmpFilePath, fileName, fileUrl):
    logging.info('Downloaded as temporary file: {0}. Beginning processing...'.format(tmpFilePath))

    fileSize = os.path.getsize(tmpFilePath) >> 20

    if (fileSize > 10):
        logging.error('File is {0}MB. Too large to process.'.format(fileSize))
        cleanUp(tmpFilePath)
        return False

    fileHash = sha256SumFile(tmpFilePath)

    if not isAcceptedHash(fileHash):
        cleanUp(tmpFilePath)
        return False

    filePath = os.path.join(baseConfig.outputFolder, fileHash)
    os.rename(tmpFilePath, filePath)

    # Trust only the content type of the downloaded file.
    mimeType = magic.from_file(filePath, mime=True)

    if mimeType not in ['application/octet-stream', 'application/x-dosexec', 'application/x-msdownload', 'application/x-ms-installer', 'application/pdf', 'application/x-pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'application/vnd.openxmlformats-officedocument.wordprocessingml.template', 'application/vnd.ms-word.document.macroEnabled', 'application/vnd.ms-excel', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'application/vnd.openxmlformats-officedocument.spreadsheetml.template', 'application/vnd.ms-excel.sheet.macroEnabled', 'application/vnd.ms-excel.template.macroEnabled', 'application/vnd.ms-excel.addin.macroEnabled', 'application/vnd.ms-excel.sheet.binary.macroEnabled', 'application/x-shockwave-flash']:
        logging.error('Detected non-binary or executable file type ({0}). Skipping: {1}'.format(mimeType, filePath))
        cleanUp(filePath)
        return False

    logging.info('File with hash: {0} identified as type: {1}'.format(fileHash, mimeType))

    uploaded = uploadToViper(filePath, fileName, fileHash, fileUrl)

    addToHashCache(fileHash)
    cleanUp(filePath)

    return uploaded
项目:elm-doc    作者:ento    | 项目源码 | 文件源码
def validate_elm_make(ctx, param, value):
    if value is None:
        return value

    realpath = os.path.realpath(value)
    if not os.path.isfile(realpath):
        realpath = shutil.which(value)

    if realpath is None or not os.path.isfile(realpath):
        raise click.BadParameter('{} not found'.format(value))

    elm_make_mimetype = magic.from_file(realpath, mime=True)
    if not elm_make_mimetype.startswith('text'):
        return value

    perhaps_binwrap_of = os.path.normpath(
        os.path.join(
            os.path.dirname(realpath),
            os.pardir,
            'elm',
            'Elm-Platform',
            '*',
            '.cabal-sandbox',
            'bin',
            'elm-make'))
    raise click.BadParameter('''should be the real elm-make binary; this looks like a text file.
if you installed Elm through npm, then try {}'''.format(perhaps_binwrap_of))
项目:open-syllabus-project    作者:davidmcclure    | 项目源码 | 文件源码
def libmagic_file_type(self):

        """
        Returns:
            str: The libmagic-parsed file type.
        """

        return magic.from_file(self.path, mime=True)
项目:cuckoo-ioc    作者:FafnerKeyZee    | 项目源码 | 文件源码
def get_type(self):
        """Get MIME file type.
        @return: file type.
        """
        file_type = None
        if HAVE_MAGIC:
            try:
                ms = magic.open(magic.MAGIC_NONE)
                ms.load()
                file_type = ms.file(self.file_path)
            except:
                try:
                    file_type = magic.from_file(self.file_path)
                except Exception as e:
                    log.debug("Error getting magic from file %s: %s",
                              self.file_path, e)
            finally:
                try:
                    ms.close()
                except:
                    pass

        if file_type is None:
            try:
                p = subprocess.Popen(["file", "-b", self.file_path],
                                     stdout=subprocess.PIPE)
                file_type = p.stdout.read().strip()
            except Exception as e:
                log.debug("Error running file(1) on %s: %s",
                          self.file_path, e)

        return file_type
项目:cuckoo-ioc    作者:FafnerKeyZee    | 项目源码 | 文件源码
def get_content_type(self):
        """Get MIME content file type (example: image/jpeg).
        @return: file content type.
        """
        file_type = None
        if HAVE_MAGIC:
            try:
                ms = magic.open(magic.MAGIC_MIME)
                ms.load()
                file_type = ms.file(self.file_path)
            except:
                try:
                    file_type = magic.from_file(self.file_path, mime=True)
                except:
                    pass
            finally:
                try:
                    ms.close()
                except:
                    pass

        if file_type is None:
            try:
                args = ["file", "-b", "--mime-type", self.file_path]
                file_type = subprocess.check_output(args).strip()
            except:
                pass

        return file_type
项目:dircast    作者:calpaterson    | 项目源码 | 文件源码
def guess_mimetype(path):
    magic_mimetype = magic.from_file(str(path), mime=True)
    if magic_mimetype == b"audio/x-m4a":
        return "audio/mp4"
    else:
        return magic_mimetype.decode("utf-8")
项目:BASS    作者:Cisco-Talos    | 项目源码 | 文件源码
def inspect(self, sample):
        sample.info[self.NAME] = {"magic": magic.from_file(sample.path), "mime": magic.from_file(sample.path, mime = True)}
项目:Snakepit    作者:K4lium    | 项目源码 | 文件源码
def get_mime(self):
        try:
            ms = magic.open(magic.MIME)
            ms.load()
            mime_type = ms.file(self.path)
        except:
            try:
                mime = magic.Magic(mime=True)
                mime_type = mime.from_file(self.path)
            except:
                return ''

        return mime_type
项目:file-metadata    作者:pywikibot-catfiles    | 项目源码 | 文件源码
def mime(self):
        if hasattr(magic, "from_file"):
            # Use https://pypi.python.org/pypi/python-magic
            return magic.from_file(self.fetch('filename'), mime=True)
        elif hasattr(magic, "open"):
            # Use the python-magic library in distro repos from the `file`
            # command - http://www.darwinsys.com/file/
            magic_instance = magic.open(magic.MAGIC_MIME)
            magic_instance.load()
            return magic_instance.file(self.fetch('filename'))

        raise ImportError(
            'The `magic` module that was found is not the expected pypi '
            'package python-magic (https://pypi.python.org/pypi/python-magic) '
            'nor file\'s (http://www.darwinsys.com/file/) package.')
项目:hackpad-migrator    作者:Stek-io    | 项目源码 | 文件源码
def create_pads_from_files(job_id, attachment, email, client_id, client_secret):
    """ For each HTML file in zipped attachment, create a new pad, return the number of
    created pads
    """
    logging.info("Opening attached zip %s." % attachment)
    m = re.search('^.+attachments/(.+)\.zip$', attachment)
    directory = './data/' + m.group(1)
    unzip_attachment(attachment, directory)
    files = os.listdir(directory)

    hackpad = Hackpad(api_scheme = os.getenv('HACKPAD_API_SCHEME') or 'http',
                      api_domain = os.getenv('HACKPAD_API_DOMAIN') or 'hackpad.dev',
                      sub_domain = os.getenv('HACKPAD_SUB_DOMAIN') or '',
                      consumer_key = client_id,
                      consumer_secret = client_secret)

    pads_created = pads_skipped = 0

    for file_name in files:
        file_path = directory + '/' + file_name
        # check if it is really an html file
        file_type = magic.from_file(file_path, mime=True)
        if file_type != 'text/html':
            logging.info('Invalid file type for file %s :%s' % (file_path, file_type))
            continue 

        fh = open(file_path)

        logging.info('importing for %s: %s' % (email, file_name))

        if insert_pad_from_file(job_id, hackpad, fh, file_name, client_id, client_secret):
            pads_created += 1
        else:
            pads_skipped += 1
        fh.close()
    # Check if all files are imported
    if pads_created + pads_skipped != len(files):
        email_error("Not all files were processed", job_id)

    return pads_created, pads_skipped
项目:mailnex    作者:linsam    | 项目源码 | 文件源码
def attachFile(attachList, filename, pos=None, replace=False):
    """Check a path and add it to the attachment list
    If pos is given and replace is False, insert attachment at given position.
    If pos is given and replace is True, replace the attachment at the given position.
    """
    if pos is not None:
        if pos < 1 or pos > len(attachList):
            print("Bad position. {} not between 1 and {}".format(pos, len(attachList)))
            return
        # Adjust from human position to index
        pos -= 1
    try:
        st = os.stat(filename)
    except OSError as err:
        import errno
        # Can't read it. Is it because it doesn't exist?
        if err.errno == errno.ENOENT:
            print("WARNING: Given file doesn't currently exist. Adding to list anyway. We'll try reading it again when completing the message")
        else:
            print("WARNING: Couldn't get information about the file: %s" % err.strerror)
            print("Adding to list anyway. We'll try reading it again when completing the message.")
    else:
        if not os.access(filename, os.R_OK):
            print("WARNING: Can't read existing file. Adding to list anyway. We'll try again when completing the message.")
        else:
            print("Attachment added to list. Raw size is currently %i bytes. Note: we'll actually read the data when completing the message" % st.st_size)
            mtype = magic.from_file(filename, mime=True)
            print("Mime type appears to be %s" % mtype)
    if pos is None:
        attachList.append(filename)
    elif replace == False:
        attachList.insert(pos, filename)
    else:
        attachList[pos] = filename
项目:ehForwarderBot    作者:blueset    | 项目源码 | 文件源码
def _download_file(self, tg_msg, file_obj, msg_type):
        """
        Download media file from telegram platform.

        Args:
            tg_msg: Telegram message instance
            file_obj: File object
            msg_type: Type of message

        Returns:
            tuple of str[2]: Full path of the file, MIME type
        """
        path = os.path.join("storage", self.channel_id)
        if not os.path.exists(path):
            os.makedirs(path)
        size = getattr(file_obj, "file_size", None)
        file_id = file_obj.file_id
        if size and size > telegram.constants.MAX_FILESIZE_DOWNLOAD:
            raise EFBMessageError("Attachment is too large. Maximum 20 MB. (AT01)")
        f = self.bot.bot.getFile(file_id)
        fname = "%s_%s_%s_%s" % (msg_type, tg_msg.chat.id, tg_msg.message_id, int(time.time()))
        fullpath = os.path.join(path, fname)
        f.download(fullpath)
        mime = getattr(file_obj, "mime_type", magic.from_file(fullpath, mime=True))
        if type(mime) is bytes:
            mime = mime.decode()
        guess_ext = mimetypes.guess_extension(mime) or ".unknown"
        if guess_ext == ".unknown":
            self.logger.warning("File %s with mime %s has no matching extensions.", fullpath, mime)
        ext = ".jpeg" if mime == "image/jpeg" else guess_ext
        os.rename(fullpath, "%s%s" % (fullpath, ext))
        fullpath = "%s%s" % (fullpath, ext)
        return fullpath, mime
项目:hdrnet_legacy    作者:mgharbi    | 项目源码 | 文件源码
def _produce_one_sample(self):
    dirname = os.path.dirname(self.path)
    if not check_dir(dirname):
      raise ValueError("Invalid data path.")
    with open(self.path, 'r') as fid:
      flist = [l.strip() for l in fid.xreadlines()]

    if self.shuffle:
      random.shuffle(flist)

    input_files = [os.path.join(dirname, 'input', f) for f in flist]
    output_files = [os.path.join(dirname, 'output', f) for f in flist]

    self.nsamples = len(input_files)

    input_queue, output_queue = tf.train.slice_input_producer(
        [input_files, output_files], shuffle=self.shuffle,
        seed=0123, num_epochs=self.num_epochs)

    if '16-bit' in magic.from_file(input_files[0]):
      input_dtype = tf.uint16
      input_wl = 65535.0
    else:
      input_wl = 255.0
      input_dtype = tf.uint8
    if '16-bit' in magic.from_file(output_files[0]):
      output_dtype = tf.uint16
      output_wl = 65535.0
    else:
      output_wl = 255.0
      output_dtype = tf.uint8

    input_file = tf.read_file(input_queue)
    output_file = tf.read_file(output_queue)

    if os.path.splitext(input_files[0])[-1] == '.jpg': 
      im_input = tf.image.decode_jpeg(input_file, channels=3)
    else:
      im_input = tf.image.decode_png(input_file, dtype=input_dtype, channels=3)

    if os.path.splitext(output_files[0])[-1] == '.jpg': 
      im_output = tf.image.decode_jpeg(output_file, channels=3)
    else:
      im_output = tf.image.decode_png(output_file, dtype=output_dtype, channels=3)

    # normalize input/output
    sample = {}
    with tf.name_scope('normalize_images'):
      im_input = tf.to_float(im_input)/input_wl
      im_output = tf.to_float(im_output)/output_wl

    inout = tf.concat([im_input, im_output], 2)
    fullres, inout = self._augment_data(inout, 6)

    sample['lowres_input'] = inout[:, :, :3]
    sample['lowres_output'] = inout[:, :, 3:]
    sample['image_input'] = fullres[:, :, :3]
    sample['image_output'] = fullres[:, :, 3:]
    return sample