Python os 模块,getlogin() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用os.getlogin()

项目:Forensic-Tools    作者:MonroCoury    | 项目源码 | 文件源码
def get_firefox_db(db_file):
    '''Return the full path of firefox sqlite databases, platform independent'''
    success = False
    plat_dict = {"Windows 7" : r"C:\Users\%s\AppData\Roaming\Mozilla\Firefox\Profiles" % os.getlogin(),
                 "Windows XP" : r"C:\Documents and Settings\%s\Application Data\Mozilla\Firefox\Profiles" % os.getlogin(),
                 "Linux" : r"/home/%s/.mozilla/firefox/" % os.getlogin(),
                 "Darwin" : r"/Users/%s/Library/Application Support/Firefox/Profiles" % os.getlogin()}
    if platform.system() == "Windows":
        string = plat_dict[platform.system() + " " + platform.release()]
    else:
        string = plat_dict[platform.system()]
    for item in os.listdir(string):
        if os.path.isdir(os.path.join(string, item)) and "default" in item:
            if os.path.isfile(os.path.join(string, item, db_file)):
                success = True
                return os.path.join(string, item, db_file)
    if not success:
        sys.exit("Couldn't find the database file in the default location! Try providing a different location using the -b option...")
项目:PyFRAP    作者:alexblaessle    | 项目源码 | 文件源码
def getPermDetails(self):

        """Returns the permission details used to change permissions.
        """

        if platform.system() not in ["Windows"]:
            import pwd

            #Grab user ID and group ID of actual user
            uid=pwd.getpwnam(os.getlogin())[2]
            gid=pwd.getpwnam(os.getlogin())[3]

            #Mode for files (everyone can read/write/execute. This is somewhat an overkill, but 0666 seems somehow not to work.)
            mode=0777

            return uid,gid,mode

        return  0,0,0
项目:repo2docker    作者:jupyter    | 项目源码 | 文件源码
def test_volume_abspath():
    """
    Validate that you can bind mount a volume onto an absolute dir & write to it
    """
    ts = str(time.time())
    with tempfile.TemporaryDirectory() as tmpdir:
        username = os.getlogin()
        subprocess.check_call([
            'repo2docker',
            '-v', '{}:/home/{}'.format(tmpdir, username),
            '--user-id', str(os.geteuid()),
            '--user-name', username,
            tmpdir,
            '--',
            '/bin/bash',
            '-c', 'echo -n {} > ts'.format(ts)
        ])

        with open(os.path.join(tmpdir, 'ts')) as f:
            assert f.read() == ts
项目:repo2docker    作者:jupyter    | 项目源码 | 文件源码
def test_volume_relpath():
    """
    Validate that you can bind mount a volume onto an relative path & write to it
    """
    curdir = os.getcwd()
    try:
        ts = str(time.time())
        with tempfile.TemporaryDirectory() as tmpdir:
            os.chdir(tmpdir)
            subprocess.check_call([
                'repo2docker',
                '-v', '.:.',
                '--user-id', str(os.geteuid()),
                '--user-name', os.getlogin(),
                tmpdir,
                '--',
                '/bin/bash',
                '-c', 'echo -n {} > ts'.format(ts)
            ])

            with open(os.path.join(tmpdir, 'ts')) as f:
                assert f.read() == ts
    finally:
        os.chdir(curdir)
项目:jn_tester    作者:erickseolin    | 项目源码 | 文件源码
def __load_username(self):
        """Loads the username that is executing the tests."""
        self.__username = getpass.getuser()
        # Bellow is a platform specific... please, comment the bellow line if need it
        _user_login = 'adessowiki'
        # The try bellow is a bug correction for jupyter notebooks
        try:
            _user_login = os.getlogin()
        except OSError as err:
            warnings.warn("OSError... {0}".format(err))
        # Let's get the user folder to see if its running inside his own folder
        # We are forcing this anyway... if os.getlogin gives error
        if _user_login == 'adessowiki':
            _user_folder = os.getcwd()
            _matched = re.match(r'.*(\/{0}\/).*'.format(self.__username), _user_folder)
            if not _matched:
                raise Exception('You can\'t submit function from another user')
项目:EmPyre    作者:EmpireProject    | 项目源码 | 文件源码
def get_sysinfo():

    # listener | username | high_integrity | hostname | internal_ip | os_details | process_id | py_version
    username = os.getlogin()

    uid = os.popen('id -u').read().strip()
    highIntegrity = "True" if (uid == "0") else False

    osDetails = os.uname()
    hostname = osDetails[1]

    x = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    x.connect(("10.0.0.0", 80))
    internalIP = x.getsockname()[0]
    x.close()

    osDetails = ",".join(osDetails)
    processID = os.getpid()
    pyVersion = '.'.join([str(x) for x in sys.version_info])

    return "%s|%s|%s|%s|%s|%s|%s|%s" % (server, username, highIntegrity, hostname, internalIP, osDetails, processID, pyVersion)


# generate a randomized sessionID
项目:xcute    作者:viert    | 项目源码 | 文件源码
def __init__(self, options={}):
        cmd.Cmd.__init__(self)
        self.conductor = Conductor(options["projects"],
                                   host=options["conductor_host"],
                                   port=options["conductor_port"],
                                   cache_dir=options["cache_dir"],
                                   print_func=export_print)
        self.ssh_threads = options["ssh_threads"]
        self.user = options.get("user") or os.getlogin()
        self.progressbar = options.get("progressbar") or self.DEFAULT_OPTIONS["progressgbar"]
        self.ping_count = options.get("ping_count") or self.DEFAULT_OPTIONS["ping_count"]
        self.finished = False
        self.one_command_mode = False
        self.alias_scripts = {}
        self.default_remote_dir = options.get("default_remote_dir") or "/tmp"
        if "mode" in options:
            if not options["mode"] in self.MODES:
                error("invalid mode '%s'. use 'parallel', 'collapse' or 'serial" % options["mode"])
                self.mode = self.DEFAULT_MODE
            else:
                self.mode = options["mode"]
        else:
            self.mode = self.DEFAULT_MODE
项目:Dragon-Catcher    作者:Fred-Barclay    | 项目源码 | 文件源码
def not_owned():
    '''Are there files in /home/$USER that are not owned by $USER?'''
    user = os.getlogin() #Unlike os.getuid(), this gives the actual (non-root, usually) user.
    home_var = '~'+user
    home_dir = os.path.expanduser(home_var)

    cmd = ['find', home_dir , '!', '-user', user, '-type', 'f']

    out = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0]
    out = out.decode('utf-8').split()

    if len(out) == 0:
        print('There are no files in your home directory that are owned by another user.')
    elif len(out) != 0:
        print('The following files in your home directory are owned by another user.  \x1b[6;36;43m [Advice] \x1b[0m')
        for file in out:
            print(file)
        print('You may want to investigate why you do not own these files.\n')
    else:
        print('not_owned.py has failed to run properly. Please contact the author with all relevant information.')

    return(out)
项目:pastas    作者:pastas    | 项目源码 | 文件源码
def get_file_info(self):
        """Method to get the file information, mainly used for saving files.

        Returns
        -------
        file_info: dict
            dictionary with file information.

        """
        file_info = dict()
        file_info["date_created"] = pd.Timestamp.now()
        file_info["date_modified"] = pd.Timestamp.now()
        file_info["pastas_version"] = __version__
        try:
            file_info["owner"] = os.getlogin()
        except:
            file_info["owner"] = "Unknown"

        return file_info
项目:openpyn-nordvpn    作者:jotyGill    | 项目源码 | 文件源码
def running_with_sudo():
    if verify_running_as_root():
        try:
            logged_in_user = os.getlogin()
            if logged_in_user_is_root(logged_in_user):
                return False    # when logged in as 'root' user notifications will work.
            else:
                return True     # 'sudo' is used notification won't work.
        except FileNotFoundError:
            print("os.getlogin(), returned FileNotFoundError, \
assuming 'openpyn' is running with 'SUDO'")
            return True
        except OSError:
            print("os.getlogin(), returned error, assuming \
'openpyn' is running with 'SUDO'")
            return True

    return False    # regular user without 'sudo'
项目:sardana    作者:sardana-org    | 项目源码 | 文件源码
def printHeader(self, t):
        s = self.stream
        time_str = time.ctime(t)
        s.write("<html><head><title>Pool test</title>\n"
                "<meta http-equiv='refresh' content='2'>\n"
                "<link rel='stylesheet' href='%s' type='text/css'/>\n"
                "</head>\n<body>"
                "<p><table class='info'>"
                "<caption class='test'>Environment Summary</caption>"
                "<tr><td>Start time:</td><td>%s</td></tr>"
                "<tr><td>User name:</td><td>%s</td></tr>"
                "<tr><td>uname:</td><td>%s</td></tr>"
                "<tr><td>python:</td><td>%s</td></tr>"
                "<tr><td>PyTango:</td><td>%s (%s)</td></tr>"
                "</table></p>"
                "<p><table class='test'>"
                "<caption class='test'>Test Summary</caption>\n"
                "<tr><th>Test</th><th width=100>Status</th><th width=100>Result</th></tr>\n" %
                (self.css, time_str, os.getlogin(), str(os.uname()),
                 sys.version, PyTango.Release.version, PyTango.Release.version_description))

        s.flush()
项目:ansible-connection-machinectl    作者:BaxterStockman    | 项目源码 | 文件源码
def fetch_file(self, in_path, out_path):
        super(Connection, self).put_file(in_path, out_path)
        display.vvv(u'FETCH {0} TO {1}'.format(in_path, out_path), host=self.machine)

        in_path = self._prefix_login_path(in_path)

        returncode, stdout, stderr = self._run_command('copy-from', args=[in_path, out_path], machine=self.machine)

        if returncode != 0:
            raise AnsibleError('failed to transfer file {0} from {1}:\n{2}\n{3}'.format(out_path, in_path, stdout, stderr))

        # TODO might not be necessary?
        # Reset file permissions to current user after transferring from
        # container
        try:
            if self.remote_uid is not None:
                os.chown(out_path, os.geteuid(), os.getegid() or -1)
        except OSError:
            raise AnsibleError('failed to change ownership on file {0} to user {1}'.format(out_path, os.getlogin()))
项目:ipbb    作者:ipbus    | 项目源码 | 文件源码
def ensuresudo():
    import getpass
    lPrompt = '> '

    # , logfile = sys.stdout)
    p = pexpect.spawn('sudo -p "{0}" whoami'.format(lPrompt))
    lIndex = p.expect([pexpect.EOF, lPrompt])

    # I have sudo powers, therefore I return
    while lIndex != 0:
        lPwd = getpass.getpass(
            'Please insert password for user {0}: '.format(os.getlogin()))
        p.sendline(lPwd)
        lIndex = p.expect([pexpect.EOF, lPrompt])
        if lIndex == 0:
            break

    return p.exitstatus
# ------------------------------------------------------------------------------


# ------------------------------------------------------------------------------
项目:v2ex-tornado-2    作者:coderyy    | 项目源码 | 文件源码
def _GetUsername(self):
    '''Attempt to find the username in a cross-platform fashion.'''
    try:
      return os.getenv('USER') or \
             os.getenv('LOGNAME') or \
             os.getenv('USERNAME') or \
             os.getlogin() or \
             'nobody'
    except (IOError, OSError), e:
      return 'nobody'
项目:hardened-centos7-kickstart    作者:fcaviggia    | 项目源码 | 文件源码
def get_user():
    try:
        user = os.getlogin()
    except:
        user = ''
        pass
    return user

# Returns Hostname
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def getlogin():
    try:
        return os.getlogin()
    except:
        return getpass.getuser()
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def setUp(self):
        if not socks:
            raise nose.SkipTest('socks module unavailable')
        if not subprocess:
            raise nose.SkipTest('subprocess module unavailable')

        # start a short-lived miniserver so we can get a likely port
        # for the proxy
        self.httpd, self.proxyport = miniserver.start_server(
            miniserver.ThisDirHandler)
        self.httpd.shutdown()
        self.httpd, self.port = miniserver.start_server(
            miniserver.ThisDirHandler)

        self.pidfile = tempfile.mktemp()
        self.logfile = tempfile.mktemp()
        fd, self.conffile = tempfile.mkstemp()
        f = os.fdopen(fd, 'w')
        our_cfg = tinyproxy_cfg % {'user': os.getlogin(),
                                   'pidfile': self.pidfile,
                                   'port': self.proxyport,
                                   'logfile': self.logfile}
        f.write(our_cfg)
        f.close()
        try:
            # TODO use subprocess.check_call when 2.4 is dropped
            ret = subprocess.call(['tinyproxy', '-c', self.conffile])
            self.assertEqual(0, ret)
        except OSError, e:
            if e.errno == errno.ENOENT:
                raise nose.SkipTest('tinyproxy not available')
            raise
项目:munkisuit    作者:velotraveler    | 项目源码 | 文件源码
def __init__(self, debug=False, prefix=myprefix):
        self.username = os.getlogin()
        if len(set("[]*?").intersection(set(prefix))) > 0:
            raise PseudoCrontabException("prefix must not contain any of the shell wildcard matching characters '*', '[', ']', or '?'")
        self.prefix = prefix
        self.debug = debug
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def Test(app):

    print("Hello " + os.getlogin() + "  - (Real user even after sudo / su)")
    print("Options:")
    print(app.options)
    print("Arguments:")
    print(app.arguments)
    print("System Configuration:")
    print(app.configuration)
    if not app.data is None:
        print("Data:")
        print(app.data.dump())
        # iterate the configuration keys
        s = ""
        for key in app.data:
            s = s + " " + str(app.data[key])
        print(s.strip())
    print("")
    # injected logger
    app.logger.warning("hello from the injected loggger")
    # Using explicitely the root logger always logs to the console
    logging.debug("This is an info of the root logger")
    # Logging from myapp.lib
    myapp_cli_logger = logging.getLogger('myapp.cli')
    myapp_cli_logger.info("This is an info from myapp.cli")  # Not recorded
    myapp_cli_logger.warning("This is a warning from myapp.cli")  # -> sample.log
    myapp_cli_logger.error("This is an error from myapp.cli")  # -> sample.log
    myapp_cli_logger.critical("This is a critical from myapp.cli")  # -> sample.log
    print(call(["echo", ["hi", "$x", "a"]], shell = True))
    print(call(["./test.sh", "QXS"], shell = True))
    print(call(["./test.sh", "QXS"], shell = False))
    print(1/0)
项目:blackgate    作者:soasme    | 项目源码 | 文件源码
def read_default_config():
    """Read config from several default paths."""
    config = (
        read_yaml_config(os.path.join(os.getcwd(), 'blackgate.yml')) or \
        read_yaml_config(os.path.join(os.getlogin(), '.blackgate.yml')) or \
        read_yaml_config(os.path.join('usr', 'local', 'etc', 'blackgate', 'blackgate.yml')) or \
        read_yaml_config(os.path.join('etc', 'blackgate', 'blackgate.yml'))
    )
    return config
项目:alfred-alarmcom    作者:schwark    | 项目源码 | 文件源码
def save_db(creds, password=''):
  creds['password'] = password and hexlify(encrypt(password, creds['password'])) or creds['password']
  creds['username'] = password and hexlify(encrypt(password, creds['username'])) or creds['username']
  keychain = Keychain()
  account = getlogin()
  password = "%s%s%s" % (creds['username'], JOIN_STRING, creds['password'])
  return keychain.set_generic_password(KEYCHAIN, account, password, SERVICE_NAME)
项目:alfred-alarmcom    作者:schwark    | 项目源码 | 文件源码
def load_db(password=''):
  creds = {}
  keychain = Keychain()
  account = getlogin()
  payload = keychain.get_generic_password(KEYCHAIN, account, SERVICE_NAME)
  if payload and 'password' in payload:
    parts = payload['password'].split(JOIN_STRING)
    if(len(parts) > 1):
      creds['password'] = password and decrypt(password, unhexlify(parts[1])) or parts[1]
      creds['username'] = password and decrypt(password, unhexlify(parts[0])) or parts[0]
  return creds
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def v2_playbook_on_play_start(self, play):
        results = {}
        results['le_jobid'] = self.le_jobid
        results['started_by'] = os.getlogin()
        if play.name:
            results['play'] = play.name
        results['hosts'] = play.hosts
        self.emit_formatted(results)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_getlogin(self):
        user_name = os.getlogin()
        self.assertNotEqual(len(user_name), 0)
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def setUp(self):
        if not socks:
            raise nose.SkipTest('socks module unavailable')
        if not subprocess:
            raise nose.SkipTest('subprocess module unavailable')

        # start a short-lived miniserver so we can get a likely port
        # for the proxy
        self.httpd, self.proxyport = miniserver.start_server(
            miniserver.ThisDirHandler)
        self.httpd.shutdown()
        self.httpd, self.port = miniserver.start_server(
            miniserver.ThisDirHandler)

        self.pidfile = tempfile.mktemp()
        self.logfile = tempfile.mktemp()
        fd, self.conffile = tempfile.mkstemp()
        f = os.fdopen(fd, 'w')
        our_cfg = tinyproxy_cfg % {'user': os.getlogin(),
                                   'pidfile': self.pidfile,
                                   'port': self.proxyport,
                                   'logfile': self.logfile}
        f.write(our_cfg)
        f.close()
        try:
            # TODO use subprocess.check_call when 2.4 is dropped
            ret = subprocess.call(['tinyproxy', '-c', self.conffile])
            self.assertEqual(0, ret)
        except OSError, e:
            if e.errno == errno.ENOENT:
                raise nose.SkipTest('tinyproxy not available')
            raise
项目:Pigrow    作者:Pragmatismo    | 项目源码 | 文件源码
def set_local_options(self):
        MainApp.OS =  platform.system()
        if MainApp.OS == "Linux":
            computer_username = os.getlogin()
            MainApp.localfiles_path = "/home/" + computer_username + "/frompigrow/"
        else:
            localpath = os.getcwd()
            localpath += '/frompigrow/'
            MainApp.localfiles_path = localpath
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def getlogin():
    try:
        return os.getlogin()
    except:
        return getpass.getuser()
项目:of    作者:OptimalBPM    | 项目源码 | 文件源码
def get_current_login():
    if os.name == "nt":
        return os.getlogin()
    else:
        return pwd.getpwuid(os.geteuid()).pw_name
项目:of    作者:OptimalBPM    | 项目源码 | 文件源码
def write_to_log(_data, _category=EC_NOTIFICATION, _severity=SEV_INFO, _process_id=None, _user_id=None,
                 _occurred_when=None, _address=None, _node_id=None, _uid=None, _pid=None):
    """
    Writes a message to the log using the current facility
    :param _data: The error message
    :param _category: The event category (defaults to CN_NOTIFICATION)
    :param _severity: The severity of the error (defaults to SEV_INFO)
    :param _process_id: The current process id (defaults to the current pid)
    :param _user_id: The Id of the user (defaults to the current username)
    :param _occurred_when: The time of occurrance (defaults to the current time)
    :param _address: The peer address
    :param _node_id: An Id for reference (like a node id)
    :param _uid: The system uid
    :param _pid: The system pid
    """
    global callback, severity
    if _severity < severity:
        return _data

    _occurred_when = _occurred_when if _occurred_when is not None else str(datetime.datetime.utcnow())
    _pid = _pid if _pid is not None else os.getpid()
    _uid = _uid if _uid is not None else os.getlogin()

    if callback is not None:
        callback(_data, _category, _severity, _process_id, _user_id, _occurred_when, _address, _node_id, _uid, _pid)
    else:
        print("Logging callback not set, print message:\n" + make_textual_log_message(_data, _category,
                                                                                      _severity, _process_id, _user_id,
                                                                                      _occurred_when, _address, _node_id,
                                                                                      _uid, _pid))

    return _data
项目:of    作者:OptimalBPM    | 项目源码 | 文件源码
def make_textual_log_message(_data, _category=None, _severity=None, _process_id=None, _user_id=None,
                             _occurred_when=None, _address=None,_node_id=None, _uid=None, _pid=None):
    """
    Build a nice textual error message based on available information for dialogs or event logs.

    :param _data: The message text or data
    :param _category: The kind of error
    :param _severity: The severity of the error
    :param _process_id: The current process id
    :param _user_id: The Id of the user
    :param _occurred_when: The time of occurrance
    :param _address: The peer address
    :param _node_id: An Id for reference (like a node id)
    :param _uid: The system uid
    :param _pid: The system pid

    :return: An error message

    """

    _result = "Process Id: " + (str(_process_id) if _process_id is not None else "Not available")
    _result += ", Adress: " + (str(_address) if _address is not None else str("None"))
    _result += (" - An error occurred:\n" if _severity > 2 else " - Message:\n") + str(_data)
    _result += "\nEvent category: " + category_to_identifier(_category,
                                                             "invalid event category:") if _category is not None else ""
    _result += "\nSeverity: " + severity_to_identifier(_severity,
                                                       "invalid severity level:") if _severity is not None else ""
    _result += "\nUser Id: " + str(_user_id) if _user_id is not None else ""
    _result += "\nOccurred when: " + str(_occurred_when) if _occurred_when is not None else ""
    _result += "\nEntity Id: " + str(_node_id) if _node_id is not None else ""
    _result += "\nSystem uid: " + (str(_uid) if _uid is not None else str(os.getlogin()))
    _result += "\nSystem pid: " + (str(_pid) if _pid is not None else str(os.getpid()))

    return _result
项目:janus    作者:mikelovell    | 项目源码 | 文件源码
def from_local_shell():
        username = os.getlogin()
        groups = []
        for group in grp.getgrall():
            if username in group.gr_mem:
                groups.append(group.gr_name)
        return JanusContext(username, groups, 'shell')
项目:TwitterCompetitionBot    作者:BenAllenUK    | 项目源码 | 文件源码
def _GetUsername(self):
        '''Attempt to find the username in a cross-platform fashion.'''
        try:
            return os.getenv('USER') or \
                   os.getenv('LOGNAME') or \
                   os.getenv('USERNAME') or \
                   os.getlogin() or \
                   'nobody'
        except (AttributeError, IOError, OSError), e:
            return 'nobody'
项目:ecodash    作者:Servir-Mekong    | 项目源码 | 文件源码
def setUp(self):
        if not socks:
            raise nose.SkipTest('socks module unavailable')
        if not subprocess:
            raise nose.SkipTest('subprocess module unavailable')

        # start a short-lived miniserver so we can get a likely port
        # for the proxy
        self.httpd, self.proxyport = miniserver.start_server(
            miniserver.ThisDirHandler)
        self.httpd.shutdown()
        self.httpd, self.port = miniserver.start_server(
            miniserver.ThisDirHandler)

        self.pidfile = tempfile.mktemp()
        self.logfile = tempfile.mktemp()
        fd, self.conffile = tempfile.mkstemp()
        f = os.fdopen(fd, 'w')
        our_cfg = tinyproxy_cfg % {'user': os.getlogin(),
                                   'pidfile': self.pidfile,
                                   'port': self.proxyport,
                                   'logfile': self.logfile}
        f.write(our_cfg)
        f.close()
        try:
            # TODO use subprocess.check_call when 2.4 is dropped
            ret = subprocess.call(['tinyproxy', '-c', self.conffile])
            self.assertEqual(0, ret)
        except OSError, e:
            if e.errno == errno.ENOENT:
                raise nose.SkipTest('tinyproxy not available')
            raise
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_getlogin(self):
        user_name = os.getlogin()
        self.assertNotEqual(len(user_name), 0)
项目:gif-disco    作者:futurice    | 项目源码 | 文件源码
def _GetUsername(self):
    '''Attempt to find the username in a cross-platform fashion.'''
    try:
      return os.getenv('USER') or \
             os.getenv('LOGNAME') or \
             os.getenv('USERNAME') or \
             os.getlogin() or \
             'nobody'
    except (AttributeError, IOError, OSError), e:
      return 'nobody'
项目:repo2docker    作者:jupyter    | 项目源码 | 文件源码
def test_user():
    """
    Validate user id and name setting
    """
    ts = str(time.time())
    # FIXME: Use arbitrary login here, We need it now since we wanna put things to volume.
    username = os.getlogin()
    userid = str(os.geteuid())
    with tempfile.TemporaryDirectory() as tmpdir:
        subprocess.check_call([
            'repo2docker',
            '-v', '{}:/home/{}'.format(tmpdir, username),
            '--user-id', userid,
            '--user-name', username,
            tmpdir,
            '--',
            '/bin/bash',
            '-c', 'id -u > id && pwd > pwd && whoami > name && echo -n $USER > env_user'.format(ts)
        ])

        with open(os.path.join(tmpdir, 'id')) as f:
            assert f.read().strip() == userid
        with open(os.path.join(tmpdir, 'pwd')) as f:
            assert f.read().strip() == '/home/{}'.format(username)
        with open(os.path.join(tmpdir, 'name')) as f:
            assert f.read().strip() == username
        with open(os.path.join(tmpdir, 'name')) as f:
            assert f.read().strip() == username
项目:audio-tagging-toolkit    作者:hipstas    | 项目源码 | 文件源码
def tags_to_csv(csv_filename,tag_pairs,class_num=0,class_label=''):
    with open(csv_filename, 'w') as csv_fo:
        csv_writer = csv.writer(csv_fo)
        for start, end in tag_pairs:
            if class_label=='':
                csv_writer.writerow([start,class_num,end])
            else:
                csv_writer.writerow([start,class_num,end,class_label])
        if len(tag_pairs)>0:
            csv_fo.write('\n\n## RandomTags.py run by '+str(os.getlogin()))
            csv_fo.write('\n## '+strftime("%Y-%m-%d %H:%M:%S", gmtime())+' GMT\n')
项目:slurm-pipeline    作者:acorg    | 项目源码 | 文件源码
def testSQueueNotFound(self, subprocessMock):
        """
        When an attempt to run squeue fails due to an OSError, an SQueueError
        must be raised.
        """
        subprocessMock.side_effect = OSError('No such file or directory')
        error = (
            "^Encountered OSError \(No such file or directory\) when running "
            "'squeue -u %s'$" % getlogin())
        assertRaisesRegex(self, SQueueError, error,  SQueue)
项目:slurm-pipeline    作者:acorg    | 项目源码 | 文件源码
def testAllColumnsMissing(self, subprocessMock):
        """
        When the squeue output does not contain any of the right column
        headings an SQueueError must be raised.
        """
        subprocessMock.return_value = 'Bad header\n'
        error = (
            '^Required columns \(JOBID, NODELIST\(REASON\), ST, TIME\) not '
            "found in 'squeue -u %s' output$" % getlogin())
        assertRaisesRegex(self, SQueueError, error,  SQueue)
项目:slurm-pipeline    作者:acorg    | 项目源码 | 文件源码
def testSomeColumnsMissing(self, subprocessMock):
        """
        When the squeue output contains only some of the right column headings
        an SQueueError must be raised, indicating which columns were not found.
        """
        subprocessMock.return_value = 'JOBID ST\n'
        error = (
            '^Required columns \(NODELIST\(REASON\), TIME\) not found in '
            "'squeue -u %s' output$" % getlogin())
        assertRaisesRegex(self, SQueueError, error,  SQueue)
项目:slurm-pipeline    作者:acorg    | 项目源码 | 文件源码
def testRepeatColumnName(self, subprocessMock):
        """
        When the squeue output contains a repeated column heading (among the
        headings we're interested in) an SQueueError must be raised.
        """
        subprocessMock.return_value = 'JOBID ST JOBID\n'
        error = (
            "^Multiple columns with title 'JOBID' found in 'squeue -u %s' "
            "output$" % getlogin())
        assertRaisesRegex(self, SQueueError, error,  SQueue)
项目:slurm-pipeline    作者:acorg    | 项目源码 | 文件源码
def testSQueueCalledAsExpectedWhenNoArgsPassed(self, subprocessMock):
        """
        When no squeue arguments are passed, it must be called as expected
        (with -u LOGIN_NAME).
        """
        subprocessMock.return_value = 'JOBID ST TIME NODELIST(REASON)\n'
        SQueue()
        subprocessMock.assert_called_once_with(['squeue', '-u', getlogin()],
                                               universal_newlines=True)
项目:StockRecommendSystem    作者:doncat99    | 项目源码 | 文件源码
def _GetUsername(self):
        """Attempt to find the username in a cross-platform fashion."""
        try:
            return os.getenv('USER') or \
                   os.getenv('LOGNAME') or \
                   os.getenv('USERNAME') or \
                   os.getlogin() or \
                   'nobody'
        except (AttributeError, IOError, OSError):
            return 'nobody'
项目:ClockworkVMs    作者:csd-dev-tools    | 项目源码 | 文件源码
def checkBuildUser(self):
        '''
        Checks if the build user has UID of 0

        @author: Roy Nielsen, Eric Ball
        @return: Tuple containing the current user's login name and UID
        '''
        # This method is called before ramdisk creation, so it does not use the
        # try/except block that most methods do
        print "Starting checkBuildUser..."

        CURRENT_USER = os.getlogin()

        RUNNING_ID = str(os.geteuid())
        print "UID: " + RUNNING_ID

        if RUNNING_ID != "0":
            print " "
            print "****************************************"
            print "***** Current logged in user: " + CURRENT_USER
            print "***** Please run with SUDO "
            print "****************************************"
            print " "
            exit(1)
        else:
            print "***** Current logged in user: " + CURRENT_USER

        print "checkBuildUser Finished..."
        return CURRENT_USER, RUNNING_ID
项目:autosub-bootstrapbill    作者:BenjV    | 项目源码 | 文件源码
def _GetUsername(self):
    '''Attempt to find the username in a cross-platform fashion.'''
    try:
      return os.getenv('USER') or \
             os.getenv('LOGNAME') or \
             os.getenv('USERNAME') or \
             os.getlogin() or \
             'nobody'
    except (AttributeError, IOError, OSError), e:
      return 'nobody'
项目:kubeque    作者:broadinstitute    | 项目源码 | 文件源码
def _make_cluster_name(image, cpu_request, mem_limit, unique_name):
    import hashlib
    import os
    if unique_name:
        return 'l-' + _random_string(20)
    return "c-" + hashlib.md5("{}-{}-{}-{}-{}".format(image, cpu_request, mem_limit, kubeque.__version__, os.getlogin()).encode("utf8")).hexdigest()[:20]
项目:SurfaceWaterTool    作者:Servir-Mekong    | 项目源码 | 文件源码
def setUp(self):
        if not socks:
            raise nose.SkipTest('socks module unavailable')
        if not subprocess:
            raise nose.SkipTest('subprocess module unavailable')

        # start a short-lived miniserver so we can get a likely port
        # for the proxy
        self.httpd, self.proxyport = miniserver.start_server(
            miniserver.ThisDirHandler)
        self.httpd.shutdown()
        self.httpd, self.port = miniserver.start_server(
            miniserver.ThisDirHandler)

        self.pidfile = tempfile.mktemp()
        self.logfile = tempfile.mktemp()
        fd, self.conffile = tempfile.mkstemp()
        f = os.fdopen(fd, 'w')
        our_cfg = tinyproxy_cfg % {'user': os.getlogin(),
                                   'pidfile': self.pidfile,
                                   'port': self.proxyport,
                                   'logfile': self.logfile}
        f.write(our_cfg)
        f.close()
        try:
            # TODO use subprocess.check_call when 2.4 is dropped
            ret = subprocess.call(['tinyproxy', '-c', self.conffile])
            self.assertEqual(0, ret)
        except OSError, e:
            if e.errno == errno.ENOENT:
                raise nose.SkipTest('tinyproxy not available')
            raise
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_getlogin(self):
        user_name = os.getlogin()
        self.assertNotEqual(len(user_name), 0)