我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用dateutil.rrule.rrulestr()。
def occurrences(self): import datetime from dateutil import rrule datelist = [] if self.rrule: rr = rrule.rrulestr(self.rrule, dtstart=self.start_date) start = datetime.datetime.combine(self.start_date, datetime.datetime.min.time()) end = start + datetime.timedelta(days=365) datelist = rr.between(start, end, inc=True) else: if self.start_date >= datetime.date.today(): datelist.append(datetime.datetime.combine(self.start_date, datetime.time(0, 0, 0))) if not self.start_time: return datelist return [datetime.datetime.combine(x.date(), self.start_time) for x in datelist]
def testRRuleAll(self): from dateutil.rrule import rrule from dateutil.rrule import rruleset from dateutil.rrule import rrulestr from dateutil.rrule import YEARLY, MONTHLY, WEEKLY, DAILY from dateutil.rrule import HOURLY, MINUTELY, SECONDLY from dateutil.rrule import MO, TU, WE, TH, FR, SA, SU rr_all = (rrule, rruleset, rrulestr, YEARLY, MONTHLY, WEEKLY, DAILY, HOURLY, MINUTELY, SECONDLY, MO, TU, WE, TH, FR, SA, SU) for var in rr_all: self.assertIsNot(var, None) # In the public interface but not in all from dateutil.rrule import weekday self.assertIsNot(weekday, None)
def rrulestr_to_text(rrule_str, dtstart=None): rrstr = rrule.rrulestr(rrule_str, dtstart=dtstart) if hasattr(rrstr, '_freq'): return rrule_to_text(rrule.rrulestr(rrule_str, dtstart=dtstart)) elif hasattr(rrstr, '_rrule'): output = [rrule_to_text(r) for r in rrstr._rrule] if hasattr(rrstr, '_rdate') and rrstr._rdate: output.extend([' ', _('and also'), ' ']) output.extend(_list(map(lambda d: datetime.datetime.strftime(d, '%B %d, %Y'), rrstr._rdate))) if hasattr(rrstr, '_exdate') and rrstr._exdate: output.extend([' ', _('except'), ' ']) output.extend(_list(map(lambda d: datetime.datetime.strftime(d, DATE_FORMAT), rrstr._exdate))) return "".join(output)
def rrulestr_to_schedule(rrule_str, dtstart=None): rrstr = rrule.rrulestr(rrule_str, dtstart=dtstart) if hasattr(rrstr, '_freq'): return [rrule_to_schedule(rrule.rrulestr(rrule_str, dtstart=dtstart), dtstart)] elif hasattr(rrstr, '_rrule'): output = [rrule_to_schedule(r, dtstart) for r in rrstr._rrule] exceptions = [] extras = [] if hasattr(rrstr, '_rdate') and rrstr._rdate: extras.extend(_list(map(lambda d: datetime.datetime.strftime(d, '%a %x'), rrstr._rdate))) output = [x._replace(extras="".join(extras)) for x in output] if hasattr(rrstr, '_exdate') and rrstr._exdate: exceptions.extend(_list(map(lambda d: datetime.datetime.strftime(d, DATE_FORMAT), rrstr._exdate))) output = [x._replace(exceptions="".join(exceptions)) for x in output] return output
def Poll (self, within_seconds=None): # print "poll within", within_seconds results = [ ] candidates = self.master.openaps.things.get('schedules', [ ]) now = datetime.datetime.now( ) # print "polling schedules", len(candidates), now.isoformat( ), 'for', self.MaxTasksAhead, 'MaxTasksAhead' for configured in candidates: # print "SCHEDULE", configured.item.fields # spec = recurrent.parse(configured.item.fields['rrule'], now=self.since) spec = configured.item.fields['rrule'] rr = rrule.rrulestr(spec, dtstart=self.since) # print configured.item.fields['rrule'], spec upcoming = rr.after(now) # print "next", upcoming.isoformat( ) # XXX: bug in making: need to fill out all events before within_seconds as well. # if (upcoming - now).total_seconds( ) <= within_seconds: for upcoming in iter_triggers(upcoming, rr, within_seconds): # print "ARM THING", configured.path # print "ATTEMPT ARM", configured.item.name, configured.path, spec # self.enqueue(upcoming, configured) trigger = Armable(upcoming, configured) # exists = self.schedules[(upcoming, configured.item.name)] results.append(trigger) return results pass
def __call__(self, *args, **kwargs): # Pop out custom keyword arguments added during scheduling and # previous run so as not to pollute the task function's # namespace. scheduled_task_id = kwargs.pop('scheduled_task_id') rrule_string = kwargs.pop('rrule_string') _first_eta = kwargs.pop('first_eta') _eta = kwargs.pop('eta') _until = kwargs.pop('until') first_eta = TaskScheduler.strptime(_first_eta) eta = TaskScheduler.strptime(_eta) until = TaskScheduler.strptime(_until) scheduled_task = ScheduledTask.objects.get(pk=scheduled_task_id) # If the task been manually set as ScheduledTask.STATUS_CANCELLED, # stop the execution. if scheduled_task.status == ScheduledTask.STATUS_CANCELLED: return scheduled_task.save_status(ScheduledTask.STATUS_RUNNING) ScheduledTaskRunLog.objects.create(task_id=self.request.id, scheduled_task=scheduled_task) # If a CancelSchedule exception is raied by the function, # cancel the schedule and exit. try: result = super(RepeatTask, self).__call__(*args, **kwargs) except CancelSchedule: TaskScheduler.cancel(scheduled_task_id=scheduled_task.id) return else: scheduled_task.save_status(ScheduledTask.STATUS_SUCCESS) # If rrule string is not specified, assume it to be a one time # task. if not rrule_string: return result # Preserve the start and end of rrule cycle. rrule_ = rrulestr(rrule_string).replace(dtstart=first_eta, until=until) next_eta = TaskScheduler.calculate_next_eta(rrule_=rrule_, current_eta=eta) # If rrule does not return an ETA, assume it to be the end of # schedule and exit. if not next_eta: return result # Add custom keyword arguments again for the next run. kwargs.update({ 'scheduled_task_id': scheduled_task.id, 'rrule_string': rrule_string, 'first_eta': _first_eta, 'eta': TaskScheduler.strftime(next_eta), 'until': TaskScheduler.strftime(until), }) self.apply_async(eta=next_eta, args=args, kwargs=kwargs) return result