Python datetime 模块,date() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用datetime.date()

项目:yeelight-controller    作者:kevinxw    | 项目源码 | 文件源码
def __compile_policy_value(self, str_value, value_dict):
        value = str_value
        if not '$' in value:
            return value
        self.__logger.debug("Compiling value %s", value)
        for key in value_dict:
            self.__logger.debug("Searching for key %s", key)
            val = value_dict[key]
            if id(type) and type(val) in (datetime, date):
                self.__logger.debug("Value in dictionary: %s -> %s", key, val)
                val = time.mktime(self.__get_localtime(val).timetuple())
                self.__logger.debug("Timestamp converted: %s -> %s", key, val)
            value = value.replace(self.__get_compiled_key(key), str(val))
            self.__logger.debug("Value after convertsion: %s", value)
        int_value = int(eval(value))
        compiled_value = self.__get_localtime(int_value)
        self.__logger.debug("Compiled value: %s", compiled_value)
        return compiled_value
项目:facebook-message-analysis    作者:szheng17    | 项目源码 | 文件源码
def get_date_to_message_statistic(self, message_statistic):
        """
        Maps each date between the date of the first message and the date of
        the last message, inclusive, to the sum of the values of a message
        statistic over all messages from that date. 

        Args:
            message_statistic: A function mapping a Message object to an int or
                a float.

        Returns:
            date_to_message_statistic: A dict mapping a date object between the
                date of the first message and the date of the last message to
                the sum of the values of message_statistic over all messages in
                self.messages from that date.
        """
        start_date = self.messages[0].timestamp.date()
        end_date = self.messages[-1].timestamp.date()
        date_range = [dt.date() for dt in rrule(DAILY, dtstart=start_date, until=end_date)]
        date_to_message_statistic = {d: 0 for d in date_range}
        for message in self.messages:
            date_to_message_statistic[message.timestamp.date()] += message_statistic(message)
        return date_to_message_statistic
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def _coerce_datetime(maybe_dt):
    if isinstance(maybe_dt, datetime.datetime):
        return maybe_dt
    elif isinstance(maybe_dt, datetime.date):
        return datetime.datetime(
            year=maybe_dt.year,
            month=maybe_dt.month,
            day=maybe_dt.day,
            tzinfo=pytz.utc,
        )
    elif isinstance(maybe_dt, (tuple, list)) and len(maybe_dt) == 3:
        year, month, day = maybe_dt
        return datetime.datetime(
            year=year,
            month=month,
            day=day,
            tzinfo=pytz.utc,
        )
    else:
        raise TypeError('Cannot coerce %s into a datetime.datetime'
                        % type(maybe_dt).__name__)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def get_last_trading_day_of_month(self, dt, env):
        self.month = dt.month

        if dt.month == 12:
            # Roll the year foward and start in January.
            year = dt.year + 1
            month = 1
        else:
            # Increment the month in the same year.
            year = dt.year
            month = dt.month + 1

        self.last_day = env.previous_trading_day(
            dt.replace(year=year, month=month, day=1)
        ).date()
        return self.last_day


# Stateful rules
项目:jobs    作者:josiahcarlson    | 项目源码 | 文件源码
def _to_ts(val):
    if isinstance(val, (int, float)):
        return val
    elif isinstance(val, (datetime, date)):
        return (val-EPOCH).total_seconds()
    v = TS_RE.match(val)
    if v:
        return float(v.group(0))
    v = DT_RE.match(val)
    if v:
        # get all of the passed datetime pieces, don't worry about them being
        # terribly valid, the datetime constructor will handle that ;)
        v = list(filter(None, v.groups()))
        if len(v) == 7:
            # truncate to microseconds as necessary
            v[6] = v[6][:6]
            # extend to microseconds as necessary
            v[6] += (6 - len(v[6])) * '0'
        v = list(map(int, v))
        dt = datetime(*v)
        return (dt-EPOCH).total_seconds()
    raise Exception("Value %r is not a timestamp, datetime, or date"%(val,))
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def itermonthdates(self, year, month):
        """
        Return an iterator for one month. The iterator will yield datetime.date
        values and will always iterate through complete weeks, so it will yield
        dates outside the specified month.
        """
        date = datetime.date(year, month, 1)
        # Go back to the beginning of the week
        days = (date.weekday() - self.firstweekday) % 7
        date -= datetime.timedelta(days=days)
        oneday = datetime.timedelta(days=1)
        while True:
            yield date
            date += oneday
            if date.month != month and date.weekday() == self.firstweekday:
                break
项目:stregsystemet    作者:f-klubben    | 项目源码 | 文件源码
def test_order_execute_single_no_remaining(self, fulfill):
        self.product.sale_set.create(
            price=100,
            member=self.member
        )
        self.product.start_date = datetime.date(year=2017, month=1, day=1)
        self.product.quantity = 1
        order = Order(self.member, self.room)

        item = OrderItem(self.product, order, 1)
        order.items.add(item)

        with self.assertRaises(NoMoreInventoryError):
            order.execute()

        fulfill.was_not_called()
项目:stregsystemet    作者:f-klubben    | 项目源码 | 文件源码
def test_order_execute_multi_some_remaining(self, fulfill):
        self.product.sale_set.create(
            price=100,
            member=self.member
        )
        self.product.start_date = datetime.date(year=2017, month=1, day=1)
        self.product.quantity = 2
        order = Order(self.member, self.room)

        item = OrderItem(self.product, order, 2)
        order.items.add(item)

        with self.assertRaises(NoMoreInventoryError):
            order.execute()

        fulfill.was_not_called()
项目:msdnhash    作者:mauricew    | 项目源码 | 文件源码
def index(request):
    latest_files = File.objects.order_by("-posted_date")[:10]
    latest_updated_families = ProductFamily.objects \
        .annotate(last_posted_date=Max('file__posted_date')) \
        .order_by('-last_posted_date')[:10]

    groups = ProductGroup.objects.order_by("name")
    total_count = File.objects.count()

    fcu_banner_expiration_date = datetime.date(2017, 11, 10)
    show_fcu_banner = datetime.date.today() < fcu_banner_expiration_date

    context = {
        'show_fcu_banner': show_fcu_banner,
        'latest_files': latest_files,
        'latest_updated_families': latest_updated_families,
        'groups': groups, 'total_count': total_count,
    }
    return render(request, 'msdn/index.html', context)
项目:tts-bug-bounty-dashboard    作者:18F    | 项目源码 | 文件源码
def test_activity_sets_sla_triaged_at():
    r = new_report()
    r.save()
    assert r.sla_triaged_at is None

    # An activity that shouldn't update sla_triaged_at
    d1 = now()
    r.activities.create(id=1, type='activity-comment', created_at=d1)
    assert r.sla_triaged_at is None

    # And now one that should
    d2 = d1 + datetime.timedelta(hours=3)
    r.activities.create(id=2, type='activity-bug-not-applicable', created_at=d2)
    assert r.sla_triaged_at == d2

    # And now another aciivity that would update the date, if it wasn't already set
    d3 = d2 + datetime.timedelta(hours=3)
    r.activities.create(id=3, type='activity-bug-resolved', created_at=d3)
    assert r.sla_triaged_at == d2
项目:HandDetection    作者:YunqiuXu    | 项目源码 | 文件源码
def load_all_url_files(_dir, file_name_prefix):
    url_list = []

    for file_name in os.listdir(_dir):
        if fnmatch.fnmatch(file_name, file_name_prefix +'*.txt'):
            file_name = osp.join(_dir, file_name)
            fp_urls = open(file_name, 'r')        #Open the text file called database.txt
            print 'load URLs from file: ' + file_name

            i = 0
            for line in fp_urls:
                line = line.strip()
                if len(line)>0:
                    splits = line.split('\t')
                    url_list.append(splits[0].strip())
                    i=i+1
            print str(i) + ' URLs loaded'
            fp_urls.close()

    return url_list         
########### End of Functions to Load downloaded urls ###########

############## Functions to get date/time strings ############
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def _check_annotations(value):
    """
    Recursively check that value is either of a "simple" type (number, string,
    date/time) or is a (possibly nested) dict, list or numpy array containing
    only simple types.
    """
    if isinstance(value, np.ndarray):
        if not issubclass(value.dtype.type, ALLOWED_ANNOTATION_TYPES):
            raise ValueError("Invalid annotation. NumPy arrays with dtype %s"
                             "are not allowed" % value.dtype.type)
    elif isinstance(value, dict):
        for element in value.values():
            _check_annotations(element)
    elif isinstance(value, (list, tuple)):
        for element in value:
            _check_annotations(element)
    elif not isinstance(value, ALLOWED_ANNOTATION_TYPES):
        raise ValueError("Invalid annotation. Annotations of type %s are not"
                         "allowed" % type(value))
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def _check_annotations(value):
    """
    Recursively check that value is either of a "simple" type (number, string,
    date/time) or is a (possibly nested) dict, list or numpy array containing
    only simple types.
    """
    if isinstance(value, np.ndarray):
        if not issubclass(value.dtype.type, ALLOWED_ANNOTATION_TYPES):
            raise ValueError("Invalid annotation. NumPy arrays with dtype %s"
                             "are not allowed" % value.dtype.type)
    elif isinstance(value, dict):
        for element in value.values():
            _check_annotations(element)
    elif isinstance(value, (list, tuple)):
        for element in value:
            _check_annotations(element)
    elif not isinstance(value, ALLOWED_ANNOTATION_TYPES):
        raise ValueError("Invalid annotation. Annotations of type %s are not"
                         "allowed" % type(value))
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def __init__(self, value):
        """
        Initializer value can be:

            - integer_type: absolute days from epoch (1970, 1, 1). Can be negative.
            - datetime.date: built-in date
            - string_type: a string time of the form "yyyy-mm-dd"
        """
        if isinstance(value, six.integer_types):
            self.days_from_epoch = value
        elif isinstance(value, (datetime.date, datetime.datetime)):
            self._from_timetuple(value.timetuple())
        elif isinstance(value, six.string_types):
            self._from_datestring(value)
        else:
            raise TypeError('Date arguments must be a whole number, datetime.date, or string')
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def test_date():
    """Test a simple dateline"""
    date_chart = DateLine(truncate_label=1000)
    date_chart.add('dates', [
        (date(2013, 1, 2), 300),
        (date(2013, 1, 12), 412),
        (date(2013, 2, 2), 823),
        (date(2013, 2, 22), 672)
    ])

    q = date_chart.render_pyquery()

    assert list(
        map(lambda t: t.split(' ')[0],
            q(".axis.x text").map(texts))) == [
                '2013-01-12',
                '2013-01-24',
                '2013-02-04',
                '2013-02-16']
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def test_date_xrange():
    """Test dateline with xrange"""
    datey = DateLine(truncate_label=1000)
    datey.add('dates', [
        (date(2013, 1, 2), 300),
        (date(2013, 1, 12), 412),
        (date(2013, 2, 2), 823),
        (date(2013, 2, 22), 672)
    ])

    datey.xrange = (date(2013, 1, 1), date(2013, 3, 1))

    q = datey.render_pyquery()
    assert list(
        map(lambda t: t.split(' ')[0],
            q(".axis.x text").map(texts))) == [
                '2013-01-01',
                '2013-01-12',
                '2013-01-24',
                '2013-02-04',
                '2013-02-16',
                '2013-02-27']
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def test_date_labels():
    """Test dateline with xrange"""
    datey = DateLine(truncate_label=1000)
    datey.add('dates', [
        (date(2013, 1, 2), 300),
        (date(2013, 1, 12), 412),
        (date(2013, 2, 2), 823),
        (date(2013, 2, 22), 672)
    ])

    datey.x_labels = [
        date(2013, 1, 1),
        date(2013, 2, 1),
        date(2013, 3, 1)
    ]

    q = datey.render_pyquery()
    assert list(
        map(lambda t: t.split(' ')[0],
            q(".axis.x text").map(texts))) == [
                '2013-01-01',
                '2013-02-01',
                '2013-03-01']
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def bind_processor(self, dialect):
        datetime_date = datetime.date
        format = self._storage_format

        def process(value):
            if value is None:
                return None
            elif isinstance(value, datetime_date):
                return format % {
                    'year': value.year,
                    'month': value.month,
                    'day': value.day,
                }
            else:
                raise TypeError("SQLite Date type only accepts Python "
                                "date objects as input.")
        return process
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def result_processor(self, dialect, coltype):
        def process(value):
            if isinstance(value, datetime.datetime):
                return value.date()
            elif isinstance(value, util.string_types):
                m = self._reg.match(value)
                if not m:
                    raise ValueError(
                        "could not parse %r as a date value" % (value, ))
                return datetime.date(*[
                    int(x or 0)
                    for x in m.groups()
                ])
            else:
                return value
        return process
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _expression_adaptations(self):
        return {
            operators.add: {
                Integer: self.__class__,
                Interval: DateTime,
                Time: DateTime,
            },
            operators.sub: {
                # date - integer = date
                Integer: self.__class__,

                # date - date = integer.
                Date: Integer,

                Interval: DateTime,

                # date - datetime = interval,
                # this one is not in the PG docs
                # but works
                DateTime: Interval,
            },
        }
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def test():
    "Runs several groups of tests of the database engine."
    # Test simple statements in SQL.
    persons = test_basic_sql()
    # Test various ways to select rows.
    test_row_selection(persons)
    # Test the four different types of joins in SQL.
    orders = test_all_joins(persons)
    # Test unstructured ways of joining tables together.
    test_table_addition(persons, orders)
    # Test creation and manipulation of databases.
    test_database_support()
    # Load and run some test on the sample Northwind database.
    northwind = test_northwind()
    # Test different date operations that can be performed.
    test_date_functionality()
    # Test various functions that operate on specified column.
    test_column_functions()
    if northwind:
        # Test ability to select columns with function processing.
        test_generic_column_functions(persons, northwind)
    # Test Database2 instances that support transactions.
    nw2 = test_transactional_database()
    # Allow for interaction at the end of the test.
    globals().update(locals())
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def test_date_functionality():
    "Tests different date operations that can be performed."
    # Create an orderz table to test the date type.
    orderz = Table(('OrderId', int), ('ProductName', str), ('OrderDate', date))
    orderz.insert(1, 'Geitost', date(2008, 11, 11))
    orderz.insert(2, 'Camembert Pierrot', date(2008, 11, 9))
    orderz.insert(3, 'Mozzarella di Giovanni', date(2008, 11, 11))
    orderz.insert(4, 'Mascarpone Fabioloi', date(2008, 10, 29))
    # Query the table for a specific date.
    orderz.where(ROW.OrderDate == date(2008, 11, 11)).print()
    # Update the orderz table so that times are present with the dates.
    orderz.alter_column('OrderDate', datetime)
    orderz.where(ROW.OrderId == 1) \
                        .update(OrderDate=datetime(2008, 11, 11, 13, 23, 44))
    orderz.where(ROW.OrderId == 2) \
                        .update(OrderDate=datetime(2008, 11, 9, 15, 45, 21))
    orderz.where(ROW.OrderId == 3) \
                        .update(OrderDate=datetime(2008, 11, 11, 11, 12, 1))
    orderz.where(ROW.OrderId == 4) \
                        .update(OrderDate=datetime(2008, 10, 29, 14, 56, 59))
    # Query the table with a datetime object this time.
    orderz.where(ROW.OrderDate == datetime(2008, 11, 11)).print()
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def add_one_month(t):
    """Return a `datetime.date` or `datetime.datetime` (as given) that is
    one month earlier.

    Note that the resultant day of the month might change if the following
    month has fewer days:

        >>> add_one_month(datetime.date(2010, 1, 31))
        datetime.date(2010, 2, 28)
    """
    import datetime
    one_day = datetime.timedelta(days=1)
    one_month_later = t + one_day
    while one_month_later.month == t.month:  # advance to start of next month
        one_month_later += one_day
    target_month = one_month_later.month
    while one_month_later.day < t.day:  # advance to appropriate day
        one_month_later += one_day
        if one_month_later.month != target_month:  # gone too far
            one_month_later -= one_day
            break
    return one_month_later
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def subtract_one_month(t):
    """Return a `datetime.date` or `datetime.datetime` (as given) that is
    one month later.

    Note that the resultant day of the month might change if the following
    month has fewer days:

        >>> subtract_one_month(datetime.date(2010, 3, 31))
        datetime.date(2010, 2, 28)
    """
    import datetime
    one_day = datetime.timedelta(days=1)
    one_month_earlier = t - one_day
    while one_month_earlier.month == t.month or one_month_earlier.day > t.day:
        one_month_earlier -= one_day
    return one_month_earlier
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def importData(fileName):
  """
    Main entry point - opens import data Excel sheet (assumed to be first 
    sheet) and processes each record, one at a time.
  """
  global importSheet, importRow, headers
  book = xlrd.open_workbook(fileName)
  importSheet = book.sheet_by_index(0)

  print "importing from %s" % importSheet.name

  # get cell value types from first-non header data row (2nd), since headers are all of type text  
  for colnum in range(0, importSheet.ncols):
    headers.append((importSheet.cell(0, colnum).value, importSheet.cell(1, colnum).ctype))

  configErrorReporting(headers)

  for importRow in range(1, importSheet.nrows):
    record = {}
    for colnum in range(0, importSheet.ncols):
      if headers[colnum][1] == xlrd.XL_CELL_DATE:
        dateTuple = xlrd.xldate_as_tuple(importSheet.cell(rowx=importRow, colx=colnum).value, book.datemode)
        date = datetime.date(dateTuple[0], dateTuple[1], dateTuple[2])
        # required format for xsd:Date type, for web service call
        record[headers[colnum][0]] = date.isoformat()
      else:
        value = importSheet.cell_value(rowx=importRow, colx=colnum)
        if isinstance(value, basestring):
          record[headers[colnum][0]] = value.strip()
        else:
          record[headers[colnum][0]] = value
      #print "%s: %s" % (type(record[headers[colnum]]), record[headers[colnum]])

    record = handleAccountContact(record)

  book.unload_sheet(importSheet.name)
项目:oriole-service    作者:zhouxiaoxiang    | 项目源码 | 文件源码
def _o(self, obj):
        """ Translate object to json.

        Dict in python is not json, so don't be confused.
        When return object from rpc, should always use _o.
        """

        if obj == None:
            return obj
        elif isinstance(obj, Decimal):
            return str(obj)
        elif isinstance(obj, datetime):
            return obj.strftime("%Y-%m-%d %H:%M:%S")
        elif isinstance(obj, date):
            return obj.strftime("%Y-%m-%d")
        elif isinstance(obj, (list, set, tuple)):
            return self._ol(obj)
        elif isinstance(obj, dict):
            return self._od(obj)
        elif isinstance(obj, (int, str, bool, float)):
            return obj
        else:
            return self._oo(obj)
项目:v2ex-tornado-2    作者:coderyy    | 项目源码 | 文件源码
def itermonthdates(self, year, month):
        """
        Return an iterator for one month. The iterator will yield datetime.date
        values and will always iterate through complete weeks, so it will yield
        dates outside the specified month.
        """
        date = datetime.date(year, month, 1)
        # Go back to the beginning of the week
        days = (date.weekday() - self.firstweekday) % 7
        date -= datetime.timedelta(days=days)
        oneday = datetime.timedelta(days=1)
        while True:
            yield date
            date += oneday
            if date.month != month and date.weekday() == self.firstweekday:
                break
项目:scrapy_projects    作者:morefreeze    | 项目源码 | 文件源码
def _datetime(d):
    if isinstance(d, datetime.datetime):
        return d
    if isinstance(d, datetime.date):
        return datetime.datetime(d.year, d.month, d.day)
    try:
        n = datetime.datetime.strptime(d, '%Y-%m-%d %H:%M:%S')
    except ValueError:
        try:
            n = datetime.datetime.strptime(d, "%Y-%m-%d %H:%M:%S.%f")
        except ValueError:
            try:
                n = datetime.datetime.strptime(d, "%Y-%m-%d")
            except ValueError:
                n = datetime.datetime.strptime(d, "%Y-%m-%d %H:%M")
    return n
项目:gloss    作者:openhealthcare    | 项目源码 | 文件源码
def test_only_update_with_existent_fields(self):
        messages = [PatientUpdateMessage(
            first_name="Mary"
        )]
        container = MessageContainer(
            message_type=PatientUpdateMessage,
            messages=messages,
            hospital_number="50092915",
            issuing_source="uclh"
        )
        subscription = UclhPatientUpdateSubscription()
        subscription.notify(container)
        patient = self.session.query(Patient).one()

        # only update first name
        self.assertEqual("Mary", patient.first_name)
        self.assertEqual("Smith", patient.surname)
        self.assertEqual(None, patient.middle_name)
        self.assertEqual("Ms", patient.title)
        self.assertEqual(date(1983, 12, 12), patient.date_of_birth)
项目:onefl-deduper    作者:ufbmi    | 项目源码 | 文件源码
def format_date_as_string(val, fmt='%m-%d-%Y'):
    """
    :rtype str:
    :return the input value formatted as '%Y-%m-%d'

    :param val: datetime or string
    :param fmt: the input format for the date
    """
    if isinstance(val, date):
        return val.strftime(fmt)

    da = format_date(val, fmt)

    if not da:
        return ''

    return da.strftime(FORMAT_DATABASE_DATE)
项目:onefl-deduper    作者:ufbmi    | 项目源码 | 文件源码
def format_date(val, fmt='%m-%d-%Y'):
    """
    Transform the input string to a datetime object

    :param val: the input string for date
    :param fmt: the input format for the date
    """
    date_obj = None

    try:
        date_obj = datetime.strptime(val, fmt)
    except Exception as exc:
        log.warning("Problem formatting date: {} - {} due: {}"
                    .format(val, fmt, exc))

    return date_obj
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_format_updated_projects(self):
        """Test format updated projects works."""
        old_date = date(2014, 11, 24)
        old_project = ProjectFactory.create(updated=old_date)
        p = ProjectFactory.create()
        p.name = 'NewNewNew'
        project_repo = ProjectRepository(db)
        project_repo.update(p)
        update_projects_week()
        day = datetime.utcnow().strftime('%Y-%m-%d')

        res = format_update_projects()

        assert len(res) == 1, res
        res = res[0]
        assert res['day'] == day, res['day']
        assert res['id'] == p.id
        assert res['short_name'] == p.short_name
        assert res['p_name'] == p.name
        assert res['email_addr'] == p.owner.email_addr
        assert res['owner_id'] == p.owner.id
        assert res['u_name'] == p.owner.name
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def default(self, o):
        """Implement this method in a subclass such that it returns a
        serializable object for ``o``, or calls the base implementation (to
        raise a :exc:`TypeError`).

        For example, to support arbitrary iterators, you could implement
        default like this::

            def default(self, o):
                try:
                    iterable = iter(o)
                except TypeError:
                    pass
                else:
                    return list(iterable)
                return JSONEncoder.default(self, o)
        """
        if isinstance(o, date):
            return http_date(o.timetuple())
        if isinstance(o, uuid.UUID):
            return str(o)
        if hasattr(o, '__html__'):
            return text_type(o.__html__())
        return _json.JSONEncoder.default(self, o)
项目:facebook-message-analysis    作者:szheng17    | 项目源码 | 文件源码
def get_month_to_message_statistic(self, message_statistic):
        """
        Maps each month between the month of the first message and the month of
        the last message, inclusive, to the sum of the values of a message
        statistic over all messages from that month. 

        Args:
            message_statistic: A function mapping a Message object to an int or
                a float.

        Returns:
            month_to_message_statistic: A dict mapping a string of the form
                'YYYY-MM' representing a month between the month of the first
                message and the month of the last message to the sum of the
                values of message_statistic over all messages in self.messages
                from that month.
        """
        start_dt = self.messages[0].timestamp
        end_dt = self.messages[-1].timestamp
        start_date_month_start = datetime(start_dt.year, start_dt.month, 1)
        end_date_month_start = datetime(end_dt.year, end_dt.month, 1)
        dt_month_range = rrule(MONTHLY, dtstart=start_date_month_start,
                               until=end_date_month_start)
        month_range = [dt.date() for dt in dt_month_range]
        month_to_message_statistic = {dt.strftime(self.MONTH_FORMAT): 0 for dt in month_range}
        for message in self.messages:
            month_str = message.timestamp.strftime(self.MONTH_FORMAT)
            month_to_message_statistic[month_str] += message_statistic(message)
        return month_to_message_statistic
项目:facebook-message-analysis    作者:szheng17    | 项目源码 | 文件源码
def get_time_interval_to_message_statistic(self, time_interval,
                                               message_statistic):
        """
        Maps each value of time interval to the sum of the values of a message
        statistic over all messages from that time interval value. Wrapper
        function for the functions 'get_' + time_interval.replace(' ', '_') +
        '_to_message_statistic'.

        Args:
            time_interval: One of 'minute in hour', 'minute in day', 'hour',
                'date', 'week', 'month', 'year'.
            message_statistic: A function mapping a Message object to an int or
                a float.

        Returns:
            time_interval_to_message_statistic: A dict mapping each value of a
                time interval to the sum of the values of message_statistic
                over all messages in self.messages from that time interval
                value.
        """

        if time_interval not in self.TIME_INTERVALS:
            raise ValueError('time_interval must be in {}'.format(self.TIME_INTERVALS))
        getter = getattr(self, 'get_' + time_interval.replace(' ', '_') + '_to_message_statistic')
        time_interval_to_message_statistic = getter(message_statistic)
        return time_interval_to_message_statistic
项目:calendary    作者:DavidHickman    | 项目源码 | 文件源码
def test_calendary_weekday_returns_ordinal(self):
        third_tuesday_in_july = Calendary(2016).weekday('Tuesday', month=7, ordinal=3)
        assert third_tuesday_in_july == ('Tuesday', datetime.date(month=7, day=19, year=2016))
项目:calendary    作者:DavidHickman    | 项目源码 | 文件源码
def test_calendary_weekday_returns_right_day_for_year(self):
        first_monday_in_nineteen_fortyseven = Calendary(1947).weekday('Monday', ordinal=1)
        assert first_monday_in_nineteen_fortyseven == ('Monday', datetime.date(month=1, day=6, year=1947))
项目:calendary    作者:DavidHickman    | 项目源码 | 文件源码
def test_weekday_with_multiple_inputs_returns_multiple_days(self):
        mondays_and_tuesdays = Calendary(2016).weekday((0, 1), month=7)
        mondays = [d[1] for d in mondays_and_tuesdays if d[0] == 'Monday']
        tuesdays = [d[1] for d in mondays_and_tuesdays if d[0] == 'Tuesday']
        monday_july_eighteenth = datetime.date(year=2016, month=7, day=18)
        tuesday_july_nineteenth = datetime.date(year=2016, month=7, day=19)
        assert monday_july_eighteenth in mondays
        assert tuesday_july_nineteenth in tuesdays
项目:nanobot    作者:bgporter    | 项目源码 | 文件源码
def Log(self, eventType, dataList):
      '''
         Create an entry in the log file. Each entry will look like:
         timestamp\tevent\tdata1\tdata2 <etc>\n
         where:
         timestamp = integer seconds since the UNIX epoch
         event = string identifying the event
         data1..n = individual data fields, as appropriate for each event type.
         To avoid maintenance issues w/r/t enormous log files, the log filename 
         that's stored in the settings file is passed through datetime.strftime()
         so we can expand any format codes found there against the current date/time
         and create e.g. a monthly log file.
      '''
      now = int(time())
      today = datetime.fromtimestamp(now)
      # if there's no explicit log file path/name, we create one
      # that's the current year & month.
      fileName = self.settings.logFilePath
      if not fileName:
         fileName = "%Y-%m.txt"
         self.settings.logFilePath = fileName
      path = self.GetPath(fileName)
      path = today.strftime(path)
      with open(path, "a+t") as f:
         f.write("{0}\t{1}\t".format(now, eventType))
         f.write("\t".join(dataList))
         f.write("\n")
项目:Modeling_Preparation    作者:Yangruipis    | 项目源码 | 文件源码
def is_due_date(date):
    if 15 <= date.day <= 21:
        if date.weekday() == 4:
            return True
    return False
项目:Modeling_Preparation    作者:Yangruipis    | 项目源码 | 文件源码
def gen_due_date(year, month):
    date0 = datetime.date(year,month, 1)
    for i in range(31):
        date0 = date0 + datetime.timedelta(1)
        if is_due_date(date0):
            return date0
    return None
项目:Modeling_Preparation    作者:Yangruipis    | 项目源码 | 文件源码
def get_due_date(date):
    due_date_this_month = gen_due_date(date.year, date.month)
    if date.month != 12:
        due_date_next_month = gen_due_date(date.year, date.month + 1)
    else:
        due_date_next_month = gen_due_date(date.year - 1, 1)
    if date > due_date_this_month:
        return due_date_next_month
    else:
        return due_date_this_month
项目:Telebackup    作者:LonamiWebs    | 项目源码 | 文件源码
def get_previous_and_next_day(db, message_date):
        """Gets the previous and following saved days given the day in between in the database"""
        previous = db.query_message("where date < '{}' order by id desc"
                                    .format(message_date))
        following = db.query_message("where date >= '{}' order by id asc"
                                     .format(message_date+timedelta(days=1)))

        return Exporter.get_message_date(previous), Exporter.get_message_date(following)
项目:Telebackup    作者:LonamiWebs    | 项目源码 | 文件源码
def get_message_date(message):
        """Retrieves the given message DATE, ignoring the time (hour, minutes, seconds, etc.)"""
        if message:
            return date(year=message.date.year, month=message.date.month, day=message.date.day)

    #endregion
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def _build_date(date, kwargs):
    """
    Builds the date argument for event rules.
    """
    if date is None:
        if not kwargs:
            raise ValueError('Must pass a date or kwargs')
        else:
            return datetime.date(**kwargs)

    elif kwargs:
        raise ValueError('Cannot pass kwargs and a date')
    else:
        return date
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def _get_open(self, dt, env):
        """
        Cache the open for each day.
        """
        if self._dt is None or (self._dt.date() != dt.date()):
            self._dt = env.get_open_and_close(dt)[0] \
                - datetime.timedelta(minutes=1)

        return self._dt
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def _get_close(self, dt, env):
        """
        Cache the close for each day.
        """
        if self._dt is None or (self._dt.date() != dt.date()):
            self._dt = env.get_open_and_close(dt)[1]

        return self._dt
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def should_trigger(self, dt, env):
        return dt.date() not in env.early_closes
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def get_first_trading_day_of_week(self, dt, env):
        prev = dt
        dt = env.previous_trading_day(dt)
        while dt.date().weekday() < prev.date().weekday():
            prev = dt
            dt = env.previous_trading_day(dt)
        return prev.date()
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def __init__(self, n):
        if not 0 <= n < MAX_WEEK_RANGE:
            raise _out_of_range_error(MAX_WEEK_RANGE)
        self.td_delta = -n
        self.date = None