Python google.appengine.api.app_identity 模块,get_application_id() 实例源码

我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用google.appengine.api.app_identity.get_application_id()

项目:montage    作者:storyful    | 项目源码 | 文件源码
def __init__(self, default_timeout=None):
        """
            Creates the manager
        """
        self.cache = caches['default']

        try:
            self.version = get_application_id()
        except AttributeError:
            # this fails on local dev
            self.version = ''

        self.default_timeout = (
            settings.MEMOISE_CACHE_TIMEOUT
            if default_timeout is None
            else default_timeout
        )
项目:montage    作者:storyful    | 项目源码 | 文件源码
def get_settings_name():
    """
        Gets the correct module to define Django's settings
    """
    # application_id -> settings file
    APPENGINE_PRODUCTION = os.getenv('SERVER_SOFTWARE', '').startswith(
        'Google App Engine')
    settings_map = {
        'greenday-project-v02-dev': 'greenday_core.settings.staging',
        'greenday-project-v02-potato': 'greenday_core.settings.potato',
        'greenday-project': 'greenday_core.settings.prod',
    }
    try:
        app_id = get_application_id().lower()
    except AttributeError:
        app_id = ''

    settings_module = settings_map.get(app_id) \
        if APPENGINE_PRODUCTION else 'greenday_core.settings.local'
    logging.info("Using settings: {0}".format(settings_module))

    return settings_module
项目:appbackendapi    作者:codesdk    | 项目源码 | 文件源码
def get(self):
        auth_token, _ = app_identity.get_access_token(
            'https://www.googleapis.com/auth/cloud-platform')
        logging.info(
            'Using token {} to represent identity {}'.format(
                auth_token, app_identity.get_service_account_name()))

        response = urlfetch.fetch(
            'https://www.googleapis.com/storage/v1/b?project={}'.format(
                app_identity.get_application_id()),
            method=urlfetch.GET,
            headers={
                'Authorization': 'Bearer {}'.format(auth_token)
            }
        )

        if response.status_code != 200:
            raise Exception(
                'Call failed. Status code {}. Body {}'.format(
                    response.status_code, response.content))

        result = json.loads(response.content)
        self.response.headers['Content-Type'] = 'application/json'
        self.response.write(json.dumps(result, indent=2))
项目:appbackendapi    作者:codesdk    | 项目源码 | 文件源码
def post(self):
        user_address = self.request.get('email_address')

        if not mail.is_email_valid(user_address):
            self.get()  # Show the form again.
        else:
            confirmation_url = create_new_user_confirmation(user_address)
            sender_address = (
                'Example.com Support <{}@appspot.gserviceaccount.com>'.format(
                    app_identity.get_application_id()))
            subject = 'Confirm your registration'
            body = """Thank you for creating an account!
Please confirm your email address by clicking on the link below:

{}
""".format(confirmation_url)
            mail.send_mail(sender_address, user_address, subject, body)
# [END send-confirm-email]
            self.response.content_type = 'text/plain'
            self.response.write('An email has been sent to {}.'.format(
                user_address))
项目:google-auth-library-python    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def get_project_id():
    """Gets the project ID for the current App Engine application.

    Returns:
        str: The project ID

    Raises:
        EnvironmentError: If the App Engine APIs are unavailable.
    """
    # pylint: disable=missing-raises-doc
    # Pylint rightfully thinks EnvironmentError is OSError, but doesn't
    # realize it's a valid alias.
    if app_identity is None:
        raise EnvironmentError(
            'The App Engine APIs are not available.')
    return app_identity.get_application_id()
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def get_validated_user_info():
  """Returns a valid (user email, user info), or raises Unauthorized or Forbidden."""
  user_email = get_oauth_id()

  # Allow clients to simulate an unauthentiated request (for testing)
  # becaues we haven't found another way to create an unauthenticated request
  # when using dev_appserver. When client tests are checking to ensure that an
  # unauthenticated requests gets rejected, they helpfully add this header.
  # The `application_id` check ensures this feature only works in dev_appserver.
  if request.headers.get('unauthenticated') and app_identity.get_application_id() == 'None':
    user_email = None
  if user_email is None:
    raise Unauthorized('No OAuth user found.')

  user_info = lookup_user_info(user_email)
  if user_info:
    enforce_ip_whitelisted(request.remote_addr, get_whitelisted_ips(user_info))
    enforce_appid_whitelisted(request.headers.get('X-Appengine-Inbound-Appid'),
                              get_whitelisted_appids(user_info))
    logging.info('User %r ALLOWED', user_email)
    return (user_email, user_info)

  logging.info('User %r NOT ALLOWED' % user_email)
  raise Forbidden()
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def auth_required(role_whitelist):
  """A decorator that keeps the function from being called without auth.
  role_whitelist can be a string or list of strings specifying one or
  more roles that are allowed to call the function. """

  assert role_whitelist, "Can't call `auth_required` with empty role_whitelist."

  if type(role_whitelist) != list:
    role_whitelist = [role_whitelist]

  def auth_required_wrapper(func):
    def wrapped(*args, **kwargs):
      appid = app_identity.get_application_id()
      # Only enforce HTTPS and auth for external requests; requests made for data generation
      # are allowed through (when enabled).
      if not _is_self_request():
        if request.scheme.lower() != 'https' and appid not in ('None', 'testbed-test', 'testapp'):
          raise Unauthorized('HTTPS is required for %r' % appid)
        check_auth(role_whitelist)
      return func(*args, **kwargs)
    return wrapped
  return auth_required_wrapper
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def get_validated_user_info():
  """Returns a valid (user email, user info), or raises Unauthorized or Forbidden."""
  user_email = get_oauth_id()

  # Allow clients to simulate an unauthentiated request (for testing)
  # becaues we haven't found another way to create an unauthenticated request
  # when using dev_appserver. When client tests are checking to ensure that an
  # unauthenticated requests gets rejected, they helpfully add this header.
  # The `application_id` check ensures this feature only works in dev_appserver.
  if request.headers.get('unauthenticated') and app_identity.get_application_id() == 'None':
    user_email = None
  if user_email is None:
    raise Unauthorized('No OAuth user found.')

  user_info = lookup_user_info(user_email)
  if user_info:
    enforce_ip_whitelisted(request.remote_addr, get_whitelisted_ips(user_info))
    enforce_appid_whitelisted(request.headers.get('X-Appengine-Inbound-Appid'),
                              get_whitelisted_appids(user_info))
    logging.info('User %r ALLOWED', user_email)
    return (user_email, user_info)

  logging.info('User %r NOT ALLOWED' % user_email)
  raise Forbidden()
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def export_tables(database, tables, directory):
    app_id = app_identity.get_application_id()
    # Determine what GCS bucket to write to based on the environment and database.
    if app_id == 'None':
      bucket_name = app_identity.get_default_gcs_bucket_name()
    elif database == 'rdr':
      bucket_name = '%s-rdr-export' % app_id
    elif database in ['cdm', 'voc']:
      bucket_name = '%s-cdm' % app_id
    else:
      raise BadRequest("Invalid database: %s" % database)
    for table_name in tables:
      if not _TABLE_PATTERN.match(table_name):
        raise BadRequest("Invalid table name: %s" % table_name)
    for table_name in tables:
      deferred.defer(TableExporter._export_csv, bucket_name, database, directory, table_name)
    return {'destination': 'gs://%s/%s' % (bucket_name, directory)}
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def get_project_id():
    return app_identity.get_application_id()
项目:gcp-census    作者:ocadotechnology    | 项目源码 | 文件源码
def get_project_id():
        return os.getenv('BQ_STORAGE_PROJECT_ID',
                         app_identity.get_application_id())
项目:appbackendapi    作者:codesdk    | 项目源码 | 文件源码
def post(self):
        print repr(self.request.POST)
        id = self.request.POST['thread_id']
        send_example_mail('{}@appspot.gserviceaccount.com'.format(
            app_identity.get_application_id()), id)
        self.response.content_type = 'text/plain'
        self.response.write(
            'Sent an email to Albert with Reference header set to {}.'
            .format(id))
项目:appbackendapi    作者:codesdk    | 项目源码 | 文件源码
def get(self):
        send_approved_mail('{}@appspot.gserviceaccount.com'.format(
            app_identity.get_application_id()))
        self.response.content_type = 'text/plain'
        self.response.write('Sent an email message to Albert.')
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def ListActions(self, error=None):
    """Handler for get requests to datastore_admin/confirm_delete."""
    use_stats_kinds = False
    kinds = []
    more_kinds = False
    try:
      kinds, more_kinds = self.GetKinds()
      if not kinds:
        use_stats_kinds = True
        logging.warning('Found no kinds. Using datastore stats instead.')
    except datastore_errors.Error, e:
      logging.exception(e)
      use_stats_kinds = True

    last_stats_update, kind_stats = _GetDatastoreStats(
        kinds, use_stats_kinds=use_stats_kinds)

    template_params = {
        'run_as_a_service': self.request.get('run_as_a_service'),
        'datastore_admin_home': utils.GenerateHomeUrl(None),
        'offer_service': (self.request.get('service') and not
                          self.request.get('run_as_a_service')),
        'kind_stats': kind_stats,
        'more_kinds': more_kinds,
        'last_stats_update': last_stats_update,
        'app_id': self.request.get('app_id'),
        'hosting_app_id': app_identity.get_application_id(),
        'has_namespace': self.request.get('namespace', None) is not None,
        'namespace': self.request.get('namespace'),
        'action_list': sorted(ENTITY_ACTIONS.keys()),
        'backup_action_list': sorted(BACKUP_ACTIONS.keys()),
        'pending_backup_action_list': sorted(PENDING_BACKUP_ACTIONS.keys()),
        'error': error,
        'completed_operations': self.GetOperations(active=False),
        'active_operations': self.GetOperations(active=True),
        'pending_backups': self.GetPendingBackups(),
        'backups': self.GetBackups(),
        'map_reduce_path': config.MAPREDUCE_PATH + '/detail'
    }
    utils.RenderToResponse(self, 'list_actions.html', template_params)
项目:endpoints-python    作者:cloudendpoints    | 项目源码 | 文件源码
def get_app_hostname():
  """Return hostname of a running Endpoints service.

  Returns hostname of an running Endpoints API. It can be 1) "localhost:PORT"
  if running on development server, or 2) "app_id.appspot.com" if running on
  external app engine prod, or "app_id.googleplex.com" if running as Google
  first-party Endpoints API, or 4) None if not running on App Engine
  (e.g. Tornado Endpoints API).

  Returns:
    A string representing the hostname of the service.
  """
  if not is_running_on_app_engine() or is_running_on_localhost():
    return None

  app_id = app_identity.get_application_id()

  prefix = get_hostname_prefix()
  suffix = 'appspot.com'

  if ':' in app_id:
    tokens = app_id.split(':')
    api_name = tokens[1]
    if tokens[0] == 'google.com':
      suffix = 'googleplex.com'
  else:
    api_name = app_id

  return '{0}{1}.{2}'.format(prefix, api_name, suffix)
项目:google-auth-library-python    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def test_default():
    credentials, project_id = google.auth.default()

    assert isinstance(credentials, app_engine.Credentials)
    assert project_id == app_identity.get_application_id()
项目:edmunds    作者:LowieHuyghe    | 项目源码 | 文件源码
def app_id(self):
        """
        Get the Google App Engine app id
        :return:    The app id
        :rtype:     str
        """

        if not self.is_gae():
            raise RuntimeError('Not running in Google App Engine environment while fetching app_id.')

        if not hasattr(self, '_app_id'):
            from google.appengine.api import app_identity
            self._app_id = app_identity.get_application_id()

        return self._app_id
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def _setup_data(self):
    self._zip_code_to_state = {}
    with open('app_data/zipcodes.txt') as zipcodes:
      reader = csv.reader(zipcodes)
      for zipcode, state in reader:
        self._zip_code_to_state[zipcode] = state
    self._first_names = self._read_all_lines('first_names.txt')
    self._middle_names = self._read_all_lines('middle_names.txt')
    self._last_names = self._read_all_lines('last_names.txt')
    self._city_names = self._read_all_lines('city_names.txt')
    self._street_names = self._read_all_lines('street_names.txt')
    measurement_specs = self._read_json('measurement_specs.json')
    if app_identity.get_application_id() == 'None':
      # Read CSV from a local file when running dev_appserver.
      answer_specs = self._read_csv_from_file(_ANSWER_SPECS_FILE)
    else:
      # Read from GCS when running in Cloud environments.
      answer_specs = self._read_csv_from_gcs(_ANSWER_SPECS_BUCKET, _ANSWER_SPECS_FILE)
    # Save all the answer specs for questions that don't have special handling already.
    self._answer_specs = {answer_spec['question_code']: answer_spec for answer_spec in answer_specs
                          if answer_spec['question_code'] not in _QUESTION_CODES}
    # Serves as the denominator when deciding whether to answer a question.
    self._answer_specs_max_participants = max([int(answer_spec['num_participants'])
                                               for answer_spec in answer_specs])
    qualifier_concepts = set()
    for measurement in measurement_specs:
      for qualifier in measurement['qualifiers']:
        qualifier_concepts.add(Concept(qualifier['system'], qualifier['code']))
    measurement_map = {Concept(measurement['code']['system'], measurement['code']['code']):
                       measurement for measurement in measurement_specs}
    self._measurement_specs = [measurement for measurement in measurement_specs if
                               Concept(measurement['code']['system'], measurement['code']['code'])
                               not in qualifier_concepts]
    self._qualifier_map = {qualifier_concept: measurement_map[qualifier_concept] for
                           qualifier_concept in qualifier_concepts}
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def doSetUp(self):
    super(FlaskTestBase, self).doSetUp()
    # http://flask.pocoo.org/docs/0.12/testing/
    main.app.config['TESTING'] = True
    self._app = main.app.test_client()

    self._patchers = []
    mock_oauth = mock.patch('app_util.get_oauth_id')
    self._mock_get_oauth_id = mock_oauth.start()
    self._patchers.append(mock_oauth)

    config_api.CONFIG_ADMIN_MAP[app_identity.get_application_id()] = self._AUTH_USER

    self.set_auth_user(self._AUTH_USER)
    self._consent_questionnaire_id = None
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def send_failure_alert(job_name, message, log_exc_info=False, extra_recipients=None):
  """Sends an alert email for a failed job."""
  subject = '%s failed in %s' % (job_name, app_identity.get_application_id())
  # This sender needs to be authorized per-environment in Email Authorized Senders,
  # see https://cloud.google.com/appengine/docs/standard/python/mail/.
  sender = config.getSetting(config.INTERNAL_STATUS_MAIL_SENDER)
  to_list = config.getSettingList(config.INTERNAL_STATUS_MAIL_RECIPIENTS)
  if extra_recipients is not None:
    to_list += extra_recipients
  logging.error(
      '%s: %s (email will be sent from %r to %r)',
      subject, message, sender, to_list, exc_info=log_exc_info)
  mail.send_mail(sender, to_list, subject, message)
项目:endpoints-python    作者:cloudendpoints    | 项目源码 | 文件源码
def api_server(api_services, **kwargs):
  """Create an api_server.

  The primary function of this method is to set up the WSGIApplication
  instance for the service handlers described by the services passed in.
  Additionally, it registers each API in ApiConfigRegistry for later use
  in the BackendService.getApiConfigs() (API config enumeration service).
  It also configures service control.

  Args:
    api_services: List of protorpc.remote.Service classes implementing the API
      or a list of _ApiDecorator instances that decorate the service classes
      for an API.
    **kwargs: Passed through to protorpc.wsgi.service.service_handlers except:
      protocols - ProtoRPC protocols are not supported, and are disallowed.

  Returns:
    A new WSGIApplication that serves the API backend and config registry.

  Raises:
    TypeError: if protocols are configured (this feature is not supported).
  """
  # Disallow protocol configuration for now, Lily is json-only.
  if 'protocols' in kwargs:
    raise TypeError("__init__() got an unexpected keyword argument 'protocols'")

  # Construct the api serving app
  apis_app = _ApiServer(api_services, **kwargs)
  dispatcher = endpoints_dispatcher.EndpointsDispatcherMiddleware(apis_app)

  # Determine the service name
  service_name = os.environ.get('ENDPOINTS_SERVICE_NAME')
  if not service_name:
    _logger.warn('Did not specify the ENDPOINTS_SERVICE_NAME environment'
                 ' variable so service control is disabled.  Please specify'
                 ' the name of service in ENDPOINTS_SERVICE_NAME to enable'
                 ' it.')
    return dispatcher

  # If we're using a local server, just return the dispatcher now to bypass
  # control client.
  if control_wsgi.running_on_devserver():
    _logger.warn('Running on local devserver, so service control is disabled.')
    return dispatcher

  # The DEFAULT 'config' should be tuned so that it's always OK for python
  # App Engine workloads.  The config can be adjusted, but that's probably
  # unnecessary on App Engine.
  controller = control_client.Loaders.DEFAULT.load(service_name)

  # Start the GAE background thread that powers the control client's cache.
  control_client.use_gae_thread()
  controller.start()

  return control_wsgi.add_all(
      dispatcher,
      app_identity.get_application_id(),
      controller)