Python paramiko 模块,ServerInterface() 实例源码

我们从Python开源项目中,提取了以下1个代码示例,用于说明如何使用paramiko.ServerInterface()

项目:cloudshell-cli    作者:QualiSystems    | 项目源码 | 文件源码
def __init__(self,
                 listen_addr='127.0.0.1',
                 port=0,
                 server_key_string=None,
                 user2key=None,
                 user2password=None,
                 fake_device=None,
                 enable_sftp=False,
                 enable_scp=False,
                 *largs, **kwargs):
        """

        :param listen_addr: str : which local IP to listen on, default 127.0.0.1
        :param port: int : listening port for the SSH server, or 0 to let the system choose; if 0 was used, check the .port field for the assigned port
        :param server_key_string: str : private key for the server's identity; leave blank to generate
        :param user2key: dict[str, PKey] : mapping from each username to the correct public or private PKey
        :param user2password: dict[str, str] : mapping from each username to the correct password
        :param fake_device: FakeDevice : an object that responds to commands
        :param enable_sftp: bool : whether to respond to SFTP requests
        :param enable_scp: bool : whether to respond to SCP requests
        :param largs:
        :param kwargs:
        """

        paramiko.ServerInterface.__init__(self)
        self.enable_sftp = enable_sftp
        self.enable_scp = enable_scp
        self.user2key = user2key or {}
        self.user2password = user2password or {}
        self.fake_device = fake_device or DefaultFakeDevice()
        self.server_key = RSAKey.from_private_key(StringIO(server_key_string)) if server_key_string else RSAKey.generate(2048)
        self.reads = {}
        self.filename2stringio = {}
        self.channelid2scpfilename = {}
        self.channelid2subsystem = {}
        self.channels = set()
        self.scpchannelid2command = {}
        self.channelid2event = {}

        self.listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.listensock.bind((listen_addr, port))
        self.listensock.listen(5)
        self.port = self.listensock.getsockname()[1]

        t = threading.Thread(target=self.sockthread)
        t.setDaemon(True)
        t.start()