Python re 模块,compile() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用re.compile()

项目:django-heartbeat    作者:pbs    | 项目源码 | 文件源码
def get_client_ip(request):
    access_route = get_access_route(request)

    if len(access_route) == 1:
        return access_route[0]
    expression = """
        (^(?!(?:[0-9]{1,3}\.){3}[0-9]{1,3}$).*$)|  # will match non valid ipV4
        (^127\.0\.0\.1)|  # will match 127.0.0.1
        (^10\.)|  # will match 10.0.0.0 - 10.255.255.255 IP-s
        (^172\.1[6-9]\.)|  # will match 172.16.0.0 - 172.19.255.255 IP-s
        (^172\.2[0-9]\.)|  # will match 172.20.0.0 - 172.29.255.255 IP-s
        (^172\.3[0-1]\.)|  # will match 172.30.0.0 - 172.31.255.255 IP-s
        (^192\.168\.)  # will match 192.168.0.0 - 192.168.255.255 IP-s
    """
    regex = re.compile(expression, re.X)
    for ip in access_route:
        if not ip:
            # it's possible that the first value from X_FORWARDED_FOR
            # will be null, so we need to pass that value
            continue
        if regex.search(ip):
            continue
        else:
            return ip
项目:MIT-CS-lectures    作者:William-Python-King    | 项目源码 | 文件源码
def _readRegExp(regExp):
    """
    Discard leading white space characters from standard input. Then read
    from standard input and return a string matching regular expression
    regExp.  Raise an EOFError if no non-whitespace characters remain
    in standard input.  Raise a ValueError if the next characters to
    be read from standard input do not match 'regExp'.
    """
    global _buffer
    if isEmpty():
        raise EOFError()
    compiledRegExp = re.compile(r'^\s*' + regExp)
    match = compiledRegExp.search(_buffer)
    if match is None:
        raise ValueError()
    s = match.group()
    _buffer = _buffer[match.end():]
    return s.lstrip()

#-----------------------------------------------------------------------
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def run(self):
        Analyzer.run(self)

        if self.data_type == 'domain' or self.data_type == 'url':
            try:
                pattern = re.compile("(?:Category: )([\w\s]+)")
                baseurl = 'http://www.fortiguard.com/webfilter?q='
                url = baseurl + self.getData()
                req = requests.get(url)
                category_match = re.search(pattern, req.content, flags=0)
                self.report({
                    'category': category_match.group(1)
                })
            except ValueError as e:
                self.unexpectedError(e)
        else:
            self.notSupported()
项目:txt2evernote    作者:Xunius    | 项目源码 | 文件源码
def findHeadings(self, lines, struc):

        linenum=len(lines)
        level=struc[-1]
        _h_re=re.compile(self.textparser._h_re_base % level, re.X | re.M)

        hidx=[]

        for ii in xrange(linenum):
            if _h_re.match(lines[ii]):
                hidx.append(ii)

        hidx.append(linenum)
        groups=[[hidx[ii],hidx[ii+1]] for ii in xrange(len(hidx)-1)]

        result=[]
        for ii in groups:
            #--------Use heading line as container name--------
            result.append(TextContainer(lines[ii[0]],struc,ii))

        return result
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def getDetailList(self,content):
        s2 = r'<h2><a target="_blank" href="(.*?)" title="(.*?)"'
        pattern =re.compile(s2 , re.S
            )
        result = re.findall(pattern, content)
        with open('file.txt','w',encoding='gbk') as f:
            f.write(content)

        if not result:
            print('???????..............')


        threadsList=[] 
        for item in result:
            t = threading.Thread(target = workthread, args=(item, self.user_agent, self.path))
            threadsList.append(t)
            t.start()

        for threadid in threadsList:
            threadid.join()
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def __init__(
            self, index_url="https://pypi.python.org/simple", hosts=('*',),
            ca_bundle=None, verify_ssl=True, *args, **kw
            ):
        Environment.__init__(self, *args, **kw)
        self.index_url = index_url + "/" [:not index_url.endswith('/')]
        self.scanned_urls = {}
        self.fetched_urls = {}
        self.package_pages = {}
        self.allows = re.compile('|'.join(map(translate, hosts))).match
        self.to_scan = []
        use_ssl = (
            verify_ssl
            and ssl_support.is_available
            and (ca_bundle or ssl_support.find_ca_bundle())
        )
        if use_ssl:
            self.opener = ssl_support.opener_for(ca_bundle)
        else:
            self.opener = urllib.request.urlopen
项目:subtitle-synchronization    作者:AlbertoSabater    | 项目源码 | 文件源码
def getAudio(freq, audio_files=None):

    files = os.listdir(DATA_DIR)
    p = re.compile('.*\.[mkv|avi]')
    files = [ f for f in files if p.match(f) ]

    if audio_files:
        files = [ f for f in files if os.path.splitext(f)[0] in audio_files]

    audio_dirs = []
    for f in files:
        name, extension = os.path.splitext(f)
        command = "ffmpeg -i {0}{1}{2} -ab 160k -ac 2 -ar {3} -vn {0}{1}_{3}.wav".format(DATA_DIR, name, extension, freq)
        audio_dirs.append(DATA_DIR + name + '_' + str(freq) + '.wav')
        subprocess.call(command, shell=True)

    return audio_dirs

# Convert timestamp to seconds
项目:pyOSC3    作者:Qirky    | 项目源码 | 文件源码
def getRegEx(pattern):
    """Compiles and returns a 'regular expression' object for the given address-pattern.
    """
    # Translate OSC-address syntax to python 're' syntax
    pattern = pattern.replace(".", r"\.")       # first, escape all '.'s in the pattern.
    pattern = pattern.replace("(", r"\(")       # escape all '('s.
    pattern = pattern.replace(")", r"\)")       # escape all ')'s.
    pattern = pattern.replace("*", r".*")       # replace a '*' by '.*' (match 0 or more characters)
    pattern = pattern.translate(OSCtrans)       # change '?' to '.' and '{,}' to '(|)'

    return re.compile(pattern)

######
#
# OSCMultiClient class
#
######
项目:pyOSC3    作者:Qirky    | 项目源码 | 文件源码
def getRegEx(pattern):
    """Compiles and returns a 'regular expression' object for the given address-pattern.
    """
    # Translate OSC-address syntax to python 're' syntax
    pattern = pattern.replace(".", r"\.")       # first, escape all '.'s in the pattern.
    pattern = pattern.replace("(", r"\(")       # escape all '('s.
    pattern = pattern.replace(")", r"\)")       # escape all ')'s.
    pattern = pattern.replace("*", r".*")       # replace a '*' by '.*' (match 0 or more characters)
    pattern = pattern.translate(OSCtrans)       # change '?' to '.' and '{,}' to '(|)'

    return re.compile(pattern)

######
#
# OSCMultiClient class
#
######
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def getDetailList(self,content):
        s2 = r'<h2><a target="_blank" href="(.*?)" title="(.*?)"'
        pattern =re.compile(s2 , re.S
            )
        result = re.findall(pattern, content)
        with open('file.txt','w',encoding='gbk') as f:
            f.write(content)

        if not result:
            print('???????..............')


        threadsList=[] 
        for item in result:
            t = threading.Thread(target = workthread, args=(item, self.user_agent, self.path))
            threadsList.append(t)
            t.start()

        for threadid in threadsList:
            threadid.join()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def characterErrorsUCS2(self, data):
        # Someone picked the wrong compile option
        # You lose
        skip = False
        for match in invalid_unicode_re.finditer(data):
            if skip:
                continue
            codepoint = ord(match.group())
            pos = match.start()
            # Pretty sure there should be endianness issues here
            if _utils.isSurrogatePair(data[pos:pos + 2]):
                # We have a surrogate pair!
                char_val = _utils.surrogatePairToCodepoint(data[pos:pos + 2])
                if char_val in non_bmp_invalid_codepoints:
                    self.errors.append("invalid-codepoint")
                skip = True
            elif (codepoint >= 0xD800 and codepoint <= 0xDFFF and
                  pos == len(data) - 1):
                self.errors.append("invalid-codepoint")
            else:
                skip = False
                self.errors.append("invalid-codepoint")
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def run_script(self, script_name, namespace):
        script = 'scripts/' + script_name
        if not self.has_metadata(script):
            raise ResolutionError("No script named %r" % script_name)
        script_text = self.get_metadata(script).replace('\r\n', '\n')
        script_text = script_text.replace('\r', '\n')
        script_filename = self._fn(self.egg_info, script)
        namespace['__file__'] = script_filename
        if os.path.exists(script_filename):
            source = open(script_filename).read()
            code = compile(source, script_filename, 'exec')
            exec(code, namespace, namespace)
        else:
            from linecache import cache
            cache[script_filename] = (
                len(script_text), 0, script_text.split('\n'), script_filename
            )
            script_code = compile(script_text, script_filename, 'exec')
            exec(script_code, namespace, namespace)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _needs_hiding(mod_name):
    """
    >>> _needs_hiding('setuptools')
    True
    >>> _needs_hiding('pkg_resources')
    True
    >>> _needs_hiding('setuptools_plugin')
    False
    >>> _needs_hiding('setuptools.__init__')
    True
    >>> _needs_hiding('distutils')
    True
    >>> _needs_hiding('os')
    False
    >>> _needs_hiding('Cython')
    True
    """
    pattern = re.compile(r'(setuptools|pkg_resources|distutils|Cython)(\.|$)')
    return bool(pattern.match(mod_name))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def byte_compile(self, to_compile):
        if sys.dont_write_bytecode:
            self.warn('byte-compiling is disabled, skipping.')
            return

        from distutils.util import byte_compile

        try:
            # try to make the byte compile messages quieter
            log.set_verbosity(self.verbose - 1)

            byte_compile(to_compile, optimize=0, force=1, dry_run=self.dry_run)
            if self.optimize:
                byte_compile(
                    to_compile, optimize=self.optimize, force=1,
                    dry_run=self.dry_run,
                )
        finally:
            log.set_verbosity(self.verbose)  # restore original verbosity
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def filterwarnings(action, message="", category=Warning, module="", lineno=0,
                   append=False):
    """Insert an entry into the list of warnings filters (at the front).

    'action' -- one of "error", "ignore", "always", "default", "module",
                or "once"
    'message' -- a regex that the warning message must match
    'category' -- a class that the warning must be a subclass of
    'module' -- a regex that the module name must match
    'lineno' -- an integer line number, 0 matches all warnings
    'append' -- if true, append to the list of filters
    """
    import re
    assert action in ("error", "ignore", "always", "default", "module",
                      "once"), "invalid action: %r" % (action,)
    assert isinstance(message, str), "message must be a string"
    assert isinstance(category, type), "category must be a class"
    assert issubclass(category, Warning), "category must be a Warning subclass"
    assert isinstance(module, str), "module must be a string"
    assert isinstance(lineno, int) and lineno >= 0, \
           "lineno must be an int >= 0"
    _add_filter(action, re.compile(message, re.I), category,
            re.compile(module), lineno, append=append)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def getDetailList(self,content):
        pattern =re.compile(r'<h2><a target="_blank" href="(.*?)"'\
            +r'title="(.*?)">', re.S
            )
        #uf-8??????
        file = open('file.txt', 'w',encoding='gbk')

        file.write(content)

        file.close()

        result = re.findall(pattern, content)
        if not result:
            print('???????..............') 
        for item in result:
            self.getDetailPic(item)
项目:Blender-power-sequencer    作者:GDquest    | 项目源码 | 文件源码
def create_text_file(self, name):
        """
        Create a new text file, change its name and return it
        Args:
        - name, a string to use as the new text file's name"""
        import re
        bpy.ops.text.new()
        re_text = re.compile(r'^Text.[0-9]{3}$')

        text_name = ''
        text_index, max_index = 0, 0
        for text in bpy.data.texts:
            if re_text.match(text.name):
                text_index = int(text.name[-3:])
                if text_index > max_index:
                    max_index = text_index
                    text_name = text.name
        if not text_name:
            text_name = 'Text'

        bpy.data.texts[text_name].name = name
        return bpy.data.texts[name]
项目:Blender-power-sequencer    作者:GDquest    | 项目源码 | 文件源码
def create_text_file(name):
    """
    Create a new text file, change its name and return it
    Args:
    - name, the name of the text file, a string"""
    if not name and isinstance(name, str):
        raise TypeError('The name of the text file has to be a string')

    bpy.ops.text.new()
    import re
    re_text = re.compile(r'^Text.[0-9]{3}$')

    text_name = ''
    text_index, max_index = 0, 0
    for text in bpy.data.texts:
        if re_text.match(text.name):
            text_index = int(text.name[-3:])
            if text_index > max_index:
                max_index = text_index
                text_name = text.name
    if not text_name:
        text_name = 'Text'

    bpy.data.texts[text_name].name = name
    return bpy.data.texts[name]
项目:fxnn    作者:khaotik    | 项目源码 | 文件源码
def load_params(self, f_, filter_=None):
        di = pickle.load(f_)
        if filter_ is None:
            for k,v in di.items():
                p = self._vars_di[k].get_value(borrow=True)
                if p.shape != v.shape:
                    raise ValueError('Shape mismatch, need %s, got %s'%(v.shape, p.shape), p.shape)
                self._vars_di[k].set_value(v)
        else:
            pat = re.compile(filter_)
            for k,v in di.items():
                if not pat.fullmatch(k): continue
                p = self._vars_di[k].get_value(borrow=True)
                if p.shape != v.shape:
                    raise ValueError('Shape mismatch, need %s, got %s'%(v.shape, p.shape), p.shape)
                self._vars_di[k].set_value(v)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def characterErrorsUCS2(self, data):
        # Someone picked the wrong compile option
        # You lose
        skip = False
        for match in invalid_unicode_re.finditer(data):
            if skip:
                continue
            codepoint = ord(match.group())
            pos = match.start()
            # Pretty sure there should be endianness issues here
            if _utils.isSurrogatePair(data[pos:pos + 2]):
                # We have a surrogate pair!
                char_val = _utils.surrogatePairToCodepoint(data[pos:pos + 2])
                if char_val in non_bmp_invalid_codepoints:
                    self.errors.append("invalid-codepoint")
                skip = True
            elif (codepoint >= 0xD800 and codepoint <= 0xDFFF and
                  pos == len(data) - 1):
                self.errors.append("invalid-codepoint")
            else:
                skip = False
                self.errors.append("invalid-codepoint")
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def run_script(self, script_name, namespace):
        script = 'scripts/' + script_name
        if not self.has_metadata(script):
            raise ResolutionError("No script named %r" % script_name)
        script_text = self.get_metadata(script).replace('\r\n', '\n')
        script_text = script_text.replace('\r', '\n')
        script_filename = self._fn(self.egg_info, script)
        namespace['__file__'] = script_filename
        if os.path.exists(script_filename):
            source = open(script_filename).read()
            code = compile(source, script_filename, 'exec')
            exec(code, namespace, namespace)
        else:
            from linecache import cache
            cache[script_filename] = (
                len(script_text), 0, script_text.split('\n'), script_filename
            )
            script_code = compile(script_text, script_filename, 'exec')
            exec(script_code, namespace, namespace)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def get_encodings_from_content(content):
    """Returns encodings from given content string.

    :param content: bytestring to extract encodings from.
    """
    warnings.warn((
        'In requests 3.0, get_encodings_from_content will be removed. For '
        'more information, please see the discussion on issue #2266. (This'
        ' warning should only appear once.)'),
        DeprecationWarning)

    charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
    pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
    xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')

    return (charset_re.findall(content) +
            pragma_re.findall(content) +
            xml_re.findall(content))
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def run_script(self, script_name, namespace):
        script = 'scripts/' + script_name
        if not self.has_metadata(script):
            raise ResolutionError("No script named %r" % script_name)
        script_text = self.get_metadata(script).replace('\r\n', '\n')
        script_text = script_text.replace('\r', '\n')
        script_filename = self._fn(self.egg_info, script)
        namespace['__file__'] = script_filename
        if os.path.exists(script_filename):
            source = open(script_filename).read()
            code = compile(source, script_filename, 'exec')
            exec(code, namespace, namespace)
        else:
            from linecache import cache
            cache[script_filename] = (
                len(script_text), 0, script_text.split('\n'), script_filename
            )
            script_code = compile(script_text, script_filename, 'exec')
            exec(script_code, namespace, namespace)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def get_encodings_from_content(content):
    """Returns encodings from given content string.

    :param content: bytestring to extract encodings from.
    """
    warnings.warn((
        'In requests 3.0, get_encodings_from_content will be removed. For '
        'more information, please see the discussion on issue #2266. (This'
        ' warning should only appear once.)'),
        DeprecationWarning)

    charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
    pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
    xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')

    return (charset_re.findall(content) +
            pragma_re.findall(content) +
            xml_re.findall(content))
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _needs_hiding(mod_name):
    """
    >>> _needs_hiding('setuptools')
    True
    >>> _needs_hiding('pkg_resources')
    True
    >>> _needs_hiding('setuptools_plugin')
    False
    >>> _needs_hiding('setuptools.__init__')
    True
    >>> _needs_hiding('distutils')
    True
    >>> _needs_hiding('os')
    False
    >>> _needs_hiding('Cython')
    True
    """
    pattern = re.compile(r'(setuptools|pkg_resources|distutils|Cython)(\.|$)')
    return bool(pattern.match(mod_name))
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def byte_compile(self, to_compile):
        if sys.dont_write_bytecode:
            self.warn('byte-compiling is disabled, skipping.')
            return

        from distutils.util import byte_compile

        try:
            # try to make the byte compile messages quieter
            log.set_verbosity(self.verbose - 1)

            byte_compile(to_compile, optimize=0, force=1, dry_run=self.dry_run)
            if self.optimize:
                byte_compile(
                    to_compile, optimize=self.optimize, force=1,
                    dry_run=self.dry_run,
                )
        finally:
            log.set_verbosity(self.verbose)  # restore original verbosity
项目:NeoVintageous    作者:NeoVintageous    | 项目源码 | 文件源码
def expect_match(self, pattern, on_error=None):
        """
        Require @item to match at the current `position`.

        Raises a ValueError if @item does not match.

        @pattern
          A regular expression.

        @on_error
          A function that returns an error. The error returned overrides the
          default ValueError.
        """
        m = re.compile(pattern).match(self.source, self.position)
        if m:
            self.position += m.end() - m.start()

            return m

        if not on_error:
            raise ValueError('expected match with \'{0}\', at \'{1}\''.format(pattern, self.source[self.position:]))

        raise on_error()
项目:NeoVintageous    作者:NeoVintageous    | 项目源码 | 文件源码
def match(self, pattern):
        """
        Return the match obtained by searching @pattern.

        The current `position` will advance as many characters as the match's
        length.

        @pattern
          A regular expression.
        """
        m = re.compile(pattern).match(self.source, self.position)
        if m:
            self.position += m.end() - m.start()

            return m

        return
项目:Crawler_and_Share    作者:f496328mm    | 项目源码 | 文件源码
def craw_last_index(ptt_class_name):   
    #ptt_class_name = 'Soft_Job'
    index_url = 'https://www.ptt.cc/bbs/' + ptt_class_name + '/index.html'
    res = requests.get(index_url,verify = True)
    soup3 = BeautifulSoup(res.text, "lxml")   

    x = soup3('',{'class':"btn wide"},text = re.compile('??'))
    last_index = x[0]['href']
    last_index = last_index.replace('/bbs/' + ptt_class_name + '/index','')
    last_index = int( last_index.replace('.html','') )+1

    return last_index
#--------------------------------------------------------------------------------- 
# ?? ubuntu - crontab-e, ????, ??????? data 
# ?? PTT ????, ???????, ??????, 
# ??????DATA, ???? index ??????, ??????? data,
# ?????, ??????
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def get_iface_from_addr(addr):
    """Work out on which interface the provided address is configured."""
    for iface in netifaces.interfaces():
        addresses = netifaces.ifaddresses(iface)
        for inet_type in addresses:
            for _addr in addresses[inet_type]:
                _addr = _addr['addr']
                # link local
                ll_key = re.compile("(.+)%.*")
                raw = re.match(ll_key, _addr)
                if raw:
                    _addr = raw.group(1)

                if _addr == addr:
                    log("Address '%s' is configured on iface '%s'" %
                        (addr, iface))
                    return iface

    msg = "Unable to infer net iface on which '%s' is configured" % (addr)
    raise Exception(msg)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def get_iface_from_addr(addr):
    """Work out on which interface the provided address is configured."""
    for iface in netifaces.interfaces():
        addresses = netifaces.ifaddresses(iface)
        for inet_type in addresses:
            for _addr in addresses[inet_type]:
                _addr = _addr['addr']
                # link local
                ll_key = re.compile("(.+)%.*")
                raw = re.match(ll_key, _addr)
                if raw:
                    _addr = raw.group(1)

                if _addr == addr:
                    log("Address '%s' is configured on iface '%s'" %
                        (addr, iface))
                    return iface

    msg = "Unable to infer net iface on which '%s' is configured" % (addr)
    raise Exception(msg)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def parse_treasury_csv_column(column):
    """
    Parse a treasury CSV column into a more human-readable format.

    Columns start with 'RIFLGFC', followed by Y or M (year or month), followed
    by a two-digit number signifying number of years/months, followed by _N.B.
    We only care about the middle two entries, which we turn into a string like
    3month or 30year.
    """
    column_re = re.compile(
        r"^(?P<prefix>RIFLGFC)"
        "(?P<unit>[YM])"
        "(?P<periods>[0-9]{2})"
        "(?P<suffix>_N.B)$"
    )

    match = column_re.match(column)
    if match is None:
        raise ValueError("Couldn't parse CSV column %r." % column)
    unit, periods = get_unit_and_periods(match.groupdict())

    # Roundtrip through int to coerce '06' into '6'.
    return str(int(periods)) + ('year' if unit == 'Y' else 'month')
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def get_transcript_gc_content(self, transcript_obj):
        pattern = re.compile('[cCgG]')

        gc, length = 0, 0
        for interval in transcript_obj.intervals:
            if interval.chrom not in self.chroms:
                continue

            seq = self.chroms[interval.chrom][interval.start:interval.end]
            gc += len(re.findall(pattern, seq))
            length += interval.length

        if length > 0:
            return float(gc) / float(length)
        else:
            return 0

# NOTE: these stub classes are necessary to maintain backwards compatibility with old refdata (1.2 or older)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def default(self, line):
        if line[:1] == '!': line = line[1:]
        locals = self.curframe_locals
        globals = self.curframe.f_globals
        try:
            code = compile(line + '\n', '<stdin>', 'single')
            save_stdout = sys.stdout
            save_stdin = sys.stdin
            save_displayhook = sys.displayhook
            try:
                sys.stdin = self.stdin
                sys.stdout = self.stdout
                sys.displayhook = self.displayhook
                exec code in globals, locals
            finally:
                sys.stdout = save_stdout
                sys.stdin = save_stdin
                sys.displayhook = save_displayhook
        except:
            t, v = sys.exc_info()[:2]
            if type(t) == type(''):
                exc_type_name = t
            else: exc_type_name = t.__name__
            print >>self.stdout, '***', exc_type_name + ':', v
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_value, tb):
        if exc_type is None:
            try:
                exc_name = self.expected.__name__
            except AttributeError:
                exc_name = str(self.expected)
            raise self.failureException(
                "{0} not raised".format(exc_name))
        if not issubclass(exc_type, self.expected):
            # let unexpected exceptions pass through
            return False
        self.exception = exc_value # store for later retrieval
        if self.expected_regexp is None:
            return True

        expected_regexp = self.expected_regexp
        if isinstance(expected_regexp, basestring):
            expected_regexp = re.compile(expected_regexp)
        if not expected_regexp.search(str(exc_value)):
            raise self.failureException('"%s" does not match "%s"' %
                     (expected_regexp.pattern, str(exc_value)))
        return True
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def descriptions(self, group_pattern):
        """Get descriptions for a range of groups."""
        line_pat = re.compile("^(?P<group>[^ \t]+)[ \t]+(.*)$")
        # Try the more std (acc. to RFC2980) LIST NEWSGROUPS first
        resp, raw_lines = self.longcmd('LIST NEWSGROUPS ' + group_pattern)
        if resp[:3] != "215":
            # Now the deprecated XGTITLE.  This either raises an error
            # or succeeds with the same output structure as LIST
            # NEWSGROUPS.
            resp, raw_lines = self.longcmd('XGTITLE ' + group_pattern)
        lines = []
        for raw_line in raw_lines:
            match = line_pat.search(raw_line.strip())
            if match:
                lines.append(match.group(1, 2))
        return resp, lines
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def xhdr(self, hdr, str, file=None):
        """Process an XHDR command (optional server extension).  Arguments:
        - hdr: the header type (e.g. 'subject')
        - str: an article nr, a message id, or a range nr1-nr2
        Returns:
        - resp: server response if successful
        - list: list of (nr, value) strings"""

        pat = re.compile('^([0-9]+) ?(.*)\n?')
        resp, lines = self.longcmd('XHDR ' + hdr + ' ' + str, file)
        for i in range(len(lines)):
            line = lines[i]
            m = pat.match(line)
            if m:
                lines[i] = m.group(1, 2)
        return resp, lines
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def parse150(resp):
    '''Parse the '150' response for a RETR request.
    Returns the expected transfer size or None; size is not guaranteed to
    be present in the 150 message.
    '''
    if resp[:3] != '150':
        raise error_reply, resp
    global _150_re
    if _150_re is None:
        import re
        _150_re = re.compile("150 .* \((\d+) bytes\)", re.IGNORECASE)
    m = _150_re.match(resp)
    if not m:
        return None
    s = m.group(1)
    try:
        return int(s)
    except (OverflowError, ValueError):
        return long(s)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def parse227(resp):
    '''Parse the '227' response for a PASV request.
    Raises error_proto if it does not contain '(h1,h2,h3,h4,p1,p2)'
    Return ('host.addr.as.numbers', port#) tuple.'''

    if resp[:3] != '227':
        raise error_reply, resp
    global _227_re
    if _227_re is None:
        import re
        _227_re = re.compile(r'(\d+),(\d+),(\d+),(\d+),(\d+),(\d+)')
    m = _227_re.search(resp)
    if not m:
        raise error_proto, resp
    numbers = m.groups()
    host = '.'.join(numbers[:4])
    port = (int(numbers[4]) << 8) + int(numbers[5])
    return host, port
项目:ross    作者:leonardbot    | 项目源码 | 文件源码
def find_templates():
    """
    Load python modules from templates directory and get templates list

    :return: list of tuples (pairs):
             [(compiled regex, lambda regex_match: return message_data)]
    """
    templates = []
    templates_directory = (inspect.getsourcefile(lambda: 0).rstrip('__init__.py') +
                           'templates')
    template_files = os.listdir(templates_directory)
    for template_file in template_files:
        if template_file.startswith('.') or not template_file.endswith('.py'):
            continue
        # Hack for dev development and disutils
        try:
            template_module = importlib.import_module('templates.{}'.format(
               template_file.rstrip('.py')
            ))
        except ImportError:
            template_module = importlib.import_module('ross.templates.{}'.format(
               template_file.rstrip('.py')
            ))
        # Iterate throw items in template.
        # If there are variable ends with 'templates',
        # extend templates list with it.
        for (name, content) in template_module.__dict__.items():
            if name.endswith('templates'):
                for (regex_text, data_func) in content:
                    templates.append((re.compile(regex_text, re.IGNORECASE), data_func))
    return templates
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def list_nics(nic_type=None):
    """Return a list of nics of given type(s)"""
    if isinstance(nic_type, six.string_types):
        int_types = [nic_type]
    else:
        int_types = nic_type

    interfaces = []
    if nic_type:
        for int_type in int_types:
            cmd = ['ip', 'addr', 'show', 'label', int_type + '*']
            ip_output = subprocess.check_output(cmd).decode('UTF-8')
            ip_output = ip_output.split('\n')
            ip_output = (line for line in ip_output if line)
            for line in ip_output:
                if line.split()[1].startswith(int_type):
                    matched = re.search('.*: (' + int_type +
                                        r'[0-9]+\.[0-9]+)@.*', line)
                    if matched:
                        iface = matched.groups()[0]
                    else:
                        iface = line.split()[1].replace(":", "")

                    if iface not in interfaces:
                        interfaces.append(iface)
    else:
        cmd = ['ip', 'a']
        ip_output = subprocess.check_output(cmd).decode('UTF-8').split('\n')
        ip_output = (line.strip() for line in ip_output if line)

        key = re.compile('^[0-9]+:\s+(.+):')
        for line in ip_output:
            matched = re.search(key, line)
            if matched:
                iface = matched.group(1)
                iface = iface.partition("@")[0]
                if iface not in interfaces:
                    interfaces.append(iface)

    return interfaces
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def get_iface_from_addr(addr):
    """Work out on which interface the provided address is configured."""
    for iface in netifaces.interfaces():
        addresses = netifaces.ifaddresses(iface)
        for inet_type in addresses:
            for _addr in addresses[inet_type]:
                _addr = _addr['addr']
                # link local
                ll_key = re.compile("(.+)%.*")
                raw = re.match(ll_key, _addr)
                if raw:
                    _addr = raw.group(1)

                if _addr == addr:
                    log("Address '%s' is configured on iface '%s'" %
                        (addr, iface))
                    return iface

    msg = "Unable to infer net iface on which '%s' is configured" % (addr)
    raise Exception(msg)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
        """Wait for rmq units extended status to show cluster readiness,
        after an optional initial sleep period.  Initial sleep is likely
        necessary to be effective following a config change, as status
        message may not instantly update to non-ready."""

        if init_sleep:
            time.sleep(init_sleep)

        message = re.compile('^Unit is ready and clustered$')
        deployment._auto_wait_for_status(message=message,
                                         timeout=timeout,
                                         include_only=['rabbitmq-server'])
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def splituser(host):
    '''urllib.splituser(), but six's support of this seems broken'''
    _userprog = re.compile('^(.*)@(.*)$')
    match = _userprog.match(host)
    if match:
        return match.group(1, 2)
    return None, host
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def splitpasswd(user):
    '''urllib.splitpasswd(), but six's support of this is missing'''
    _passwdprog = re.compile('^([^:]*):(.*)$', re.S)
    match = _passwdprog.match(user)
    if match:
        return match.group(1, 2)
    return user, None
项目:redberry    作者:michaelcho    | 项目源码 | 文件源码
def strip_tags(text, strip_punctuation=False):
        # Return only the words from content, stripping punctuation and HTML.
        soup = BeautifulSoup(text)

        if strip_punctuation:
            punctuation = re.compile('[{}]+'.format(re.escape(p)))
            words_only = punctuation.sub('', soup.get_text())
            return words_only

        words_only = soup.get_text()
        return words_only
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def __init__(self):
        threading.Thread.__init__(self)
        self.finished = threading.Event()

        # Give these some initial values
        self.mouse_position_x = 0
        self.mouse_position_y = 0
        self.ison = {"shift":False, "caps":False}

        # Compile our regex statements.
        self.isshift = re.compile('^Shift')
        self.iscaps = re.compile('^Caps_Lock')
        self.shiftablechar = re.compile('^[a-z0-9]$|^minus$|^equal$|^bracketleft$|^bracketright$|^semicolon$|^backslash$|^apostrophe$|^comma$|^period$|^slash$|^grave$')
        self.logrelease = re.compile('.*')
        self.isspace = re.compile('^space$')

        # Assign default function actions (do nothing).
        self.KeyDown = lambda x: True
        self.KeyUp = lambda x: True
        self.MouseAllButtonsDown = lambda x: True
        self.MouseAllButtonsUp = lambda x: True

        self.contextEventMask = [X.KeyPress,X.MotionNotify]

        # Hook to our display.
        self.local_dpy = display.Display()
        self.record_dpy = display.Display()
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def _search_for_query(self, query):
        if query in self._search_pattern_cache:
            return self._search_pattern_cache[query]

        # Build pattern: include all characters
        pattern = []
        for c in query:
            # pattern.append('[^{0}]*{0}'.format(re.escape(c)))
            pattern.append('.*?{0}'.format(re.escape(c)))
        pattern = ''.join(pattern)
        search = re.compile(pattern, re.IGNORECASE).search

        self._search_pattern_cache[query] = search
        return search
项目:txt2evernote    作者:Xunius    | 项目源码 | 文件源码
def defSyntax(self):
        '''Define re patterns according to syntax.'''

        #------------------REGEX patterns------------------

        if self.syntax=='markdown':

            self._img_re=re.compile('^(.*)!\\[(.+?)\\]\\((.+?)\\)', re.M | re.L)
            self._h_re_base = r'''
            (^(.+)[ \t]*\n(=+|-+)[ \t]*\n+)
            |
            (^(\#{%s})  # \1 = string of #'s
            [ \t]*
            (.+?)       # \2 = Header text
            [ \t]*
            (?<!\\)     # ensure not an escaped trailing '#'
            \#*         # optional closing #'s (not counted)
            \n+
            )
            '''
            self._all_h_re=re.compile(self._h_re_base %'1,6', re.X | re.M)

        elif self.syntax=='zim':

            self._img_re=re.compile('^(.*)\\{\\{(.+?)\\}\\}(.*)$', re.M | re.L)
            self._h_re_base = r'''
                ^(\={%s})  # \1 = string of ='s
                [ \t]*
                (.+?)       # \2 = Header text
                [ \t]*
                \1
                \n+
                '''
            self._all_h_re=re.compile(self._h_re_base %'1,6', re.X | re.M)
        else:
            raise Exception("Unknown syntax %s" %self.syntax)

        return
项目:txt2evernote    作者:Xunius    | 项目源码 | 文件源码
def createNoteBook(title,geeknote=None,verbose=True):

    #-------------------Trunc title-------------------
    title=title.strip()
    title=truncStr(title,MAX_NOTEBOOK_TITLE_LEN)

    #-------Make sure title doesnt start with #-------
    tp=textparse.TextParser('markdown')
    _h_re=re.compile(tp._h_re_base %'1,', re.X | re.M)
    m=_h_re.match(title)
    if m:
        title=m.group(6)

    #---------------------Connect---------------------
    if geeknote is None:
        geeknote=GeekNoteConnector()
        geeknote.connectToEvertone()

    #-----------------Check if exists-----------------
    notebooks=geeknote.getEvernote().findNotebooks()
    out.preloader.stop()
    if not isinstance(title,unicode):
        title=unicode(title,'utf8')
    notebooks=[unicode(ii.name,'utf8') for ii in notebooks]

    if title in notebooks:
        out.successMessage('Notebook already exists.')
        return 0
    else:
        out.preloader.setMessage("Creating notebook...")
        result = geeknote.getEvernote().createNotebook(name=title)
        if result:
            out.successMessage("Notebook has been successfully created.")
            return 0
        else:
            out.failureMessage("Error while the process "
                               "of creating the notebook.")
            return tools.exitErr()