Python magic 模块,open() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用magic.open()

项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def open(self):
            self.__inPos = 0
            self.__inPosOld = 0
            self.__outFile = None
            self.__current = DirHasher.FileIndex.Stat()
            try:
                if os.path.exists(self.__cachePath):
                    self.__inFile = open(self.__cachePath, "rb")
                    sig = self.__inFile.read(4)
                    if sig == DirHasher.FileIndex.SIGNATURE:
                        self.__mismatch = False
                        self.__inPos = self.__inPosOld = 4
                        self.__readEntry() # prefetch first entry
                    else:
                        logging.getLogger(__name__).info(
                            "Wrong signature at '%s': %s", self.__cachePath, sig)
                        self.__inFile.close()
                        self.__inFile = None
                        self.__mismatch = True
                else:
                    self.__inFile = None
                    self.__mismatch = True
            except OSError as e:
                raise BuildError("Error opening hash cache: " + str(e))
项目:ApkParser    作者:yigitozgumus    | 项目源码 | 文件源码
def parse_icon(self, icon_path=None):
        """
        parse icon.
        :param icon_path: icon storage path
        """
        if not icon_path:
            icon_path = os.path.dirname(os.path.abspath(__file__))

        pkg_name_path = os.path.join(icon_path, self.package)
        if not os.path.exists(pkg_name_path):
            os.mkdir(pkg_name_path)

        aapt_line = "aapt dump badging %s | grep 'application-icon' | awk -F ':' '{print $2}'" % self.get_filename()
        parse_icon_rt = os.popen(aapt_line).read()
        icon_paths = [icon.replace("'", '') for icon in parse_icon_rt.split('\n') if icon]

        zfile = zipfile.ZipFile(StringIO.StringIO(self.__raw), mode='r')
        for icon in icon_paths:
            icon_name = icon.replace('/', '_')
            data = zfile.read(icon)
            with open(os.path.join(pkg_name_path, icon_name), 'w+b') as icon_file:
                icon_file.write(data)
        print "APK ICON in: %s" % pkg_name_path
项目:binaryanalysis    作者:armijnhemel    | 项目源码 | 文件源码
def readrewritelist(rewritelist):
    ## rewrite is a hash. Key is sha256 of the file.
    rewrite = {}
    try:
        rewritefile = open(rewritelist, 'r')
        rewritelines = rewritefile.readlines()
        rewritefile.close()
        for r in rewritelines:
            rs = r.strip().split()
            ## format error, bail out
            if len(rs) != 7:
                return {}
            (package, version, filename, origin, sha256, newp, newv) = rs
            ## dupe, skip
            if sha256 in rewrite:
                continue
            rewrite[sha256] = {'package': package, 'version': version, 'filename': filename, 'origin': origin, 'newpackage': newp, 'newversion': newv}
    except:
        return {}
    return rewrite

## split on the special characters, plus remove special control characters that are
## at the beginning and end of the string in escaped form.
## Return a list of strings.
项目:analyst-scripts    作者:Te-k    | 项目源码 | 文件源码
def get_filetype(data):
    """There are two versions of python-magic floating around, and annoyingly, the interface
    changed between versions, so we try one method and if it fails, then we try the other.
    NOTE: you may need to alter the magic_file for your system to point to the magic file."""
    if sys.modules.has_key('magic'):
        try:
            ms = magic.open(magic.MAGIC_NONE)
            ms.load()
            return ms.buffer(data)
        except:
            try:
                return magic.from_buffer(data)
            except magic.MagicException:
                magic_custom = magic.Magic(magic_file='C:\windows\system32\magic')
                return magic_custom.from_buffer(data)
    return ''
项目:Tacita    作者:Quantika14    | 项目源码 | 文件源码
def parse_icon(self, icon_path=None):
        """
        parse icon.
        :param icon_path: icon storage path
        """
        if not icon_path:
            icon_path = os.path.dirname(os.path.abspath(__file__))

        pkg_name_path = os.path.join(icon_path, self.package)
        if not os.path.exists(pkg_name_path):
            os.mkdir(pkg_name_path)

        aapt_line = "aapt dump badging %s | grep 'application-icon' | awk -F ':' '{print $2}'" % self.get_filename()
        parse_icon_rt = os.popen(aapt_line).read()
        icon_paths = [icon.replace("'", '') for icon in parse_icon_rt.split('\n') if icon]

        zfile = zipfile.ZipFile(StringIO.StringIO(self.__raw), mode='r')
        for icon in icon_paths:
            icon_name = icon.replace('/', '_')
            data = zfile.read(icon)
            with open(os.path.join(pkg_name_path, icon_name), 'w+b') as icon_file:
                icon_file.write(data)
        print "APK ICON in: %s" % pkg_name_path
项目:cuckoodroid-2.0    作者:idanr1986    | 项目源码 | 文件源码
def _get_filetype(self, data):
        """Gets filetype, uses libmagic if available.
        @param data: data to be analyzed.
        @return: file type or None.
        """
        if not HAVE_MAGIC:
            return None

        try:
            ms = magic.open(magic.MAGIC_NONE)
            ms.load()
            file_type = ms.buffer(data)
        except:
            try:
                file_type = magic.from_buffer(data)
            except Exception:
                return None
        finally:
            try:
                ms.close()
            except:
                pass

        return file_type
项目:firmflaws    作者:Ganapati    | 项目源码 | 文件源码
def magic(indata, mime=False):
        """
        Performs file magic while maintaining compatibility with different
        libraries.
        """

        try:
            if mime:
                mymagic = magic.open(magic.MAGIC_MIME_TYPE)
            else:
                mymagic = magic.open(magic.MAGIC_NONE)
            mymagic.load()
        except AttributeError:
            mymagic = magic.Magic(mime)
            mymagic.file = mymagic.from_file
        return mymagic.file(indata)
项目:rucio    作者:rucio01    | 项目源码 | 文件源码
def smart_open(filename):
    '''
    Returns an open file object if `filename` is plain text, else assumes
    it is a bzip2 compressed file and returns a file-like object to
    handle it.
    '''
    if isplaintext(filename):
        f = open(filename, 'rt')
    else:
        file_type = mimetype(filename)
        if file_type.find('gzip') > -1:
            f = gzip.GzipFile(filename, 'rt')
        elif file_type.find('bzip2') > -1:
            f = bz2file.open(filename, 'rt')
        else:
            pass  # Not supported format
    return f
项目:rucio    作者:rucio01    | 项目源码 | 文件源码
def srm_download_to_file(url, file_):
    '''
    Download the file in `url` storing it in the `file_` file-like
    object.
    '''
    logger = logging.getLogger('dumper.__init__')
    ctx = gfal2.creat_context()  # pylint: disable=no-member
    infile = ctx.open(url, 'r')

    try:
        chunk = infile.read(CHUNK_SIZE)
    except GError as e:
        if e[1] == 70:
            logger.debug('GError(70) raised, using GRIDFTP PLUGIN:STAT_ON_OPEN=False workarround to download %s', url)
            ctx.set_opt_boolean('GRIDFTP PLUGIN', 'STAT_ON_OPEN', False)
            infile = ctx.open(url, 'r')
            chunk = infile.read(CHUNK_SIZE)
        else:
            raise

    while chunk:
        file_.write(chunk)
        chunk = infile.read(CHUNK_SIZE)
项目:auto_mal    作者:0xhughes    | 项目源码 | 文件源码
def _hardcode_setup():
    # A user may want to hardcode in and out paths for their analysis files. If no config file exists, it will ask if you want to store your in/out options in one and use them from then on out.
    sane = 0
    home_dir = str(os.path.expanduser('~'))
    input_conf = raw_input('-- : --  Please enter full path to input sample directory: ')
    output_conf = raw_input('-- : --  Please enter full path to sample/analysis output directory: ')
    if not os.path.exists(input_conf) or not os.path.exists(output_conf):
        sys.exit("-- : --  Invalid paths detected. Please check input. Run again to re-enter paths.")
    print '-- : --  Is this input directory correct "'+str(input_conf)+'"?'
    print '-- : --  Is this output directory correct "'+str(output_conf)+'"?'
    while sane == 0:
        dir_choice = raw_input('-- : --  Yes or No: ')
        if dir_choice == 'Yes':
            conf_file = open(home_dir+'/automal_conf.conf', 'w')
            conf_file.write('ipath='+str(input_conf)+'\n')
            conf_file.write('opath='+str(output_conf)+'\n')
            conf_file.close()
            sane = 1
        if dir_choice == 'No':
            sys.exit('-- : --  Exited on incorrect paths. Run again to re-enter.')
        if dir_choice != 'No' and dir_choice != 'Yes':
            print '-- : --  Invalid choice, try again. Yes or No.'
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def _get_guest_digital_signers(self):
        retdata = dict()
        cert_data = dict()
        cert_info = os.path.join(CUCKOO_ROOT, "storage", "analyses",
                                 str(self.results["info"]["id"]), "aux",
                                 "DigiSig.json")

        if os.path.exists(cert_info):
            with open(cert_info, "r") as cert_file:
                buf = cert_file.read()
            if buf:
                cert_data = json.loads(buf)

        if cert_data:
            retdata = {
                "aux_sha1": cert_data["sha1"],
                "aux_timestamp": cert_data["timestamp"],
                "aux_valid": cert_data["valid"],
                "aux_error": cert_data["error"],
                "aux_error_desc": cert_data["error_desc"],
                "aux_signers": cert_data["signers"]
            }

        return retdata
项目:diffoscope    作者:ReproducibleBuilds    | 项目源码 | 文件源码
def fuzzy_hash(self):
            if not hasattr(self, '_fuzzy_hash'):
                # tlsh is not meaningful with files smaller than 512 bytes
                if os.stat(self.path).st_size >= 512:
                    h = tlsh.Tlsh()
                    with open(self.path, 'rb') as f:
                        for buf in iter(lambda: f.read(32768), b''):
                            h.update(buf)
                    h.final()
                    try:
                        self._fuzzy_hash = h.hexdigest()
                    except ValueError:
                        # File must contain a certain amount of randomness.
                        self._fuzzy_hash = None
                else:
                    self._fuzzy_hash = None
            return self._fuzzy_hash
项目:diffoscope    作者:ReproducibleBuilds    | 项目源码 | 文件源码
def has_same_content_as(self, other):
        logger.debug('File.has_same_content: %s %s', self, other)
        if os.path.isdir(self.path) or os.path.isdir(other.path):
            return False
        # try comparing small files directly first
        try:
            my_size = os.path.getsize(self.path)
            other_size = os.path.getsize(other.path)
        except OSError:
            # files not readable (e.g. broken symlinks) or something else,
            # just assume they are different
            return False
        if my_size == other_size and my_size <= SMALL_FILE_THRESHOLD:
            try:
                with profile('command', 'cmp (internal)'):
                    with open(self.path, 'rb') as file1, open(other.path, 'rb') as file2:
                        return file1.read() == file2.read()
            except OSError:
                # one or both files could not be opened for some reason,
                # assume they are different
                return False

        return self.cmp_external(other)
项目:firmanal    作者:kyechou    | 项目源码 | 文件源码
def magic(indata, mime=False):
        """
        Performs file magic while maintaining compatibility with different
        libraries.
        """

        try:
            if mime:
                mymagic = magic.open(magic.MAGIC_MIME_TYPE)
            else:
                mymagic = magic.open(magic.MAGIC_NONE)
            mymagic.load()
        except AttributeError:
            mymagic = magic.Magic(mime)
            mymagic.file = mymagic.from_file
        return mymagic.file(indata)
项目:Snakepit    作者:K4lium    | 项目源码 | 文件源码
def get_type(self):
        try:
            ms = magic.open(magic.MAGIC_NONE)
            ms.load()
            file_type = ms.file(self.path)
        except:
            try:
                file_type = magic.from_file(self.path)
            except:
                try:
                    import subprocess
                    file_process = subprocess.Popen(['file', '-b', self.path], stdout = subprocess.PIPE)
                    file_type = file_process.stdout.read().strip()
                except:
                    return ''
        finally:
            try:
                ms.close()
            except:
                pass

        return file_type
项目:Snakepit    作者:K4lium    | 项目源码 | 文件源码
def get_type(data):
    try:
        ms = magic.open(magic.MAGIC_NONE)
        ms.load()
        file_type = ms.buffer(data)
    except:
        try:
            file_type = magic.from_buffer(data)
        except:
            return ''
    finally:
        try:
            ms.close()
        except:
            pass

    return file_type
项目:Overlord    作者:TSDBBench    | 项目源码 | 文件源码
def generate_html(p, templateFile, templateDict, outputFile, overwrite, logger):
    if not Util.check_file_exists(templateFile):
        logger.error("Template file does not exist: '%s'" %(templateFile))
        os._exit(-1)
    try:
        template = jinja2.Environment(loader = jinja2.FileSystemLoader(searchpath=os.path.split(templateFile)[0])).get_template(os.path.split(templateFile)[1])
    except Exception, e:
        logger.error("Failed load template file '%s'" %(templateFile), exc_info=True)
        os._exit(-1)
    html = bokeh.embed.file_html(models=p, resources=bokeh.resources.INLINE, title=templateDict["title"] , template=template, template_variables=templateDict)
    if Util.check_file_exists(outputFile) and not overwrite:
        logger.error("Html file does exist: '%s'. Delete or use overwrite flag." %(outputFile))
        os._exit(-1)
    try:
        file = open(outputFile,"w")
        file.write(html)
        file.close()
    except Exception, e:
        logger.error("Error while writing html file '%s'" %(outputFile), exc_info=True)
        os._exit(-1)

# generates bokeh histogram_data
# gets data from every "LatencyList"
# data2 is just data/2.0
# commented out code is old and better to read but much slower due to "key not in" - if
项目:Overlord    作者:TSDBBench    | 项目源码 | 文件源码
def openCompressedFile(ycsbfile, dict, key, decompress, overwrite, logger):
    try:
        file = gzip.open(ycsbfile,"r")
        dict[key]=cPickle.load(file)
        file.close()
    except Exception, e:
        logger.error("Can't open '%s'. Is it really a compressed .ydc file?" %(ycsbfile), exc_info=True)
        os._exit(-1)
    # if you truly just want to decompress it, stop after saving plain ycsb file
    if decompress:
        try:
            newFileName=os.path.splitext(os.path.basename(ycsbfile))[0]+".log"
            if (not Util.check_file_exists(newFileName) or  overwrite) and os.access(".", os.W_OK):
                if key in dict.keys() and dict[key] != None:
                    decompressFile(dict[key], newFileName, logger)
                else:
                    logger.error("Dictionary does not have filecontent or is null." , exc_info=True)
                    os._exit(-1)
            else:
                logger.error("Can't create '%s' to write. Does it already exist?" %(newFileName), exc_info=True)
                os._exit(-1)
        except Exception, e:
            logger.error("Can't open '%s'." %("%s.log.log" %(os.path.basename(ycsbfile))), exc_info=True)
            os._exit(-1)
项目:acbs    作者:AOSC-Dev    | 项目源码 | 文件源码
def file_type(file_loc):
    try:
        import magic
    except:
        print('[W] ACBS cannot find libmagic bindings, will use bundled one instead.')
        import lib.magic as magic
    mco = magic.open(magic.MIME_TYPE | magic.MAGIC_SYMLINK)
    mco.load()
    try:
        tp = mco.file(file_loc)
        tp_list = tp.decode('utf-8').split('/')
    except:
        print('[W] Unable to determine the file type!')
        return ['unknown', 'unknown']
    return tp_list
项目:acbs    作者:AOSC-Dev    | 项目源码 | 文件源码
def file_type_full(file_loc):
    try:
        import magic
    except:
        print('[W] ACBS cannot find libmagic bindings, will use bundled one instead.')
        import lib.magic as magic
    mco = magic.open(magic.NONE | magic.MAGIC_SYMLINK)
    mco.load()
    try:
        tp = mco.file(file_loc)
    except:
        print('[W] Unable to determine the file type!')
        return 'data'
    return tp.decode('utf-8')
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def hashFile(path):
    m = hashlib.sha1()
    try:
        with open(path, 'rb', buffering=0) as f:
            buf = f.read(16384)
            while len(buf) > 0:
                m.update(buf)
                buf = f.read(16384)
    except OSError as e:
        logging.getLogger(__name__).warning("Cannot hash file: %s", str(e))
    return m.digest()
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def open(self):
            pass
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def hashDirectory(self, path):
        self.__index.open()
        try:
            return self.__hashDir(os.fsencode(path))
        finally:
            self.__index.close()
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def hashPath(self, path):
        path = os.fsencode(path)
        try:
            s = os.lstat(path)
        except OSError as err:
            logging.getLogger(__name__).warning("Cannot stat '%s': %s", path, str(err))
            return b''

        self.__index.open()
        try:
            return self.__hashEntry(path, b'', s)
        finally:
            self.__index.close()
项目:csirtg-smrt-py    作者:csirtgadgets    | 项目源码 | 文件源码
def _process_cache(self, split="\n", rstrip=True):
        try:
            ftype = magic.from_file(self.cache, mime=True)
        except AttributeError:
            try:
                mag = magic.open(magic.MAGIC_MIME)
                mag.load()
                ftype = mag.file(self.cache)
            except AttributeError as e:
                raise RuntimeError('unable to detect cached file type')

        if PYVERSION < 3:
            ftype = ftype.decode('utf-8')

        if ftype.startswith('application/x-gzip') or ftype.startswith('application/gzip'):
            from csirtg_smrt.decoders.zgzip import get_lines
            for l in get_lines(self.cache, split=split):
                yield l

            return

        if ftype == "application/zip":
            from csirtg_smrt.decoders.zzip import get_lines
            for l in get_lines(self.cache, split=split):
                yield l

            return

        # all others, mostly txt, etc...
        with open(self.cache) as f:
            for l in f:
                yield l
项目:csirtg-smrt-py    作者:csirtgadgets    | 项目源码 | 文件源码
def _cache_write(self, s):
        with open(self.cache, 'wb') as f:
            auth = False
            if self.username:
                auth = (self.username, self.password)

            resp = self._cache_refresh(s, auth)
            if not resp:
                return

            for block in resp.iter_content(1024):
                f.write(block)
项目:csirtg-smrt-py    作者:csirtgadgets    | 项目源码 | 文件源码
def get_mimetype(f):
    try:
        ftype = magic.from_file(f, mime=True)
    except AttributeError:
        try:
            mag = magic.open(magic.MAGIC_MIME)
            mag.load()
            ftype = mag.file(f)
        except AttributeError as e:
            raise RuntimeError('unable to detect cached file type')

    if PYVERSION < 3:
        ftype = ftype.decode('utf-8')

    return ftype
项目:csirtg-smrt-py    作者:csirtgadgets    | 项目源码 | 文件源码
def get_type(f, mime=None):
    if not mime:
        mime = get_mimetype(f)

    if isinstance(f, str):
        f = open(f)

    t = None
    for tt in TESTS:
        f.seek(0)
        t = tt(f, mime)
        if t:
            return t
项目:guest-images    作者:S2E    | 项目源码 | 文件源码
def detect_magic():
    global g_m
    if hasattr(magic, 'open'):
        g_m = magic.open(magic.MAGIC_SYMLINK)
        g_m.load()
项目:binaryanalysis    作者:armijnhemel    | 项目源码 | 文件源码
def gethash(path, filename):
    scanfile = open("%s/%s" % (path, filename), 'r')
    h = hashlib.new('sha256')
    scanfile.seek(0)
    hashdata = scanfile.read(10000000)
    while hashdata != '':
        h.update(hashdata)
        hashdata = scanfile.read(10000000)
    scanfile.close()
    return h.hexdigest()

## method to compare binaries. Returns the amount of bytes that differ
## according to bsdiff, or 0 if the files are identical
项目:binaryanalysis    作者:armijnhemel    | 项目源码 | 文件源码
def main(argv):
    parser = OptionParser()
    parser.add_option("-f", "--filedir", action="store", dest="filedir", help="path to directory containing files to unpack", metavar="DIR")

    (options, args) = parser.parse_args()
    if options.filedir == None:
        parser.error("Specify dir with files")
    else:
        try:
            filelist = open(os.path.join(options.filedir, "LIST")).readlines()
        except:
            parser.error("'LIST' not found in file dir")

    ## first process the LIST file
    pkgmeta = []
    for unpackfile in filelist:
        try:
            unpacks = unpackfile.strip().split()
            if len(unpacks) == 3:
                origin = "unknown"
                (package, version, filename) = unpacks
            else:
                (package, version, filename, origin) = unpacks
            pkgmeta.append((options.filedir, filename))
        except Exception, e:
            # oops, something went wrong
            print >>sys.stderr, e

    pool = multiprocessing.Pool()
    unpackresults = pool.map(unpack, pkgmeta, 1)
    pool.terminate()
    for i in unpackresults:
        if i != None:
            (filename, result) = i
            if not result:
                print "corrupt archive: %s" % filename
项目:binaryanalysis    作者:armijnhemel    | 项目源码 | 文件源码
def scanArchitecture(filename, tags, cursor, conn, filehashes, blacklist=[], scanenv={}, scandebug=False, unpacktempdir=None):
    if not 'elf' in tags:
        return
    archres = elfcheck.getArchitecture(filename, tags)
    if archres != None:
        return (['architecture'], archres)

## search markers for various open source programs
## This search is not accurate, but might come in handy in some situations
项目:binaryanalysis    作者:armijnhemel    | 项目源码 | 文件源码
def searchUnpackLzip(filename, tempdir=None, blacklist=[], offsets={}, scanenv={}, debug=False):
    hints = {}
    if not 'lzip' in offsets:
        return ([], blacklist, [], hints)
    if offsets['lzip'] == []:
        return ([], blacklist, [], hints)
    filesize = os.stat(filename).st_size
    if filesize < 5:
        return ([], blacklist, [], hints)
    diroffsets = []
    tags = []
    counter = 1
    for offset in offsets['lzip']:
        blacklistoffset = extractor.inblacklist(offset, blacklist)
        if blacklistoffset != None:
            continue
        ## sanity check, only versions 0 or 1 are supported
        lzipfile = open(filename, 'rb')
        lzipfile.seek(offset+4)
        lzipversion = lzipfile.read(1)
        lzipfile.close()
        if struct.unpack('<B', lzipversion)[0] > 1:
            continue
        tmpdir = dirsetup(tempdir, filename, "lzip", counter)
        (res, lzipsize) = unpackLzip(filename, offset, tmpdir)
        if res != None:
            diroffsets.append((res, offset, lzipsize))
            blacklist.append((offset, offset+lzipsize))
            counter = counter + 1
            if offset == 0 and lzipsize == filesize:
                tags.append("compressed")
                tags.append("lzip")
        else:
            ## cleanup
            os.rmdir(tmpdir)
    return (diroffsets, blacklist, tags, hints)
项目:binaryanalysis    作者:armijnhemel    | 项目源码 | 文件源码
def gzipcrc32(filename):
    datafile = open(filename, 'rb')
    datafile.seek(0)
    databuffer = datafile.read(10000000)
    crc32 = binascii.crc32('')
    while databuffer != '':
        crc32 = binascii.crc32(databuffer, crc32)
        databuffer = datafile.read(10000000)
    datafile.close()
    crc32 = crc32 & 0xffffffff
    return crc32
项目:binaryanalysis    作者:armijnhemel    | 项目源码 | 文件源码
def searchUnpackKnownGzip(filename, tempdir=None, scanenv={}, debug=False):
    ## first check if the file actually could be a valid gzip file
    gzipfile = open(filename, 'rb')
    gzipfile.seek(0)
    gzipheader = gzipfile.read(3)
    gzipfile.close()
    if gzipheader != fsmagic.fsmagic['gzip']:
        return ([], [], [], {})

    ## then try unpacking it.
    res = searchUnpackGzip(filename, tempdir, [], {'gzip': [0]}, scanenv, debug)
    (diroffsets, blacklist, newtags, hints) = res

    failed = False
    ## there were results, so check if they were successful
    if diroffsets != []:
        if len(diroffsets) != 1:
            failed = True
        else:
            (dirpath, startoffset, endoffset) = diroffsets[0]
            if startoffset != 0 or endoffset != os.stat(filename).st_size:
                failed = True

        if failed:
            for i in diroffsets:
                (dirpath, startoffset, endoffset) = i
                try:
                    shutil.rmtree(dirpath)
                except:
                    pass
            return ([], [], [], {})
        else:
            return (diroffsets, blacklist, newtags, hints)
    return ([], [], [], {})
项目:binaryanalysis    作者:armijnhemel    | 项目源码 | 文件源码
def searchUnpackKnownBzip2(filename, tempdir=None, scanenv={}, debug=False):
    ## first check if the file actually could be a valid gzip file
    bzip2file = open(filename, 'rb')
    bzip2file.seek(0)
    bzip2header = bzip2file.read(3)
    bzip2file.close()
    if bzip2header != fsmagic.fsmagic['bz2']:
        return ([], [], [], {})

    ## then try unpacking it.
    res = searchUnpackBzip2(filename, tempdir, [], {'bz2': [0]}, scanenv, debug)
    (diroffsets, blacklist, newtags, hints) = res

    failed = False
    ## there were results, so check if they were successful
    if diroffsets != []:
        if len(diroffsets) != 1:
            failed = True
        else:
            (dirpath, startoffset, endoffset) = diroffsets[0]
            if startoffset != 0 or endoffset != os.stat(filename).st_size:
                failed = True

        if failed:
            for i in diroffsets:
                (dirpath, startoffset, endoffset) = i
                try:
                    shutil.rmtree(dirpath)
                except:
                    pass
            return ([], [], [], {})
        else:
            return (diroffsets, blacklist, newtags, hints)
    return ([], [], [], {})

## search and unpack bzip2 compressed files
项目:binaryanalysis    作者:armijnhemel    | 项目源码 | 文件源码
def searchUnpackRZIP(filename, tempdir=None, blacklist=[], offsets={}, scanenv={}, debug=False):
    hints = {}
    if not 'rzip' in offsets:
        return ([], blacklist, [], hints)
    if offsets['rzip'] == []:
        return ([], blacklist, [], hints)
    if offsets['rzip'][0] != 0:
        return ([], blacklist, [], hints)
    if os.stat(filename).st_size < 10:
        return ([], blacklist, [], hints)
    diroffsets = []
    tags = []
    offset = 0

    rzipfile = open(filename, 'rb')
    rzipfile.seek(0)
    rzipdata = rzipfile.read(10)
    rzipfile.close()

    rzipsize = struct.unpack('>L', rzipdata[6:10])[0]

    blacklistoffset = extractor.inblacklist(offset, blacklist)
    if blacklistoffset != None:
        return (diroffsets, blacklist, tags, hints)

    tmpdir = dirsetup(tempdir, filename, "rzip", 1)
    res = unpackRZIP(filename, offset, rzipsize, tmpdir)
    if res != None:
        rzipdir = res
        diroffsets.append((rzipdir, offset, 0))
        #blacklist.append((offset, offset + unpackrzipsize))
        #if offset == 0:
        #   tags.append("compressed")
        #   tags.append("rzip")
    else:
        ## cleanup
        os.rmdir(tmpdir)

    return (diroffsets, blacklist, tags, hints)
项目:binaryanalysis    作者:armijnhemel    | 项目源码 | 文件源码
def searchUnpackAndroidSparse(filename, tempdir=None, blacklist=[], offsets={}, scanenv={}, debug=False):
    hints = {}
    if not 'android-sparse' in offsets:
        return ([], blacklist, [], hints)
    if offsets['android-sparse'] == []:
        return ([], blacklist, [], hints)

    diroffsets = []
    counter = 1
    tags = []
    for offset in offsets['android-sparse']:
        blacklistoffset = extractor.inblacklist(offset, blacklist)
        if blacklistoffset != None:
            continue
        ## first see if the major version is correct
        sparsefile = open(filename, 'rb')
        sparsefile.seek(offset+4)
        sparsedata = sparsefile.read(2)
        sparsefile.close()
        if len(sparsedata) != 2:
            break
        majorversion = struct.unpack('<H', sparsedata)[0]
        if not majorversion == 1:
            continue

        tmpdir = dirsetup(tempdir, filename, "android-sparse", counter)
        res = unpackAndroidSparse(filename, offset, tmpdir)
        if res != None:
            (sparsesize, sparsedir) = res
            diroffsets.append((sparsedir, offset, sparsesize))

            blacklist.append((offset, offset + sparsesize))
            counter = counter + 1
        else:
            ## cleanup
            os.rmdir(tmpdir)
    return (diroffsets, blacklist, tags, hints)
项目:binaryanalysis    作者:armijnhemel    | 项目源码 | 文件源码
def searchUnpackIHex(filename, tempdir=None, blacklist=[], offsets={}, scanenv={}, debug=False):
    hints = {}
    tags = []
    diroffsets = []
    counter = 1
    filesize = os.stat(filename).st_size

    tmpdir = dirsetup(tempdir, filename, "ihex", counter)
    tmpfile = tempfile.mkstemp(dir=tmpdir)
    datafile = open(filename, 'r')
    foundend = False
    offset = 0
    for d in datafile:
        if foundend:
            os.fdopen(tmpfile[0]).close()
            datafile.close()
            os.rmdir(tmpdir)
            return (diroffsets, blacklist, tags, hints)
        b = d.strip()
        if not b.startswith(':'):
            if not b.startswith('#'):
                break
        if len(b) < 3:
            break
        bytecount = ord(b[1:3].decode('hex'))
        address = struct.unpack('>H', b[3:7].decode('hex'))
        recordtype = ord(b[7:9].decode('hex'))
        if recordtype == 1:
            foundend = True
            break
        if recordtype != 0:
            continue
        databytes = b[9:9+bytecount*2].decode('hex')
        os.write(tmpfile[0], databytes)
    os.fdopen(tmpfile[0]).close()
    datafile.close()
    diroffsets.append((tmpdir, offset, filesize))
    blacklist.append((offset, offset + filesize))
    return (diroffsets, blacklist, tags, hints)

## sometimes MP3 audio files are embedded into binary blobs
项目:cuckoodroid-2.0    作者:idanr1986    | 项目源码 | 文件源码
def run(self):
        ret = []
        source = open(self.filepath, "rb").read()

        # Get rid of superfluous comments.
        source = re.sub("/\\*.*?\\*/", "", source, flags=re.S)

        for script in re.findall(self.script_re, source, re.I | re.S):
            try:
                x = bs4.BeautifulSoup(script, "html.parser")
                language = x.script.attrs.get("language", "").lower()
            except:
                language = None

            # We can't rely on bs4 or any other HTML/XML parser to provide us
            # with the raw content of the xml tag as they decode html entities
            # and all that, leaving us with a corrupted string.
            source = re.match("<.*>(.*)</.*>$", script, re.S).group(0)

            # Decode JScript.Encode encoding.
            if language in ("jscript.encode", "vbscript.encode"):
                source = self.decode(source)

            ret.append(to_unicode(source))

        return ret
项目:cuckoodroid-2.0    作者:idanr1986    | 项目源码 | 文件源码
def _get_keys(self):
        """Get any embedded plaintext public and/or private keys."""
        buf = open(self.file_path).read()
        ret = set()
        ret.update(re.findall(self.PUBKEY_RE, buf))
        ret.update(re.findall(self.PRIVKEY_RE, buf))
        return list(ret)
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def get_filetype(data):
    """There are two versions of python-magic floating around, and annoyingly, the interface
        changed between versions, so we try one method and if it fails, then we try the other"""
        if sys.modules.has_key('magic'):
            try:
                ms = magic.open(magic.MAGIC_NONE)
                        ms.load()
                        return ms.buffer(data)
                except:
                    return magic.from_buffer(data)
项目:firmflaws    作者:Ganapati    | 项目源码 | 文件源码
def io_dd(indir, offset, size, outdir):
        """
        Given a path to a target file, extract size bytes from specified offset
        to given output file.
        """
        if not size:
            return

        with open(indir, "rb") as ifp:
            with open(outdir, "wb") as ofp:
                ifp.seek(offset, 0)
                ofp.write(ifp.read(size))
项目:firmflaws    作者:Ganapati    | 项目源码 | 文件源码
def io_md5(target):
        """
        Performs MD5 with a block size of 64kb.
        """
        blocksize = 65536
        hasher = hashlib.md5()

        with open(target, 'rb') as ifp:
            buf = ifp.read(blocksize)
            while buf:
                hasher.update(buf)
                buf = ifp.read(blocksize)
            return hasher.hexdigest()
项目:rucio    作者:rucio01    | 项目源码 | 文件源码
def http_download(url, filename):
    '''
    Download the file in `url` storing it in the path given by `filename`.
    '''
    with open(filename, 'w') as f:
        http_download_to_file(url, f)
项目:rucio    作者:rucio01    | 项目源码 | 文件源码
def srm_download(url, filename):
    '''
    Download the file in `url` storing it in the path given by `filename`.
    '''
    with open(filename, 'w') as f:
        srm_download_to_file(url, f)
项目:auto_mal    作者:0xhughes    | 项目源码 | 文件源码
def _get_hc_dirs():
    try:
        read_conf = open(str(os.path.expanduser('~'))+'/automal_conf.conf', 'r').readlines()
    except IOError:
        sys.exit("-- : --  Unable to access conf. file. Run and re-enter new conf. file paths.")
    for line in read_conf:
        if line.startswith('ipath'):
            input_path = str(line.split('=')[1]).strip()
        if line.startswith('opath'):
            output_path = str(line.split('=')[1]).strip()
    return(input_path, output_path)
项目:auto_mal    作者:0xhughes    | 项目源码 | 文件源码
def _get_file_type(full_targ_path):
    # This function takes the full path of a target sample and determines/returns the file type via python-magic.
    try:
        magicObj = magic.open(magic.MAGIC_NONE)
        magicObj.load()
        magic_out = str(magicObj.file(full_targ_path))
    except AttributeError:
        magic_out = str(magic.from_file(full_targ_path))

    return(magic_out)
项目:auto_mal    作者:0xhughes    | 项目源码 | 文件源码
def _swf_analysis(full_targ_path):
    # This function calls swftools, tools, and flasm against SWF samples to extract data and/or performs analysis as needed.
    command_out = subprocess.Popen(["swfdump", "-a", full_targ_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0]
    command2_out = subprocess.Popen(["swfextract", full_targ_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0]
    command2_list = command2_out.split('\n')
    command_out_list = command_out.split('\n')

    swf_ioc_res = ""

    for out in command_out_list:
        strOut = str(out)
        ioc_list = ["http", "www", ".com", ".net", ".info", "GetVariable", "GetURL", 'String:"_post"', 'String:"send"', "\\\\", "pushstring", "url.split", ".php", "urlmon", ".exe"]
        for indi in ioc_list:
            if indi in strOut:
                swf_ioc_res = "Present"

    if len(swf_ioc_res) == 0:
        swf_ioc_res = "None"

    extract_list_fns = []
    for out in command2_list:
        if "JPEG" in out:
            j_id = out.rfind(' ')+1
            j_id = int(out[j_id:len(out)])
            # Sometimes picture extraction doesn't occur... correctly, so we suppress the output. If we get it, great, if we don't whatever, for now.
            os_null = open(os.devnull, 'wb')
            subprocess.Popen(['swfextract', full_targ_path, '-j', str(j_id), '-o', '/tmp/automal/'+str(j_id)+'.jpg'], stdout=os_null, stderr=os_null)
            subprocess.Popen(['swfextract', full_targ_path, '-p', str(j_id), '-o', '/tmp/automal/'+str(j_id)+'.png'], stdout=os_null, stderr=os_null)
            extract_list_fns.append('/tmp/automal/'+str(j_id))
    return(command_out, extract_list_fns, command2_out, swf_ioc_res)
项目:auto_mal    作者:0xhughes    | 项目源码 | 文件源码
def _c_sample_out_dir(targ, automal_dir):
    # When we analyze samples, a output directory named after the MD5 hash of the sample is created and/or used for the samples specific exports, putput info, etc.
    out_md5 = str(hashlib.md5(targ).hexdigest())
    out_full_path = automal_dir+'/'+out_md5
    if not os.path.exists(out_full_path):
        os.makedirs(out_full_path)
    out_file_Obj = open(out_full_path+'/Output.txt', 'a')
    return (out_file_Obj, out_full_path, out_md5)