Python geopy.geocoders 模块,Nominatim() 实例源码

我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用geopy.geocoders.Nominatim()

项目:django-happenings    作者:natgeosociety    | 项目源码 | 文件源码
def geocode_address(self):
        """
        Set the lat_long field, if the appropriate fields are filled
        """
        from geopy.geocoders import Nominatim
        from geopy.exc import GeopyError

        address = self.one_line_address()
        if address:
            geolocator = Nominatim()
            try:
                location = geolocator.geocode(address)
                self.lat_long = {
                    "type": "Point",
                    "coordinates": [location.longitude, location.latitude]
                }
            except (GeopyError, AttributeError):
                pass
        return
项目:cOSMos    作者:astrosat    | 项目源码 | 文件源码
def coords_for(name):
    geocoder = Nominatim()
    location = geocoder.geocode(name, geometry='geojson')

    try:
        geometry = shape(location.raw['geojson'])

        # Coordinates have to be flipped in order to work in overpass
        if geometry.geom_type == 'Polygon':
            west, south, east, north = geometry.bounds
            return BoundingBox(south, west, north, east)
        elif geometry.geom_type == 'MultiPolygon':
            bboxs = (BoundingBox(*(g.bounds[0:2][::-1] + g.bounds[2:][::-1]))
                     for g in geometry)
            return bboxs
        elif geometry.geom_type == 'Point':
            south, north, west, east = (float(coordinate)
                                        for coordinate in
                                        location.raw['boundingbox'])
            return BoundingBox(south, west, north, east)

    except (KeyError, AttributeError):
        raise AttributeError(
            'No bounding box available for this location name.')
项目:LoveIsInTheAir-Stats-Plotter    作者:sahilsareen    | 项目源码 | 文件源码
def __init__(self, config_file=DEFAULT_CONFIG_FILE):
        self.config = ConfigReader(config_file)
        self.states = {}
        self.geo_locator = Nominatim()
        self.tweet_count = 0
        self.city_cache_appender = CacheAppender(self.config.cache_file_path)

        def get_level():
            return {
                'DEBUG': logging.DEBUG,
                'INFO': logging.INFO,
                'WARN': logging.WARNING,
                'ERROR': logging.ERROR,
                'FATAL': logging.FATAL,
                'CRITICAL': logging.CRITICAL
            }[self.config.logging_level]

        logging.basicConfig(format="[%(levelname)s] %(name)s: %(message)s", level=get_level())

        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.info("Analysing city names using config in %s" % config_file)
项目:NodeDefender    作者:CTSNE    | 项目源码 | 文件源码
def location(name, street, city, latitude = None, longitude = None):
    group = get_sql(name)
    if group is None:
        return False

    if not latitude and not longitude:
        geo = Nominatim()
        coord = geo.geocode(street + ' ' + city, timeout = 10)
        if coord:
            latitude = coord.latitude
            longitude = coord.longitude
        else:
            longitude = 0.0
            longitude = 0.0

    group.location = LocationModel(street, city, latitude, longitude)
    SQL.session.add(group)
    SQL.session.commit()
    return group
项目:NodeDefender    作者:CTSNE    | 项目源码 | 文件源码
def location(name, street, city, latitude = None, longitude = None):
    node = get_sql(name)
    if node is None:
        return False

    if not latitude and not longitude:
        geo = Nominatim()
        coord = geo.geocode(street + ' ' + city, timeout = 10)
        if coord:
            latitude = coord.latitude
            longitude = coord.longitude
        else:
            latitude = 0.0
            longitude = 0.0

    node.location = LocationModel(street, city, latitude, longitude)
    SQL.session.add(node)
    SQL.session.commit()
    return node
项目:python    作者:micronicstraining    | 项目源码 | 文件源码
def main():
    """ Only step 1 is given as solution. """
    with open(FILE_NAME, newline='') as csv_file:
        geo_locator = Nominatim()
        csv_reader = csv.reader(csv_file, delimiter=DELIMITER)
        for record in csv_reader:
            # is this a better way of unpacking you can think of?
            # Perhaps using a better data structure than just a tuple?
            policy_id, *_, point_latitude, point_longitude, line, construction, point_granularity = record
            # print(policy_id, point_latitude, point_longitude)
            try:
                print(geo_locator.reverse(point_latitude + ', ' + point_longitude))
            except Exception as e:
                print(e)
                print("Unable to get reverse coordinates")

    return 0
项目:apex-sigma-core    作者:lu-ci    | 项目源码 | 文件源码
def mapsearch(cmd, message, args):
    if args:
        search = ' '.join(args)
        search_url = '+'.join(args)
        if search:
            geo_parser = Nominatim()
            location = geo_parser.geocode(search)
            if location:
                lat = location.latitude
                lon = location.longitude
                maps_url = f'https://www.google.rs/maps/search/{search_url}/@{lat},{lon},11z?hl=en'
                response = discord.Embed(color=0xdd4e40)
                response.set_author(name=f'{location}', icon_url=map_icon, url=maps_url)
            else:
                maps_url = f'https://www.google.rs/maps/search/{search_url}'
                response = discord.Embed(color=0xdd4e40)
                response.set_author(name=f'Broad Search: {search.title()}', icon_url=map_icon, url=maps_url)
        else:
            response = discord.Embed(color=0xBE1931, title='? No location inputted.')
    else:
        response = discord.Embed(color=0xBE1931, title='? Nothing inputted.')
    await message.channel.send(embed=response)
项目:perdiem-django    作者:RevolutionTech    | 项目源码 | 文件源码
def get(self, request, *args, **kwargs):
        # Validate request
        form = CoordinatesFromAddressForm(request.GET)
        if not form.is_valid():
            return Response(form.errors, status=status.HTTP_400_BAD_REQUEST)
        address = form.cleaned_data['address']

        # Return lat/lon for address
        geolocator = Nominatim()
        try:
            location = geolocator.geocode(address)
        except GeocoderTimedOut:
            return Response(
                "Geocoder service currently unavailable. Please try again later.",
                status=status.HTTP_503_SERVICE_UNAVAILABLE
            )
        return Response({
            'latitude': float("{0:.4f}".format(location.latitude)),
            'longitude': float("{0:.4f}".format(location.longitude)),
        })
项目:sketal    作者:vk-brain    | 项目源码 | 文件源码
def __init__(self, *commands, token=None, prefixes=None, strict=False):
        """Answers with a weather in user's city or on specified addres."""

        if not token:
            raise ValueError("Token is not specified! Get it from: https://darksky.net")

        super().__init__(*commands, prefixes=prefixes, strict=strict)

        self.token = token

        self.icons = {
            "clear-day": "??",
            "clear-night": "??",
            "cloudy": "??",
            "fog": "??",
            "partly-cloudy-day":   "??",
            "partly-cloudy-night": "??",
            "rain": "??",
            "sleet": "?? ??",
            "snow": "??",
            "wind": "??",
            "error": "??",
        }

        self.geocoders = []
        for coder in [Photon, Yandex, Nominatim]:
            self.geocoders.append(coder())

        self.coords_cache = {}
        self.weather_cache = {}
        self.weather_clear = time.time() + 12 * 60 * 60

        self.api_lim = 95
        self.api_lim_clear = time.time() + 24 * 60 * 60
        self.api_lim_count = 0
项目:NodeDefender    作者:CTSNE    | 项目源码 | 文件源码
def coordinates(street, city):
    geo = Nominatim()
    coords = geo.geocode(street + ' ' + city)
    if coords:
        emit('coordinates', (coords.latitude, coords.longitude))
    else:
        emit('error', "Coordinates not found", namespace='/general')
项目:NodeDefender    作者:CTSNE    | 项目源码 | 文件源码
def coordinates(street, city):
    geo = Nominatim()
    geocords = geo.geocode(street + ' ' + city)
    if geocords:
        emit('coordinates', (geocords.latitude, geocords.longitude))
    else:
        emit("warning", "Coordinated no found", namespace='/general')
    return True
项目:IntroPython2016    作者:UWPCE-PythonCert    | 项目源码 | 文件源码
def main():
    geolocator = Nominatim()
    intro = True
    again = True

    print("This app will generate a crime heat map based on your location")

    while intro:
        introText = input("Would you like to enter a location (y/n) ")
        if introText.lower() == "y" or introText.lower() == "n":
            locationInfo = validLocation(geolocator, introText)
            intro = False

    raw_data = apiResults(locationInfo)
    mapMaker(raw_data, locationInfo)
    output(raw_data, locationInfo)

    while again:
        againText = input("Would you like to run the application again (y/n) ")
        if againText.lower() == "y" or againText.lower() == "n":
            again = False
        if againText.lower() == "y":
            main()
        else:
            print("Thanks! See you next time")
            SystemExit
项目:georef    作者:rukayaj    | 项目源码 | 文件源码
def _geolocate_nominatim(self, string):
        g = Nominatim(timeout=30)
        results = g.geocode(query=string, exactly_one=True)
        if results and results[0]:
            self.add_potential_georeference(long=results.longitude,
                                            lat=results.latitude,
                                            profile_name='OpenStreetMap',
                                            locality_name=results.address)
项目:pinder    作者:dhharris    | 项目源码 | 文件源码
def onChangeLoc(self, event):
        '''
        Change location
        '''
        loc_set = False
        while not loc_set:
            dlg = wx.TextEntryDialog(self.frame, 'Please enter a location', 'Current location: ' + self.location)
            if dlg.ShowModal() == wx.ID_OK:
                # do something here
                loc = str(dlg.GetValue())
            else:
                # handle dialog being cancelled or ended by some other button
                loc = None

            dlg.Destroy()

            geolocator = Nominatim()
            # Look up location given
            try:
                l = geolocator.geocode(loc, exactly_one=True)
                self.latlon = (l.latitude, l.longitude)
                self.location = loc
                loc_set = True

            except Exception as e:
                print('Error setting location\n' + str(e))
        self.session.update_location(self.latlon[0], self.latlon[1])
项目:errorgeopy    作者:alpha-beta-soup    | 项目源码 | 文件源码
def _get_default_pool_members():
    from geopy.geocoders import Nominatim, GoogleV3
    return (Nominatim(), GoogleV3())
项目:web    作者:pyjobs    | 项目源码 | 文件源码
def __init__(self):
        self._geocoder = geocoders.Nominatim(timeout=5, country_bias='fr')
项目:Hanhan_Play_With_Social_Media    作者:hanhanwu    | 项目源码 | 文件源码
def calculate_distance(merchant_loc, user_loc):
  geolocator = Nominatim()

  merchant_lat_lng = [Decimal(l) for l in merchant_loc.split(',')]
  al1 = (merchant_lat_lng[0], merchant_lat_lng[1])

  location2 = geolocator.geocode(user_loc)
  if location2 == None: return None
  al2 = (location2.latitude, location2.longitude)

  distce = vincenty(al1, al2).miles
  return distce
项目:Hanhan_Play_With_Social_Media    作者:hanhanwu    | 项目源码 | 文件源码
def get_ll(postal_code):
  if postal_code == None: return None
  geolocator = Nominatim()
  location = geolocator.geocode(postal_code)   # it seems that cannot write abbreviation here
  if location == None: return None
  al = str(location.latitude) + ', ' + str(location.longitude)
  return al
项目:AutoTinder    作者:nnormandin    | 项目源码 | 文件源码
def change_location(session, location_name):
    geolocator = Nominatim()
    location = geolocator.geocode(location_name)
    print('** changing location to {0}'.format(location.address))
    session.update_location(location.latitude, location.longitude)
    print('** location changed')


# make yourself undiscoverable
项目:kotori    作者:daq-tools    | 项目源码 | 文件源码
def geohash(latitude, longitude):
    return Geohash.encode(float(latitude), float(longitude))


# Cache responses from Nominatim for 1 month
项目:CAM2RetrieveData    作者:PurdueCAM2Project    | 项目源码 | 文件源码
def __init__(self, source, key):
        if source == 'Google':
            self.geolocator = GoogleV3(api_key = key)
        if source == 'Nominatim':
            self.geolocator = Nominatim()
        self.latitude = ""
        self.longitude = ""
        self.city = ""
        self.state = ""
        self.country = ""

        reload(sys)  
        sys.setdefaultencoding('utf8')
项目:air-bot    作者:saurabh-deochake    | 项目源码 | 文件源码
def get_latlon(self, city):
        try:
            geolocator = Nominatim()
            location = geolocator.geocode(city)
            latitude, longitude = location.latitude, location.longitude
            return latitude, longitude
        except: 
            return -1, -1
项目:greenhouse_keeper    作者:VictorRodriguez    | 项目源码 | 文件源码
def getaddress():
    from geopy.geocoders import Nominatim
    geolocator = Nominatim()
    cord = "%s,%s" % (lat,lng)
    location = geolocator.reverse(cord)
    return location.address
项目:perdiem-django    作者:RevolutionTech    | 项目源码 | 文件源码
def dispatch(self, request, *args, **kwargs):
        # Filtering
        self.active_genre = request.GET.get('genre', 'All Genres')
        self.distance = request.GET.get('distance')
        self.location = request.GET.get('location')
        self.lat = request.GET.get('lat')
        self.lon = request.GET.get('lon')

        # Sorting
        order_by_slug = request.GET.get('sort')
        if order_by_slug not in self.ORDER_BY_NAME:
            order_by_slug = 'recent'
        self.order_by = {
            'slug': order_by_slug,
            'name': self.ORDER_BY_NAME[order_by_slug],
        }

        # Geolocate if location
        self.location_coordinates = None
        self.geocoder_failed = False
        if self.location:
            geolocator = Nominatim()
            try:
                self.location_coordinates = geolocator.geocode(self.location)
            except GeocoderTimedOut:
                self.geocoder_failed = True

        return super(ArtistListView, self).dispatch(request, *args, **kwargs)
项目:rvo    作者:noqqe    | 项目源码 | 文件源码
def validate_location(ctx, param, value):

        if value is None:
            return value

        geolocator = Nominatim()
        location = geolocator.geocode(value)

        if location is None:
            raise click.BadParameter('Location \"%s\" could not be found' % value)

        return location
项目:MagiCircles    作者:MagiCircles    | 项目源码 | 文件源码
def latlong(opt):
    reload, retry = opt['reload'], opt['retry']
    map = models.UserPreferences.objects.filter(location__isnull=False).exclude(location__exact='')
    if not reload:
        map = map.filter(location_changed__exact=True)
    geolocator = Nominatim()
    for user in map:
        getLatLong(geolocator, user, retry)

    map = models.UserPreferences.objects.filter(latitude__isnull=False).select_related('user')

    mapcache = '{# this file is generated, do not edit it #}{% extends "base.html" %}{% load i18n %}{% load l10n %}{% block title %}{% trans "Map" %}{% endblock %}{% block content %}<div id="map"></div>{% endblock %}{% block afterjs %}{% localize off %}<script>var center=new google.maps.LatLng({% if center %}{{ center.latitude }},{{ center.longitude }}{% else %}30,0{% endif %});var zoom={% if zoom %}{{ zoom }}{% else %}2{% endif %};var addresses = ['

    for u in map:
        try:
            mapcache += '{open}"username": "{username}","avatar": "{avatar}","location": "{location}","icon": "{icon}","latlong": new google.maps.LatLng({latitude},{longitude}){close},'.format(
                open='{',
                username=escape(u.user.username),
                avatar=escape(models.avatar(u.user)),
                location=escape(u.location),
                icon=escape(u.favorite_character1_image if u.favorite_character1_image else SITE_STATIC_URL + 'static/img/default_map_icon.png'),
                latitude=u.latitude,
                longitude=u.longitude,
                close='}',
            )
        except:
            print 'One user not added in map'

    mapcache += '];</script><script src="' + SITE_STATIC_URL + 'static/js/map.js"></script>{% endlocalize %}{% endblock %}'

    with open(django_settings.BASE_DIR + '/' + django_settings.SITE + '/templates/pages/map.html', 'w') as f:
        f.write(mapcache.encode('UTF-8'))
    f.close()
项目:apex-sigma-core    作者:lu-ci    | 项目源码 | 文件源码
def weather(cmd, message, args):
    if 'secret_key' in cmd.cfg:
        secret_key = cmd.cfg['secret_key']
        if args:
            search, unit = get_unit_and_search(args)
            if search:
                geo_parser = Nominatim()
                location = geo_parser.geocode(search)
                if location:
                    lat = location.latitude
                    lon = location.longitude
                    req_url = f'https://api.darksky.net/forecast/{secret_key}/{lat},{lon}?units={unit}'
                    async with aiohttp.ClientSession() as session:
                        async with session.get(req_url) as data:
                            search_data = await data.read()
                            data = json.loads(search_data)
                    curr = data['currently']
                    icon = curr['icon']
                    forecast = data['daily']['summary']
                    dis, deg = get_dis_and_deg(unit, forecast)
                    forecast_title = f'{icons[icon]["icon"]} {curr["summary"]}'
                    response = discord.Embed(color=icons[icon]['color'], title=forecast_title)
                    response.description = f'Location: {location}'
                    response.add_field(name='?? Forecast', value=forecast, inline=False)
                    info_title = f'?? Temperature'
                    info_text = f'Temperature: {curr["temperature"]}{deg}'
                    info_text += f'\nFeels Like: {curr["apparentTemperature"]}{deg}'
                    info_text += f'\nDew Point: {curr["dewPoint"]}{deg}'
                    response.add_field(name=info_title, value=info_text, inline=True)
                    wind_title = '?? Wind'
                    wind_text = f'Speed: {curr["windSpeed"]} {dis}/H'
                    wind_text += f'\nGust: {curr["windGust"]} {dis}/H'
                    wind_text += f'\nBearing: {curr["windBearing"]}°'
                    response.add_field(name=wind_title, value=wind_text, inline=True)
                    other_title = '?? Other'
                    other_text = f'Humidity: {curr["humidity"]*100}%'
                    other_text += f'\nPressure: {curr["pressure"]}mbar'
                    if 'visibility' in curr:
                        other_text += f'\nVisibility: {curr["visibility"]} {dis}'
                    else:
                        other_text += f'\nVisibility: Unknown'
                    response.add_field(name=other_title, value=other_text, inline=True)
                else:
                    response = discord.Embed(color=0x696969, title='?? Location not found.')
            else:
                response = discord.Embed(color=0xBE1931, title='? No location inputted.')
        else:
            response = discord.Embed(color=0xBE1931, title='? Nothing inputted.')
    else:
        response = discord.Embed(color=0xBE1931, title='? The API Key is missing.')
    await message.channel.send(embed=response)
项目:apex-sigma-plugins    作者:lu-ci    | 项目源码 | 文件源码
def weather(cmd, message, args):
    if 'secret_key' in cmd.cfg:
        secret_key = cmd.cfg['secret_key']
        if args:
            search, unit = get_unit_and_search(args)
            if search:
                geo_parser = Nominatim()
                location = geo_parser.geocode(search)
                if location:
                    lat = location.latitude
                    lon = location.longitude
                    req_url = f'https://api.darksky.net/forecast/{secret_key}/{lat},{lon}?units={unit}'
                    async with aiohttp.ClientSession() as session:
                        async with session.get(req_url) as data:
                            search_data = await data.read()
                            data = json.loads(search_data)
                    curr = data['currently']
                    icon = curr['icon']
                    forecast = data['daily']['summary']
                    dis, deg = get_dis_and_deg(unit, forecast)
                    forecast_title = f'{icons[icon]["icon"]} {curr["summary"]}'
                    response = discord.Embed(color=icons[icon]['color'], title=forecast_title)
                    response.description = f'Location: {location}'
                    response.add_field(name='?? Forecast', value=forecast, inline=False)
                    info_title = f'?? Temperature'
                    info_text = f'Temperature: {curr["temperature"]}{deg}'
                    info_text += f'\nFeels Like: {curr["apparentTemperature"]}{deg}'
                    info_text += f'\nDew Point: {curr["dewPoint"]}{deg}'
                    response.add_field(name=info_title, value=info_text, inline=True)
                    wind_title = '?? Wind'
                    wind_text = f'Speed: {curr["windSpeed"]} {dis}/H'
                    wind_text += f'\nGust: {curr["windGust"]} {dis}/H'
                    wind_text += f'\nBearing: {curr["windBearing"]}°'
                    response.add_field(name=wind_title, value=wind_text, inline=True)
                    other_title = '?? Other'
                    other_text = f'Humidity: {curr["humidity"]*100}%'
                    other_text += f'\nPressure: {curr["pressure"]}mbar'
                    if 'visibility' in curr:
                        other_text += f'\nVisibility: {curr["visibility"]} {dis}'
                    else:
                        other_text += f'\nVisibility: Unknown'
                    response.add_field(name=other_title, value=other_text, inline=True)
                else:
                    response = discord.Embed(color=0x696969, title='?? Location not found.')
            else:
                response = discord.Embed(color=0xBE1931, title='? No location inputted.')
        else:
            response = discord.Embed(color=0xBE1931, title='? Nothing inputted.')
    else:
        response = discord.Embed(color=0xBE1931, title='? The API Key is missing.')
    await message.channel.send(embed=response)
项目:nsfw    作者:vied12    | 项目源码 | 文件源码
def get_air_quality(request):
    geolocator = Nominatim()
    context = request['context']
    entities = request['entities']
    loc = first_entity_value(entities, 'location')

    for context_key in (
        'missingLocation',
        'outOfGermany',
        'subscribed',
        'notSubscribed',
        'location',
        'station',
        'stationId',
        'lastAlertValue',
        'lastAlertDate',
        'kind',
        'clean',
        'notFound',
    ):
        try:
            del context[context_key]
        except KeyError:
            pass
    if not loc:
        context['missingLocation'] = True
    else:
        loc = geolocator.geocode(loc, language='en')
        if not loc:
            loc = geolocator.geocode('{}, Germany'.format(loc), language='en')
        if loc:
            closest_station = get_closest_station(loc.latitude, loc.longitude)
            # is subscribed ?
            if is_subscribed(request['session_id'], closest_station):
                context['subscribed'] = True
            else:
                context['notSubscribed'] = True
            # oldest alert we want
            max_date = datetime.datetime.now() - datetime.timedelta(days=2)
            last_alert = Alert.objects.filter(station=closest_station, report__date__gte=max_date).last()
            context['location'] = loc.address
            context['station'] = closest_station.name
            context['stationId'] = closest_station.pk
            cache.set(request['session_id'], closest_station.pk)
            if last_alert:
                context['lastAlertValue'] = last_alert.value
                context['lastAlertDate'] = last_alert.report.date.strftime('%x')
                context['kind'] = last_alert.report.kind
            else:
                context['clean'] = True
        else:
            context['notFound'] = True
    return context


# Setup Actions