我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用operator.not_()。
def delete_proxy(task_id, **config): logger.info('starting delete proxy') ports = config['ports'] programs = [] for p in ports: conf_file = CORVUS_CONF.format(p) if os.path.exists(conf_file): os.remove(conf_file) for f in os.listdir(SUPERVISOR_DIR): if f.startswith('corvus-{}-'.format(p)) and f.endswith('.ini'): programs.append(f[:-4]) os.remove(os.path.join(SUPERVISOR_DIR, f)) logger.info('{} deleted'.format(programs[-1])) run('supervisorctl reread') run('supervisorctl update {}'.format(' '.join(programs))) logger.info('waiting proxy to be stopped') retry = 5 * len(programs) for _ in range(retry): if all(map(operator.not_, map(program_running, programs))): break; os.sleep(2) else: raise TaskException('proxies are still running') delete_proxy_in_dashboard(config) logger.info('successfully delete proxy')
def prev(cls, ea, count): ea = interface.address.within(ea) isStop = lambda ea: _instruction.feature(ea) & idaapi.CF_STOP == idaapi.CF_STOP invalidQ = utils.compose(utils.fap(utils.compose(type.is_code, operator.not_), isStop), any) refs = filter(type.is_code, xref.up(ea)) if len(refs) > 1 and invalidQ(address.prev(ea)): logging.fatal("{:s}.prev({:x}, count={:d}) : Unable to determine previous address due to multiple previous references being available : {:s}".format('.'.join((__name__, cls.__name__)), ea, count, ', '.join(__builtin__.map("{:x}".format,refs)))) return None try: if invalidQ(address.prev(ea)): res = refs[0] count += 1 else: res = address.prev(ea) except: res = ea return cls.prev(res, count-1) if count > 1 else res
def test_operator(self): import operator self.assertIs(operator.truth(0), False) self.assertIs(operator.truth(1), True) with test_support.check_py3k_warnings(): self.assertIs(operator.isCallable(0), False) self.assertIs(operator.isCallable(len), True) self.assertIs(operator.isNumberType(None), False) self.assertIs(operator.isNumberType(0), True) self.assertIs(operator.not_(1), False) self.assertIs(operator.not_(0), True) self.assertIs(operator.isSequenceType(0), False) self.assertIs(operator.isSequenceType([]), True) self.assertIs(operator.contains([], 1), False) self.assertIs(operator.contains([1], 1), True) self.assertIs(operator.isMappingType(1), False) self.assertIs(operator.isMappingType({}), True) self.assertIs(operator.lt(0, 0), False) self.assertIs(operator.lt(0, 1), True) self.assertIs(operator.is_(True, True), True) self.assertIs(operator.is_(True, False), False) self.assertIs(operator.is_not(True, True), False) self.assertIs(operator.is_not(True, False), True)
def number_of_args(fn): """Return the number of positional arguments for a function, or None if the number is variable. Looks inside any decorated functions.""" try: if hasattr(fn, '__wrapped__'): return number_of_args(fn.__wrapped__) if any(p.kind == p.VAR_POSITIONAL for p in signature(fn).parameters.values()): return None else: return sum(p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) for p in signature(fn).parameters.values()) except ValueError: # signatures don't work for built-in operators, so check for a few explicitly UNARY_OPS = [len, op.not_, op.truth, op.abs, op.index, op.inv, op.invert, op.neg, op.pos] BINARY_OPS = [op.lt, op.le, op.gt, op.ge, op.eq, op.ne, op.is_, op.is_not, op.add, op.and_, op.floordiv, op.lshift, op.mod, op.mul, op.or_, op.pow, op.rshift, op.sub, op.truediv, op.xor, op.concat, op.contains, op.countOf, op.delitem, op.getitem, op.indexOf] TERNARY_OPS = [op.setitem] if fn in UNARY_OPS: return 1 elif fn in BINARY_OPS: return 2 elif fn in TERNARY_OPS: return 3 else: raise NotImplementedError("Bult-in operator {} not supported".format(fn))
def add_globals(self): "Add some Scheme standard procedures." import math, cmath, operator as op from functools import reduce self.update(vars(math)) self.update(vars(cmath)) self.update({ '+':op.add, '-':op.sub, '*':op.mul, '/':op.itruediv, 'níl':op.not_, 'agus':op.and_, '>':op.gt, '<':op.lt, '>=':op.ge, '<=':op.le, '=':op.eq, 'mod':op.mod, 'frmh':cmath.sqrt, 'dearbhluach':abs, 'uas':max, 'íos':min, 'cothrom_le?':op.eq, 'ionann?':op.is_, 'fad':len, 'cons':cons, 'ceann':lambda x:x[0], 'tóin':lambda x:x[1:], 'iarcheangail':op.add, 'liosta':lambda *x:list(x), 'liosta?': lambda x:isa(x,list), 'folamh?':lambda x: x == [], 'adamh?':lambda x: not((isa(x, list)) or (x == None)), 'boole?':lambda x: isa(x, bool), 'scag':lambda f, x: list(filter(f, x)), 'cuir_le':lambda proc,l: proc(*l), 'mapáil':lambda p, x: list(map(p, x)), 'lódáil':lambda fn: load(fn), 'léigh':lambda f: f.read(), 'oscail_comhad_ionchuir':open,'dún_comhad_ionchuir':lambda p: p.file.close(), 'oscail_comhad_aschur':lambda f:open(f,'w'), 'dún_comhad_aschur':lambda p: p.close(), 'dac?':lambda x:x is eof_object, 'luacháil':lambda x: evaluate(x), 'scríobh':lambda x,port=sys.stdout:port.write(to_string(x) + '\n')}) return self
def __not__(self): return self.apply_op1(operator.not_).binarize()
def mk_operator(which): ops = {'+':operator.add, '-':operator.sub, '*':operator.mul, '/':operator.truediv, '^':operator.pow, '&&': operator.and_, '||': operator.or_, '!':operator.not_, 'and': operator.and_, 'or': operator.or_, 'not':operator.not_, 'in': lambda x, y: operator.contains(y, x), '=':operator.eq, '<': operator.lt, '<=':operator.le, '>=':operator.ge, '>':operator.gt } return ops[which] # Token makers
def test_not(self): # Check that exceptions in __bool__ are properly # propagated by the not operator import operator class Exc(Exception): pass class Bad: def __bool__(self): raise Exc def do(bad): not bad for func in (do, operator.not_): self.assertRaises(Exc, func, Bad())
def test_operator(self): import operator self.assertIs(operator.truth(0), False) self.assertIs(operator.truth(1), True) self.assertIs(operator.not_(1), False) self.assertIs(operator.not_(0), True) self.assertIs(operator.contains([], 1), False) self.assertIs(operator.contains([1], 1), True) self.assertIs(operator.lt(0, 0), False) self.assertIs(operator.lt(0, 1), True) self.assertIs(operator.is_(True, True), True) self.assertIs(operator.is_(True, False), False) self.assertIs(operator.is_not(True, True), False) self.assertIs(operator.is_not(True, False), True)
def parse_pr_message(message): message_by_line = message.split("\n") if len(message) == 0: return None, None title = message_by_line[0] body = "\n".join(itertools.dropwhile( operator.not_, message_by_line[1:])) return title, body
def prevreg(cls, ea, reg, *regs, **modifiers): regs = (reg,) + regs count = modifiers.get('count', 1) args = ', '.join(["{:x}".format(ea)] + __builtin__.map("\"{:s}\"".format, regs) + __builtin__.map(utils.unbox("{:s}={!r}".format), modifiers.items())) # generate each helper using the regmatch class iterops = interface.regmatch.modifier(**modifiers) uses_register = interface.regmatch.use(regs) # if within a function, then sure we're within the chunk's bounds. if function.within(ea): (start, _) = function.chunk(ea) fwithin = functools.partial(operator.le, start) # otherwise ensure that we're not in the function and we're a code type. else: fwithin = utils.compose(utils.fap(utils.compose(function.within, operator.not_), type.is_code), all) start = cls.walk(ea, cls.prev, fwithin) start = top() if start == idaapi.BADADDR else start # define a function for cls.walk to continue looping while F = lambda ea: fwithin(ea) and not any(uses_register(ea, opnum) for opnum in iterops(ea)) # skip the current address prevea = cls.prev(ea) if prevea is None: # FIXME: include registers in message logging.fatal("{:s}.prevreg({:s}, ...) : Unable to start walking from previous address. : {:x}".format('.'.join((__name__, cls.__name__)), args, ea)) return ea # now walk while none of our registers match res = cls.walk(prevea, cls.prev, F) if res == idaapi.BADADDR or (cls == address and res < start): # FIXME: include registers in message raise ValueError("{:s}.prevreg({:s}, ...) : Unable to find register{:s} within chunk. {:x}:{:x} : {:x}".format('.'.join((__name__, cls.__name__)), args, ('s','')[len(regs)>1], start, ea, res)) # recurse if the user specified it modifiers['count'] = count - 1 return cls.prevreg( cls.prev(res), *regs, **modifiers) if count > 1 else res
def nextreg(cls, ea, reg, *regs, **modifiers): regs = (reg,) + regs count = modifiers.get('count',1) args = ', '.join(["{:x}".format(ea)] + __builtin__.map("\"{:s}\"".format, regs) + __builtin__.map(utils.unbox("{:s}={!r}".format), modifiers.items())) # generate each helper using the regmatch class iterops = interface.regmatch.modifier(**modifiers) uses_register = interface.regmatch.use(regs) # if within a function, then sure we're within the chunk's bounds. if function.within(ea): (_,end) = function.chunk(ea) fwithin = functools.partial(operator.gt, end) # otherwise ensure that we're not in a function and we're a code type. else: fwithin = utils.compose(utils.fap(utils.compose(function.within, operator.not_), type.is_code), all) end = cls.walk(ea, cls.next, fwithin) end = bottom() if end == idaapi.BADADDR else end # define a function for cls.walk to continue looping while F = lambda ea: fwithin(ea) and not any(uses_register(ea, opnum) for opnum in iterops(ea)) # skip the current address nextea = cls.next(ea) if nextea is None: # FIXME: include registers in message logging.fatal("{:s}.nextreg({:s}) : Unable to start walking from next address. : {:x}".format('.'.join((__name__, cls.__name__)), args, ea)) return ea # now walk while none of our registers match res = cls.walk(nextea, cls.next, F) if res == idaapi.BADADDR or (cls == address and res >= end): # FIXME: include registers in message raise ValueError("{:s}.nextreg({:s}, ...) : Unable to find register{:s} within chunk {:x}:{:x} : {:x}".format('.'.join((__name__, cls.__name__)), args, ('s','')[len(regs)>1], end, ea, res)) # recurse if the user specified it modifiers['count'] = count - 1 return cls.nextreg(cls.next(res), *regs, **modifiers) if count > 1 else res
def next(cls, ea, count): ea = interface.address.within(ea) isStop = lambda ea: _instruction.feature(ea) & idaapi.CF_STOP == idaapi.CF_STOP invalidQ = utils.compose(utils.fap(utils.compose(type.is_code, operator.not_), isStop), any) refs = filter(type.is_code, xref.down(ea)) if len(refs) > 1: logging.fatal("{:s}.next({:x}, count={:d}) : Unable to determine next address due to multiple xrefs being available : {:s}".format('.'.join((__name__, cls.__name__)), ea, count, ', '.join(__builtin__.map("{:x}".format,refs)))) return None if invalidQ(ea) and not _instruction.is_jmp(ea): # logging.fatal("{:s}.next({:x}, count={:d}) : Unable to move to next address. Flow has stopped.".format('.'.join((__name__, cls.__name__)), ea, count)) return None res = refs[0] if _instruction.is_jmp(ea) else address.next(ea) return cls.next(res, count-1) if count > 1 else res
def test_not(self): # Check that exceptions in __nonzero__ are properly # propagated by the not operator import operator class Exc(Exception): pass class Bad: def __nonzero__(self): raise Exc def do(bad): not bad for func in (do, operator.not_): self.assertRaises(Exc, func, Bad())
def standard_env(): import math import operator from functools import reduce env = Env() env.update(vars(math)) env.update({'+': lambda *args: reduce(lambda x, y: x + y, args), '-': lambda *args: reduce(lambda x, y: x - y, args), '*': lambda *args: reduce(lambda x, y: x * y, args), '/': lambda *args: reduce(lambda x, y: x / y, args), '>': lambda x, y: x > y, '>=': lambda x, y: x >= y, '<': lambda x, y: x < y, '<=': lambda x, y: x <= y, '=': lambda x, y: x == y, 'abs': abs, 'append': lambda *args: reduce(lambda x, y: x + y, args), 'begin': lambda *x: x[-1], 'car': lambda x: x[0], 'cdr': lambda x: x[1:], 'cons': lambda x, y: [x] + y, 'eq?': lambda x, y: isinstance(x, y), 'equal?': lambda x, y: x == y, 'length': len, 'list': lambda *x: list(x), 'list?': lambda x: isinstance(x, list), 'map': map, 'min': min, 'max': max, 'not': operator.not_, 'null?': lambda x: x == [], 'number?': lambda x: isinstance(x, Number), 'procedure': callable, 'round': round, 'symbol': lambda x: isinstance(x, Symbol) }) return env
def opstr_to_op(opstr): if opstr == "<=": return operator.le elif opstr == ">=": return operator.ge elif opstr == "<": return operator.lt elif opstr == ">": return operator.gt elif opstr == "=": return operator.eq elif opstr == "-": return operator.sub elif opstr == "+": return operator.add elif opstr == "/": return operator.div elif opstr == "*": return operator.mul elif opstr == "and": return operator.and_ elif opstr == "or": return operator.or_ elif opstr == "not": return operator.not_ else: raise Exception("Opstr not supported: {}".format(opstr))
def make_env(): ''' An environment with basic procedures. ''' import math import cmath import itertools import operator as op env = Environment() env.update(vars(math)) env.update(vars(cmath)) env.update(vars(itertools)) env.update({ '>': op.gt, '<': op.lt, '>=': op.ge, '<=': op.le, '=': op.eq, '>>': op.rshift, '<<': op.lshift, '+': lambda *x: reduce(op.add, x, 0), '-': lambda *x: x[0] - sum(x[1:]), '*': lambda *x: reduce(op.mul, x, 1), '/': lambda *x: reduce(op.truediv, (x[1:]), x[0]), '//': lambda *x: reduce(op.floordiv, (x[1:]), x[0]), '%': op.mod, 'abs': abs, 'append': op.add, 'apply': lambda proc,l: proc(*l), 'begin': lambda *x: x[-1], 'bool?': lambda x: isinstance(x, bool), 'call/cc': callcc, 'car': lambda x: x[0], 'cdr': lambda x: x[1:], 'cons': lambda x,y: [x] + y, 'filter': lambda f,l: list(filter(f, l)), 'length': len, 'list': lambda *x: list(x), 'list?': lambda x: isinstance(x,list), # Map can be defined in the stdlib, though it will max out python's recursion depth 'map': lambda f,l: list(map(f, l)), 'max': max, 'min': min, 'not': op.not_, 'number?': lambda x: not isinstance(x, bool) and isinstance(x, int) or isinstance(x, float) or isinstance(x, complex), 'or': op.or_, 'proc?': callable, 'range': lambda *x: list(range(x[0], x[1])) if len(x) > 1 else list(range(x[0])), 'round': round, 'str?': lambda x: isinstance(x, str), 'sum': lambda x: sum(x), }) return env # Create a global env for `gazeval()` to access