Python thread 模块,start_new() 实例源码

我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用thread.start_new()

项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def intercept_threads(for_attach = False):
    thread.start_new_thread = thread_creator
    thread.start_new = thread_creator

    # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
    # so that new threads started using it will be intercepted by our code.
    #
    # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
    # treat the current thread as the main thread, which is incorrect when attaching because this code is executing
    # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
    # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
    global _threading
    if _threading is None and 'threading' in sys.modules:
        import threading
        _threading = threading
        _threading._start_new_thread = thread_creator

    global _INTERCEPTING_FOR_ATTACH
    _INTERCEPTING_FOR_ATTACH = for_attach

## Modified parameters by Don Jayamanne
# Accept current Process id to pass back to debugger
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def intercept_threads(for_attach = False):
    thread.start_new_thread = thread_creator
    thread.start_new = thread_creator

    # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
    # so that new threads started using it will be intercepted by our code.
    #
    # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
    # treat the current thread as the main thread, which is incorrect when attaching because this code is executing
    # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
    # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
    global _threading
    if _threading is None and 'threading' in sys.modules:
        import threading
        _threading = threading
        _threading._start_new_thread = thread_creator

    global _INTERCEPTING_FOR_ATTACH
    _INTERCEPTING_FOR_ATTACH = for_attach

## Modified parameters by Don Jayamanne
# Accept current Process id to pass back to debugger
项目:HomeAutomation    作者:gs2671    | 项目源码 | 文件源码
def intercept_threads(for_attach = False):
    thread.start_new_thread = thread_creator
    thread.start_new = thread_creator

    # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
    # so that new threads started using it will be intercepted by our code.
    #
    # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
    # treat the current thread as the main thread, which is incorrect when attaching because this code is executing
    # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
    # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
    global _threading
    if _threading is None and 'threading' in sys.modules:
        import threading
        _threading = threading
        _threading._start_new_thread = thread_creator

    global _INTERCEPTING_FOR_ATTACH
    _INTERCEPTING_FOR_ATTACH = for_attach
项目:xidian-sfweb    作者:Gear420    | 项目源码 | 文件源码
def intercept_threads(for_attach = False):
    thread.start_new_thread = thread_creator
    thread.start_new = thread_creator

    # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
    # so that new threads started using it will be intercepted by our code.
    #
    # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
    # treat the current thread as the main thread, which is incorrect when attaching because this code is executing
    # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
    # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
    global _threading
    if _threading is None and 'threading' in sys.modules:
        import threading
        _threading = threading
        _threading._start_new_thread = thread_creator

    global _INTERCEPTING_FOR_ATTACH
    _INTERCEPTING_FOR_ATTACH = for_attach
项目:skojjt    作者:martin-green    | 项目源码 | 文件源码
def intercept_threads(for_attach = False):
    thread.start_new_thread = thread_creator
    thread.start_new = thread_creator

    # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
    # so that new threads started using it will be intercepted by our code.
    #
    # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
    # treat the current thread as the main thread, which is incorrect when attaching because this code is executing
    # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
    # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
    global _threading
    if _threading is None and 'threading' in sys.modules:
        import threading
        _threading = threading
        _threading._start_new_thread = thread_creator

    global _INTERCEPTING_FOR_ATTACH
    _INTERCEPTING_FOR_ATTACH = for_attach
项目:DjangoWebProject    作者:wrkettlitz    | 项目源码 | 文件源码
def intercept_threads(for_attach = False):
    thread.start_new_thread = thread_creator
    thread.start_new = thread_creator

    # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
    # so that new threads started using it will be intercepted by our code.
    #
    # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
    # treat the current thread as the main thread, which is incorrect when attaching because this code is executing
    # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
    # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
    global _threading
    if _threading is None and 'threading' in sys.modules:
        import threading
        _threading = threading
        _threading._start_new_thread = thread_creator

    global _INTERCEPTING_FOR_ATTACH
    _INTERCEPTING_FOR_ATTACH = for_attach
项目:ApiRestPythonTest    作者:rvfvazquez    | 项目源码 | 文件源码
def intercept_threads(for_attach = False):
    thread.start_new_thread = thread_creator
    thread.start_new = thread_creator

    # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
    # so that new threads started using it will be intercepted by our code.
    #
    # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
    # treat the current thread as the main thread, which is incorrect when attaching because this code is executing
    # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
    # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
    global _threading
    if _threading is None and 'threading' in sys.modules:
        import threading
        _threading = threading
        _threading._start_new_thread = thread_creator

    global _INTERCEPTING_FOR_ATTACH
    _INTERCEPTING_FOR_ATTACH = for_attach
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def __init__(self, func, args=(), kwargs={}):
        import thread
        def thread_bkcall():
            try:
                self.ret=func(*args, **kwargs)
                self.done=1
            except:
                self.exc=sys.exc_info()
                self.done=2
        self.id=thread.start_new(thread_bkcall, ())
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def detach_process():
    global DETACHED
    DETACHED = True
    if not _INTERCEPTING_FOR_ATTACH:
        if isinstance(sys.stdout, _DebuggerOutput): 
            sys.stdout = sys.stdout.old_out
        if isinstance(sys.stderr, _DebuggerOutput):
            sys.stderr = sys.stderr.old_out

    if not _INTERCEPTING_FOR_ATTACH:
        thread.start_new_thread = _start_new_thread
        thread.start_new = _start_new_thread
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def detach_process():
    global DETACHED
    DETACHED = True
    if not _INTERCEPTING_FOR_ATTACH:
        if isinstance(sys.stdout, _DebuggerOutput): 
            sys.stdout = sys.stdout.old_out
        if isinstance(sys.stderr, _DebuggerOutput):
            sys.stderr = sys.stderr.old_out

    if not _INTERCEPTING_FOR_ATTACH:
        thread.start_new_thread = _start_new_thread
        thread.start_new = _start_new_thread
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def BeginThreadsSimpleMarshal(numThreads, cookie):
    """Creates multiple threads using simple (but slower) marshalling.

    Single interpreter object, but a new stream is created per thread.

    Returns the handles the threads will set when complete.
    """
    ret = []
    for i in range(numThreads):
        hEvent = win32event.CreateEvent(None, 0, 0, None)
        thread.start_new(TestInterpInThread, (hEvent, cookie))
        ret.append(hEvent)
    return ret
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def BeginThreadsSimpleMarshal(numThreads, cookie):
    """Creates multiple threads using simple (but slower) marshalling.

    Single interpreter object, but a new stream is created per thread.

    Returns the handles the threads will set when complete.
    """
    ret = []
    for i in range(numThreads):
        hEvent = win32event.CreateEvent(None, 0, 0, None)
        thread.start_new(TestInterpInThread, (hEvent, cookie))
        ret.append(hEvent)
    return ret
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def run(self, nworkers):
        if not self.work:
            return # Nothing to do
        for i in range(nworkers-1):
            thread.start_new(self._worker, ())
        self._worker()
        self.todo.acquire()


# Main program
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def main():
    if len(sys.argv) < 2:
        sys.stderr.write('usage: telnet hostname [port]\n')
        sys.exit(2)
    host = sys.argv[1]
    try:
        hostaddr = gethostbyname(host)
    except error:
        sys.stderr.write(sys.argv[1] + ': bad host name\n')
        sys.exit(2)
    #
    if len(sys.argv) > 2:
        servname = sys.argv[2]
    else:
        servname = 'telnet'
    #
    if '0' <= servname[:1] <= '9':
        port = eval(servname)
    else:
        try:
            port = getservbyname(servname, 'tcp')
        except error:
            sys.stderr.write(servname + ': bad tcp service name\n')
            sys.exit(2)
    #
    s = socket(AF_INET, SOCK_STREAM)
    #
    try:
        s.connect((host, port))
    except error, msg:
        sys.stderr.write('connect failed: %r\n' % (msg,))
        sys.exit(1)
    #
    thread.start_new(child, (s,))
    parent(s)
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def startPycheckerRun(self):
        self.result=None
        old=win32api.SetCursor(win32api.LoadCursor(0, win32con.IDC_APPSTARTING))
        win32ui.GetApp().AddIdleHandler(self.idleHandler)
        import thread
        thread.start_new(self.threadPycheckerRun,())
        ##win32api.SetCursor(old)
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def __init__(self, *args):
        winout.WindowOutput.__init__(*(self,)+args)
        self.hStopThread = win32event.CreateEvent(None, 0, 0, None)
        thread.start_new(CollectorThread, (self.hStopThread, self))
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def BeginThreadsSimpleMarshal(numThreads, cookie):
    """Creates multiple threads using simple (but slower) marshalling.

    Single interpreter object, but a new stream is created per thread.

    Returns the handles the threads will set when complete.
    """
    ret = []
    for i in range(numThreads):
        hEvent = win32event.CreateEvent(None, 0, 0, None)
        thread.start_new(TestInterpInThread, (hEvent, cookie))
        ret.append(hEvent)
    return ret
项目:SEF    作者:ahmadnourallah    | 项目源码 | 文件源码
def Is_HTTP_On(on_off):
    if on_off == "ON":
        return thread.start_new(serve_thread_tcp,('', 80,HTTP))
    if on_off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:ahmadnourallah    | 项目源码 | 文件源码
def Is_HTTPS_On(SSL_On_Off):
    if SSL_On_Off == "ON":
        return thread.start_new(serve_thread_SSL,('', 443,DoSSL))
    if SSL_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:ahmadnourallah    | 项目源码 | 文件源码
def Is_WPAD_On(on_off):
    if on_off == True:
        return True
        #return thread.start_new(serve_thread_tcp,('', 3141,ProxyHandler))
    if on_off == False:
        return False

#Function name self-explanatory
项目:SEF    作者:ahmadnourallah    | 项目源码 | 文件源码
def Is_SMB_On(SMB_On_Off):
    if SMB_On_Off == "ON":
        if LM_On_Off == True:
            return thread.start_new(serve_thread_tcp, ('', 445,SMB1LM)),thread.start_new(serve_thread_tcp,('', 139,SMB1LM))
        else:
            return thread.start_new(serve_thread_tcp, ('', 445,SMB1)),thread.start_new(serve_thread_tcp,('', 139,SMB1))
    if SMB_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:ahmadnourallah    | 项目源码 | 文件源码
def Is_Kerberos_On(Krb_On_Off):
    if Krb_On_Off == "ON":
        return thread.start_new(serve_thread_udp,('', 88,KerbUDP)),thread.start_new(serve_thread_tcp,('', 88, KerbTCP))
    if Krb_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:ahmadnourallah    | 项目源码 | 文件源码
def Is_FTP_On(FTP_On_Off):
    if FTP_On_Off == "ON":
        return thread.start_new(serve_thread_tcp,('', 21,FTP))
    if FTP_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:ahmadnourallah    | 项目源码 | 文件源码
def Is_POP_On(POP_On_Off):
    if POP_On_Off == "ON":
        return thread.start_new(serve_thread_tcp,('', 110,POP))
    if POP_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:ahmadnourallah    | 项目源码 | 文件源码
def Is_LDAP_On(LDAP_On_Off):
    if LDAP_On_Off == "ON":
        return thread.start_new(serve_thread_tcp,('', 389,LDAP))
    if LDAP_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:ahmadnourallah    | 项目源码 | 文件源码
def Is_SMTP_On(SMTP_On_Off):
    if SMTP_On_Off == "ON":
        return thread.start_new(serve_thread_tcp,('', 25,ESMTP)),thread.start_new(serve_thread_tcp,('', 587,ESMTP))
    if SMTP_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:ahmadnourallah    | 项目源码 | 文件源码
def Is_IMAP_On(IMAP_On_Off):
    if IMAP_On_Off == "ON":
        return thread.start_new(serve_thread_tcp,('', 143,IMAP))
    if IMAP_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:ahmadnourallah    | 项目源码 | 文件源码
def main():
    try:
        thread.start_new(RunInloop,(Target,Command,Domain))
    except KeyboardInterrupt:
        exit()
项目:HomeAutomation    作者:gs2671    | 项目源码 | 文件源码
def detach_process():
    global DETACHED
    DETACHED = True
    if not _INTERCEPTING_FOR_ATTACH:
        if isinstance(sys.stdout, _DebuggerOutput): 
            sys.stdout = sys.stdout.old_out
        if isinstance(sys.stderr, _DebuggerOutput):
            sys.stderr = sys.stderr.old_out

    if not _INTERCEPTING_FOR_ATTACH:
        thread.start_new_thread = _start_new_thread
        thread.start_new = _start_new_thread
项目:AutoDiff    作者:icewall    | 项目源码 | 文件源码
def intercept_threads(for_attach = False):
    thread.start_new_thread = thread_creator
    thread.start_new = thread_creator
    global threading
    if threading is None:
        # we need to patch threading._start_new_thread so that 
        # we pick up new threads in the attach case when threading
        # is already imported.
        import threading
        threading._start_new_thread = thread_creator
    global _INTERCEPTING_FOR_ATTACH
    _INTERCEPTING_FOR_ATTACH = for_attach
项目:AutoDiff    作者:icewall    | 项目源码 | 文件源码
def detach_process():
    global DETACHED
    DETACHED = True
    if not _INTERCEPTING_FOR_ATTACH:
        if isinstance(sys.stdout, _DebuggerOutput): 
            sys.stdout = sys.stdout.old_out
        if isinstance(sys.stderr, _DebuggerOutput):
            sys.stderr = sys.stderr.old_out

    if not _INTERCEPTING_FOR_ATTACH:
        thread.start_new_thread = _start_new_thread
        thread.start_new = _start_new_thread
项目:xidian-sfweb    作者:Gear420    | 项目源码 | 文件源码
def detach_process():
    global DETACHED
    DETACHED = True
    if not _INTERCEPTING_FOR_ATTACH:
        if isinstance(sys.stdout, _DebuggerOutput): 
            sys.stdout = sys.stdout.old_out
        if isinstance(sys.stderr, _DebuggerOutput):
            sys.stderr = sys.stderr.old_out

    if not _INTERCEPTING_FOR_ATTACH:
        thread.start_new_thread = _start_new_thread
        thread.start_new = _start_new_thread
项目:skojjt    作者:martin-green    | 项目源码 | 文件源码
def detach_process():
    global DETACHED
    DETACHED = True
    if not _INTERCEPTING_FOR_ATTACH:
        if isinstance(sys.stdout, _DebuggerOutput): 
            sys.stdout = sys.stdout.old_out
        if isinstance(sys.stderr, _DebuggerOutput):
            sys.stderr = sys.stderr.old_out

    if not _INTERCEPTING_FOR_ATTACH:
        thread.start_new_thread = _start_new_thread
        thread.start_new = _start_new_thread
项目:SEF    作者:hossamhasanin    | 项目源码 | 文件源码
def Is_HTTP_On(on_off):
    if on_off == "ON":
        return thread.start_new(serve_thread_tcp,('', 80,HTTP))
    if on_off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:hossamhasanin    | 项目源码 | 文件源码
def Is_HTTPS_On(SSL_On_Off):
    if SSL_On_Off == "ON":
        return thread.start_new(serve_thread_SSL,('', 443,DoSSL))
    if SSL_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:hossamhasanin    | 项目源码 | 文件源码
def Is_WPAD_On(on_off):
    if on_off == True:
        return True
        #return thread.start_new(serve_thread_tcp,('', 3141,ProxyHandler))
    if on_off == False:
        return False

#Function name self-explanatory
项目:SEF    作者:hossamhasanin    | 项目源码 | 文件源码
def Is_SMB_On(SMB_On_Off):
    if SMB_On_Off == "ON":
        if LM_On_Off == True:
            return thread.start_new(serve_thread_tcp, ('', 445,SMB1LM)),thread.start_new(serve_thread_tcp,('', 139,SMB1LM))
        else:
            return thread.start_new(serve_thread_tcp, ('', 445,SMB1)),thread.start_new(serve_thread_tcp,('', 139,SMB1))
    if SMB_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:hossamhasanin    | 项目源码 | 文件源码
def Is_Kerberos_On(Krb_On_Off):
    if Krb_On_Off == "ON":
        return thread.start_new(serve_thread_udp,('', 88,KerbUDP)),thread.start_new(serve_thread_tcp,('', 88, KerbTCP))
    if Krb_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:hossamhasanin    | 项目源码 | 文件源码
def Is_FTP_On(FTP_On_Off):
    if FTP_On_Off == "ON":
        return thread.start_new(serve_thread_tcp,('', 21,FTP))
    if FTP_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:hossamhasanin    | 项目源码 | 文件源码
def Is_POP_On(POP_On_Off):
    if POP_On_Off == "ON":
        return thread.start_new(serve_thread_tcp,('', 110,POP))
    if POP_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:hossamhasanin    | 项目源码 | 文件源码
def Is_LDAP_On(LDAP_On_Off):
    if LDAP_On_Off == "ON":
        return thread.start_new(serve_thread_tcp,('', 389,LDAP))
    if LDAP_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:hossamhasanin    | 项目源码 | 文件源码
def Is_SMTP_On(SMTP_On_Off):
    if SMTP_On_Off == "ON":
        return thread.start_new(serve_thread_tcp,('', 25,ESMTP)),thread.start_new(serve_thread_tcp,('', 587,ESMTP))
    if SMTP_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:hossamhasanin    | 项目源码 | 文件源码
def Is_IMAP_On(IMAP_On_Off):
    if IMAP_On_Off == "ON":
        return thread.start_new(serve_thread_tcp,('', 143,IMAP))
    if IMAP_On_Off == "OFF":
        return False

#Function name self-explanatory
项目:SEF    作者:hossamhasanin    | 项目源码 | 文件源码
def main():
    try:
        thread.start_new(RunInloop,(Target,Command,Domain))
    except KeyboardInterrupt:
        exit()
项目:DjangoWebProject    作者:wrkettlitz    | 项目源码 | 文件源码
def detach_process():
    global DETACHED
    DETACHED = True
    if not _INTERCEPTING_FOR_ATTACH:
        if isinstance(sys.stdout, _DebuggerOutput): 
            sys.stdout = sys.stdout.old_out
        if isinstance(sys.stderr, _DebuggerOutput):
            sys.stderr = sys.stderr.old_out

    if not _INTERCEPTING_FOR_ATTACH:
        thread.start_new_thread = _start_new_thread
        thread.start_new = _start_new_thread
项目:ApiRestPythonTest    作者:rvfvazquez    | 项目源码 | 文件源码
def detach_process():
    global DETACHED
    DETACHED = True
    if not _INTERCEPTING_FOR_ATTACH:
        if isinstance(sys.stdout, _DebuggerOutput): 
            sys.stdout = sys.stdout.old_out
        if isinstance(sys.stderr, _DebuggerOutput):
            sys.stderr = sys.stderr.old_out

    if not _INTERCEPTING_FOR_ATTACH:
        thread.start_new_thread = _start_new_thread
        thread.start_new = _start_new_thread