Python os.environ 模块,get() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用os.environ.get()

项目:docker-mongodb    作者:washingtonpost    | 项目源码 | 文件源码
def configure_options(self):
        if not environ.get('CLUSTER_NAME'):
            self.logger.fatal('CLUSTER_NAME environment variable is required')
            sys.exit(1)

        if not environ.get('MONGODB_DEVICE_NAME'):
            self.logger.fatal('MONGODB_DEVICE_NAME environment variable is required')
            sys.exit(1)

        self.cluster_name = environ['CLUSTER_NAME']
        self.device_name = environ['MONGODB_DEVICE_NAME']
        self.snapshot_and_exit = environ.get('MONGODB_SNAPSHOT_AND_EXIT', 'false').lower() == 'true'
        self.minutely_snapshots = int(environ.get('MONGODB_MINUTELY_SNAPSHOTS', '360'))
        self.hourly_snapshots = int(environ.get('MONGODB_HOURLY_SNAPSHOTS', '24'))
        self.daily_snapshots = int(environ.get('MONGODB_DAILY_SNAPSHOTS', '30'))
        self.instance_id = self.get_instance_id()
        self.statsd = DogStatsd(host='172.17.0.1', port=8125)
        self.snapshot_frequency = self.get_snapshot_frequency()
项目:benzoin    作者:mpilar    | 项目源码 | 文件源码
def wait_for_snapshot_validation(event, context):
    """
    Gets the snapshots involved in the step function and checks for success
    of the MongoDB validator script.
    """
    session = boto3.session.Session(region_name=event.get("region"))

    ec2 = session.resource("ec2")
    snapshot_ids = [vol["snapshot-id"] for vol in event.get("backup-volumes")]
    for sid in snapshot_ids:
        tags = ec2.Snapshot(sid).tags
        if not tags:
            break
        for tag in tags:
            if tag.get("Key") == VALIDATION_TAG_KEY:
                if tag.get("Success") == "false":
                    raise Exception("Validation failed!")
                else:
                    return event
    raise NotReady("Snapshot not validated yet.")
项目:Gnome-Authenticator    作者:bil-elmoussaoui    | 项目源码 | 文件源码
def read_database(self):
        """
            Read .yml database files provided by 2factorauth guys!
        """
        db_dir = path.join(env.get("DATA_DIR"), "applications") + "/data/*.yml"
        logging.debug("Database folder is {0}".format(db_dir))
        db_files = glob(db_dir)
        logging.debug("Reading database files started")
        for db_file in db_files:
            logging.debug("Reading database file {0}".format(db_file))
            with open(db_file, 'r') as data:
                try:
                    websites = yaml.load(data)["websites"]
                    for app in websites:
                        if self.is_valid_app(app):
                            self.db.append(app)
                except yaml.YAMLError as error:
                    logging.error("Error loading yml file {0} : {1}".format(
                        db_file, str(error)))
                except TypeError:
                    logging.error("Not a valid yml file {0}".format(db_file))
        logging.debug("Reading database files finished")
项目:Gnome-Authenticator    作者:bil-elmoussaoui    | 项目源码 | 文件源码
def get_icon(image, size):
    """
        Generate a GdkPixbuf image
        :param image: icon name or image path
        :return: GdkPixbux Image
    """
    directory = path.join(env.get("DATA_DIR"), "applications", "images") + "/"
    theme = Gtk.IconTheme.get_default()
    if theme.has_icon(path.splitext(image)[0]):
        icon = theme.load_icon(path.splitext(image)[0], size, 0)
    elif path.exists(directory + image):
        icon = GdkPixbuf.Pixbuf.new_from_file(directory + image)
    elif path.exists(image):
        icon = GdkPixbuf.Pixbuf.new_from_file(image)
    else:
        icon = theme.load_icon("image-missing", size, 0)
    if icon.get_width() != size or icon.get_height() != size:
        icon = icon.scale_simple(size, size, GdkPixbuf.InterpType.BILINEAR)
    return icon
项目:embeddings    作者:vzhong    | 项目源码 | 文件源码
def download_file(url, local_filename):
        """
        Downloads a file from an url to a local file.

        Args:
            url (str): url to download from.
            local_filename (str): local file to download to.

        Returns:
            str: file name of the downloaded file.

        """
        r = requests.get(url, stream=True)
        if path.dirname(local_filename) and not path.isdir(path.dirname(local_filename)):
            raise Exception(local_filename)
            makedirs(path.dirname(local_filename))
        with open(local_filename, 'wb') as f:
            for chunk in r.iter_content(chunk_size=1024):
                if chunk:
                    f.write(chunk)
        return local_filename
项目:django-dev-protector    作者:ElusiveSpirit    | 项目源码 | 文件源码
def process_request(self, request):
        url = match(r'^/django_dev_protector/$', request.path)
        if url and request.method == 'POST':
            import json
            data = json.loads(request.body.decode('utf-8'))
            if data['key'] == settings.SECRET_KEY:
                from .setup import save_status
                environ[PROTECT_STATUS_VARIABLE] = str(data['status'])
                save_status(data['status'])
                return redirect('/')

        if environ.get(PROTECT_STATUS_VARIABLE) == 'True':
            from django.shortcuts import render
            return render(request, TEMPLATE_NAME, {
                'redirect_url': REDIRECT_URL
            })
项目:hardmob_promo    作者:leandrotoledo    | 项目源码 | 文件源码
def main():
    bot = Bot(environ.get('TOKEN'))
    chat_id = environ.get('CHAT_ID')

    post.updatedb()

    new_posts = post.get_posts(sent_only=False)

    if new_posts:
        for p in new_posts:
            try:
                bot.sendMessage(chat_id=chat_id, text=p.text(), parse_mode='HTML')
                post.mark_as_sent(p.uid)
            except BadRequest:
                logging.info('Bad post formatting: %d' % p.uid)

        logger.info('%d new post(s) have been sent!' % len(new_posts))
    else:
        logger.info('No new posts at this time!')
项目:hardmob_promo    作者:leandrotoledo    | 项目源码 | 文件源码
def fetch():
        url = environ.get('URL')
        html = requests.get(url).content
        soup = BeautifulSoup(html, 'html.parser')

        posts = list()

        for link in soup.select('#threads a.title'):
            post = dict()

            post['title'] = link.text
            post['href'] = link.get('href')
            post['uid'] = int(post['href'].replace(url, '')[:6])  # TODO

            posts.append(post)

        return posts
项目:django-iot    作者:aschn    | 项目源码 | 文件源码
def get_attributes(device_pk):
    """Get dict of numerical attributes"""
    # make request
    selector = 'id:%s' % Device.objects.get(pk=device_pk).manufacturer_id
    url = BASE_URL + selector
    data = make_request(url)[0]

    # assemble result
    result = {
        'brightness': data['brightness'],
        'hue': data['color']['hue'],
        'saturation': data['color']['saturation'],
        'kelvin': data['color']['kelvin'],
    }

    # return
    return result
项目:cfdilib    作者:Vauxoo    | 项目源码 | 文件源码
def test_008_s3(self):
        """Cache on amazon is working properly"""
        if not environ.get('TRAVIS') == 'true':
            # This set of tests can be run just locally with properly set
            # amazon credentials, this will synchronize the xsd files
            # (and others) it does not make sense in travis.
            url = self.test_url_plain
            url_on_s3 = self.s3_url_plain
            new_url = tools.cache_s3(url, self.test_plain)
            self.assertEqual(new_url, url_on_s3,
                             'The url on amazon was not setted properly '
                             'got %s' % new_url)
            new_url = tools.cache_s3(url_on_s3, self.test_plain)
            self.assertEqual(new_url, url_on_s3,
                             'The url on amazon wired not return what I expected'
                             ' properly got %s' % new_url)

            check_s3 = tools.check_s3('NOTEXISTSORNOTPROPERACL', 'url/no/exists')
            self.assertFalse(check_s3, 'checking a non existing bucket fails')
            check_s3 = tools.check_s3('s3.vauxoo.com', 'url/no/exists')
            self.assertFalse(check_s3, 'checking a non existing element fails')
项目:http-observatory    作者:mozilla    | 项目源码 | 文件源码
def __conf(section, param, type=None):
    try:
        if type == str or type is None:
            return _config_parser.get(section, param)
        elif type == int:
            return _config_parser.getint(section, param)
        elif type == bool:
            return _config_parser.getboolean(section, param)
        elif type == float:
            return _config_parser.getfloat(section, param)
        else:
            return None
    except (KeyError, configparser.NoSectionError):
        return None
    except:
        print('Error with key {0} in section {1}'.format(param, section))
        sys.exit(1)
项目:roslaunch_to_dot    作者:bponsler    | 项目源码 | 文件源码
def __parseArg(self, arg):
        '''Parse the given arg element to get (and resolve) its name
        and value.

        * arg -- the arg element

        '''
        name = self.__getAttribute(arg, self.NameAttribute)

        # Grab the default and standard value
        default = arg.attrib.get(self.DefaultAttribute, None)
        value = arg.attrib.get(self.ValueAttribute, default)

        # Any of these attributes may have substitution arguments
        # that need to be resolved
        name = self.__resolveText(name)

        # Only resolve the value if it is defined
        if value is not None:
            value = self.__resolveText(value)

        return name, value

    ##### ROS launch substitution argument related functions
项目:roslaunch_to_dot    作者:bponsler    | 项目源码 | 文件源码
def __onEnvSubstitutionArg(self, env):
        '''Handle the ROS launch 'env' or 'optenv' substitution argument which
        aims to substitute the value of an environment variable inside of
        some text.

        * package -- the package to find

        '''
        # Determine if a default value was supplied
        parts = env.split(" ")
        if len(parts) == 1:
            #### No default value was supplied
            if parts[0] not in environ:
                raise Exception(
                    "Could not find environment variable: '%s'" % env)
            return environ[env]
        else:
            #### A default value was supplied
            env, default = parts
            return environ.get(env, default)
项目:yt    作者:yt-project    | 项目源码 | 文件源码
def _environ_cols_linux(fp):  # pragma: no cover

    # import os
    # if fp is None:
    #     try:
    #         fp = os.open(os.ctermid(), os.O_RDONLY)
    #     except:
    #         pass
    try:
        from termios import TIOCGWINSZ
        from fcntl import ioctl
        from array import array
    except ImportError:
        return None
    else:
        try:
            return array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[1]
        except:
            try:
                from os.environ import get
            except ImportError:
                return None
            else:
                return int(get('COLUMNS', 1)) - 1
项目:orangecloud-client    作者:antechrestos    | 项目源码 | 文件源码
def __init__(self, client_id, client_secret, redirect_uri):
        proxies = dict(http=environ.get('HTTP_PROXY', ''), https=environ.get('HTTPS_PROXY', ''))
        # some certificates such as netatmo are invalid
        super(ApiManager, self).__init__(
            ServiceInformation(
                '%s/oauth/v2/authorize' % URL_API,
                '%s/oauth/v2/token' % URL_API,
                client_id=client_id,
                client_secret=client_secret,
                scopes=ApiManager.SCOPES,
                skip_ssl_verifications=False),
            proxies)
        self.folders = Folders(self)
        self.freespace = Freespace(self)
        self.files = Files(self)
        self.redirect_uri = redirect_uri
项目:cuckoodroid-2.0    作者:idanr1986    | 项目源码 | 文件源码
def _extract(self, filename, password):
        archive_path = _prepare_archive_at_path(filename)
        if not archive_path:
            return None
        # Extraction.
        extract_path = environ.get("TEMP", "/tmp")
        with ZipFile(archive_path, "r") as archive:
            try:
                archive.extractall(path=extract_path, pwd=password)
            except BadZipfile:
                raise Exception("Invalid Zip file")
            # Try to extract it again, but with a default password
            except RuntimeError:
                try:
                    archive.extractall(path=extract_path, pwd="infected")
                except RuntimeError as err:
                    raise Exception("Unable to extract Zip file: %s" % err)
            finally:
                self._extract_nested_archives(archive, extract_path, password)
        return archive.namelist()
项目:experiments-viewer    作者:mozilla    | 项目源码 | 文件源码
def get_database_connection():
    global logger

    s3 = boto3.resource('s3')
    metasrcs = ujson.load(
        s3.Object('net-mozaws-prod-us-west-2-pipeline-metadata',
                  'sources.json').get()['Body'])
    creds = ujson.load(
        s3.Object(
            'net-mozaws-prod-us-west-2-pipeline-metadata',
            '%s/write/credentials.json'
            % metasrcs[DB]['metadata_prefix']
        ).get()['Body'])
    conn = psycopg2.connect(connection_factory=LoggingConnection,
                            host=creds['host'], port=creds.get('port', 5432),
                            user=creds['username'], password=creds['password'],
                            dbname=creds['db_name'])
    conn.initialize(logger)
    return conn
项目:python-trustpilot    作者:trustpilot    | 项目源码 | 文件源码
def setup(self, api_host=None, api_key=None, api_secret=None,
              access_token=None, token_issuer_path=None,
              token_issuer_host=None, **kwargs):

        self.api_host = api_host or environ.get('TRUSTPILOT_API_HOST', 'https://api.trustpilot.com')
        self.token_issuer_host = token_issuer_host or self.api_host
        self.access_token = access_token
        self.token_issuer_path = token_issuer_path or environ.get(
            'TRUSTPILOT_API_TOKEN_ISSUER_PATH', "oauth/system-users/token")
        self.hooks = dict()

        if not self.api_host.startswith("http"):
            raise requests.URLRequired(
                "'{}' is not a valid api_host url".format(api_host))

        try:
            self.api_key=api_key or environ['TRUSTPILOT_API_KEY']
            self.api_secret=api_secret or environ.get('TRUSTPILOT_API_SECRET', '')
            self.access_token=access_token
            self.hooks['response'] = self._post_request_callback
        except KeyError as e:
            logging.debug("Not auth setup, missing env-var or setup for {}".format(e))

        return self
项目:labgrid    作者:labgrid-project    | 项目源码 | 文件源码
def set_resource(self, groupname, resourcename, resource, details=None):
        session = self.sessions.get(details.caller)
        if session is None:
            return
        assert isinstance(session, ExporterSession)

        groupname = str(groupname)
        resourcename = str(resourcename)
        # TODO check if acquired
        print(details)
        pprint(resource)
        action, resource_path = session.set_resource(groupname, resourcename, resource)
        if action is Action.ADD:
            self._add_default_place(groupname)
        yield from self._update_acquired_places(action, resource_path)
        self.save_later()
项目:postsai    作者:postsai    | 项目源码 | 文件源码
def connect(self):
        """connects to the database"""

        self.conn = mdb.connect(
            host    = self.config["db"]["host"],
            user    = self.config["db"]["user"],
            passwd  = self.config["db"]["password"],
            db      = self.config["db"]["database"],
            port    = self.config["db"].get("port", 3306),
            use_unicode = True,
            charset = "utf8")

        # checks whether this is a ViewVC database instead of a Bonsai database
        cursor = self.conn.cursor()
        cursor.execute("show tables like 'commits'")
        self.is_viewvc_database = (cursor.rowcount == 1)
        cursor.execute("SET SESSION innodb_lock_wait_timeout = 500")
        cursor.close()
        self.conn.begin()
项目:postsai    作者:postsai    | 项目源码 | 文件源码
def extra_data_for_key_tables(self, cursor, column, row, value):
        """provides additional data that should be stored in lookup tables"""

        extra_column = ""
        extra_data = ""
        data = [value]
        if column == "description":
            extra_column = ", hash"
            extra_data = ", %s"
            data.append(len(value))
        elif column == "repository":
            extra_column = ", base_url, repository_url, file_url, commit_url, tracker_url, icon_url"
            extra_data = ", %s, %s, %s, %s, %s, %s"
            data.extend(self.call_setup_repository(row, self.guess_repository_urls(row)))
        elif column == "hash":
            extra_column = ", authorid, committerid, co_when"
            extra_data = ", %s, %s, %s"
            self.fill_id_cache(cursor, "who", row, row["author"])
            self.fill_id_cache(cursor, "who", row, row["committer"])
            data.extend((self.cache.get("who", row["author"]),
                         self.cache.get("who", row["committer"]),
                         row["co_when"]))

        return data, extra_column, extra_data
项目:SFT    作者:xumi1993    | 项目源码 | 文件源码
def __getattr__(self, attr):
        """Return a terminal capability, like bold.

        For example, you can say ``term.bold`` to get the string that turns on
        bold formatting and ``term.normal`` to get the string that turns it off
        again. Or you can take a shortcut: ``term.bold('hi')`` bolds its
        argument and sets everything to normal afterward. You can even combine
        things: ``term.bold_underline_red_on_bright_green('yowzers!')``.

        For a parametrized capability like ``cup``, pass the parameters too:
        ``some_term.cup(line, column)``.

        ``man terminfo`` for a complete list of capabilities.

        Return values are always Unicode.

        """
        resolution = (self._resolve_formatter(attr) if self.does_styling
                      else NullCallableString())
        setattr(self, attr, resolution)  # Cache capability codes.
        return resolution
项目:SFT    作者:xumi1993    | 项目源码 | 文件源码
def _height_and_width(self):
        """Return a tuple of (terminal height, terminal width).

        Start by trying TIOCGWINSZ (Terminal I/O-Control: Get Window Size),
        falling back to environment variables (LINES, COLUMNS), and returning
        (None, None) if those are unavailable or invalid.

        """
        # tigetnum('lines') and tigetnum('cols') update only if we call
        # setupterm() again.
        for descriptor in self._init_descriptor, sys.__stdout__:
            try:
                return struct.unpack(
                        'hhhh', ioctl(descriptor, TIOCGWINSZ, '\000' * 8))[0:2]
            except IOError:
                # when the output stream or init descriptor is not a tty, such
                # as when when stdout is piped to another program, fe. tee(1),
                # these ioctls will raise IOError
                pass
        try:
            return int(environ.get('LINES')), int(environ.get('COLUMNS'))
        except TypeError:
            return None, None
项目:SFT    作者:xumi1993    | 项目源码 | 文件源码
def __getattr__(self, attr):
        """Return a terminal capability, like bold.

        For example, you can say ``term.bold`` to get the string that turns on
        bold formatting and ``term.normal`` to get the string that turns it off
        again. Or you can take a shortcut: ``term.bold('hi')`` bolds its
        argument and sets everything to normal afterward. You can even combine
        things: ``term.bold_underline_red_on_bright_green('yowzers!')``.

        For a parametrized capability like ``cup``, pass the parameters too:
        ``some_term.cup(line, column)``.

        ``man terminfo`` for a complete list of capabilities.

        Return values are always Unicode.

        """
        resolution = (self._resolve_formatter(attr) if self.does_styling
                      else NullCallableString())
        setattr(self, attr, resolution)  # Cache capability codes.
        return resolution
项目:SFT    作者:xumi1993    | 项目源码 | 文件源码
def _height_and_width(self):
        """Return a tuple of (terminal height, terminal width).

        Start by trying TIOCGWINSZ (Terminal I/O-Control: Get Window Size),
        falling back to environment variables (LINES, COLUMNS), and returning
        (None, None) if those are unavailable or invalid.

        """
        # tigetnum('lines') and tigetnum('cols') update only if we call
        # setupterm() again.
        for descriptor in self._init_descriptor, sys.__stdout__:
            try:
                return struct.unpack(
                        'hhhh', ioctl(descriptor, TIOCGWINSZ, '\000' * 8))[0:2]
            except IOError:
                # when the output stream or init descriptor is not a tty, such
                # as when when stdout is piped to another program, fe. tee(1),
                # these ioctls will raise IOError
                pass
        try:
            return int(environ.get('LINES')), int(environ.get('COLUMNS'))
        except TypeError:
            return None, None
项目:crisscross    作者:bhaveshAn    | 项目源码 | 文件源码
def _get_uid(self):
        old_lang = environ.get('LANG')
        environ['LANG'] = 'C'

        ioreg_process = Popen(["ioreg", "-l"], stdout=PIPE)
        grep_process = Popen(["grep", "IOPlatformSerialNumber"],
            stdin=ioreg_process.stdout, stdout=PIPE)
        ioreg_process.stdout.close()
        output = grep_process.communicate()[0]

        if old_lang is None:
            environ.pop('LANG')
        else:
            environ['LANG'] = old_lang

        if output:
            return output.split()[3][1:-1]
        else:
            return None
项目:crisscross    作者:bhaveshAn    | 项目源码 | 文件源码
def _get_platform(self):

        if self._platform_android is None:
            # ANDROID_ARGUMENT and ANDROID_PRIVATE are 2 environment variables
            # from python-for-android project
            self._platform_android = 'ANDROID_ARGUMENT' in environ

        if self._platform_ios is None:
            self._platform_ios = (environ.get('KIVY_BUILD', '') == 'ios')

        # On android, _sys_platform return 'linux2', so prefer to check the
        # import of Android module than trying to rely on _sys_platform.
        if self._platform_android is True:
            return 'android'
        elif self._platform_ios is True:
            return 'ios'
        elif _sys_platform in ('win32', 'cygwin'):
            return 'win'
        elif _sys_platform == 'darwin':
            return 'macosx'
        elif _sys_platform[:5] == 'linux':
            return 'linux'
        return 'unknown'
项目:senic.cryptoyaml    作者:getsenic    | 项目源码 | 文件源码
def get_key(key=None, keyfile=None):
    """ returns a key given either its value, a path to it on the filesystem
    or as last resort it checks the environment variable CRYPTOYAML_SECRET
    """
    if key is None:
        if keyfile is None:
            key = environ.get('CRYPTOYAML_SECRET')
            if key is None:
                raise MissingKeyException(
                    '''You must either provide a key value,'''
                    ''' a path to a key or its value via the environment variable '''
                    ''' CRYPTOYAML_SECRET'''
                )
            else:
                key = key.encode('utf-8')
        else:
            key = open(keyfile, 'rb').read()
    return key
项目:electrumx    作者:kyuupichan    | 项目源码 | 文件源码
def main():
    '''Send the RPC command to the server and print the result.'''
    parser = argparse.ArgumentParser('Send electrumx an RPC command')
    parser.add_argument('-p', '--port', metavar='port_num', type=int,
                        help='RPC port number')
    parser.add_argument('command', nargs=1, default=[],
                        help='command to send')
    parser.add_argument('param', nargs='*', default=[],
                        help='params to send')
    args = parser.parse_args()

    port = args.port
    if port is None:
        port = int(environ.get('RPC_PORT', 8000))

    # Get the RPC request.
    method = args.command[0]
    params = args.param
    if method in ('log', 'disconnect'):
        params = [params]

    rpc_send_and_wait(port, method, params)
项目:fabric8-analytics-worker    作者:fabric8-analytics    | 项目源码 | 文件源码
def _make_postgres_string(password):
        """Create postgres connection string.

        It's parametrized, so it's possible to
        create either quoted or unquoted version of connection string.
        Note that it's outside of class since there is no simple way how to call it inside the class
        without class initialization.
        :param password: password which will be embedded into Postgres connection string
        :return: fully working postgres connection string
        """
        connection = 'postgresql://{user}:{password}@{pgbouncer_host}:{pgbouncer_port}' \
                     '/{database}?sslmode=disable'. \
            format(user=environ.get('POSTGRESQL_USER'),
                   password=password,
                   pgbouncer_host=environ.get('PGBOUNCER_SERVICE_HOST', 'coreapi-pgbouncer'),
                   pgbouncer_port=environ.get('PGBOUNCER_SERVICE_PORT', '5432'),
                   database=environ.get('POSTGRESQL_DATABASE'))
        return connection
项目:dwinelle-tools    作者:dkess    | 项目源码 | 文件源码
def get(self, number):
        '''Gets an entry, or return None if the entry does not exist.'''
        # Do a binary search of the interval tree
        bmin = 0
        bmax = len(self.intervals) - 1
        while True:
            if bmax < bmin:
                return None
            index = (bmax + bmin) // 2
            on = self.intervals[index]
            if number < on[0][0]:
                bmax = index - 1
            elif number >= on[0][1]:
                bmin = index + 1
            else:
                return on[1]
项目:dwinelle-tools    作者:dkess    | 项目源码 | 文件源码
def get_logentry_at_time(*args):
    '''
    If one argument is provided, get the LogEntry corresponding to the given
    unix time (in milliseconds).  If two arguments are provided, get the 
    LogEntry at clipid (first argument) and time in that clip in milliseconds
    (second argument).
    '''

    if len(args) == 1:
        unixtime = args[0]
    else:
        unixtime = timestamp_from_video(*args)

    for logentry in get_logentries():
        if logentry.starttime <= unixtime < logentry.endtime:
            return logentry
项目:git-fame    作者:casperdcl    | 项目源码 | 文件源码
def _environ_cols_linux(fp):  # pragma: no cover

  # import os
  # if fp is None:
  #   try:
  #     fp = os.open(os.ctermid(), os.O_RDONLY)
  #   except:
  #     pass
  try:
    from termios import TIOCGWINSZ
    from fcntl import ioctl
    from array import array
  except ImportError:
    return None
  else:
    try:
      return array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[1]
    except:
      try:
        from os.environ import get
      except ImportError:
        return None
      else:
        return int(get('COLUMNS', 1)) - 1
项目:ramp-workflow    作者:paris-saclay-cds    | 项目源码 | 文件源码
def get_data_home(ramp_kits_home=None):
    """Return the path of the ramp-kits data dir.

    This folder is used to fetch the up-to-date ramp-kits

    By default the data dir is set to a folder named 'ramp-kits'
    in the user home folder.

    Alternatively, it can be set by the 'RAMP-KITS' environment
    variable or programmatically by giving an explicit folder path. The
    '~' symbol is expanded to the user home folder.

    If the folder does not already exist, it is automatically created.

    """
    if ramp_kits_home is None:
        ramp_kits_home = environ.get('RAMP-KITS',
                                     join('~', 'ramp-kits'))
        ramp_kits_home = expanduser(ramp_kits_home)
    if not exists(ramp_kits_home):
        makedirs(ramp_kits_home)

    return ramp_kits_home
项目:minihydra    作者:VillanCh    | 项目源码 | 文件源码
def __getattr__(self, attr):
        """Return a terminal capability, like bold.

        For example, you can say ``term.bold`` to get the string that turns on
        bold formatting and ``term.normal`` to get the string that turns it off
        again. Or you can take a shortcut: ``term.bold('hi')`` bolds its
        argument and sets everything to normal afterward. You can even combine
        things: ``term.bold_underline_red_on_bright_green('yowzers!')``.

        For a parametrized capability like ``cup``, pass the parameters too:
        ``some_term.cup(line, column)``.

        ``man terminfo`` for a complete list of capabilities.

        Return values are always Unicode.

        """
        resolution = (self._resolve_formatter(attr) if self.does_styling
                      else NullCallableString())
        setattr(self, attr, resolution)  # Cache capability codes.
        return resolution
项目:minihydra    作者:VillanCh    | 项目源码 | 文件源码
def _height_and_width(self):
        """Return a tuple of (terminal height, terminal width).

        Start by trying TIOCGWINSZ (Terminal I/O-Control: Get Window Size),
        falling back to environment variables (LINES, COLUMNS), and returning
        (None, None) if those are unavailable or invalid.

        """
        # tigetnum('lines') and tigetnum('cols') update only if we call
        # setupterm() again.
        for descriptor in self._init_descriptor, sys.__stdout__:
            try:
                return struct.unpack(
                        'hhhh', ioctl(descriptor, TIOCGWINSZ, '\000' * 8))[0:2]
            except IOError:
                # when the output stream or init descriptor is not a tty, such
                # as when when stdout is piped to another program, fe. tee(1),
                # these ioctls will raise IOError
                pass
        try:
            return int(environ.get('LINES')), int(environ.get('COLUMNS'))
        except TypeError:
            return None, None
项目:dynamodb-py    作者:gusibi    | 项目源码 | 文件源码
def getDynamoDBConnection(self, config=None, endpoint=None, port=None,
                              local=False, use_instance_metadata=False):
        if not config:
            config = {'region_name': 'us-west-2'}
        params = {
            'region_name': config.get('region_name', 'cn-north-1')
        }
        if local:
            endpoint_url = 'http://{endpoint}:{port}'.format(endpoint=endpoint,
                                                             port=port)
            params['endpoint_url'] = endpoint_url
            db = boto3.resource('dynamodb', **params)
        else:
            if not config or not isinstance(config, dict):
                raise ParameterException("Invalid config")
            params.update(config)
            db = boto3.resource('dynamodb', **params)
        return db
项目:dynamodb-py    作者:gusibi    | 项目源码 | 文件源码
def update_item_by_set():
    Test.create(realname='gs01', score=100, order_score=99.99, date_created=now)
    item = Test.get(realname='gs01', score=100)
    item.update(Test.order_score.set(80))
    print 'set'
    assert item.order_score == 80
    item.update(Test.order_score.set(78.7, attr_label='os'))
    print 'set with attr_label'
    assert item.order_score == 78.7
    item.update(Test.order_score.set(78.7, if_not_exists='order_score'))
    print 'set with if_not_exists'
    assert item.order_score == 78.7
    item.update(Test.order_score.set(10, if_not_exists='ids[0]'))
    assert item.order_score == 10
    print 'ids', item.ids, type(item.ids)
    item.update(ids=[12])
    print 'ids', item.ids, type(item.ids)
    item.update(Test.ids.set([100], list_append=('ids', -1)))
    print 'set with list_append'
    assert item.ids[-1] == 100
    item.update(Test.order_score.set(78.7, attr_label='os'),
                doc={'a': 'bbb'})
    print 'set with attr_label and upate_field'
    assert item.doc['a'] == 'bbb'
项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def rrid_resolver_xml(exact, found_rrids):
    print('\t' + exact)
    resolver_uri = 'https://scicrunch.org/resolver/%s.xml' % exact
    r = requests.get(resolver_uri)
    status_code = r.status_code
    xml = r.content
    print(status_code)
    found_rrids[exact] = status_code
    return xml, status_code, resolver_uri
项目:benzoin    作者:mpilar    | 项目源码 | 文件源码
def initialize_snapshot_tags(event, context):
    """
    Event comes in with snapshot info, make sure we"re working with a snapshot
    we"re supposed to and wait for it to be done snapshotting.
    """
    session = boto3.session.Session(region_name=event.get("region"))

    ec2 = session.resource("ec2")

    snapshot_ids = [vol["snapshot-id"] for vol in event.get("backup-volumes")]
    for sid in snapshot_ids:
        if ec2.Snapshot(sid).state != "completed":
            raise NotReady("Snapshot not ready")

    return event
项目:benzoin    作者:mpilar    | 项目源码 | 文件源码
def terminate_instances(event, context):
    """ Terminate instance identified in the event """
    session = boto3.session.Session(region_name=event.get("region"))

    ec2 = session.resource("ec2")
    ec2.Instance(event["instance-id"]).terminate()
    return event
项目:benzoin    作者:mpilar    | 项目源码 | 文件源码
def are_snapshots_tagged(region, snapshot_ids):
    """Check the current snapshot tags to prevent multiple snapshots."""
    session = boto3.session.Session(region_name=region)
    ec2 = session.resource('ec2')

    for sid in snapshot_ids:
        snapshot = ec2.Snapshot(sid)
        current_tags = snapshot.tags
        if not current_tags:
            continue
        for tag in current_tags:
            if tag.get("Key") == VALIDATION_TAG_KEY:
                return True
    return False
项目:MercrediFiction    作者:Meewan    | 项目源码 | 文件源码
def index_page():
    if request.args.get('epub'):
        return create_epub()
    else:
        return create_page()
项目:MercrediFiction    作者:Meewan    | 项目源码 | 文件源码
def create_page():
    page = int(request.args.get("page", "0"))
    limit = int(request.args.get("limit", "10"))
    offset = page * limit

    toots, count, count_all = get_toots(offset, limit)

    accounts = Account.query.order_by(Account.username)
    instances = Instance.query.order_by(Instance.domain)
    blacklist_status = True if request.args.get('blacklisted', None) else False

    if request.args.get('blacklisted') != 'ignore':
        accounts = accounts.filter(Account.blacklisted == blacklist_status)
        instances = instances.filter(Instance.blacklisted == blacklist_status)

    pagination = {'page': page + 1,
                  'limit': limit}
    pagination['next'] = "/?page=%s&" % (page + 1)
    pagination['previous'] = "/?page=%s&" % (page - 1)
    for key, value in request.args.iteritems():
        if not key == "page":
            pagination['next'] += "&%s=%s" % (key, value)
            pagination['previous'] += "&%s=%s" % (key, value)

    if count < limit:
        pagination.pop('next')
    if page == 0:
        pagination.pop('previous')

    pagination['page_count'] = int(count_all / limit) + 1

    return render_template('index.html',
                           toots=toots,
                           accounts=accounts,
                           instances=instances,
                           pagination=pagination)
项目:boss    作者:kabirbaidhya    | 项目源码 | 文件源码
def is_ci():
    ''' Check if boss is running in a Continuous Integration (CI) environment. '''

    return bool(
        env.get('BOSS_RUNNING') == 'true' and (
            (env.get('CI') == 'true') or
            (env.get('CONTINUOUS_INTEGRATION') == 'true')
        )
    )
项目:boss    作者:kabirbaidhya    | 项目源码 | 文件源码
def is_travis():
    ''' Check if boss is running under Travis CI. '''
    return is_ci() and env.get('TRAVIS') == 'true'
项目:boss    作者:kabirbaidhya    | 项目源码 | 文件源码
def get_ci_link(config):
    ''' Get CI build link for the current build deployment. '''
    if is_travis():
        base_url = config['ci']['base_url'].rstrip('/')

        return ci.TRAVIS_BUILD_URL.format(
            base_url=base_url,
            repo_slug=env.get('TRAVIS_REPO_SLUG'),
            build_id=env.get('TRAVIS_BUILD_ID')
        )

    # Other CI providers aren't supported at the moment.
    # TODO: Add support for more providers.

    return None
项目:pycoal    作者:capstone-coal    | 项目源码 | 文件源码
def test_intersect_proximity():
    ec = environment.EnvironmentalCorrelation()
    ec.intersect_proximity(mining_filename, vector_filename, proximity, test_filename)
    expected = spectral.open_image(correlated_filename)
    actual = spectral.open_image(test_filename)
    assert numpy.array_equal(expected.asarray(), actual.asarray())
    assert actual.metadata.get('description') == 'COAL '+pycoal.version+' environmental correlation image.'
    assert expected.metadata.get('class names') == actual.metadata.get('class names')
    assert expected.metadata.get('map info') == actual.metadata.get('map info')