Python ansible.module_utils.basic 模块,AnsibleModule() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ansible.module_utils.basic.AnsibleModule()

项目:iosxr-ansible    作者:ios-xr    | 项目源码 | 文件源码
def main ():
    spec = dict (provider = dict (required = True))
    spec.update (iosxr_argument_spec)
    module = AnsibleModule (argument_spec = spec)

    # make sure "terminal length 0" is set on XR console
    cmds = [ "show platform",
             "show version",
             "show inventory all",
             "show memory summary",
             "show install active",
             "show filesystem",
             "show media",
             "show route",
             "show running-config",
             "show arp",
             "show ipv4 int brief",
             "show ipv6 int brief" ]

    result = dict (changed = False)
    for cmd in cmds:
        result[cmd] = str (run_commands (module, cmd)).split (r"\n")
    return module.exit_json (**result)
项目:iosxr-ansible    作者:ios-xr    | 项目源码 | 文件源码
def main ():
    spec = dict (provider = dict (required = True),
                 cfgname = dict (required = True))
    spec.update (iosxr_argument_spec)
    module = AnsibleModule (argument_spec = spec)

    args = module.params
    cfg_name = args["cfgname"]

    command = "configure terminal"
    rc, out, err = exec_command (module, command)
    if rc != 0:
        module.fail_json (msg="unable to enter configuration mode",
                          err = to_text (err, errors="surrogate_or_strict"))

    commands = ["load " + cfg_name]
    commands.append ("show commit changes diff")
    response = run_commands  (module, commands)

    result = dict (changed = False)
    result["stdout"] = response
    result["stdout_lines"] = str (result["stdout"]).split (r"\n")

    module.exit_json (**result)
项目:linchpin    作者:CentOS-PaaS-SIG    | 项目源码 | 文件源码
def main():

    global module
    module = AnsibleModule(
        argument_spec={
            'filename': {'required': True, 'aliases': ['name']},
            'cred_type': {'required': False, 'aliases': ['credential_type']},
            'cred_path': {'required': True, 'aliases': ['credential_store']},
            'driver': {'required': True, 'aliases': ['driver_type']},
        },
        required_one_of=[],
        supports_check_mode=True
    )
    filename = module.params["filename"]
    cred_type = module.params["cred_type"]  # noqa F841
    cred_path = module.params["cred_path"]
    driver_type = module.params["driver"]  # noqa F841
    output, path = get_cred(filename, cred_path)
    changed = True
    module.exit_json(changed=changed,
                     output=output, params=module.params, path=path)
项目:pinder    作者:dotwaffle    | 项目源码 | 文件源码
def main():
    module = AnsibleModule(
        argument_spec=dict(
            sessions=dict(type='list', required=True),
            asn=dict(type='int', required=True),
        ),
        supports_check_mode=True
    )
    sessions = module.params['sessions']
    asn = module.params['asn']

    pinder = hammock.Hammock('http://127.0.0.1:8000', append_slash=True)
    result = []
    for session in sessions:
        if session['sender']['isp']['asn'] == asn:
            session['sender_is_ready'] = True
        elif session['receiver']['asn'] == asn:
            session['receiver_is_ready'] = True
        else:
            raise Exception("Couldn't figure out if I was the sender or the receiver")
        r = pinder.api.requests(session['id']).PUT(data=session)
        result.append(r.json())
    module.exit_json(sessions=result, msg=str(result))
项目:ansible-playbook    作者:fnordpipe    | 项目源码 | 文件源码
def __init__(self):
        self.module = AnsibleModule(
            argument_spec = dict(
                config = dict(required = True, type = 'path'),
                state = dict(type = 'str', default = 'started'),
                name = dict(required = True, type = 'str'),
                template = dict(type = 'str', default = 'gentoo-overlay'),
                template_args = dict(type = 'str')
            ),
            supports_check_mode = False
        )

        self.args = {
            'name': self.module.params.get('name'),
            'config': self.module.params.get('config'),
            'state': self.module.params.get('state'),
            'template': self.module.params.get('template'),
            'templateArgs': self.module.params.get('template_args')
        }

        self.xl = XenXlClient(self.args['name'], self.args['config'], self.args['template'])
项目:ansible-playbook    作者:fnordpipe    | 项目源码 | 文件源码
def __init__(self):
        self.module = AnsibleModule(
            argument_spec = dict(
                user = dict(required = True, type = 'str')
            ),
            supports_check_mode = False
        )

        self.args = {
            'user': self.module.params.get('user')
        }

        try:
            self.userInfo = {
                'home': pwd.getpwnam(self.args['user']).pw_dir
            }
        except KeyError:
            self.module.fail_json(
                msg = 'user (%s) not found' % (self.args['user'])
            )
项目:ansible-playbook    作者:fnordpipe    | 项目源码 | 文件源码
def __init__(self):
        self.module = AnsibleModule(
            argument_spec = dict(
                crt = dict(required = True, type = 'path'),
                ca = dict(required = True, type = 'path'),
                dest = dict(required = True, type = 'path'),
            ),
            add_file_common_args = True,
            supports_check_mode = False
        )
        self.args = {
            'crt': self.module.params.get('crt'),
            'ca': self.module.params.get('ca'),
            'dest': self.module.params.get('dest')
        }
        self.changed = False
项目:ansible-pacemaker    作者:redhat-openstack    | 项目源码 | 文件源码
def test__clone_resource__happy(self,
                                    has_type,
                                    clone_resource_expected_count,
                                    clone_resource_current_count):

        mod_cls = create_autospec(AnsibleModule)
        mod = mod_cls.return_value
        mod.params = dict(
            resource="haproxy",
            max_wait="5"
        )

        has_type.return_value = pacemaker_is_active.Clone(mod, 'haproxy')
        clone_resource_expected_count.return_value = 3
        clone_resource_current_count.return_value = 3
        pacemaker_is_active.is_resource_active(mod)
        self.assertEqual(0, mod.fail_json.call_count)
        self.assertEqual(1, mod.exit_json.call_count)
项目:ansible-pacemaker    作者:redhat-openstack    | 项目源码 | 文件源码
def test__master_resource__happy(self,
                                     has_type,
                                     master_resource_expected_count,
                                     master_resource_current_count):

        mod_cls = create_autospec(AnsibleModule)
        mod = mod_cls.return_value
        mod.params = dict(
            resource="galera",
            max_wait="5"
        )

        has_type.return_value = pacemaker_is_active.Master(mod, 'galera')
        master_resource_expected_count.return_value = 3
        master_resource_current_count.return_value = 3
        pacemaker_is_active.is_resource_active(mod)
        self.assertEqual(0, mod.fail_json.call_count)
        self.assertEqual(1, mod.exit_json.call_count)
项目:ansible-pacemaker    作者:redhat-openstack    | 项目源码 | 文件源码
def test__primitive_resource__happy(self,
                                        has_type,
                                        primitive_resource_current_count):

        mod_cls = create_autospec(AnsibleModule)
        mod = mod_cls.return_value
        mod.params = dict(
            resource="openstack-cinder-volume",
            max_wait="5"
        )

        has_type.return_value = pacemaker_is_active.Primitive(
            mod,
            "openstack-cinder-volume")
        primitive_resource_current_count.return_value = 1
        pacemaker_is_active.is_resource_active(mod)
        self.assertEqual(0, mod.fail_json.call_count)
        self.assertEqual(1, mod.exit_json.call_count)
项目:ansible-pacemaker    作者:redhat-openstack    | 项目源码 | 文件源码
def test__primitive_resource__happy(self,
                                        has_type,
                                        primitive_resource_expected_count,
                                        primitive_resource_current_count):

        mod_cls = create_autospec(AnsibleModule)
        mod = mod_cls.return_value
        mod.params = dict(
            resource="openstack-cinder-volume",
            max_wait="3"
        )

        has_type.return_value = pacemaker_is_active.Primitive(
            mod,
            'openstack-cinder-volume')
        primitive_resource_expected_count.side_effect = [1, 1, 1]
        primitive_resource_current_count.side_effect = [0, 0, 0]
        pacemaker_is_active.is_resource_active(mod)
        self.assertEqual(1, mod.fail_json.call_count)
        self.assertEqual(0, mod.exit_json.call_count)
项目:Learning-Ansible-2-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def main():
    # Parsing argument file
    module = AnsibleModule(
        argument_spec = dict(
            user = dict(required=True)
        )
    )
    user = module.params.get('user')

    chkusr = User(user)
    success, ret_msg = chkusr.check_if_user_exists()

    # Error handling and JSON return
    if success:
        module.exit_json(msg=ret_msg, uid=uid, gid=gid)
    else:
        module.fail_json(msg=ret_msg)
项目:Learning-Ansible-2-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def main():
    # Parsing argument file
    module = AnsibleModule(
        argument_spec = dict(
            user = dict(required=True)
        )
    )
    user = module.params.get('user')

    chkusr = CheckUser(user)
    success, ret_msg, uid, gid = chkusr.check_user()

    # Error handling and JSON return
    if success:
        module.exit_json(msg=ret_msg, uid=uid, gid=gid)
    else:
        module.fail_json(msg=ret_msg)
项目:Learning-Ansible-2-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def main():
    # Parsing argument file
    module = AnsibleModule(
        argument_spec = dict(
            user = dict(required=True)
        )
    )
    user = module.params.get('user')

    # Check if user exists
    try:
        pwd.getpwnam(user)
        success = True
        ret_msg = 'User %s exists' % user
    except KeyError:
        success = False
        ret_msg = 'User %s does not exists' % user

    # Error handling and JSON return
    if success:
        module.exit_json(msg=ret_msg)
    else:
        module.fail_json(msg=ret_msg)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def main():
    module = AnsibleModule(
        argument_spec = dict(
            state = dict(choices=['present', 'absent'], default='present'),
            api_token = dict(aliases=['API_TOKEN'], no_log=True),
            name = dict(type='str'),
            id = dict(aliases=['droplet_id'], type='int'),
            ip = dict(type='str'),
        ),
        required_one_of = (
            ['id', 'name'],
        ),
    )
    if not HAS_DOPY:
        module.fail_json(msg='dopy required for this module')

    try:
        core(module)
    except (DoError, Exception) as e:
        module.fail_json(msg=str(e), exception=traceback.format_exc())
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def main():
    module = AnsibleModule(
        argument_spec=dict(
            state = dict(choices=['present', 'absent'], required=True),
            command = dict(choices=['create', 'attach'], required=True),
            api_token = dict(aliases=['API_TOKEN'], no_log=True),
            block_size = dict(type='int'),
            volume_name = dict(type='str', required=True),
            description = dict(type='str'),
            region = dict(type='str', required=True),
            droplet_id = dict(type='int'),
            timeout = dict(type='int', default=10),
        ),
    )
    try:
        handle_request(module)
    except DOBlockStorageException:
        e = get_exception()
        module.fail_json(msg=e.message)
    except KeyError:
        e = get_exception()
        module.fail_json(msg='Unable to load %s' % e.message)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def main():
    module = AnsibleModule(
        argument_spec = dict(
            state = dict(choices=['present', 'absent'], default='present'),
            client_id = dict(aliases=['CLIENT_ID'], no_log=True),
            api_key = dict(aliases=['API_KEY'], no_log=True),
            name = dict(type='str'),
            id = dict(aliases=['droplet_id'], type='int'),
            ssh_pub_key = dict(type='str'),
        ),
        required_one_of = (
            ['id', 'name'],
        ),
    )
    if not HAS_DOPY:
        module.fail_json(msg='dopy required for this module')

    try:
        core(module)
    except (DoError, Exception) as e:
        module.fail_json(msg=str(e), exception=traceback.format_exc())
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def main():
    module = AnsibleModule(
        argument_spec = dict(
           user        = dict(required=True, type='str'),
           key         = dict(required=True, type='str'),
           path        = dict(required=False, type='str'),
           manage_dir  = dict(required=False, type='bool', default=True),
           state       = dict(default='present', choices=['absent','present']),
           key_options = dict(required=False, type='str'),
           unique      = dict(default=False, type='bool'),
           exclusive   = dict(default=False, type='bool'),
           validate_certs = dict(default=True, type='bool'),
        ),
        supports_check_mode=True
    )

    results = enforce_state(module, module.params)
    module.exit_json(**results)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def main():
    argument_spec = ec2_argument_spec()
    argument_spec.update(
        dict(
            duration_seconds = dict(required=False, default=None, type='int'),
            mfa_serial_number = dict(required=False, default=None),
            mfa_token = dict(required=False, default=None)
        )
    )

    module = AnsibleModule(argument_spec=argument_spec)

    if not HAS_BOTO3:
        module.fail_json(msg='boto3 and botocore are required.')

    region, ec2_url, aws_connect_kwargs = get_aws_connection_info(module, boto3=True)
    if region:
        connection = boto3_conn(module, conn_type='client', resource='sts', region=region, endpoint=ec2_url, **aws_connect_kwargs)
    else:
        module.fail_json(msg="region must be specified")

    get_session_token(connection, module)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def main():
    argument_spec = ec2_argument_spec()
    argument_spec.update(
        dict(
            filters = dict(default=None, type='dict')
        )
    )

    module = AnsibleModule(argument_spec=argument_spec)

    if not HAS_BOTO:
        module.fail_json(msg='boto required for this module')

    region, ec2_url, aws_connect_params = get_aws_connection_info(module)

    if region:
        try:
            connection = connect_to_aws(boto.vpc, region, **aws_connect_params)
        except (boto.exception.NoAuthHandlerFound, StandardError) as e:
            module.fail_json(msg=str(e))
    else:
        module.fail_json(msg="region must be specified")

    list_ec2_vpcs(connection, module)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def main():
    argument_spec = ec2_argument_spec()
    argument_spec.update(
        dict(
            filters = dict(default=None, type='dict')
        )
    )

    module = AnsibleModule(argument_spec=argument_spec)

    if not HAS_BOTO:
        module.fail_json(msg='boto required for this module')

    region, ec2_url, aws_connect_params = get_aws_connection_info(module)

    if region:
        try:
            connection = connect_to_aws(boto.vpc, region, **aws_connect_params)
        except (boto.exception.NoAuthHandlerFound, AnsibleAWSError) as e:
            module.fail_json(msg=str(e))
    else:
        module.fail_json(msg="region must be specified")

    list_ec2_vpc_route_tables(connection, module)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def main():
    argument_spec = ec2_argument_spec()
    argument_spec.update(
        dict(
            filters = dict(default=None, type='dict')
        )
    )

    module = AnsibleModule(argument_spec=argument_spec)

    if not HAS_BOTO:
        module.fail_json(msg='boto required for this module')

    region, ec2_url, aws_connect_params = get_aws_connection_info(module)

    if region:
        try:
            connection = connect_to_aws(boto.vpc, region, **aws_connect_params)
        except (boto.exception.NoAuthHandlerFound, AnsibleAWSError) as e:
            module.fail_json(msg=str(e))
    else:
        module.fail_json(msg="region must be specified")

    list_ec2_vpc_subnets(connection, module)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def main():
    argument_spec = ec2_argument_spec()
    argument_spec.update(
        dict(
            filters = dict(default=None, type='dict')
        )
    )

    module = AnsibleModule(argument_spec=argument_spec)

    if not HAS_BOTO:
        module.fail_json(msg='boto required for this module')

    region, ec2_url, aws_connect_params = get_aws_connection_info(module)

    if region:
        try:
            connection = connect_to_aws(boto.ec2, region, **aws_connect_params)
        except (boto.exception.NoAuthHandlerFound, AnsibleAWSError) as e:
            module.fail_json(msg=str(e))
    else:
        module.fail_json(msg="region must be specified")

    list_ec2_instances(connection, module)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def __new__(cls, module):
        """Return the platform-specific subclass.

        It does not use load_platform_subclass() because it need to judge based
        on whether the `timedatectl` command exists.

        Args:
            module: The AnsibleModule.
        """
        if get_platform() == 'Linux':
            if module.get_bin_path('timedatectl') is not None:
                return super(Timezone, SystemdTimezone).__new__(SystemdTimezone)
            else:
                return super(Timezone, NosystemdTimezone).__new__(NosystemdTimezone)
        else:
            # Not supported yet
            return super(Timezone, Timezone).__new__(Timezone)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def __init__(self, module):
        """Initialize of the class.

        Args:
            module: The AnsibleModule.
        """
        super(Timezone, self).__init__()
        self.msg = []
        # `self.value` holds the values for each params on each phases.
        # Initially there's only info of "planned" phase, but the
        # `self.check()` function will fill out it.
        self.value = dict()
        for key in module.argument_spec:
            value = module.params[key]
            if value is not None:
                self.value[key] = dict(planned=value)
        self.module = module
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def main():
    '''Main function, dispatches logic'''
    module = AnsibleModule(
        argument_spec=dict(
            start=dict(required=False, type='int'),
            stop=dict(required=False, type='int'),
            category=dict(required=True),
            title=dict(required=True),
            description=dict(required=True),
            duration=dict(required=False, type='int'),
            api_key=dict(required=True, no_log=True)
            )
        )
    annotation = create_annotation(module)
    try:
        resp = post_annotation(annotation, module.params['api_key'])
    except requests.exceptions.RequestException:
        err_str = get_exception()
        module.fail_json(msg='Request Failed', reason=err_str)
    module.exit_json(changed=True, annotation=resp.json())
项目:fortimanager-ansible    作者:networktocode    | 项目源码 | 文件源码
def config_absent(self, module, proposed, existing):
        """
        This function is used to determine the appropriate configuration to remove from the FortiManager when the
        "state" parameter is set to "absent" and to collect the dictionary data that will be returned by the Ansible
        Module.

        :param module: The AnsibleModule instance.
        :param proposed: The proposed config to send to the FortiManager.
        :param existing: The existing configuration for the item on the FortiManager (using the "name" key to get item).
        :return: A dictionary containing the module exit values.
        """
        changed = False
        config = {}

        if existing:
            config = self.config_delete(module, proposed["policyid"])
            changed = True

        return {"changed": changed, "config": config, "existing": existing}
项目:fortimanager-ansible    作者:networktocode    | 项目源码 | 文件源码
def config_absent(self, module, proposed, existing):
        """
        This function is used to determine the appropriate configuration to remove from the FortiManager when the
        "state" parameter is set to "absent" and to collect the dictionary data that will be returned by the Ansible
        Module.

        :param module: The AnsibleModule instance.
        :param proposed: The proposed config to send to the FortiManager.
        :param existing: The existing configuration for the item on the FortiManager (using the "name" key to get item).
        :return: A dictionary containing the module exit values.
        """
        changed = False
        config = {}

        # proposed length of 1 is just the sequence number of the route
        if existing:
            config = self.config_delete(module, existing["seq-num"])
            changed = True
        else:
            existing = {}

        return {"changed": changed, "config": config, "existing": existing}
项目:cinch    作者:RedHatQE    | 项目源码 | 文件源码
def main():
    module = AnsibleModule(
        argument_spec={
            'jenkins_home': {'default': '/var/lib/jenkins'},
            'update_center_id': {'required': True},
            'update_center_url': {'required': True}
        }
    )
    update_ctr_config = os.path.join(module.params['jenkins_home'],
                                     'hudson.model.UpdateCenter.xml')
    tree = ET.parse(update_ctr_config)
    if module.params['update_center_id'] == 'default':
        changed = set_default(tree, module.params['update_center_url'])
    else:
        changed = append(tree,
                         module.params['update_center_id'],
                         module.params['update_center_url'])
    if changed:
        tree.write(update_ctr_config, encoding='UTF-8')
    module.exit_json(changed=changed)
项目:cinch    作者:RedHatQE    | 项目源码 | 文件源码
def main():
    module = AnsibleModule(
        argument_spec={
            'jenkins_home': {'default': '/var/lib/jenkins'},
            'update_center_id': {'required': True},
            'update_center_url': {'required': True}
        }
    )
    update_ctr_config = os.path.join(module.params['jenkins_home'],
                                     'hudson.model.UpdateCenter.xml')
    tree = ET.parse(update_ctr_config)
    if module.params['update_center_id'] == 'default':
        changed = set_default(tree, module.params['update_center_url'])
    else:
        changed = append(tree,
                         module.params['update_center_id'],
                         module.params['update_center_url'])
    if changed:
        tree.write(update_ctr_config, encoding='UTF-8')
    module.exit_json(changed=changed)
项目:ovirt-ansible-example    作者:machacekondra    | 项目源码 | 文件源码
def main():
    argument_spec = ovirt_full_argument_spec(
        pattern=dict(default='', required=False),
    )
    module = AnsibleModule(argument_spec)
    check_sdk(module)

    try:
        connection = create_connection(module.params.pop('auth'))
        templates_service = connection.system_service().templates_service()
        templates = templates_service.list(search=module.params['pattern'])
        module.exit_json(
            changed=False,
            ansible_facts=dict(
                ovirt_templates=[
                    get_dict_of_struct(c) for c in templates
                ],
            ),
        )
    except Exception as e:
        module.fail_json(msg=str(e), exception=traceback.format_exc())
    finally:
        connection.close(logout=False)
项目:ovirt-ansible-example    作者:machacekondra    | 项目源码 | 文件源码
def main():
    argument_spec = ovirt_full_argument_spec(
        pattern=dict(default='', required=False),
    )
    module = AnsibleModule(argument_spec)
    check_sdk(module)

    try:
        connection = create_connection(module.params.pop('auth'))
        clusters_service = connection.system_service().clusters_service()
        clusters = clusters_service.list(search=module.params['pattern'])
        module.exit_json(
            changed=False,
            ansible_facts=dict(
                ovirt_clusters=[
                    get_dict_of_struct(c) for c in clusters
                ],
            ),
        )
    except Exception as e:
        module.fail_json(msg=str(e), exception=traceback.format_exc())
    finally:
        connection.close(logout=False)
项目:ovirt-ansible-example    作者:machacekondra    | 项目源码 | 文件源码
def main():
    argument_spec = ovirt_full_argument_spec(
        pattern=dict(default='', required=False),
    )
    module = AnsibleModule(argument_spec)
    check_sdk(module)

    try:
        connection = create_connection(module.params.pop('auth'))
        storage_domains_service = connection.system_service().storage_domains_service()
        storage_domains = storage_domains_service.list(search=module.params['pattern'])
        module.exit_json(
            changed=False,
            ansible_facts=dict(
                ovirt_storage_domains=[
                    get_dict_of_struct(c) for c in storage_domains
                ],
            ),
        )
    except Exception as e:
        module.fail_json(msg=str(e), exception=traceback.format_exc())
    finally:
        connection.close(logout=False)
项目:ovirt-ansible-example    作者:machacekondra    | 项目源码 | 文件源码
def main():
    argument_spec = ovirt_full_argument_spec(
        pattern=dict(default='', required=False),
    )
    module = AnsibleModule(argument_spec)
    check_sdk(module)

    try:
        connection = create_connection(module.params.pop('auth'))
        vmpools_service = connection.system_service().vm_pools_service()
        vmpools = vmpools_service.list(search=module.params['pattern'])
        module.exit_json(
            changed=False,
            ansible_facts=dict(
                ovirt_vm_pools=[
                    get_dict_of_struct(c) for c in vmpools
                ],
            ),
        )
    except Exception as e:
        module.fail_json(msg=str(e), exception=traceback.format_exc())
    finally:
        connection.close(logout=False)
项目:ovirt-ansible-example    作者:machacekondra    | 项目源码 | 文件源码
def main():
    argument_spec = ovirt_full_argument_spec(
        pattern=dict(default='', required=False),
    )
    module = AnsibleModule(argument_spec)
    check_sdk(module)

    try:
        connection = create_connection(module.params.pop('auth'))
        users_service = connection.system_service().users_service()
        users = users_service.list(search=module.params['pattern'])
        module.exit_json(
            changed=False,
            ansible_facts=dict(
                ovirt_users=[
                    get_dict_of_struct(c) for c in users
                ],
            ),
        )
    except Exception as e:
        module.fail_json(msg=str(e), exception=traceback.format_exc())
    finally:
        connection.close(logout=False)
项目:ovirt-ansible-example    作者:machacekondra    | 项目源码 | 文件源码
def main():
    argument_spec = ovirt_full_argument_spec(
        pattern=dict(default='', required=False),
    )
    module = AnsibleModule(argument_spec)
    check_sdk(module)

    try:
        connection = create_connection(module.params.pop('auth'))
        networks_service = connection.system_service().networks_service()
        networks = networks_service.list(search=module.params['pattern'])
        module.exit_json(
            changed=False,
            ansible_facts=dict(
                ovirt_networks=[
                    get_dict_of_struct(c) for c in networks
                ],
            ),
        )
    except Exception as e:
        module.fail_json(msg=str(e), exception=traceback.format_exc())
    finally:
        connection.close(logout=False)
项目:ansible-insights-client    作者:RedHatInsights    | 项目源码 | 文件源码
def main():
    from ansible.module_utils.basic import AnsibleModule
    module = AnsibleModule({
        "egg_path": {"required": True, "type": "path"}
    })
    sys.path.append(module.params["egg_path"])
    import falafel
    module.exit_json(nvr=falafel.get_nvr())
项目:iosxr-ansible    作者:ios-xr    | 项目源码 | 文件源码
def main ():
    spec = dict (provider = dict (required = True))
    spec.update (iosxr_argument_spec)
    module = AnsibleModule (argument_spec = spec)

    command = {"command": "clear logging",
               "prompt": CLI_PROMPT_RE,
               "answer": "y"}

    response = run_commands (module, command)

    result = dict (changed = True)
    result["stdout"] = response
    result["stdout_lines"] = response
    return module.exit_json (**result)
项目:iosxr-ansible    作者:ios-xr    | 项目源码 | 文件源码
def main ():
    spec = dict (provider = dict (required = True),
                 confirm= dict (required = True),
                 location= dict (required = False, default = None),
                 force= dict (required = False, type="bool", default = False))
    spec.update (iosxr_argument_spec)
    module = AnsibleModule (argument_spec = spec)

    args = module.params
    force = args["force"]
    location = args["location"]

    result = dict (changed = False)
    if args["confirm"] != "yes":
        result["stdout"] = "reload aborted"
        module.exit_json (**result)

    # setup reload command expecting specific prompt to return
    reload_command = "reload "
    if location != None:
        reload_command = reload_command + "location %s " % location
    if force is True:
        reload_command = reload_command + "force "
    command = {"command": reload_command,
               "prompt": CLI_PROMPT_RE,
               "answer": "y"}
    response = run_commands (module, command)

    result["stdout"] = response
    result["stdout_lines"] = str (result["stdout"]).split (r"\n")
    module.exit_json (**result)
项目:iosxr-ansible    作者:ios-xr    | 项目源码 | 文件源码
def main ():
    spec = dict (provider = dict (requreid = True),
                 pkgpath= dict (required = False, default = None),
                 pkgname = dict (required = True, default = None),
                 state = dict (required = False, default = "present",
                               choices = ["present",
                                          "absent",
                                          "updated",
                                          "activated",
                                          "deactivated",
                                          "committed"]))
    spec.update (iosxr_argument_spec)

    module = AnsibleModule (argument_spec = spec)

    args = module.params
    state = args["state"]

    # cannot run on classic XR
    if is_legacy_iosxr (module):
        module.fail_json (msg =
                          "use 'xr32_install_package' on 32-bit IOS-XR instead")

    # make sure no other install in progress
    if is_install_in_progress (module):
        module.fail_json (msg = "other install operation in progress")

    install = {
        "present":     install_add,
        "absent":      install_remove,
        "updated":     install_update,
        "activated":   install_activate,
        "deactivated": install_deactivate,
        "committed":   install_commit
    }
    result = install[state] (module, args["pkgpath"], args["pkgname"])

    module.exit_json (**result)
项目:iosxr-ansible    作者:ios-xr    | 项目源码 | 文件源码
def main ():
    spec = dict (provider = dict (required = True),
                 keyfile = dict (required = True))
    spec.update (iosxr_argument_spec)
    module = AnsibleModule (argument_spec = spec)

    args = module.params
    keyfile = args["keyfile"]

    show_command = "show crypto key authentication rsa"
    response = run_commands (module, show_command)

    cmd = "crypto key import authentication rsa " + keyfile
    if "authentication" in response[0]:
        crypto_command = {"command": cmd,
                          "prompt": CLI_PROMPT_RE,
                          "answer": "yes"}
    else:
        crypto_command = cmd
    response = run_commands (module, crypto_command)

    # now show the new key
    response = run_commands (module, show_command)

    result = dict (changed = True)
    result["stdout"] = response
    result["stdout_lines"] = str (result["stdout"]).split (r"\n")

    module.exit_json (**result)
项目:iosxr-ansible    作者:ios-xr    | 项目源码 | 文件源码
def main ():
    spec = dict (provider = dict (required = True),
                 confirm = dict (required = True))
    spec.update (iosxr_argument_spec)
    module = AnsibleModule (argument_spec = spec)

    result = dict (changed = False)
    if module.params["confirm"] != "yes":
        result["stdout"] = "clear configs aborted"
        module.exit_json (**result)

    command = "configure terminal"
    rc, out, err = exec_command (module, command)
    if rc != 0:
        module.fail_json (msg = "unable to enter configuration mode",
                          err = to_text (err, errors = "surrogate_or_strict"))

    command = {"command": "commit replace",
               "prompt": CLI_PROMPT_RE,
               "answer": "yes"}
    run_commands (module, command, check_rc = False)

    result = dict (changed = False)
    result["stdout"] = "all configs cleared"
    result["stdout_lines"] = str (result["stdout"]).split (r"\n")
    return module.exit_json (**result)
项目:iosxr-ansible    作者:ios-xr    | 项目源码 | 文件源码
def main():
    spec = dict (provider = dict (required = True),
                 component = dict (required = False, default = "all"))
    spec.update (iosxr_argument_spec)
    module = AnsibleModule (argument_spec = spec)

    args = module.params
    comp = args["component"]
    command = "show running-config " + comp
    response = run_commands (module, command)

    result = dict (changed = False)
    result["stdout"] = response
    result["stdout_lines"] = str (result["stdout"]).split (r"\n")
    return module.exit_json (**result)
项目:linchpin    作者:CentOS-PaaS-SIG    | 项目源码 | 文件源码
def main():
    module = AnsibleModule(argument_spec={
        'whiteboard': {'required': False, 'type': 'str'},
        'recipesets': {'required': True, 'type': 'list'},
        'job_group': {'default': '', 'type': 'str'},
        'state': {'default': 'present', 'choices': ['present', 'absent']},
        'cancel_message': {'default': 'Job canceled by Ansible'}
    })
    params = type('Args', (object,), module.params)
    factory = BkrFactory()
    # logs = []
    job_ids = []
    for recipeset in params.recipesets:
        for x in range(0, recipeset['count']):
            if params.state == 'present':
                extra_params = {}
                if params.job_group:
                    extra_params['job_group'] = params.job_group
                # Make provision
                job_id = factory.provision(debug=True, wait=True,
                                           recipesets=[recipeset],
                                           whiteboard=params.whiteboard,
                                           **extra_params)
                job_ids.extend(job_id)
            else:
                factory.cancel_jobs(recipeset['ids'], params.cancel_message)
    if params.state == 'present':
        parsed_ids = []
        for id_string in job_ids:
            parsed_ids.append(id_string[2:])
        module.exit_json(success=False, ids=parsed_ids, changed=True)
    else:
        module.exit_json(success=True, changed=True)
项目:linchpin    作者:CentOS-PaaS-SIG    | 项目源码 | 文件源码
def main():
    mod = AnsibleModule(argument_spec={
        'ids': {'type': 'list'},
        'skip_no_system': {'type': 'bool', 'default': False},
        'max_attempts': {'type': 'int', 'default': 60}
    })
    beaker = BeakerTargets(mod.params)
    try:
        results = beaker.get_system_statuses()
        mod.exit_json(hosts=results, changed=True, success=True)
    except Exception as ex:
        mod.fail_json(msg=str(ex))


# import module snippets
项目:linchpin    作者:CentOS-PaaS-SIG    | 项目源码 | 文件源码
def main():

    module = AnsibleModule(
        argument_spec=dict(
            name=dict(type='str'),
            count=dict(default=1, type='int'),
            state=dict(choices=['present', 'absent']),
        ),
    )

    try:
        d = Dummy()
        execute_output = d.execute(module)

        json_output = {}
        host = execute_output.get('host')
        changed = execute_output.get('changed')
        if host or changed is not None:
            json_output['changed'] = True
            json_output.update(execute_output)
        else:
            json_output['changed'] = False

        module.exit_json(**json_output)
    except Exception as e:
        module.fail_json(msg=str(e))


# ---- Import Ansible Utilities (Ansible Framework) -------------------#
项目:linchpin    作者:CentOS-PaaS-SIG    | 项目源码 | 文件源码
def main():
    module = AnsibleModule(
        argument_spec={
            'data': {'required': True, 'aliases': ['topology']},
            'schema': {'required': True},
            'data_format': {'required': False,
                            'choices': ['json', 'yaml', 'yml']},
        },
        required_one_of=[],
        supports_check_mode=True
    )
    data_file_path = os.path.expanduser(module.params['data'])
    schema_file_path = os.path.expanduser(module.params['schema'])
    check_file_paths(module, data_file_path, schema_file_path)
    validate_values(module, data_file_path)
    schema_obj = JSONSchema(data_file_path, schema_file_path)

    status, out = schema_obj.validate()

#    module.fail_json(msg=out)

    if status:
        changed = True
        module.exit_json(isvalid=changed, out=out)
    else:
        resp = {"path": data_file_path,
                "schema": schema_file_path,
                "output": out}

        module.fail_json(msg=resp)
项目:pinder    作者:dotwaffle    | 项目源码 | 文件源码
def main():
    module = AnsibleModule(
        argument_spec=dict(
            accepted_requests=dict(type='list', required=True),
        ),
        supports_check_mode=True
    )
    accepted_requests = module.params['accepted_requests']

    peering_info = mocked_data
    module.exit_json(peering_info=peering_info, msg=str(peering_info))
项目:pinder    作者:dotwaffle    | 项目源码 | 文件源码
def main():
    module = AnsibleModule(
        argument_spec=dict(
            state=dict(type='str', required=True, choices=['in-progress']),
        ),
        supports_check_mode=True
    )
    state = module.params['state']

    pinder = hammock.Hammock('http://127.0.0.1:8000')

    sessions = pinder.api.requests.GET(params='state={}'.format(state)).json()['results']
    module.exit_json(sessions=sessions, msg=str(sessions))
项目:ansible-satellite-6-2-installer    作者:rhtconsulting    | 项目源码 | 文件源码
def main():
    argument_spec = {
        "repos": {"required": True, "type": "list"},
        "state": {
            "default": "present",
            "choices": ['present', 'absent'],
            "type": 'str'
        },
        "only": {"default": 'no', "required": False, "type": "str", "choices": ['yes', 'no']},

    }
    module = AnsibleModule(argument_spec=argument_spec)
    repos = module.params['repos']
    state = module.params['state']
    only = module.params['only']
    if state == 'present':
        if only == 'yes':
            os.system("subscription-manager repos --disable='*'" % repos)
        repos = ' '.join(['--enable=' + repo for repo in repos])
        # result = os.system("subscription-manager repos %s" % repos)
        result = os.popen("subscription-manager repos %s" % repos).read()
        if 'Error' in result:
            module.fail_json(msg=result)
        meta = {'result': result}
        changed = True
        skipped = False
    else:
        repos = ' '.join(['--disable=' + repo for repo in repos])
        result = os.popen("subscription-manager repos %s" % repos).read()
        if 'Error' in result:
            module.fail_json(msg=result)
        meta = {'result': result}
        changed = True
        skipped = False
    module.exit_json(changed=changed, skipped=skipped, meta=meta)