Python paramiko 模块,DSSKey() 实例源码

我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用paramiko.DSSKey()

项目:jumpserver-python-sdk    作者:jumpserver    | 项目源码 | 文件源码
def from_string(cls, key_string):
        try:
            pkey = paramiko.RSAKey(file_obj=StringIO(key_string))
            return pkey
        except paramiko.SSHException:
            try:
                pkey = paramiko.DSSKey(file_obj=StringIO(key_string))
                return pkey
            except paramiko.SSHException:
                return None
项目:coco    作者:jumpserver    | 项目源码 | 文件源码
def ssh_key_gen(length=2048, type='rsa', password=None, username='jumpserver', hostname=None):
    """Generate user ssh private and public key

    Use paramiko RSAKey generate it.
    :return private key str and public key str
    """

    if hostname is None:
        hostname = os.uname()[1]

    f = StringIO()

    try:
        if type == 'rsa':
            private_key_obj = paramiko.RSAKey.generate(length)
        elif type == 'dsa':
            private_key_obj = paramiko.DSSKey.generate(length)
        else:
            raise IOError('SSH private key must be `rsa` or `dsa`')
        private_key_obj.write_private_key(f, password=password)
        private_key = f.getvalue()
        public_key = ssh_pubkey_gen(private_key_obj, username=username, hostname=hostname)
        return private_key, public_key
    except IOError:
        raise IOError('These is error when generate ssh key.')
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_4_dict_set(self):
        hostdict = paramiko.HostKeys('hostfile.temp')
        key = paramiko.RSAKey(data=decodebytes(keyblob))
        key_dss = paramiko.DSSKey(data=decodebytes(keyblob_dss))
        hostdict['secure.example.com'] = {
            'ssh-rsa': key,
            'ssh-dss': key_dss
        }
        hostdict['fake.example.com'] = {}
        hostdict['fake.example.com']['ssh-rsa'] = key

        self.assertEqual(3, len(hostdict))
        self.assertEqual(2, len(list(hostdict.values())[0]))
        self.assertEqual(1, len(list(hostdict.values())[1]))
        self.assertEqual(1, len(list(hostdict.values())[2]))
        fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
        self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp)
        fp = hexlify(hostdict['secure.example.com']['ssh-dss'].get_fingerprint()).upper()
        self.assertEqual(b'4478F0B9A23CC5182009FF755BC1D26C', fp)
项目:coco    作者:jumpserver    | 项目源码 | 文件源码
def ssh_key_string_to_obj(text):
    key_f = StringIO(text)
    key = None
    try:
        key = paramiko.RSAKey.from_private_key(key_f)
    except paramiko.SSHException:
        pass

    try:
        key = paramiko.DSSKey.from_private_key(key_f)
    except paramiko.SSHException:
        pass
    return key
项目:wetland    作者:ohmyadd    | 项目源码 | 文件源码
def handle(self):
        transport = paramiko.Transport(self.request)

        rsafile = self.server.cfg.get("ssh", "private_rsa")
        dsafile = self.server.cfg.get("ssh", "private_dsa")
        rsakey = paramiko.RSAKey(filename=rsafile)
        dsakey = paramiko.DSSKey(filename=dsafile)
        transport.add_server_key(rsakey)
        transport.add_server_key(dsakey)

        transport.local_version = self.server.cfg.get("ssh", "banner")

        transport.set_subsystem_handler('sftp', paramiko.SFTPServer,
                                        sftpServer.sftp_server)
        nw = network.network(self.client_address[0],
                    self.server.cfg.get("wetland", "docker_addr"))
        nw.create()

        sServer = sshServer.ssh_server(transport=transport, network=nw)

        try:
            transport.start_server(server=sServer)
        except paramiko.SSHException:
            return
        except Exception as e:
            print e
            nw.delete()
            sServer.docker_trans.close()
            return

        try:
            while True:
                chann = transport.accept(60)
                # no channel left
                if not transport._channels.values():
                    break
        except Exception as e:
            print e
        finally:
            nw.delete()
            sServer.docker_trans.close()
项目:BigBrotherBot-For-UrT43    作者:ptitbigorneau    | 项目源码 | 文件源码
def get_private_key(self, password=None):
        if self.private_key_file is not None:
            with open(self.private_key_file, "r") as f:
                key_head = f.readline()
            if 'DSA' in key_head:
                key_type = paramiko.DSSKey
            elif 'RSA' in key_head:
                key_type = paramiko.RSAKey
            else:
                raise ValueError("can't identify private key type")
            with open(self.private_key_file, "r") as f:
                return key_type.from_private_key(f, password=password)
项目:netconf-proxy    作者:fortinet-solutions-cse    | 项目源码 | 文件源码
def get_user_auth_keys (self, username):
        """Parse the users's authorized_keys file if any to look for authorized keys"""
        if username in self.users_keys:
            return self.users_keys[username]

        self.users_keys[username] = []

        userdir = os.path.expanduser("~" + username)
        if not userdir:
            return self.users_keys[username]

        keyfile = os.path.join(userdir, ".ssh/authorized_keys")
        if not keyfile or not os.path.exists(keyfile):
            return self.users_keys[username]

        with open(keyfile) as f:
            for line in f.readlines():
                line = line.strip()
                if not line or line.startswith("#"):
                    continue
                values = [ x.strip() for x in line.split() ]

                exp = None
                try:
                    int(values[0])      # bits value?
                except ValueError:
                    # Type 1 or type 2, type 1 is bits in second value
                    options_ktype = values[0]
                    try:
                        int(values[1])  # bits value?
                    except ValueError:
                        # type 2 with options
                        ktype = options_ktype
                        data = values[1]
                    else:
                        # Type 1 no options.
                        exp = int(values[1])
                        data = values[2]
                else:
                    # Type 1 no options.
                    exp = int(values[1])
                    data = values[2]

                # XXX For now skip type 1 keys
                if exp is not None:
                    continue

                if data:
                    import base64
                    if ktype == "ssh-rsa":
                        key = ssh.RSAKey(data=base64.decodebytes(data.encode('ascii')))
                    elif ktype == "ssh-dss":
                        key = ssh.DSSKey(data=base64.decodebytes(data.encode('ascii')))
                    else:
                        key = None
                    if key:
                        self.users_keys[username].append(key)
        return self.users_keys[username]