Python pprint 模块,pformat() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pprint.pformat()

项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def pre_prepare_second_call(**kwargs):
    """
    this function gets called prior to an issued prepare but gets
    called 'after' the first pre_prepare() call identified above

    If you return False here, you will skip the preparation entirely
    """
    logger.info(
        'DEBUG HOOK pre_prep()\n{kwargs}'.format(
            kwargs=pformat(kwargs, indent=4, depth=2),
        )
    )


# Below is an example of how you can tag 1 function to 2 different
# kinds of hooks:
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def take_action(self, parsed_args):
        session = self.app.client.sessions.get(parsed_args.session_id)

        if not session:
            raise exceptions.ApiClientException('Session not found')

        column = (
            'Session ID',
            'Description',
            'Status',
            'Jobs'
        )

        data = (
            session.get('session_id'),
            session.get('description'),
            session.get('status'),
            pprint.pformat(session.get('jobs'))
        )
        return column, data
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def take_action(self, parsed_args):
        session = self.app.client.sessions.get(parsed_args.session_id)

        if not session:
            raise exceptions.ApiClientException('Session not found')

        column = (
            'Session ID',
            'Description',
            'Status',
            'Jobs'
        )

        data = (
            session.get('session_id'),
            session.get('description'),
            session.get('status'),
            pprint.pformat(session.get('jobs'))
        )
        return column, data
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _cached_roles(search):
    """
    Return the cached roles in a convenient structure.  Trust the cached
    values from the master pillar since a downed minion will be absent
    from any dynamic query.  Also, do not worry about downed minions that
    are outside of the search criteria.
    """
    pillar_util = salt.utils.master.MasterPillarUtil(search, "compound",
                                                     use_cached_grains=True,
                                                     grains_fallback=False,
                                                     opts=__opts__)

    cached = pillar_util.get_minion_pillar()
    roles = {}
    for minion in cached:
        if 'roles' in cached[minion]:
            for role in cached[minion]['roles']:
                roles.setdefault(role, []).append(minion)

    log.debug(pprint.pformat(roles))
    return roles.keys()
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _run(cmd):
    """
    NOTE: Taken from osd.py module.
    """
    log.info(cmd)
    proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
    proc.wait()
    _stdout = proc.stdout.read().rstrip()
    _stderr = proc.stdout.read().rstrip()
    log.debug("return code: {}".format(proc.returncode))
    log.debug(_stdout)
    log.debug(_stderr)
    log.debug(pprint.pformat(proc.stdout.read()))
    log.debug(pprint.pformat(proc.stderr.read()))
    # return proc.returncode, _stdout, _stderr
    return proc.returncode, _stdout, _stderr
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def __init__(self, _id, **kwargs):
        """
        Initialize settings, connect to Ceph cluster
        """
        self.osd_id = _id
        self.settings = {
            'conf': "/etc/ceph/ceph.conf",
            'filename': '/var/run/ceph/osd.{}-weight'.format(id),
            'timeout': 60,
            'keyring': '/etc/ceph/ceph.client.admin.keyring',
            'client': 'client.admin',
            'delay': 6
        }
        self.settings.update(kwargs)
        log.debug("settings: {}".format(pprint.pformat(self.settings)))
        # self.cluster=rados.Rados(conffile=self.settings['conf'])
        self.cluster = rados.Rados(conffile=self.settings['conf'],
                                   conf=dict(keyring=self.settings['keyring']),
                                   name=self.settings['client'])
        try:
            self.cluster.connect()
        except Exception as error:
            raise RuntimeError("connection error: {}".format(error))
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def readlink(device, follow=True):
    """
    Return the short name for a symlink device
    """
    option = ''
    if follow:
        option = '-f'
    cmd = "readlink {} {}".format(option, device)
    log.info(cmd)
    proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
    proc.wait()
    result = proc.stdout.read().rstrip()
    log.debug(pprint.pformat(result))
    log.debug(pprint.pformat(proc.stderr.read()))
    return result


# pylint: disable=too-many-instance-attributes
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def __init__(self, device, **kwargs):
        """
        Set attributes for an OSD
        """
        self.device = readlink(device)
        # top_level_identifiier
        self.tli = self._set_tli()
        self.capacity = self.set_capacity()
        self.size = self.set_bytes()
        self.small = self._set_small()
        self.disk_format = self.set_format()
        self.journal = self.set_journal()
        self.journal_size = self.set_journal_size()
        self.wal_size = self.set_wal_size()
        self.wal = self.set_wal()
        self.db_size = self.set_db_size()
        # pylint: disable=invalid-name
        self.db = self.set_db()
        # default for encryption can be retrieved from the global pillar
        self.encryption = self.set_encryption()
        self.types = self.set_types()
        log.debug("OSD config: \n{}".format(pprint.pformat(vars(self))))
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def set_journal(self, default=False):
        """
        Return the journal device, if defined
        """
        if self._config_version() == OSDConfig.V1:
            struct = self._convert_data_journals(__pillar__['storage']['data+journals'])
            log.debug("struct: \n{}".format(pprint.pformat(struct)))
            if self.device in struct:
                return struct[self.device]
            else:
                log.info("No journal specified for {}".format(self.device))
        if self._config_version() == OSDConfig.V2:
            if (self.device in self.tli and
               'journal' in self.tli[self.device]):
                return self.tli[self.device]['journal']
            else:
                log.info("No journal specified for {}".format(self.device))
        return default

    # pylint: disable=no-self-use
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _fsck(device, _partition):
    """
    Check filesystem on partition

    Note: xfs_repair returns immediately on success, but takes 3m39s to fail
    on some broken filesystems.  Not good for automation.
    """
    prefix = ''
    if 'nvme' in device:
        prefix = 'p'
    # cmd = "/sbin/fsck -t xfs -n {}{}{}".format(device, prefix, partition)
    cmd = "/usr/sbin/xfs_admin -u {}{}{}".format(device, prefix, _partition)
    log.info(cmd)
    proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
    proc.wait()
    log.debug(pprint.pformat(proc.stdout.read()))
    log.debug(pprint.pformat(proc.stderr.read()))
    log.debug("xfs_admin: {}".format(proc.returncode))
    return proc.returncode == 0
项目:wurst    作者:IndEcol    | 项目源码 | 文件源码
def link_internal(data, fields=('name', 'product', 'location', 'unit')):
    """Link internal exchanges by ``fields``. Creates ``input`` field in newly-linked exchanges."""
    input_databases = get_input_databases(data)
    get_tuple = lambda exc: tuple([exc[f] for f in fields])
    products = {
        get_tuple(reference_product(ds)): (ds['database'], ds['code'])
        for ds in data
    }

    for ds in data:
        for exc in ds['exchanges']:
            if exc.get('input'):
                continue

            if exc['type'] == 'biosphere':
                raise ValueError("Unlinked biosphere exchange:\n{}".format(pformat(exc)))

            try:
                exc['input'] = products[get_tuple(exc)]
            except KeyError:
                raise KeyError("Can't find linking activity for exchange:\n{}".format(pformat(exc)))
    return data
项目:zinc    作者:PressLabs    | 项目源码 | 文件源码
def meld(got, expected):
    if got == expected:
        return
    import inspect
    call_frame = inspect.getouterframes(inspect.currentframe(), 2)
    test_name = call_frame[1][3]
    from pprint import pformat
    import os
    from os import path
    os.makedirs(test_name, exist_ok=True)
    got_fn = path.join(test_name, 'got')
    expected_fn = path.join(test_name, 'expected')
    with open(got_fn, 'w') as got_f, open(expected_fn, 'w') as expected_f:
        got_f.write(pformat(got))
        expected_f.write(pformat(expected))
    import subprocess
    subprocess.run(['meld', got_fn, expected_fn])
项目:calm    作者:cygwin    | 项目源码 | 文件源码
def compare_with_expected_file(test, dirpath, results, basename=None):
    results_str = pprint.pformat(results, width=120)

    if basename:
        results_fn = basename + '.results'
        expected_fn = basename + '.expected'
    else:
        results_fn = 'results'
        expected_fn = 'expected'

    # save results in a file
    with open(os.path.join(dirpath, results_fn), 'w') as f:
        print(results_str, file=f)

    # read expected from a file
    with open(os.path.join(dirpath, expected_fn)) as f:
        expected = f.read().rstrip()

    test.assertMultiLineEqual(expected, results_str)


#
# capture a directory tree as a dict 'tree', where each key is a directory path
# and the value is a sorted list of filenames
#
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _compare_eq_iterable(left, right, verbose=False):
    if not verbose:
        return [u('Use -v to get the full diff')]
    # dynamic import to speedup pytest
    import difflib

    try:
        left_formatting = pprint.pformat(left).splitlines()
        right_formatting = pprint.pformat(right).splitlines()
        explanation = [u('Full diff:')]
    except Exception:
        # hack: PrettyPrinter.pformat() in python 2 fails when formatting items that can't be sorted(), ie, calling
        # sorted() on a list would raise. See issue #718.
        # As a workaround, the full diff is generated by using the repr() string of each item of each container.
        left_formatting = sorted(repr(x) for x in left)
        right_formatting = sorted(repr(x) for x in right)
        explanation = [u('Full diff (fallback to calling repr on each item):')]
    explanation.extend(line.strip() for line in difflib.ndiff(left_formatting, right_formatting))
    return explanation
项目:smc-python    作者:gabstopper    | 项目源码 | 文件源码
def on_open(self):
        """
        Once the connection is made, start the query off and
        start an event loop to wait for a signal to
        stop. Results are yielded within receive().
        """
        def event_loop():
            logger.debug(pformat(self.query.request))
            self.send(json.dumps(self.query.request))
            while not self.event.is_set():
                #print('Waiting around on the socket')
                self.event.wait(self.gettimeout())

            logger.debug('Event loop terminating.')

        self.thread = threading.Thread(
            target=event_loop)
        self.thread.setDaemon(True)
        self.thread.start()
项目:rubiks-cube-NxNxN-solver    作者:dwalton76    | 项目源码 | 文件源码
def get_wings_on_edge(self, pos1, side1_name, side2_name):
        wings = self.get_wings(pos1)
        wings_to_keep = []
        #log.info("get_wings_on_edge for pos1 %d, side1 %s, side2 %s, init_wings %s" % (pos1, side1_name, side2_name, pformat(wings)))

        for (wing_pos1, wing_pos2) in wings:
            wing_pos1_side = self.get_side_for_index(wing_pos1)
            wing_pos2_side = self.get_side_for_index(wing_pos2)

            #log.info("get_wings_on_edge wing_pos1 %d side %s, wing_pos2 %d side %s\n" %
            #    (wing_pos1, wing_pos1_side, wing_pos2, wing_pos2_side))

            if ((wing_pos1_side.name == side1_name and wing_pos2_side.name == side2_name) or
                (wing_pos2_side.name == side1_name and wing_pos1_side.name == side2_name)):
                wings_to_keep.append((wing_pos1, wing_pos2))

        #log.info("get_wings_on_edge keeping %s\n" % pformat(wings_to_keep))
        return wings_to_keep
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def __repr__(self):
        return '{} {}'.format(self.value, pprint.pformat(self.context))
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def context_repr(self):
        if self.context:
            return pprint.pformat(self.context)
        return ''
项目:PythonForMayaSamples    作者:dgovil    | 项目源码 | 文件源码
def populate(self):
        # This function will be used to populate the UI. Shocking. I know.

        # First lets clear all the items that are in the list to start fresh
        self.listWidget.clear()

        # Then we ask our library to find everything again in case things changed
        self.library.find()

        # Now we iterate through the dictionary
        # This is why I based our library on a dictionary, because it gives us all the nice tricks a dictionary has
        for name, info in self.library.items():
            # We create an item for the list widget and tell it to have our controller name as a label
            item = QtWidgets.QListWidgetItem(name)

            # We set its tooltip to be the info from the json
            # The pprint.pformat will format our dictionary nicely
            item.setToolTip(pprint.pformat(info))

            # Finally we check if there's a screenshot available
            screenshot = info.get('screenshot')
            # If there is, then we will load it
            if screenshot:
                # So first we make an icon with the path to our screenshot
                icon = QtGui.QIcon(screenshot)
                # then we set the icon onto our item
                item.setIcon(icon)

            # Finally we add our item to the list
            self.listWidget.addItem(item)

# This is a convenience function to display our UI
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def pre_prepare(**kwargs):
    """
    this function gets called prior to an issued prepare

    If you return False here, you will skip the preparation entirely
    """
    logger.info(
        'DEBUG HOOK pre_prep()\n{kwargs}'.format(
            kwargs=pformat(kwargs, indent=4, depth=2),
        )
    )


# Below is an example of how you can define a fuction name that differs
# from the actual hook you wnat to tie it to
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def pre_and_pre_prepare_call(**kwargs):
    """
    A demo of how you can hook tag a function twice

    """
    logger.info(
        'DEBUG HOOK pre_and_post_prep()\n{kwargs}'.format(
            kwargs=pformat(kwargs, indent=4, depth=2),
        )
    )
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def post_prepare(**kwargs):
    """
    this function gets called after to an issued prepare

    """
    logger.info(
        'DEBUG HOOK post_prep()\n{kwargs}'.format(
            kwargs=pformat(kwargs, indent=4, depth=2),
        )
    )
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def pre_stage(**kwargs):
    """
    this function gets called prior to an issued stage

    If you return False here, you will skip the staging entirely
    """
    logger.info(
        'DEBUG HOOK pre_stage()\n{kwargs}'.format(
            kwargs=pformat(kwargs, indent=4, depth=2),
        )
    )
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def post_staged_segment(**kwargs):
    """
    this function gets called after a segment has been staged for posting

    """
    logger.info(
        'DEBUG HOOK post_staged_segment()\n{kwargs}'.format(
            kwargs=pformat(kwargs, indent=4, depth=2),
        )
    )
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def post_staged_nzb(**kwargs):
    """
    this function gets called after a NZB-File has been staged for saving

    """
    logger.info(
        'DEBUG HOOK post_staged_nzb()\n{kwargs}'.format(
            kwargs=pformat(kwargs, indent=4, depth=2),
        )
    )
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def post_stage(session, **kwargs):
    """
    this function gets called prior to an issued stage

    """
    logger.info(
        'DEBUG HOOK post_stage()\n{kwargs}'.format(
            kwargs=pformat(kwargs, indent=4, depth=2),
        )
    )
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def pre_upload(**kwargs):
    """
    this function gets called prior to an issued upload

    If you return False here, you will skip the upload entirely

    """
    logger.info(
        'DEBUG HOOK pre_upload()\n{kwargs}'.format(
            kwargs=pformat(kwargs, indent=4, depth=2),
        )
    )
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def upload_article(**kwargs):
    """
    this function gets called prior to an actual article upload

    If you return False from this function, you 'will' prevent
    the article from being uploaded.

    """
    logger.info(
        'DEBUG HOOK upload_article()\n{kwargs}'.format(
            kwargs=pformat(kwargs, indent=4, depth=2),
        )
    )
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def pre_verify(**kwargs):
    """
    this function gets called prior to running a verification check

    If you return False here, you will skip the verify entirely

    """
    logger.info(
        'DEBUG HOOK pre_verify()\n{kwargs}'.format(
            kwargs=pformat(kwargs, indent=4, depth=2),
        )
    )
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def post_verify(**kwargs):
    """
    this function gets called after running a verification check

    """
    logger.info(
        'DEBUG HOOK pre_upload()\n{kwargs}'.format(
            kwargs=pformat(kwargs, indent=4, depth=2),
        )
    )
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def pre_clean(**kwargs):
    """
    this function gets called prior to running a cleanup

    If you return False here, you will skip the clean-up entirely

    """
    logger.info(
        'DEBUG HOOK pre_clean()\n{kwargs}'.format(
            kwargs=pformat(kwargs, indent=4, depth=2),
        )
    )
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def post_clean(**kwargs):
    """
    this function gets called after running a cleanup

    """
    logger.info(
        'DEBUG HOOK pre_upload()\n{kwargs}'.format(
            kwargs=pformat(kwargs, indent=4, depth=2),
        )
    )
项目:quizbot-2017    作者:pycontw    | 项目源码 | 文件源码
def configure_linebot_app(app):

    @app.after_request
    def commit_database(response):
        db.commit()
        return response

    @app.route("/api/line_webhook", methods=["POST"])
    def line_webhook():
        signature = request.headers['X-Line-Signature']
        body = request.get_data(as_text=True)
        logger.debug(f'Incoming message:\n{pformat(body)}')
        try:
            line_webhook_handler.handle(body, signature)
        except InvalidSignatureError:
            logger.warning('Message with an invalid signature received')
            abort(400)
        except LineBotApiError as e:
            logger.error(f'{e}\nDetails:\n{pformat(e.error.details)}')
            abort(500)
        except Exception as e:
            logger.error(f'Uncaught error: {e}')
            abort(500)
        return "OK"
项目:loggingnight    作者:kdknigga    | 项目源码 | 文件源码
def do_lookup(icao_identifier, date):
    ln = LoggingNight(icao_identifier, date, try_cache=True)

    if ln.in_zulu:
        time_format = '%H%M Zulu'
    else:
        time_format = '%I:%M %p'

    if dev_mode == "true":
        result = dict(
            airport=icao_identifier,
            name=ln.name,
            date=date.isoformat(),
            sunset=ln.sun_set.strftime(time_format),
            end_civil=ln.end_civil_twilight.strftime(time_format),
            one_hour=ln.hour_after_sunset.strftime(time_format),
            airport_debug=pprint.pformat(ln.airport, indent=4),
            usno_debug=pprint.pformat(ln.usno, indent=4)
            )
    else:
        result = dict(
            airport=icao_identifier,
            name=ln.name,
            date=date.isoformat(),
            sunset=ln.sun_set.strftime(time_format),
            end_civil=ln.end_civil_twilight.strftime(time_format),
            one_hour=ln.hour_after_sunset.strftime(time_format)
            )

    return result
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def buildFacade(schema):
    cls = type(schema.name, (Type,), dict(name=schema.name,
                                          version=schema.version,
                                          schema=schema))
    source = """
class {name}Facade(Type):
    name = '{name}'
    version = {version}
    schema = {schema}
    """.format(name=schema.name,
               version=schema.version,
               schema=textwrap.indent(pprint.pformat(schema), "    "))
    return cls, source
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def _format_unequal_keys(dicts):
    return pformat([sorted(d.keys()) for d in dicts])
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def assertDictEqual(self, d1, d2, msg=None):
        self.assertIsInstance(d1, dict, 'First argument is not a dictionary')
        self.assertIsInstance(d2, dict, 'Second argument is not a dictionary')

        if d1 != d2:
            standardMsg = '%s != %s' % (safe_repr(d1, True), safe_repr(d2, True))
            diff = ('\n' + '\n'.join(difflib.ndiff(
                           pprint.pformat(d1).splitlines(),
                           pprint.pformat(d2).splitlines())))
            standardMsg = self._truncateMessage(standardMsg, diff)
            self.fail(self._formatMessage(msg, standardMsg))
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def dump_option_dicts(self, header=None, commands=None, indent=""):
        from pprint import pformat

        if commands is None:             # dump all command option dicts
            commands = self.command_options.keys()
            commands.sort()

        if header is not None:
            self.announce(indent + header)
            indent = indent + "  "

        if not commands:
            self.announce(indent + "no commands known yet")
            return

        for cmd_name in commands:
            opt_dict = self.command_options.get(cmd_name)
            if opt_dict is None:
                self.announce(indent +
                              "no option dict for '%s' command" % cmd_name)
            else:
                self.announce(indent +
                              "option dict for '%s' command:" % cmd_name)
                out = pformat(opt_dict)
                for line in out.split('\n'):
                    self.announce(indent + "  " + line)

    # -- Config file finding/parsing methods ---------------------------
项目:twentybn-dl    作者:TwentyBN    | 项目源码 | 文件源码
def print_processed_results(self, counts, failures):
        for p, d in zip(self.possible_results, self.result_descriptions):
            print('{}: {}'.  format(d, counts[p]))
        if failures:
            print('Failures:')
            print(pprint.pformat(failures))
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def take_action(self, parsed_args):
        job = self.app.client.jobs.get(parsed_args.job_id)

        if not job:
            raise exceptions.ApiClientException('Job not found')

        column = (
            'Job ID',
            'Client ID',
            'User ID',
            'Session ID',
            'Description',
            'Actions',
            'Start Date',
            'End Date',
            'Interval',
        )
        data = (
            job.get('job_id'),
            job.get('client_id'),
            job.get('user_id'),
            job.get('session_id', ''),
            job.get('description'),
            pprint.pformat(job.get('job_actions')),
            job.get('job_schedule', {}).get('schedule_start_date', ''),
            job.get('job_schedule', {}).get('schedule_end_date', ''),
            job.get('job_schedule', {}).get('schedule_interval', ''),
        )
        return column, data
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def take_action(self, parsed_args):
        backup = self.app.client.backups.get(parsed_args.backup_uuid)
        if not backup:
            raise exceptions.ApiClientException('Backup not found')

        column = (
            'Backup ID',
            'Metadata'
        )
        data = (
            backup.get('backup_uuid'),
            pprint.pformat(backup.get('backup_metadata'))
        )
        return column, data
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def take_action(self, parsed_args):
        job = self.app.client.jobs.get(parsed_args.job_id)

        if not job:
            raise exceptions.ApiClientException('Job not found')

        column = (
            'Job ID',
            'Client ID',
            'User ID',
            'Session ID',
            'Description',
            'Actions',
            'Start Date',
            'End Date',
            'Interval',
        )
        data = (
            job.get('job_id'),
            job.get('client_id'),
            job.get('user_id'),
            job.get('session_id', ''),
            job.get('description'),
            pprint.pformat(job.get('job_actions')),
            job.get('job_schedule', {}).get('schedule_start_date', ''),
            job.get('job_schedule', {}).get('schedule_end_date', ''),
            job.get('job_schedule', {}).get('schedule_interval', ''),
        )
        return column, data
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def create_app(self):
        """Send a POST to spinnaker to create a new application with class variables.

        Raises:
            AssertionError: Application creation failed.
        """
        self.appinfo['accounts'] = self.get_accounts()
        self.log.debug('Pipeline Config\n%s', pformat(self.pipeline_config))
        self.log.debug('App info:\n%s', pformat(self.appinfo))
        jsondata = self.retrieve_template()
        wait_for_task(jsondata)

        self.log.info("Successfully created %s application", self.appname)
        return
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def retrieve_template(self):
        """Sets the instance links with pipeline_configs and then renders template files

        Returns:
            jsondata: A json objects containing templates
        """
        links = self.retrieve_instance_links()
        self.log.debug('Links is \n%s', pformat(links))
        self.pipeline_config['instance_links'].update(links)
        jsondata = get_template(template_file='infrastructure/app_data.json.j2',
                                appinfo=self.appinfo, pipeline_config=self.pipeline_config)
        self.log.debug('jsondata is %s', pformat(jsondata))
        return jsondata
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def create_elb(self):
        """Create or Update the ELB after rendering JSON data from configs.
        Asserts that the ELB task was successful.
        """

        json_data = self.make_elb_json()
        LOG.debug('Block ELB JSON Data:\n%s', pformat(json_data))

        wait_for_task(json_data)

        self.add_listener_policy(json_data)
        self.add_backend_policy(json_data)

        self.configure_attributes(json_data)
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def render_wrapper(self, region='us-east-1'):
        """Generate the base Pipeline wrapper.

        This renders the non-repeatable stages in a pipeline, like jenkins, baking, tagging and notifications.

        Args:
            region (str): AWS Region.

        Returns:
            dict: Rendered Pipeline wrapper.
        """
        base = self.settings['pipeline']['base']

        if self.base:
            base = self.base

        email = self.settings['pipeline']['notifications']['email']
        slack = self.settings['pipeline']['notifications']['slack']
        deploy_type = self.settings['pipeline']['type']
        pipeline_id = self.compare_with_existing(region=region)

        data = {
            'app': {
                'appname': self.app_name,
                'base': base,
                'deploy_type': deploy_type,
                'environment': 'packaging',
                'region': region,
                'triggerjob': self.trigger_job,
                'email': email,
                'slack': slack,
            },
            'id': pipeline_id
        }

        self.log.debug('Wrapper app data:\n%s', pformat(data))

        wrapper = get_template(template_file='pipeline/pipeline_wrapper.json.j2', data=data)

        return json.loads(wrapper)
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def render_wrapper(self, region='us-east-1'):
        """Generate the base Pipeline wrapper.

        This renders the non-repeatable stages in a pipeline, like jenkins, baking, tagging and notifications.

        Args:
            region (str): AWS Region.

        Returns:
            dict: Rendered Pipeline wrapper.
        """
        base = self.settings['pipeline']['base']

        if self.base:
            base = self.base

        email = self.settings['pipeline']['notifications']['email']
        slack = self.settings['pipeline']['notifications']['slack']
        deploy_type = self.settings['pipeline']['type']
        pipeline_id = self.compare_with_existing(region=region)

        data = {
            'app': {
                'appname': self.app_name,
                'base': base,
                'deploy_type': deploy_type,
                'environment': 'packaging',
                'region': region,
                'triggerjob': self.trigger_job,
                'email': email,
                'slack': slack,
            },
            'id': pipeline_id
        }

        self.log.debug('Wrapper app data:\n%s', pformat(data))

        wrapper = get_template(template_file='pipeline/pipeline_wrapper.json.j2', data=data)

        return json.loads(wrapper)
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def render_wrapper(self, region='us-east-1'):
        """Generate the base Pipeline wrapper.

        This renders the non-repeatable stages in a pipeline, like jenkins, baking, tagging and notifications.

        Args:
            region (str): AWS Region.

        Returns:
            dict: Rendered Pipeline wrapper.
        """
        base = self.base or self.settings['pipeline']['base']

        email = self.settings['pipeline']['notifications']['email']
        slack = self.settings['pipeline']['notifications']['slack']
        deploy_type = self.settings['pipeline']['type']
        pipeline_id = self.compare_with_existing(region=region)

        data = {
            'app': {
                'appname': self.app_name,
                'base': base,
                'deploy_type': deploy_type,
                'environment': 'packaging',
                'region': region,
                'triggerjob': self.trigger_job,
                'email': email,
                'slack': slack,
            },
            'id': pipeline_id
        }

        self.log.debug('Wrapper app data:\n%s', pformat(data))

        wrapper = get_template(template_file='pipeline/pipeline_wrapper.json.j2', data=data)

        return json.loads(wrapper)
项目:gocd-dashboard    作者:datasift    | 项目源码 | 文件源码
def debug(*objects):
    flask.current_app.logger.debug('\n'.join(map(pprint.pformat, objects)))