Python tempfile 模块,gettempdir() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.gettempdir()

项目:senf    作者:quodlibet    | 项目源码 | 文件源码
def mkstemp(suffix=None, prefix=None, dir=None, text=False):
    """
    Args:
        suffix (`pathlike` or `None`): suffix or `None` to use the default
        prefix (`pathlike` or `None`): prefix or `None` to use the default
        dir (`pathlike` or `None`): temp dir or `None` to use the default
        text (bool): if the file should be opened in text mode
    Returns:
        Tuple[`int`, `fsnative`]:
            A tuple containing the file descriptor and the file path
    Raises:
        EnvironmentError

    Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative`
    path.
    """

    suffix = fsnative() if suffix is None else path2fsn(suffix)
    prefix = gettempprefix() if prefix is None else path2fsn(prefix)
    dir = gettempdir() if dir is None else path2fsn(dir)

    return tempfile.mkstemp(suffix, prefix, dir, text)
项目:senf    作者:quodlibet    | 项目源码 | 文件源码
def mkdtemp(suffix=None, prefix=None, dir=None):
    """
    Args:
        suffix (`pathlike` or `None`): suffix or `None` to use the default
        prefix (`pathlike` or `None`): prefix or `None` to use the default
        dir (`pathlike` or `None`): temp dir or `None` to use the default
    Returns:
        `fsnative`: A path to a directory
    Raises:
        EnvironmentError

    Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative` path.
    """

    suffix = fsnative() if suffix is None else path2fsn(suffix)
    prefix = gettempprefix() if prefix is None else path2fsn(prefix)
    dir = gettempdir() if dir is None else path2fsn(dir)

    return tempfile.mkdtemp(suffix, prefix, dir)
项目:py2rb    作者:naitoh    | 项目源码 | 文件源码
def runTest(self):
        """The actual test goes here."""
        if os.system(
            "ruby --help > %s" %
            os.path.join(
                tempfile.gettempdir(),
                tempfile.gettempprefix()
                )
            ):
            self.stop()
            raise RuntimeError("""Can't find the "ruby" command.""")
        self.reportProgres()
        if not os.path.exists("py2rb/builtins/module.rb"):
            self.stop()
            raise RuntimeError("""Can't find the "py2rb/builtins/module.rb" command.""")
        if not os.path.exists("py2rb/builtins/using.rb"):
            self.stop()
            raise RuntimeError("""Can't find the "py2rb/builtins/using.rb" command.""")
        if not os.path.exists("py2rb/builtins/require.rb"):
            self.stop()
            raise RuntimeError("""Can't find the "py2rb/builtins/require.rb" command.""")
        self.reportProgres()
项目:csvtotable    作者:vividvilla    | 项目源码 | 文件源码
def serve(content):
    """Write content to a temp file and serve it in browser"""
    temp_folder = tempfile.gettempdir()
    temp_file_name = tempfile.gettempprefix() + str(uuid.uuid4()) + ".html"
    # Generate a file path with a random name in temporary dir
    temp_file_path = os.path.join(temp_folder, temp_file_name)

    # save content to temp file
    save(temp_file_path, content)

    # Open templfile in a browser
    webbrowser.open("file://{}".format(temp_file_path))

    # Block the thread while content is served
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        # cleanup the temp file
        os.remove(temp_file_path)
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def download_workflow(url):
    """Download workflow at ``url`` to a local temporary file.

    :param url: URL to .alfredworkflow file in GitHub repo
    :returns: path to downloaded file

    """
    filename = url.split("/")[-1]

    if (not filename.endswith('.alfredworkflow') and
            not filename.endswith('.alfred3workflow')):
        raise ValueError('Attachment `{0}` not a workflow'.format(filename))

    local_path = os.path.join(tempfile.gettempdir(), filename)

    wf().logger.debug(
        'Downloading updated workflow from `%s` to `%s` ...', url, local_path)

    response = web.get(url)

    with open(local_path, 'wb') as output:
        output.write(response.content)

    return local_path
项目:table-compositor    作者:InvestmentSystems    | 项目源码 | 文件源码
def load_names_data():
    fp = os.path.join(tempfile.gettempdir(), ZIP_NAME)
    if not os.path.exists(fp):
        r = requests.get(URL_NAMES)
        with open(fp, 'wb') as f:
            f.write(r.content)

    post = collections.OrderedDict()
    with zipfile.ZipFile(fp) as zf:
        # get ZipInfo instances
        for zi in sorted(zf.infolist(), key=lambda zi: zi.filename):
            fn = zi.filename
            if fn.startswith('yob'):
                year = int(fn[3:7])
                df = pd.read_csv(
                    zf.open(zi),
                    header=None,
                    names=('name', 'gender', 'count'))
                df['year'] = year
                post[year] = df

        df = pd.concat(post.values())
        df.set_index('name', inplace=True, drop=True)
        return df
项目:table-compositor    作者:InvestmentSystems    | 项目源码 | 文件源码
def render_xlsx(cls):
        '''
        Render the df to a xlsx file.
        '''

        # load data
        df = sample_names_data()
        # build presentation model
        pm = tc.build_presentation_model(df=df, output_format='xlsx')

        # render to xlsx
        tempdir = tempfile.gettempdir()
        fp = os.path.join(tempdir, 'example1.xlsx')
        layout = [pm]
        print('Writing to ' + fp)
        xlsxw.XLSXWriter.to_xlsx(layout, output_fp=fp)
# end_XLSXExample1


# start_XLSXExample2
项目:table-compositor    作者:InvestmentSystems    | 项目源码 | 文件源码
def load_names_data():
    fp = os.path.join(tempfile.gettempdir(), ZIP_NAME)
    if not os.path.exists(fp):
        r = requests.get(URL_NAMES)
        with open(fp, 'wb') as f:
            f.write(r.content)

    post = collections.OrderedDict()
    with zipfile.ZipFile(fp) as zf:
        # get ZipInfo instances
        for zi in sorted(zf.infolist(), key=lambda zi: zi.filename):
            fn = zi.filename
            if fn.startswith('yob'):
                year = int(fn[3:7])
                df = pd.read_csv(
                    zf.open(zi),
                    header=None,
                    names=('name', 'gender', 'count'))
                df['year'] = year
                post[year] = df

        df = pd.concat(post.values())
        df.set_index('name', inplace=True, drop=True)
        return df
项目:table-compositor    作者:InvestmentSystems    | 项目源码 | 文件源码
def render_html(cls):

        # load data
        df = load_names_data()
        df = df[:100]

        # build presentation model
        pm = tc.build_presentation_model(df=df, output_format='html')

        # render to xlsx
        tempdir = tempfile.gettempdir()
        fp = os.path.join(tempdir, 'example_1.html')
        layout = [pm]
        print('Writing to ' + fp)
        html = htmlw.HTMLWriter.to_html(layout, border=1)
        output_fp = os.path.join(
            tempfile.gettempdir(),
            'example1.html')
        with open(output_fp, 'w') as f:
            f.write(html)
# end_HTMLExample1

# start_HTMLExample2
项目:rain    作者:scizzorz    | 项目源码 | 文件源码
def compile_so(libs):
  # I don't know how else to find these .so files other than just asking clang
  # to make a .so file out of all of them

  clang = os.getenv('CLANG', 'clang')

  tempdir = tempfile.gettempdir()
  libname = '.'.join(sorted(libs))
  target = join(tempdir, 'lib' + libname + '.so')

  if not os.path.exists(target):
    libs = ['-l' + lib for lib in libs]
    flags = ['-shared']

    cmd = [clang, '-o', target] + flags + libs
    subprocess.check_call(cmd)

  return target
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _find_grail_rc(self):
        import glob
        import pwd
        import socket
        import tempfile
        tempdir = os.path.join(tempfile.gettempdir(),
                               ".grail-unix")
        user = pwd.getpwuid(os.getuid())[0]
        filename = os.path.join(tempdir, user + "-*")
        maybes = glob.glob(filename)
        if not maybes:
            return None
        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        for fn in maybes:
            # need to PING each one until we find one that's live
            try:
                s.connect(fn)
            except socket.error:
                # no good; attempt to clean it out, but don't fail:
                try:
                    os.unlink(fn)
                except IOError:
                    pass
            else:
                return s
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def cmdargs_for_style(self, formatstyle, filename=None):
        # type: (Style, Optional[str]) -> List[str]
        assert isinstance(formatstyle, Style)
        configdata = bytestr(self.styletext(formatstyle))
        sha = shahex(configdata)
        cfg = os.path.join(tempfile.gettempdir(), 'whatstyle_uncrustify_%s.cfg' % sha)
        if not self.tempfile_exists(cfg):
            writebinary(cfg, configdata)
            self.add_tempfile(cfg)
        cmdargs = ['-c', cfg]
        # The filename extension might be ambiguous so we choose from the languages
        # registered in identify_language.
        if self.languages:
            lang = self.languages[0]
            cmdargs.extend(['-l', lang])
        return cmdargs
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def cmdargs_for_style(self, formatstyle, filename=None):
        # type: (Style, Optional[str]) -> List[str]
        assert isinstance(formatstyle, Style)
        configdata = bytestr(self.styletext(formatstyle))
        sha = shahex(configdata)
        cfg = os.path.join(tempfile.gettempdir(),
                           'whatstyle_rustfmt_%s/%s' % (sha, self.configfilename))
        try:
            dirpath = os.path.dirname(cfg)
            os.makedirs(dirpath)
            self.add_tempfile(dirpath)
        except OSError as exc:
            if exc.errno != errno.EEXIST:
                raise
        if not self.tempfile_exists(cfg):
            writebinary(cfg, configdata)
            self.add_tempfile(cfg)
        cmdargs = ['--config-path', cfg]
        return cmdargs
项目:mbed-cli    作者:ARMmbed    | 项目源码 | 文件源码
def fromrepo(cls, path=None):
        repo = cls()
        if path is None:
            path = Repo.findparent(getcwd())
            if path is None:
                error(
                    "Could not find mbed program in current path \"%s\".\n"
                    "You can fix this by calling \"mbed new .\" or \"mbed config root .\" in the root of your program." % getcwd())

        repo.path = os.path.abspath(path)
        repo.name = os.path.basename(repo.path)

        cache_cfg = Global().get_cfg('CACHE', '')
        if cache_repositories and cache_cfg and cache_cfg != 'none' and cache_cfg != 'off' and cache_cfg != 'disabled':
            loc = cache_cfg if (cache_cfg and cache_cfg != 'on' and cache_cfg != 'enabled') else None
            repo.cache = loc or os.path.join(tempfile.gettempdir(), 'mbed-repo-cache')

        repo.sync()

        if repo.scm is None:
            warning(
                "Program \"%s\" in \"%s\" does not use source control management.\n"
                "To fix this you should use \"mbed new .\" in the root of your program." % (repo.name, repo.path))

        return repo
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def __init__(self, max_age):
      """Constructor.

      Args:
        max_age: Cache expiration in seconds.
      """
      self._max_age = max_age
      self._file = os.path.join(tempfile.gettempdir(), FILENAME)
      f = LockedFile(self._file, 'a+', 'r')
      try:
        f.open_and_lock()
        if f.is_locked():
          _read_or_initialize_cache(f)
        # If we can not obtain the lock, other process or thread must
        # have initialized the file.
      except Exception as e:
        logging.warning(e, exc_info=True)
      finally:
        f.unlock_and_close()
项目:sketch-components    作者:ibhubs    | 项目源码 | 文件源码
def compare_component_output(self, input_path, expected_output_path):
        rendering_engine = self.get_rendering_engine()
        temp_dir = tempfile.gettempdir()
        output_dir = os.path.join(temp_dir, str(uuid.uuid4()))
        process_sketch_archive(zip_path=input_path, compress_zip=False,
                               output_path=output_dir, engine=rendering_engine)
        self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
        shutil.rmtree(output_dir)
        storage.clear()
        output_zip = os.path.join(temp_dir, "{}.zip".format(str(uuid.uuid4())))
        process_sketch_archive(zip_path=input_path, compress_zip=True,
                               output_path=output_zip, engine=rendering_engine)
        z = zipfile.ZipFile(output_zip)
        z.extractall(output_dir)
        self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
        shutil.rmtree(output_dir)
        os.remove(output_zip)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def create_local_temp_dir(name, directory=None):
    """
    Create a directory for storing temporary files needed for testing neo

    If directory is None or not specified, automatically create the directory
    in {tempdir}/files_for_testing_neo on linux/unix/mac or
    {tempdir}\files_for_testing_neo on windows, where {tempdir} is the system
    temporary directory returned by tempfile.gettempdir().
    """
    if directory is None:
        directory = os.path.join(tempfile.gettempdir(),
                                 'files_for_testing_neo')

    if not os.path.exists(directory):
        os.mkdir(directory)
    directory = os.path.join(directory, name)
    if not os.path.exists(directory):
        os.mkdir(directory)
    return directory
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def create_local_temp_dir(name, directory=None):
    """
    Create a directory for storing temporary files needed for testing neo

    If directory is None or not specified, automatically create the directory
    in {tempdir}/files_for_testing_neo on linux/unix/mac or
    {tempdir}\files_for_testing_neo on windows, where {tempdir} is the system
    temporary directory returned by tempfile.gettempdir().
    """
    if directory is None:
        directory = os.path.join(tempfile.gettempdir(),
                                 'files_for_testing_neo')

    if not os.path.exists(directory):
        os.mkdir(directory)
    directory = os.path.join(directory, name)
    if not os.path.exists(directory):
        os.mkdir(directory)
    return directory
项目:outis    作者:SySS-Research    | 项目源码 | 文件源码
def activate_debug(module):
    """
    activates the given module for debugging
    :param module: the module name to activate
    :return: None
    """

    module = str(module)
    if module not in AVAILABLE_DEBUG_MODULES:
        print_error("debug module '{}' is unknown, cannot activate it".format(module))
        return
    if module in ACTIVATED_DEBUG_MODULES:
        print_error("debug module '{}' is already active".format(module))
        return

    # if this is the first activation set LOGFILE and print it
    if not ACTIVATED_DEBUG_MODULES:
        import os
        global LOGFILE
        LOGFILE = str(os.path.join(tempfile.gettempdir(), "outis.log"))
        print_message("DEBUGGING is active, writing to debug file " + str(LOGFILE))

    # add module to ACTIVATED_DEBUG_MODULES
    ACTIVATED_DEBUG_MODULES.append(module)
项目:waybackscraper    作者:abrenaut    | 项目源码 | 文件源码
def main():
    args = parse_args()

    logging.basicConfig(level=(logging.WARN if args.quiet else logging.INFO))

    # Don't allow more than 10 concurrent requests to the wayback machine
    concurrency = min(args.concurrency, 10)

    # Scrape results are stored in a temporary folder if no folder specified
    target_folder = args.target_folder if args.target_folder else tempfile.gettempdir()
    logger.info('Writing scrape results in the folder {target_folder}'.format(target_folder=target_folder))

    # Parse the period entered by the user (throws an exception if the dates are not correctly formatted)
    from_date = datetime.strptime(args.from_date, CLI_DATE_FORMAT)
    to_date = datetime.strptime(args.to_date, CLI_DATE_FORMAT)

    # The scraper downloads the elements matching the given xpath expression in the target folder
    scraper = Scraper(target_folder, args.xpath)

    # Launch the scraping using the scraper previously instantiated
    scrape_archives(args.website_url, scraper.scrape, from_date, to_date, args.user_agent, timedelta(days=args.delta),
                    concurrency)
项目:pyrsss    作者:butala    | 项目源码 | 文件源码
def update_sideshow_file(fname,
                         server_fname,
                         server=SIDESHOW_SERVER,
                         temp_path=gettempdir()):
    """
    Update the JPL side show file stored locally at *fname*. The
    remote file is accessed via FTP on *server* at *server_fname*. The
    path *temp_path* is used to store intermediate files. Return
    *fname*.
    """
    dest_fname = replace_path(temp_path, server_fname)
    logger.info('opening connection to {}'.format(server))
    with closing(FTP(server)) as ftp, open(dest_fname, 'w') as fid:
        logger.info('logging in')
        ftp.login()
        logger.info('writing to {}'.format(dest_fname))
        ftp.retrbinary('RETR ' + server_fname, fid.write)
    logger.info('uncompressing file to {}'.format(fname))
    with GzipFile(dest_fname) as gzip_fid, open(fname, 'w') as fid:
        fid.write(gzip_fid.read())
    return fname
项目:kernda    作者:maxpoint    | 项目源码 | 文件源码
def kernel():
    """Create a ipykernel conda environment separate from this test
    environment where jupyter console is installed.

    The environments must be separate otherwise we cannot easily check
    if kernel start is activating the environment or if it was already
    active when the test suite started.
    """
    # unique name for the kernel and environment
    name = str(uuid4())
    env_path = '{}/kernel-env-{name}'.format(gettempdir(), name=name)
    pexpect.run('/bin/bash -c "conda create -y -p {env_path} ipykernel && \
                source activate {env_path} && \
                python -m ipykernel install --user \
                --name {name}"'.format(env_path=env_path, name=name))
    # query jupyter for the user data directory in a separate command to
    # make parsing easier
    stdout = pexpect.run('jupyter --data-dir')
    user_path = stdout.decode('utf-8').strip()
    # the kernel spec resides in the jupyter user data path
    spec_path = os.path.join(user_path, 'kernels', name)
    yield Kernel(name, os.path.join(spec_path, 'kernel.json'), env_path)
    shutil.rmtree(env_path)
项目:workflows.kyoyue    作者:wizyoung    | 项目源码 | 文件源码
def download_workflow(url):
    """Download workflow at ``url`` to a local temporary file.

    :param url: URL to .alfredworkflow file in GitHub repo
    :returns: path to downloaded file

    """
    filename = url.split("/")[-1]

    if (not filename.endswith('.alfredworkflow') and
            not filename.endswith('.alfred3workflow')):
        raise ValueError('Attachment `{0}` not a workflow'.format(filename))

    local_path = os.path.join(tempfile.gettempdir(), filename)

    wf().logger.debug(
        'Downloading updated workflow from `%s` to `%s` ...', url, local_path)

    response = web.get(url)

    with open(local_path, 'wb') as output:
        output.write(response.content)

    return local_path
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def _get_storage_path(cls):
        try:
            return cls._storage_path
        except AttributeError:
            storage_path = getattr(settings, "SESSION_FILE_PATH", None)
            if not storage_path:
                storage_path = tempfile.gettempdir()

            # Make sure the storage path is valid.
            if not os.path.isdir(storage_path):
                raise ImproperlyConfigured(
                    "The session storage path %r doesn't exist. Please set your"
                    " SESSION_FILE_PATH setting to an existing directory in which"
                    " Django can store session data." % storage_path)

            cls._storage_path = storage_path
            return storage_path
项目:NarshaTech    作者:KimJangHyeon    | 项目源码 | 文件源码
def _get_storage_path(cls):
        try:
            return cls._storage_path
        except AttributeError:
            storage_path = getattr(settings, "SESSION_FILE_PATH", None)
            if not storage_path:
                storage_path = tempfile.gettempdir()

            # Make sure the storage path is valid.
            if not os.path.isdir(storage_path):
                raise ImproperlyConfigured(
                    "The session storage path %r doesn't exist. Please set your"
                    " SESSION_FILE_PATH setting to an existing directory in which"
                    " Django can store session data." % storage_path)

            cls._storage_path = storage_path
            return storage_path
项目:gui_tool    作者:UAVCAN    | 项目源码 | 文件源码
def _should_continue():
    min_check_interval = 3600 * 24

    update_timestamp_file = os.path.join(tempfile.gettempdir(), 'uavcan_gui_tool', 'update_check_timestamp')

    try:
        with open(update_timestamp_file, 'r') as f:
            update_check_timestamp = float(f.read().strip())
    except Exception:
        logger.debug('Update timestamp file could not be read', exc_info=True)
        update_check_timestamp = 0

    if (time.time() - update_check_timestamp) < min_check_interval:
        return False

    try:
        os.makedirs(os.path.dirname(update_timestamp_file))
    except Exception:
        pass            # Nobody cares.

    with open(update_timestamp_file, 'w') as f:
        f.write(str(time.time()))

    return True
项目:django-mako    作者:ahmedaljazzar    | 项目源码 | 文件源码
def setUp(self):
        self.tmp_dir = tempfile.gettempdir()
        self.template_name = 'good_template.html'
        self.bad_string = '<% name="Jazzar" My name is ${name}.'
        self.template_string = '<% name="Jazzar" %> My name is ${name}.'

        tmp_template = os.path.join(self.tmp_dir, self.template_name)
        with open(tmp_template, 'w') as f:
            f.write(self.template_string)

        parameters = {
            'NAME': 'mako',
            'DIRS': [self.tmp_dir],
            'APP_DIRS': False,
            'OPTIONS': {}
        }

        self.mako_backend = MakoBackend(parameters)
项目:linchpin    作者:CentOS-PaaS-SIG    | 项目源码 | 文件源码
def __init__(self):

        self.DUMMY_FILE = '{0}/dummy.hosts'.format(tempfile.gettempdir())
        pass

#    def generate_hostname(self, minlen=5, maxlen=20):
#        """
#        Generate a random hostname between minlen and maxlen, appending
#        '.example.net' (eg. oskpijsrss7.example.net)
#
#        :param minlen: minimum length of the hostname (default: 5)
#
#        :param maxlen: maximum length of the hostname (default: 20)
#        """
#
#        len = random.randint(minlen, maxlen)
#
#        host_str = ''.join(
#            random.choice(string.ascii_lowercase) for _ in range(len -1))
#
#        host_str = '{0}{1}.example.net'.format(
#                host_str, random.choice(string.digits))
#
#        return host_str
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def __init__(self, max_age):
      """Constructor.

      Args:
        max_age: Cache expiration in seconds.
      """
      self._max_age = max_age
      self._file = os.path.join(tempfile.gettempdir(), FILENAME)
      f = LockedFile(self._file, 'a+', 'r')
      try:
        f.open_and_lock()
        if f.is_locked():
          _read_or_initialize_cache(f)
        # If we can not obtain the lock, other process or thread must
        # have initialized the file.
      except Exception as e:
        LOGGER.warning(e, exc_info=True)
      finally:
        f.unlock_and_close()
项目:GoToMeetingTools    作者:plongitudes    | 项目源码 | 文件源码
def download_workflow(url):
    """Download workflow at ``url`` to a local temporary file.

    :param url: URL to .alfredworkflow file in GitHub repo
    :returns: path to downloaded file

    """
    filename = url.split("/")[-1]

    if (not filename.endswith('.alfredworkflow') and
            not filename.endswith('.alfred3workflow')):
        raise ValueError('Attachment `{0}` not a workflow'.format(filename))

    local_path = os.path.join(tempfile.gettempdir(), filename)

    wf().logger.debug(
        'Downloading updated workflow from `%s` to `%s` ...', url, local_path)

    response = web.get(url)

    with open(local_path, 'wb') as output:
        output.write(response.content)

    return local_path
项目:scm-workbench    作者:barry-scott    | 项目源码 | 文件源码
def cmdDiffFolder( self, folder, head=False ):
        self.debugLog( 'cmdDiffFolder( %r )' % (folder,) )
        abs_folder = self.pathForSvn( folder )

        if head:
            old_rev = self.svn_rev_head

        else:
            old_rev = self.svn_rev_base

        diff_text = self.client().diff(
            tempfile.gettempdir(),
            abs_folder, old_rev,
            abs_folder, self.svn_rev_working,
            recurse=True,
            relative_to_dir=str( self.projectPath() ),
            use_git_diff_format=True
            )

        return diff_text
项目:Scrum    作者:prakharchoudhary    | 项目源码 | 文件源码
def _get_storage_path(cls):
        try:
            return cls._storage_path
        except AttributeError:
            storage_path = getattr(settings, "SESSION_FILE_PATH", None)
            if not storage_path:
                storage_path = tempfile.gettempdir()

            # Make sure the storage path is valid.
            if not os.path.isdir(storage_path):
                raise ImproperlyConfigured(
                    "The session storage path %r doesn't exist. Please set your"
                    " SESSION_FILE_PATH setting to an existing directory in which"
                    " Django can store session data." % storage_path)

            cls._storage_path = storage_path
            return storage_path
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def download_workflow(url):
    """Download workflow at ``url`` to a local temporary file.

    :param url: URL to .alfredworkflow file in GitHub repo
    :returns: path to downloaded file

    """
    filename = url.split('/')[-1]

    if (not filename.endswith('.alfredworkflow') and
            not filename.endswith('.alfred3workflow')):
        raise ValueError('attachment not a workflow: {0}'.format(filename))

    local_path = os.path.join(tempfile.gettempdir(), filename)

    wf().logger.debug(
        'downloading updated workflow from `%s` to `%s` ...', url, local_path)

    response = web.get(url)

    with open(local_path, 'wb') as output:
        output.write(response.content)

    return local_path
项目:lowrank-highwaynetwork    作者:Avmb    | 项目源码 | 文件源码
def __init__(self):
        self._target_size = 10
        logging.info("loading minst data")
        path = os.path.join(tempfile.gettempdir(), "mnist.pkl.gz")
        if not os.path.exists(path):
            logging.info("downloading minst data")
            urllib.urlretrieve (MNIST_URL, path)
        self._train_set, self._valid_set, self._test_set = cPickle.load(gzip.open(path, 'rb'))

        # Moving validation examples to training set, leaving 1000
        train_set_x = np.vstack((self._train_set[0], self._valid_set[0][:-1000]))
        train_set_y = np.hstack((self._train_set[1], self._valid_set[1][:-1000]))
        valid_set_x = self._valid_set[0][-1000:]
        valid_set_y = self._valid_set[1][-1000:]
        self._train_set = (train_set_x, train_set_y)
        self._valid_set = (valid_set_x, valid_set_y)

        logging.info("[mnist small validation] training data size: %d" % len(self._train_set[0]))
        logging.info("[mnist small validation] valid data size: %d" % len(self._valid_set[0]))
        logging.info("[mnist small validation] test data size: %d" % len(self._test_set[0]))
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def ssl_authenticator():
    try:
        import pupy_credentials
        keystr=pupy_credentials.SSL_BIND_KEY
        certstr=pupy_credentials.SSL_BIND_CERT
    except:
        keystr=DEFAULT_SSL_BIND_KEY
        certstr=DEFAULT_SSL_BIND_CERT
    key_path=None
    cert_path=None
    if os.path.isfile("pupy.conf"):
        config = configparser.ConfigParser()
        config.read("pupy.conf")
        key_path=config.get("pupyd","keyfile").replace("\\",os.sep).replace("/",os.sep)
        cert_path=config.get("pupyd","certfile").replace("\\",os.sep).replace("/",os.sep)
    else:
        tmpdir=tempfile.gettempdir()
        cert_path=os.path.join(tmpdir, ''.join(random.choice(string.lowercase+string.digits) for _ in range(random.randint(5,8))))
        key_path=os.path.join(tmpdir,''.join(random.choice(string.lowercase+string.digits) for _ in range(random.randint(5,8))))
        with open(cert_path,'wb') as f:
            f.write(certstr.strip())
        with open(key_path,'wb') as f:
            f.write(keystr.strip())
    return SSLAuthenticator(key_path, cert_path, ciphers="SHA256+AES256:SHA1+AES256:@STRENGTH")
项目:alfred-workflows    作者:arthurhammer    | 项目源码 | 文件源码
def download_workflow(url):
    """Download workflow at ``url`` to a local temporary file.

    :param url: URL to .alfredworkflow file in GitHub repo
    :returns: path to downloaded file

    """
    filename = url.split("/")[-1]

    if (not url.endswith('.alfredworkflow') or
            not filename.endswith('.alfredworkflow')):
        raise ValueError('Attachment `{0}` not a workflow'.format(filename))

    local_path = os.path.join(tempfile.gettempdir(), filename)

    wf().logger.debug(
        'Downloading updated workflow from `%s` to `%s` ...', url, local_path)

    response = web.get(url)

    with open(local_path, 'wb') as output:
        output.write(response.content)

    return local_path
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def test_profiler(self):
        self.do_computation()

        profilers = self.sc.profiler_collector.profilers
        self.assertEqual(1, len(profilers))
        id, profiler, _ = profilers[0]
        stats = profiler.stats()
        self.assertTrue(stats is not None)
        width, stat_list = stats.get_print_list([])
        func_names = [func_name for fname, n, func_name in stat_list]
        self.assertTrue("heavy_foo" in func_names)

        old_stdout = sys.stdout
        sys.stdout = io = StringIO()
        self.sc.show_profiles()
        self.assertTrue("heavy_foo" in io.getvalue())
        sys.stdout = old_stdout

        d = tempfile.gettempdir()
        self.sc.dump_profiles(d)
        self.assertTrue("rdd_%d.pstats" % id in os.listdir(d))
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def _get_pandoc_version(pandoc_path):
    new_env = os.environ.copy()
    if 'HOME' not in os.environ:
        new_env['HOME'] = tempfile.gettempdir()
    p = subprocess.Popen(
        [pandoc_path, '--version'],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        env=new_env)
    comm = p.communicate()
    out_lines = comm[0].decode().splitlines(False)
    if p.returncode != 0 or len(out_lines) == 0:
        raise RuntimeError("Couldn't call pandoc to get version information. Output from "
                           "pandoc:\n%s" % str(comm))

    version_pattern = re.compile(r"^\d+(\.\d+){1,}$")
    for tok in out_lines[0].split():
        if version_pattern.match(tok):
            version = tok
            break
    return version
项目:baselines    作者:openai    | 项目源码 | 文件源码
def configure(dir=None, format_strs=None):
    if dir is None:
        dir = os.getenv('OPENAI_LOGDIR')
    if dir is None:
        dir = osp.join(tempfile.gettempdir(),
            datetime.datetime.now().strftime("openai-%Y-%m-%d-%H-%M-%S-%f"))
    assert isinstance(dir, str)
    os.makedirs(dir, exist_ok=True)

    if format_strs is None:
        strs = os.getenv('OPENAI_LOG_FORMAT')
        format_strs = strs.split(',') if strs else LOG_OUTPUT_FORMATS
    output_formats = [make_output_format(f, dir) for f in format_strs]

    Logger.CURRENT = Logger(dir=dir, output_formats=output_formats)
    log('Logging to %s'%dir)
项目:blocksver    作者:ndrchvzz    | 项目源码 | 文件源码
def main():
    cachePath = os.path.join(tempfile.gettempdir(), CACHEFILE)
    cache = loadCache(cachePath, BASE64)
    chainInfo = rpcRetrieve('getblockchaininfo')
    bestHash = chainInfo['bestblockhash']
    height = int(chainInfo['blocks'])
    bip9forks = chainInfo['bip9_softforks']
    print(formatWelcome(cache, WINDOW, bestHash, height,
                        F(chainInfo['difficulty']), bip9forks, THRESHOLD))
    if cache.height == 0:
        print('Please wait while retrieving latest block versions and caching them...\n')
    if len(cache.hashes) < 1 or cache.hashes[0] != bestHash:
        retrieveBlock = lambda h: rpcRetrieve('getblock', h)
        cache = updateCache(cache, WINDOW, HASHES_SIZE, bestHash, height, retrieveBlock)
        saveCache(cache, cachePath, BASE64)
    print(formatAllData(cache, bip9forks, THRESHOLD, WINDOW))
项目:aws_emojipacks    作者:Surgo    | 项目源码 | 文件源码
def download_and_save_icons(dist_dir):
    saved_icons = dict()

    temp_dir = tempfile.gettempdir()
    temp_path = download_icons(AWS_ICON_DOWNLOAD_URL, os.path.join(temp_dir, 'icons.zip'))

    for image, friendly_name in get_images_from_archive_file(temp_path):
        emoji_icon = emojinize_image(image)
        dist_path = os.path.join(
            dist_dir,
            '{name}.{ext}'.format(name=friendly_name, ext='png'),
        )
        emoji_icon.save(dist_path, 'PNG', optimize=True)
        logging.info("Save emoji: %s", dist_path)
        saved_icons[friendly_name] = dist_path
    return saved_icons
项目:django    作者:alexsukhrin    | 项目源码 | 文件源码
def _get_storage_path(cls):
        try:
            return cls._storage_path
        except AttributeError:
            storage_path = getattr(settings, "SESSION_FILE_PATH", None)
            if not storage_path:
                storage_path = tempfile.gettempdir()

            # Make sure the storage path is valid.
            if not os.path.isdir(storage_path):
                raise ImproperlyConfigured(
                    "The session storage path %r doesn't exist. Please set your"
                    " SESSION_FILE_PATH setting to an existing directory in which"
                    " Django can store session data." % storage_path)

            cls._storage_path = storage_path
            return storage_path
项目:alfred-bear    作者:chrisbro    | 项目源码 | 文件源码
def download_workflow(url):
    """Download workflow at ``url`` to a local temporary file.

    :param url: URL to .alfredworkflow file in GitHub repo
    :returns: path to downloaded file

    """
    filename = url.split("/")[-1]

    if (not filename.endswith('.alfredworkflow') and
            not filename.endswith('.alfred3workflow')):
        raise ValueError('Attachment `{0}` not a workflow'.format(filename))

    local_path = os.path.join(tempfile.gettempdir(), filename)

    wf().logger.debug(
        'Downloading updated workflow from `%s` to `%s` ...', url, local_path)

    response = web.get(url)

    with open(local_path, 'wb') as output:
        output.write(response.content)

    return local_path
项目:alfred-confluence    作者:skleinei    | 项目源码 | 文件源码
def download_workflow(url):
    """Download workflow at ``url`` to a local temporary file.

    :param url: URL to .alfredworkflow file in GitHub repo
    :returns: path to downloaded file

    """
    filename = url.split("/")[-1]

    if (not url.endswith('.alfredworkflow') or
            not filename.endswith('.alfredworkflow')):
        raise ValueError('Attachment `{0}` not a workflow'.format(filename))

    local_path = os.path.join(tempfile.gettempdir(), filename)

    wf().logger.debug(
        'Downloading updated workflow from `%s` to `%s` ...', url, local_path)

    response = web.get(url)

    with open(local_path, 'wb') as output:
        output.write(response.content)

    return local_path
项目:alfred_reviewboard    作者:lydian    | 项目源码 | 文件源码
def download_workflow(url):
    """Download workflow at ``url`` to a local temporary file.

    :param url: URL to .alfredworkflow file in GitHub repo
    :returns: path to downloaded file

    """
    filename = url.split('/')[-1]

    if (not filename.endswith('.alfredworkflow') and
            not filename.endswith('.alfred3workflow')):
        raise ValueError('attachment not a workflow: {0}'.format(filename))

    local_path = os.path.join(tempfile.gettempdir(), filename)

    wf().logger.debug(
        'downloading updated workflow from `%s` to `%s` ...', url, local_path)

    response = web.get(url)

    with open(local_path, 'wb') as output:
        output.write(response.content)

    return local_path
项目:registrant    作者:AlexArcPy    | 项目源码 | 文件源码
def prepare_test(test_type):
    """prepare the geodatabase data for running tests on"""
    cfg = TEST_CONFIG[test_type]
    test_type = cfg['name']
    out_report_folder = os.path.join(tempfile.gettempdir(), test_type)
    if not os.path.exists(out_report_folder):
        os.mkdir(out_report_folder)

    if arcpy_found:
        xml_schema = cfg['xml_schema']
        in_gdb = arcpy.CreateFileGDB_management(out_report_folder, test_type).getOutput(0)
        arcpy.ImportXMLWorkspaceDocument_management(in_gdb, xml_schema, "SCHEMA_ONLY")
    else:
        in_gdb = cfg['ogr_geodatabase']

    json_results = cfg['json_results']

    return (in_gdb, out_report_folder, json_results)


#adding the project folder to support running test files individually and from the IDE
项目:AshsSDK    作者:thehappydinoa    | 项目源码 | 文件源码
def zip_folder(folder_path):
    """
    Zip the entire folder and return a file to the zip. Use this inside
    a "with" statement to cleanup the zipfile after it is used.

    :param folder_path:
    :return: Name of the zipfile
    """

    filename = os.path.join(
        tempfile.gettempdir(), "data-" + uuid.uuid4().hex)

    zipfile_name = make_zip(filename, folder_path)
    try:
        yield zipfile_name
    finally:
        if os.path.exists(zipfile_name):
            os.remove(zipfile_name)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _find_grail_rc(self):
        import glob
        import pwd
        import socket
        import tempfile
        tempdir = os.path.join(tempfile.gettempdir(),
                               ".grail-unix")
        user = pwd.getpwuid(os.getuid())[0]
        filename = os.path.join(tempdir, user + "-*")
        maybes = glob.glob(filename)
        if not maybes:
            return None
        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        for fn in maybes:
            # need to PING each one until we find one that's live
            try:
                s.connect(fn)
            except socket.error:
                # no good; attempt to clean it out, but don't fail:
                try:
                    os.unlink(fn)
                except IOError:
                    pass
            else:
                return s
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def rtask_save_proc(task=None):
    import os
    import tempfile
    # save data in /tmp/tickdata
    with open(os.path.join(os.sep, tempfile.gettempdir(), 'tickdata'), 'w') as fd:
        while True:
            i, n = yield task.receive()
            if n is None:
                break
            fd.write('%s: %s\n' % (i, n))
    raise StopIteration(0)


# This task runs on client. It gets trend messages from remote task that
# computes moving window average.
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def rtask_save_proc(task=None):
    import os
    import tempfile
    # save data in /tmp/tickdata
    with open(os.path.join(os.sep, tempfile.gettempdir(), 'tickdata'), 'w') as fd:
        while True:
            i, n = yield task.receive()
            if n is None:
                break
            fd.write('%s: %s\n' % (i, n))
    raise StopIteration(0)


# This task runs on client. It gets trend messages from remote task that
# computes moving window average.