Python iso8601 模块,parse_date() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用iso8601.parse_date()

项目:ml-rest    作者:apinf    | 项目源码 | 文件源码
def cast_primitive_value(spec, value):
    format = spec.get('format')
    type = spec.get('type')
    if type == 'boolean':
        return (force_text(value).lower() in ('1', 'yes', 'true'))
    if type == 'integer' or format in ('integer', 'long'):
        return int(value)
    if type == 'number' or format in ('float', 'double'):
        return float(value)
    if format == 'byte':  # base64 encoded characters
        return base64.b64decode(value)
    if format == 'binary':  # any sequence of octets
        return force_bytes(value)
    if format == 'date':  # ISO8601 date
        return iso8601.parse_date(value).date()
    if format == 'dateTime':  # ISO8601 datetime
        return iso8601.parse_date(value)
    if type == 'string':
        return force_text(value)
    return value
项目:lepo    作者:akx    | 项目源码 | 文件源码
def cast_primitive_value(spec, value):
    format = spec.get('format')
    type = spec.get('type')
    if type == 'boolean':
        return (force_text(value).lower() in ('1', 'yes', 'true'))
    if type == 'integer' or format in ('integer', 'long'):
        return int(value)
    if type == 'number' or format in ('float', 'double'):
        return float(value)
    if format == 'byte':  # base64 encoded characters
        return base64.b64decode(value)
    if format == 'binary':  # any sequence of octets
        return force_bytes(value)
    if format == 'date':  # ISO8601 date
        return iso8601.parse_date(value).date()
    if format == 'dateTime':  # ISO8601 datetime
        return iso8601.parse_date(value)
    if type == 'string':
        return force_text(value)
    return value
项目:repocribro    作者:MarekSuchanek    | 项目源码 | 文件源码
def _process_event(self, repo, event):
        """Process potentially new event for repository

        :param repo: Repository related to event
        :type repo: ``repocribro.models.Repository``
        :param event: GitHub event data
        :type event: dict
        :return: If the event was new or already registered before
        :rtype: bool
        """
        last = pytz.utc.localize(repo.last_event)
        if iso8601.parse_date(event['created_at']) <= last:
            return False
        hook_type = self.event2webhook.get(event['type'], 'uknown')
        for event_processor in self.hooks.get(hook_type, []):
            try:
                event_processor(db=self.db, repo=repo,
                                payload=event['payload'],
                                actor=event['actor'])
                print('Processed {} from {} event for {}'.format(
                    event['type'], event['created_at'], repo.full_name
                ))
            except HTTPException:
                print('Error while processing #{}'.format(event['id']))
        return True
项目:jsonschema2db    作者:better    | 项目源码 | 文件源码
def _is_valid_type(self, t, value):
        try:
            if t == 'number':
                float(value)
            elif t == 'integer':
                int(value)
            elif t == 'boolean':
                assert type(value) == bool
            elif t == 'timestamp':
                iso8601.parse_date(value)
            elif t == 'date':
                iso8601.parse_date(value + 'T00:00:00Z')
            elif t == 'string':
                # Allow coercing ints/floats, but nothing else
                assert type(value) in [str, int, float]
        except:
            return False
        return True
项目:fuel-ccp-tests    作者:openstack    | 项目源码 | 文件源码
def linear_check(self):
        """Action to wait until all checks are done
        Each check has own timeout equals 300 secs"""
        creation_dates = []
        for item in self.__linear_order:
            resource_repr = getattr(item, '__resource_repr')
            helpers.wait(
                lambda: self._linear_check(item), timeout=300, interval=2,
                timeout_msg="{} creation timeout reached".format(resource_repr)
            )
            k8s_obj = self.get_k8s_object(resource_repr)
            creation_date = iso8601.parse_date(
                k8s_obj.metadata.creation_timestamp)
            creation_dates.append(creation_date)
            if len(creation_dates) > 1:
                assert creation_dates[-2] <= creation_dates[-1], (
                    "The order of linear objects is broken!")
        LOG.info("Linear check passed!")
项目:graphene-mongo    作者:joaovitorsilvestre    | 项目源码 | 文件源码
def parse_operators(args):
    """ Avoid problem that mongoengine for some reason the operators 
        gte, gt, lt, lte doesn't work with dates in isoformat 
    """

    args = {k: v for k, v in args.items() if k not in ['skip', 'limit']}
    for k, v in args.items():
        try:
            is_data = iso8601.parse_date(v)
        except:
            is_data = False

        if is_data:
            args[k] = is_data if is_data else v

    return args
项目:etc    作者:sublee    | 项目源码 | 文件源码
def make_node(cls, data):
        try:
            key = data['key']
        except KeyError:
            key, kwargs = u'/', {}
        else:
            kwargs = {'modified_index': int(data['modifiedIndex']),
                      'created_index': int(data['createdIndex'])}
        ttl = data.get('ttl')
        if ttl is not None:
            expiration = iso8601.parse_date(data['expiration'])
            kwargs.update(ttl=ttl, expiration=expiration)
        if 'value' in data:
            node_cls = Value
            args = (data['value'],)
        elif data.get('dir', False):
            node_cls = Directory
            args = ([cls.make_node(n) for n in data.get('nodes', ())],)
        else:
            node_cls, args = Node, ()
        return node_cls(key, *args, **kwargs)
项目:series-tiempo-ar-api    作者:datosgobar    | 项目源码 | 文件源码
def run(self, query, args):
        self.start = args.get(constants.PARAM_START_DATE)
        self.end = args.get(constants.PARAM_END_DATE)

        self.validate_start_end_dates()
        if self.errors:
            return

        start_date, end_date = None, None
        if self.start:
            start_date = str(iso8601.parse_date(self.start).date())
        if self.end:
            end_date = iso8601.parse_date(self.end).date()
            if '-' not in self.end:  # Solo especifica año
                end_date = datetime.date(end_date.year, 12, 31)
            if self.end.count('-') == 1:  # Especifica año y mes
                # Obtengo el último día del mes, monthrange devuelve
                # tupla (month, last_day)
                days = monthrange(end_date.year, end_date.month)[1]
                end_date = datetime.date(end_date.year, end_date.month, days)

        query.add_filter(start_date, end_date)
项目:series-tiempo-ar-api    作者:datosgobar    | 项目源码 | 文件源码
def validate_date(self, _date, param):
        """Valida y parsea la fecha pasada.

        Args:
            _date (str): date string, ISO 8601
            param (str): Parámetro siendo parseado

        Returns:
            date con la fecha parseada

        Raises:
            ValueError: si el formato no es válido
        """

        try:
            parsed_date = iso8601.parse_date(_date)
        except iso8601.ParseError:
            self._append_error(strings.INVALID_DATE.format(param, _date))
            raise ValueError
        return parsed_date
项目:series-tiempo-ar-api    作者:datosgobar    | 项目源码 | 文件源码
def test_add_collapse(self):
        """Testea que luego de agregar un collapse default, los
        resultados sean anuales, es decir cada uno a un año de
        diferencia con su anterior"""
        self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        self.query.add_collapse()
        data = self.query.run()
        prev_timestamp = None
        for row in data:
            timestamp = row[0]
            parsed_timestamp = iso8601.parse_date(timestamp)
            if not prev_timestamp:
                prev_timestamp = parsed_timestamp
                continue
            delta = relativedelta(parsed_timestamp, prev_timestamp)
            self.assertTrue(delta.years == 1, timestamp)
            prev_timestamp = parsed_timestamp
项目:series-tiempo-ar-api    作者:datosgobar    | 项目源码 | 文件源码
def test_query_fills_nulls(self):
        self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        self.query.add_series(self.delayed_series,
                              self.rep_mode,
                              self.series_periodicity)

        query = ESQuery(index=settings.TEST_INDEX)
        query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        query.sort('asc')
        delayed_first_date = iso8601.parse_date(query.run()[0][0])
        data = self.query.run()

        delayed_series_index = 1  # Primera serie agregada
        for row in data:
            current_date = iso8601.parse_date(row[0])
            if current_date < delayed_first_date:
                self.assertEqual(row[delayed_series_index], None)
            else:
                break
项目:series-tiempo-ar-api    作者:datosgobar    | 项目源码 | 文件源码
def test_query_fills_nulls_second_series(self):
        self.query.add_series(self.delayed_series,
                              self.rep_mode,
                              self.series_periodicity)
        self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)

        query = ESQuery(index=settings.TEST_INDEX)
        query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        query.sort('asc')
        delayed_first_date = iso8601.parse_date(query.run()[0][0])
        data = self.query.run()

        delayed_series_index = 2  # Segunda serie agregada
        for row in data:
            current_date = iso8601.parse_date(row[0])
            if current_date < delayed_first_date:
                self.assertEqual(row[delayed_series_index], None)
            else:
                break
项目:series-tiempo-ar-api    作者:datosgobar    | 项目源码 | 文件源码
def test_end_of_period(self):
        query = ESQuery(index=settings.TEST_INDEX)
        query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        query.add_pagination(start=0, limit=1000)
        query.sort('asc')
        query.add_filter(start="1970")
        orig_data = query.run()

        self.query.add_series(self.single_series,
                              self.rep_mode,
                              self.series_periodicity,
                              'end_of_period')
        self.query.add_filter(start="1970")
        self.query.add_collapse('year')
        eop_data = self.query.run()

        for eop_row in eop_data:
            eop_value = eop_row[1]
            year = iso8601.parse_date(eop_row[0]).year
            for row in orig_data:
                row_date = iso8601.parse_date(row[0])
                if row_date.year == year and row_date.month == 12:
                    self.assertAlmostEqual(eop_value, row[1], 5)  # EOP trae pérdida de precisión
                    break
项目:series-tiempo-ar-api    作者:datosgobar    | 项目源码 | 文件源码
def test_query_fills_nulls(self):
        self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        self.query.add_series(self.delayed_series,
                              self.rep_mode,
                              self.series_periodicity)

        query = ESQuery(index=settings.TEST_INDEX)
        query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        query.sort('asc')
        delayed_first_date = iso8601.parse_date(query.run()[0][0])
        self.query.sort('asc')
        data = self.query.run()

        delayed_series_index = 1  # Primera serie agregada
        for row in data:
            current_date = iso8601.parse_date(row[0])
            if current_date < delayed_first_date:
                self.assertEqual(row[delayed_series_index], None)
            else:
                break
项目:series-tiempo-ar-api    作者:datosgobar    | 项目源码 | 文件源码
def test_query_fills_nulls_second_series(self):
        self.query.add_series(self.delayed_series,
                              self.rep_mode,
                              self.series_periodicity)
        self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)

        query = ESQuery(index=settings.TEST_INDEX)
        query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        query.sort('asc')
        delayed_first_date = iso8601.parse_date(query.run()[0][0])
        self.query.sort('asc')
        data = self.query.run()

        delayed_series_index = 2  # Segunda serie agregada
        for row in data:
            current_date = iso8601.parse_date(row[0])
            if current_date < delayed_first_date:
                self.assertEqual(row[delayed_series_index], None)
            else:
                break
项目:series-tiempo-ar-api    作者:datosgobar    | 项目源码 | 文件源码
def test_index_continuity(self):
        self.query.add_series(self.delayed_series,
                              self.rep_mode,
                              self.series_periodicity)
        self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        self.query.add_filter(start="1910", end="1920")
        self.query.sort('asc')

        query = ESQuery(index=settings.TEST_INDEX)
        query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        query.add_filter(start="1921")  # Garantiza datos vacíos entre 1920-1921
        query.add_pagination(start=0, limit=1000)
        query.sort('asc')

        data = self.query.run()
        current_date = iso8601.parse_date(data[0][0])
        for row in data[1:]:
            row_date = iso8601.parse_date(row[0])
            self.assertEqual(current_date + relativedelta(months=1), row_date)
            current_date = row_date
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def get_times(request):
    """Gets start and endtime from request

    As we use no timezone in NAV, remove it from parsed timestamps
    :param request: django.http.HttpRequest
    """
    starttime = request.GET.get('starttime')
    endtime = request.GET.get('endtime')
    try:
        if starttime:
            starttime = iso8601.parse_date(starttime).replace(tzinfo=None)
        if endtime:
            endtime = iso8601.parse_date(endtime).replace(tzinfo=None)
    except iso8601.ParseError:
        raise Iso8601ParseError

    return starttime, endtime
项目:trees-api    作者:codeforberlin    | 项目源码 | 文件源码
def get_timestamp(filename):

    class GMLHandler(xml.sax.ContentHandler):

        timestamp = None

        def startElement(self, name, attrs):
            if name == "wfs:FeatureCollection":
                self.timestamp = attrs['timeStamp']

    handler = GMLHandler()
    parser = xml.sax.make_parser()
    parser.setContentHandler(handler)
    parser.parse(filename)

    timestamp = iso8601.parse_date(handler.timestamp, default_timezone=None)
    return pytz.timezone(settings.TIME_ZONE).localize(timestamp)
项目:django-daiquiri    作者:aipescience    | 项目源码 | 文件源码
def _test_get_job_destruction(self, username):
        '''
        GET /{jobs}/{job-id}/destruction returns the destruction instant for
        {job-id} as [std:iso8601].
        '''
        for job in self.jobs:
            url = reverse(self.url_names['destruction'], kwargs={'pk': job.pk})
            response = self.client.get(url)

            if username == 'user':
                self.assertEqual(response.status_code, 200)

                if job.destruction_time:
                    destruction_time = iso8601.parse_date(response.content.decode())
                    self.assertEqual(destruction_time, job.destruction_time)
                else:
                    self.assertEqual(response.content.decode(), '')
            else:
                self.assertEqual(response.status_code, 404)
项目:django-daiquiri    作者:aipescience    | 项目源码 | 文件源码
def _test_post_job_destruction(self, username):
        '''
        POST /{jobs}/{job-id}/destruction with DESTRUCTION={std:iso8601}
        (application/x-www-form-urlencoded) sets the destruction instant for
        {job-id} and redirects to /{jobs}/{job-id} as 303.
        '''
        destruction_time = '2016-01-01T00:00:00'

        for job in self.jobs:
            url = reverse(self.url_names['destruction'], kwargs={'pk': job.pk})
            response = self.client.post(url, urlencode({'DESTRUCTION': destruction_time}), content_type='application/x-www-form-urlencoded')

            if username == 'user':
                redirect_url = 'http://testserver' + reverse(self.url_names['detail'], kwargs={'pk': job.pk})
                self.assertRedirects(response, redirect_url, status_code=303)
                self.assertEqual(
                    self.jobs.get(pk=job.pk).destruction_time,
                    iso8601.parse_date('2016-01-01T00:00:00')
                )
            else:
                self.assertEqual(response.status_code, 404)
项目:django-daiquiri    作者:aipescience    | 项目源码 | 文件源码
def filter_queryset(self, request, queryset, view):
        query_dict = make_query_dict_upper_case(request.GET)

        # apply only for list
        if view.action == 'list_jobs':
            phases = query_dict.getlist('PHASE')

            if phases:
                queryset = queryset.filter(phase__in=phases)
            else:
                queryset = queryset.exclude(phase__exact=Job.PHASE_ARCHIVED)

            after = query_dict.get('AFTER')
            if after:
                queryset = queryset.filter(creation_time__gt=iso8601.parse_date(after))

            last = query_dict.get('LAST')
            if last:
                queryset = queryset.filter(start_time__isnull=False) \
                    .order_by('-start_time')[:int(last)]

        return queryset
项目:aw-core    作者:ActivityWatch    | 项目源码 | 文件源码
def _timestamp_parse(ts: ConvertableTimestamp) -> datetime:
    """
    Takes something representing a timestamp and
    returns a timestamp in the representation we want.
    """
    if isinstance(ts, str):
        ts = iso8601.parse_date(ts)
    # Set resolution to milliseconds instead of microseconds
    # (Fixes incompability with software based on unix time, for example mongodb)
    ts = ts.replace(microsecond=int(ts.microsecond / 1000) * 1000)
    # Add timezone if not set
    if not ts.tzinfo:
        # Needed? All timestamps should be iso8601 so ought to always contain timezone.
        # Yes, because it is optional in iso8601
        logger.warning("timestamp without timezone found, using UTC: {}".format(ts))
        ts = ts.replace(tzinfo=timezone.utc)
    return ts
项目:openprocurement.auctions.dgf    作者:openprocurement    | 项目源码 | 文件源码
def test_create_auction_auctionPeriod(self):
        data = self.initial_data.copy()
        #tenderPeriod = data.pop('tenderPeriod')
        #data['auctionPeriod'] = {'startDate': tenderPeriod['endDate']}
        response = self.app.post_json('/auctions', {'data': data})
        self.assertEqual(response.status, '201 Created')
        self.assertEqual(response.content_type, 'application/json')
        auction = response.json['data']
        self.assertIn('tenderPeriod', auction)
        self.assertIn('auctionPeriod', auction)
        self.assertNotIn('startDate', auction['auctionPeriod'])
        self.assertEqual(parse_date(data['auctionPeriod']['startDate']).date(), parse_date(auction['auctionPeriod']['shouldStartAfter'], TZ).date())
        if SANDBOX_MODE:
            auction_startDate = parse_date(data['auctionPeriod']['startDate'], None)
            if not auction_startDate.tzinfo:
                auction_startDate = TZ.localize(auction_startDate)
            tender_endDate = parse_date(auction['tenderPeriod']['endDate'], None)
            if not tender_endDate.tzinfo:
                tender_endDate = TZ.localize(tender_endDate)
            self.assertLessEqual((auction_startDate - tender_endDate).total_seconds(), 70)
        else:
            self.assertEqual(parse_date(auction['tenderPeriod']['endDate']).date(), parse_date(data['auctionPeriod']['startDate'], TZ).date() - timedelta(days=1))
            self.assertEqual(parse_date(auction['tenderPeriod']['endDate']).time(), time(20, 0))
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_instance_get_all_by_filters_changes_since(self):
        i1 = self.create_instance_with_args(updated_at=
                                            '2013-12-05T15:03:25.000000')
        i2 = self.create_instance_with_args(updated_at=
                                            '2013-12-05T15:03:26.000000')
        changes_since = iso8601.parse_date('2013-12-05T15:03:25.000000')
        result = db.instance_get_all_by_filters(self.ctxt,
                                                {'changes-since':
                                                 changes_since})
        self._assertEqualListsOfInstances([i1, i2], result)

        changes_since = iso8601.parse_date('2013-12-05T15:03:26.000000')
        result = db.instance_get_all_by_filters(self.ctxt,
                                                {'changes-since':
                                                 changes_since})
        self._assertEqualListsOfInstances([i2], result)

        db.instance_destroy(self.ctxt, i1['uuid'])
        filters = {}
        filters['changes-since'] = changes_since
        filters['marker'] = i1['uuid']
        result = db.instance_get_all_by_filters(self.ctxt,
                                                filters)
        self._assertEqualListsOfInstances([i2], result)
项目:deb-subunit    作者:openstack    | 项目源码 | 文件源码
def test_parse_no_timezone():
    """issue 4 - Handle datetime string without timezone

    This tests what happens when you parse a date with no timezone. While not
    strictly correct this is quite common. I'll assume UTC for the time zone
    in this case.
    """
    d = iso8601.parse_date("2007-01-01T08:00:00")
    assert d.year == 2007
    assert d.month == 1
    assert d.day == 1
    assert d.hour == 8
    assert d.minute == 0
    assert d.second == 0
    assert d.microsecond == 0
    assert d.tzinfo == iso8601.UTC
项目:log-tick    作者:xxv    | 项目源码 | 文件源码
def load_logs(self, start, end, log_filter):
        try:
            response = urllib.request.urlopen(self.get_url(start, end, log_filter))
        except urllib.error.URLError as err:
            print("Error reading from network: %s" % err)
            return []
        body = response.read().decode('utf-8')
        events = []
        for line in body.split('\n'):
            match = re.match(r'.*?(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}'
                             r'.\d{6}[-+]\d{2}:\d{2}).*path="([^"]+)".*', line)
            if match:
                path = self.filter_path(match.groups()[1])
                when = iso8601.parse_date(match.groups()[0])
                events.append((when, path))

        return events
项目:mirandum    作者:google    | 项目源码 | 文件源码
def as_dict(self):
        details = json.loads(self.details)
        name = "Anonymous Donor"
        if 'donorName' in details and details['donorName']:
            name = details['donorName']
        datetime = iso8601.parse_date(details['createdOn'])    
        info = {
            # general 
            'name': name,
            'comment': details.get('message', "") or '',
            'donation_amount': float(details['donationAmount']),
            'currency': 'USD',
            # Display-friendly
            'amount': "$%.2f" % details['donationAmount'],
            'timestamp': datetime,
        }
        return info
项目:mirandum    作者:google    | 项目源码 | 文件源码
def as_dict(self):
        details = json.loads(self.details)
        name = "Anonymous"
        amount = " ".join([str(details['amount']), details['currencyCode']])
        if 'user' in details:
            name = details['user']['displayName']
        elif 'username' in details:
            name = details['username']
        timestamp = iso8601.parse_date(details['date'])
        info = {
            'name': name,
            'amount': amount,
            'comment': details['note'],
            'donation_amount': float(details['amount']),
            'currency': details['currencyCode'],
            'timestamp': timestamp,
        }
        return info
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def parse_datetime(date):
    """
    Validates date is in iso8601 format. Returns parsed datetime in UTC as as
    native datetime (tzinfo=None).
    """
    if not isinstance(date, basestring):
        raise Invalid('date is not a string')
    try:
        return iso8601.parse_date(date).astimezone(iso8601.UTC).replace(
            tzinfo=None)
    except:
        raise Invalid('date is not in iso8601 format')
项目:dj-paypal    作者:HearthSim    | 项目源码 | 文件源码
def test_sync_all_active_plans():
    all_plans = get_fixture("rest.billingplan.all.active.json")
    models.BillingPlan.objects.sync_data(all_plans["plans"])

    assert models.BillingPlan.objects.count() == len(all_plans["plans"])

    for plan in all_plans["plans"]:
        plan_obj = get_fixture(
            "GET/v1/payments/billing-plans/{id}.json".format(id=plan["id"])
        )
        plan = models.BillingPlan.objects.get(id=plan_obj["id"])
        assert plan.id == plan_obj["id"]
        assert plan.state == getattr(enums.BillingPlanState, plan_obj["state"])
        assert plan.type == getattr(enums.BillingPlanType, plan_obj["type"])
        assert plan.name == plan_obj["name"]
        assert plan.description == plan_obj["description"]
        assert plan.merchant_preferences == plan_obj["merchant_preferences"]
        assert plan.create_time == parse_date(plan_obj["create_time"])
        assert plan.update_time == parse_date(plan_obj["update_time"])

        for definition in plan_obj["payment_definitions"]:
            pd = models.PaymentDefinition.objects.get(id=definition["id"])
            assert pd.id == definition["id"]
            assert pd.name == definition["name"]
            assert pd.type == getattr(enums.PaymentDefinitionType, definition["type"])
            assert plan.payment_definitions.filter(id=pd.id).count() == 1
项目:dj-paypal    作者:HearthSim    | 项目源码 | 文件源码
def test_sync_executed_billing_agreement():
    ba = get_fixture("rest.billingagreement.execute.json")
    inst, created = models.BillingAgreement.get_or_update_from_api_data(ba, always_sync=True)
    assert created
    assert inst.id == ba["id"]
    assert inst.last_payment_date == parse_date("2017-08-24T11:47:17Z")
    assert inst.calculate_end_of_period() == parse_date("2017-09-24T11:47:17Z")
项目:infi.clickhouse_orm    作者:Infinidat    | 项目源码 | 文件源码
def to_python(self, value, timezone_in_use):
        if isinstance(value, datetime.datetime):
            return value.astimezone(pytz.utc) if value.tzinfo else value.replace(tzinfo=pytz.utc)
        if isinstance(value, datetime.date):
            return datetime.datetime(value.year, value.month, value.day, tzinfo=pytz.utc)
        if isinstance(value, int):
            return datetime.datetime.utcfromtimestamp(value).replace(tzinfo=pytz.utc)
        if isinstance(value, string_types):
            if value == '0000-00-00 00:00:00':
                return self.class_default
            if len(value) == 10:
                try:
                    value = int(value)
                    return datetime.datetime.utcfromtimestamp(value).replace(tzinfo=pytz.utc)
                except ValueError:
                    pass
            try:
                # left the date naive in case of no tzinfo set
                dt = iso8601.parse_date(value, default_timezone=None)
            except iso8601.ParseError as e:
                raise ValueError(text_type(e))

            # convert naive to aware
            if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None:
                dt = timezone_in_use.localize(dt)
            return dt.astimezone(pytz.utc)
        raise ValueError('Invalid value for %s - %r' % (self.__class__.__name__, value))
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def parse_isotime(timestr, default=None):
    """This duplicates oslo timeutils parse_isotime but with a
    @register.filter annotation and a silent fallback on error.
    """
    try:
        return iso8601.parse_date(timestr)
    except (iso8601.ParseError, TypeError):
        return default or ''
项目:fbarc    作者:justinlittman    | 项目源码 | 文件源码
def load_keys(args):
    """
    Get the Facebook API keys. Order of precedence is command line,
    environment, config file.
    """
    config = {}
    input_app_id = None
    input_app_secret = None
    input_short_access_token = None
    if args.config:
        config = load_config(args)
        if not config:
            input_app_id, input_app_secret, input_short_access_token = input_keys(args)
            if not input_short_access_token:
                save_config(args, input_app_id, input_app_secret)

    app_id = args.app_id or os.environ.get('APP_ID') or config.get('app_id') or input_app_id
    app_secret = args.app_secret or os.environ.get('APP_SECRET') or config.get('app_secret') or input_app_secret
    short_access_token = args.access_token or os.environ.get('ACCESS_TOKEN') or input_short_access_token
    long_access_token = config.get('access_token')
    expires_at = None
    if 'expires_at' in config:
        expires_at = iso8601.parse_date(config['expires_at'])

    if not (app_id and app_secret):
        sys.exit('App id and secret are required.')
    return app_id, app_secret, short_access_token, long_access_token, expires_at
项目:onedrive-e    作者:tobecontinued    | 项目源码 | 文件源码
def str_to_datetime(s):
    """
    :param str s:
    :return datetime.datetime:
    """
    return parse_date(s)
项目:weibo    作者:windskyer    | 项目源码 | 文件源码
def parse_isotime(timestr):
    """Parse time from ISO 8601 format"""
    try:
        return iso8601.parse_date(timestr)
    except iso8601.ParseError as e:
        raise ValueError(e.message)
    except TypeError as e:
        raise ValueError(e.message)
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def datetime_u(s):
    fmt = "%Y-%m-%dT%H:%M:%S"
    try:
        return _strptime(s, fmt)
    except ValueError:
        try:
            # strip zulu timezone suffix or utc offset
            if s[-1] == "Z" or (s[-3] == ":" and s[-6] in (' ', '-', '+')):
                try:
                    import iso8601
                    return iso8601.parse_date(s)
                except ImportError:
                    pass

                try:
                    import isodate
                    return isodate.parse_datetime(s)
                except ImportError:
                    pass

                try:
                    import dateutil.parser
                    return dateutil.parser.parse(s)
                except ImportError:
                    pass

                warnings.warn('removing unsupported "Z" suffix or UTC offset. Install `iso8601`, `isodate` or `python-dateutil` package to support it', RuntimeWarning)
                s = s[:-1] if s[-1] == "Z" else s[:-6]
            # parse microseconds
            try:
                return _strptime(s, fmt + ".%f")
            except:
                return _strptime(s, fmt)
        except ValueError:
            # strip microseconds (not supported in this platform)
            if "." in s:
                warnings.warn('removing unsuppported microseconds', RuntimeWarning)
                s = s[:s.index(".")]
            return _strptime(s, fmt)
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def parse_isotime(timestr):
    """Parse time from ISO 8601 format."""
    try:
        return iso8601.parse_date(timestr)
    except iso8601.ParseError as e:
        raise ValueError(six.text_type(e))
    except TypeError as e:
        raise ValueError(six.text_type(e))
项目:coscup-line-bot    作者:ncuoolab    | 项目源码 | 文件源码
def parse_wit_datime(dt):
    value = dt['value']
    return iso8601.parse_date(value)
项目:ckan-timeseries    作者:namgk    | 项目源码 | 文件源码
def timestamp_from_string(str):
    print(str)
    if (str.startswith( 'last ', 0, 5 )):
        # sample queries: 1m; 1m,2s; 1d,2h,3m,4s
        query = Timeseries_query(str.split('last ')[1])
        diff = datetime.timedelta(seconds=query.s, minutes=query.m, hours=query.h, days=query.d)
        return utcnow() - diff

    return iso8601.parse_date(str)
项目:django-wordpress-api    作者:swappsco    | 项目源码 | 文件源码
def get_context_data(self, **kwargs):
        api_kwargs = self.get_wp_api_kwargs(**kwargs)
        page = api_kwargs.get('page_number', 1)
        search = api_kwargs.get('search', '')
        blogs = WPApiConnector().get_posts(**api_kwargs)
        tags = WPApiConnector().get_tags(lang=self.blog_language)
        categories = WPApiConnector().get_categories(lang=self.blog_language)

        if 'server_error' in blogs or\
           'server_error' in tags:
            messages.add_message(self.request, messages.ERROR,
                                 blogs['server_error'])
            raise Http404
        if not blogs['body']:
            raise Http404
        for blog in blogs['body']:
            if blog['excerpt'] is not None:
                position = blog['excerpt'].find(
                    'Continue reading')
                if position != -1:
                    blog['excerpt'] = blog['excerpt'][:position]
            blog['slug'] = str(blog['slug'])
            blog['bdate'] = iso8601.parse_date(blog['date']).date()
        context = {
            'blogs': blogs['body'],
            'tags': tags,
            'categories': categories,
            'search': search,
            'total_posts': int(blogs['headers']['X-WP-Total']),
            'total_pages': int(blogs['headers']['X-WP-TotalPages']),
            'current_page': page,
            'previous_page': page - 1,
            'next_page': page + 1,
        }
        return context
项目:django-wordpress-api    作者:swappsco    | 项目源码 | 文件源码
def get_context_data(self, **kwargs):
        api_kwargs = self.get_wp_api_kwargs(**kwargs)
        page = api_kwargs.get('page_number', 1)
        search = api_kwargs.get('search', '')
        if not isinstance(page, int):
            page = 1
        blogs = WPApiConnector().get_posts(**api_kwargs)
        tags = WPApiConnector().get_tags(lang=self.blog_language)
        categories = WPApiConnector().get_categories(lang=self.blog_language)

        if 'server_error' in blogs or\
           'server_error' in tags:
            raise Http404
        if not blogs['body']:
            raise Http404
        for blog in blogs['body']:
            if blog['excerpt'] is not None:
                position = blog['excerpt'].find(
                    'Continue reading')
                if position != -1:
                    blog['excerpt'] = blog['excerpt'][:position]
            blog['slug'] = str(blog['slug'])
            blog['bdate'] = iso8601.parse_date(blog['date']).date()
        context = {
            'blogs': blogs['body'],
            'tags': tags,
            'categories': categories,
            'search': search,
            'total_posts': int(blogs['headers']['X-WP-Total']),
            'total_pages': int(blogs['headers']['X-WP-TotalPages']),
            'current_page': page,
            'previous_page': page - 1,
            'next_page': page + 1,
        }
        return context
项目:scriptabit    作者:DC23    | 项目源码 | 文件源码
def parse_date_utc(date, milliseconds=True):
    """Parses dates from ISO8601 or Epoch formats to a standard datetime object.

    This is particularly useful since Habitica returns dates in two
    formats::

        - iso8601 encoded strings
        - Long integer Epoch times

    Args:
        date (str): A date string in either iso8601 or Epoch format.
        milliseconds (bool): If True, then epoch times are treated as
            millisecond values, otherwise they are evaluated as seconds.

    Returns:
        datetime: The parsed date time in UTC.
    """

    parsed_date = None
    try:
        parsed_date = iso8601.parse_date(date)
    except iso8601.ParseError:
        value = int(date)
        # utcfromtimestamp expects values in seconds
        if milliseconds:
            value /= 1000
        parsed_date = datetime.utcfromtimestamp(value)

    return parsed_date.replace(tzinfo=pytz.utc)
项目:aw-server    作者:ActivityWatch    | 项目源码 | 文件源码
def get(self, bucket_id):
        args = request.args
        limit = int(args["limit"]) if "limit" in args else 100
        start = iso8601.parse_date(args["start"]) if "start" in args else None
        end = iso8601.parse_date(args["end"]) if "end" in args else None

        events = app.api.get_events(bucket_id, limit=limit, start=start, end=end)
        return events, 200

    # TODO: How to tell expect that it could be a list of events? Until then we can't use validate.
项目:aw-server    作者:ActivityWatch    | 项目源码 | 文件源码
def get(self, viewname):
        args = request.args
        start = iso8601.parse_date(args["start"]) if "start" in args else None
        end = iso8601.parse_date(args["end"]) if "end" in args else None

        result = app.api.query_view(viewname, start, end)
        return result, 200
项目:Habitica-todo    作者:eringiglio    | 项目源码 | 文件源码
def parse_date_utc(date, milliseconds=True):
    """Parses dates from ISO8601 or Epoch formats to a standard datetime object.

    This is particularly useful since Habitica returns dates in two
    formats::

        - iso8601 encoded strings
        - Long integer Epoch times

    Args:
        date (str): A date string in either iso8601 or Epoch format.
        milliseconds (bool): If True, then epoch times are treated as
            millisecond values, otherwise they are evaluated as seconds.

    Returns:
        datetime: The parsed date time in UTC.
    """

    parsed_date = None
    try:
        parsed_date = iso8601.parse_date(date)
    except iso8601.ParseError:
        value = int(date)
        # utcfromtimestamp expects values in seconds
        if milliseconds:
            value /= 1000
        parsed_date = datetime.utcfromtimestamp(value)

    return parsed_date.replace(tzinfo=pytz.utc)
项目:openregistry.api    作者:openprocurement    | 项目源码 | 文件源码
def to_native(self, value, context=None):
        if isinstance(value, datetime):
            return value
        try:
            date = parse_date(value, None)
            if not date.tzinfo:
                date = TZ.localize(date)
            return date
        except ParseError:
            raise ConversionError(self.messages['parse'].format(value))
        except OverflowError as e:
            raise ConversionError(e.message)
项目:vaultier    作者:Movile    | 项目源码 | 文件源码
def get(self, request):
        return Response({
            "datetime": iso8601.parse_date(datetime.utcnow().isoformat())
        })
项目:nirum-python    作者:spoqa    | 项目源码 | 文件源码
def deserialize_primitive(cls, data):
    if cls is datetime.datetime:
        try:
            d = parse_date(data)
        except iso8601.ParseError:
            raise ValueError("'{}' is not a datetime.".format(data))
    elif cls is datetime.date:
        try:
            d = parse_date(data).date()
        except iso8601.ParseError:
            raise ValueError("'{}' is not a date.".format(data))
    elif cls in {int, float, uuid.UUID, bool}:
        d = cls(data)
    elif cls is numbers.Integral:
        d = data
    elif cls is decimal.Decimal:
        try:
            d = cls(data)
        except decimal.InvalidOperation:
            raise ValueError("'{}' is not a decimal.".format(data))
    elif cls is text_type:
        if not isinstance(data, text_type):
            raise ValueError("'{}' is not a string.".format(data))
        d = cls(data)
    else:
        raise TypeError(
            "'{0}' is not a primitive type.".format(typing._type_repr(cls))
        )
    return d
项目:python-gnocchiclient    作者:gnocchixyz    | 项目源码 | 文件源码
def _convert_dates(cls, data):
        # NOTE(sileht): browse to aggregates measures dict tree and convert
        # date when we found timeseries, dict can looks like
        # {"aggregated": ...}, {"metric_id": {"agg": ...}} or
        # {"resource_id": {"metric_name": {"agg": ...}}}
        for key in data:
            if isinstance(data[key], list):
                data[key] = [(iso8601.parse_date(ts), g, value)
                             for ts, g, value in data[key]]
            elif isinstance(data[key], dict):
                cls._convert_dates(data[key])
            else:
                raise RuntimeError("Unexpected aggregates API output %s" %
                                   data[key])