Python thread 模块,stopped_on_line() 实例源码

我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用thread.stopped_on_line()

项目:ApiRestPythonTest    作者:rvfvazquez    | 项目源码 | 文件源码
def command_auto_resume(self):
        tid = read_int(self.conn)
        THREADS_LOCK.acquire()
        thread = THREADS[tid]
        THREADS_LOCK.release()

        stepping = thread.stepping
        if ((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and thread.cur_frame.f_lineno != thread.stopped_on_line): 
            report_step_finished(tid)
        else:
            self.command_resume_all()
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def block(self, block_lambda, keep_stopped_on_line = False):
        """blocks the current thread until the debugger resumes it"""
        assert not self._is_blocked
        #assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident())    # we should only ever block ourselves

        # send thread frames before we block
        self.enum_thread_frames_locally()

        if not keep_stopped_on_line:
            self.stopped_on_line = self.cur_frame.f_lineno

        # need to synchronize w/ sending the reason we're blocking
        self._block_starting_lock.acquire()
        self._is_blocked = True
        block_lambda()
        self._block_starting_lock.release()

        while not DETACHED:
            self._block_lock.acquire()
            if self.unblock_work is None:
                break

            # the debugger wants us to do something, do it, and then block again
            self._is_working = True
            self.unblock_work()
            self.unblock_work = None
            self._is_working = False

        self._block_starting_lock.acquire()
        assert self._is_blocked
        self._is_blocked = False
        self._block_starting_lock.release()
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def command_auto_resume(self):
        tid = read_int(self.conn)
        THREADS_LOCK.acquire()
        thread = THREADS[tid]
        THREADS_LOCK.release()

        stepping = thread.stepping
        if ((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and thread.cur_frame.f_lineno != thread.stopped_on_line): 
            report_step_finished(tid)
        else:
            self.command_resume_all()
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def block(self, block_lambda, keep_stopped_on_line = False):
        """blocks the current thread until the debugger resumes it"""
        assert not self._is_blocked
        #assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident())    # we should only ever block ourselves

        # send thread frames before we block
        self.enum_thread_frames_locally()

        if not keep_stopped_on_line:
            self.stopped_on_line = self.cur_frame.f_lineno

        # need to synchronize w/ sending the reason we're blocking
        self._block_starting_lock.acquire()
        self._is_blocked = True
        block_lambda()
        self._block_starting_lock.release()

        while not DETACHED:
            self._block_lock.acquire()
            if self.unblock_work is None:
                break

            # the debugger wants us to do something, do it, and then block again
            self._is_working = True
            self.unblock_work()
            self.unblock_work = None
            self._is_working = False

        self._block_starting_lock.acquire()
        assert self._is_blocked
        self._is_blocked = False
        self._block_starting_lock.release()
项目:HomeAutomation    作者:gs2671    | 项目源码 | 文件源码
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
项目:HomeAutomation    作者:gs2671    | 项目源码 | 文件源码
def block(self, block_lambda, keep_stopped_on_line = False):
        """blocks the current thread until the debugger resumes it"""
        assert not self._is_blocked
        #assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident())    # we should only ever block ourselves

        # send thread frames before we block
        self.enum_thread_frames_locally()

        if not keep_stopped_on_line:
            self.stopped_on_line = self.cur_frame.f_lineno

        # need to synchronize w/ sending the reason we're blocking
        self._block_starting_lock.acquire()
        self._is_blocked = True
        block_lambda()
        self._block_starting_lock.release()

        while not DETACHED:
            self._block_lock.acquire()
            if self.unblock_work is None:
                break

            # the debugger wants us to do something, do it, and then block again
            self._is_working = True
            self.unblock_work()
            self.unblock_work = None
            self._is_working = False

        self._block_starting_lock.acquire()
        assert self._is_blocked
        self._is_blocked = False
        self._block_starting_lock.release()
项目:HomeAutomation    作者:gs2671    | 项目源码 | 文件源码
def command_auto_resume(self):
        tid = read_int(self.conn)
        THREADS_LOCK.acquire()
        thread = THREADS[tid]
        THREADS_LOCK.release()

        stepping = thread.stepping
        if ((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and thread.cur_frame.f_lineno != thread.stopped_on_line): 
            report_step_finished(tid)
        else:
            self.command_resume_all()
项目:xidian-sfweb    作者:Gear420    | 项目源码 | 文件源码
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
项目:xidian-sfweb    作者:Gear420    | 项目源码 | 文件源码
def block(self, block_lambda, keep_stopped_on_line = False):
        """blocks the current thread until the debugger resumes it"""
        assert not self._is_blocked
        #assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident())    # we should only ever block ourselves

        # send thread frames before we block
        self.enum_thread_frames_locally()

        if not keep_stopped_on_line:
            self.stopped_on_line = self.cur_frame.f_lineno

        # need to synchronize w/ sending the reason we're blocking
        self._block_starting_lock.acquire()
        self._is_blocked = True
        block_lambda()
        self._block_starting_lock.release()

        while not DETACHED:
            self._block_lock.acquire()
            if self.unblock_work is None:
                break

            # the debugger wants us to do something, do it, and then block again
            self._is_working = True
            self.unblock_work()
            self.unblock_work = None
            self._is_working = False

        self._block_starting_lock.acquire()
        assert self._is_blocked
        self._is_blocked = False
        self._block_starting_lock.release()
项目:xidian-sfweb    作者:Gear420    | 项目源码 | 文件源码
def command_auto_resume(self):
        tid = read_int(self.conn)
        THREADS_LOCK.acquire()
        thread = THREADS[tid]
        THREADS_LOCK.release()

        stepping = thread.stepping
        if ((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and thread.cur_frame.f_lineno != thread.stopped_on_line): 
            report_step_finished(tid)
        else:
            self.command_resume_all()
项目:skojjt    作者:martin-green    | 项目源码 | 文件源码
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
项目:skojjt    作者:martin-green    | 项目源码 | 文件源码
def block(self, block_lambda, keep_stopped_on_line = False):
        """blocks the current thread until the debugger resumes it"""
        assert not self._is_blocked
        #assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident())    # we should only ever block ourselves

        # send thread frames before we block
        self.enum_thread_frames_locally()

        if not keep_stopped_on_line:
            self.stopped_on_line = self.cur_frame.f_lineno

        # need to synchronize w/ sending the reason we're blocking
        self._block_starting_lock.acquire()
        self._is_blocked = True
        block_lambda()
        self._block_starting_lock.release()

        while not DETACHED:
            self._block_lock.acquire()
            if self.unblock_work is None:
                break

            # the debugger wants us to do something, do it, and then block again
            self._is_working = True
            self.unblock_work()
            self.unblock_work = None
            self._is_working = False

        self._block_starting_lock.acquire()
        assert self._is_blocked
        self._is_blocked = False
        self._block_starting_lock.release()
项目:skojjt    作者:martin-green    | 项目源码 | 文件源码
def command_auto_resume(self):
        tid = read_int(self.conn)
        THREADS_LOCK.acquire()
        thread = THREADS[tid]
        THREADS_LOCK.release()

        stepping = thread.stepping
        if ((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and thread.cur_frame.f_lineno != thread.stopped_on_line): 
            report_step_finished(tid)
        else:
            self.command_resume_all()
项目:DjangoWebProject    作者:wrkettlitz    | 项目源码 | 文件源码
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
项目:DjangoWebProject    作者:wrkettlitz    | 项目源码 | 文件源码
def block(self, block_lambda, keep_stopped_on_line = False):
        """blocks the current thread until the debugger resumes it"""
        assert not self._is_blocked
        #assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident())    # we should only ever block ourselves

        # send thread frames before we block
        self.enum_thread_frames_locally()

        if not keep_stopped_on_line:
            self.stopped_on_line = self.cur_frame.f_lineno

        # need to synchronize w/ sending the reason we're blocking
        self._block_starting_lock.acquire()
        self._is_blocked = True
        block_lambda()
        self._block_starting_lock.release()

        while not DETACHED:
            self._block_lock.acquire()
            if self.unblock_work is None:
                break

            # the debugger wants us to do something, do it, and then block again
            self._is_working = True
            self.unblock_work()
            self.unblock_work = None
            self._is_working = False

        self._block_starting_lock.acquire()
        assert self._is_blocked
        self._is_blocked = False
        self._block_starting_lock.release()
项目:DjangoWebProject    作者:wrkettlitz    | 项目源码 | 文件源码
def command_auto_resume(self):
        tid = read_int(self.conn)
        THREADS_LOCK.acquire()
        thread = THREADS[tid]
        THREADS_LOCK.release()

        stepping = thread.stepping
        if ((stepping == STEPPING_OVER or stepping == STEPPING_INTO) and thread.cur_frame.f_lineno != thread.stopped_on_line): 
            report_step_finished(tid)
        else:
            self.command_resume_all()
项目:ApiRestPythonTest    作者:rvfvazquez    | 项目源码 | 文件源码
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
项目:ApiRestPythonTest    作者:rvfvazquez    | 项目源码 | 文件源码
def block(self, block_lambda, keep_stopped_on_line = False):
        """blocks the current thread until the debugger resumes it"""
        assert not self._is_blocked
        #assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident())    # we should only ever block ourselves

        # send thread frames before we block
        self.enum_thread_frames_locally()

        if not keep_stopped_on_line:
            self.stopped_on_line = self.cur_frame.f_lineno

        # need to synchronize w/ sending the reason we're blocking
        self._block_starting_lock.acquire()
        self._is_blocked = True
        block_lambda()
        self._block_starting_lock.release()

        while not DETACHED:
            self._block_lock.acquire()
            if self.unblock_work is None:
                break

            # the debugger wants us to do something, do it, and then block again
            self._is_working = True
            self.unblock_work()
            self.unblock_work = None
            self._is_working = False

        self._block_starting_lock.acquire()
        assert self._is_blocked
        self._is_blocked = False
        self._block_starting_lock.release()