Source code for pyveg.src.date_utils

"""
Useful functions for manipulating dates and date strings,
e.g. splitting a period into sub-periods.

When dealing with date strings, ALWAYS use the ISO format YYYY-MM-DD

"""

import dateparser
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import re


[docs]def get_num_n_day_slices(start_date, end_date, days_per_chunk): """ Divide the full period between the start_date and end_date into n equal-length (to the nearest day) chunks. The size of the chunk is defined by days_per_chunk. Takes start_date and end_date as strings 'YYYY-MM-DD'. Returns an integer with the number of possible points avalaible in that time period] """ start = dateparser.parse(start_date) end = dateparser.parse(end_date) if (not isinstance(start, datetime)) or (not isinstance(end, datetime)): raise RuntimeError("invalid time strings") td = end - start if td.days <= 0: raise RuntimeError("end_date must be after start_date") n = td.days // days_per_chunk return n
[docs]def slice_time_period_into_n(start_date, end_date, n): """ Divide the full period between the start_date and end_date into n equal-length (to the nearest day) chunks. Takes start_date and end_date as strings 'YYYY-MM-DD'. Returns a list of tuples [ (chunk0_start,chunk0_end),...] """ start = dateparser.parse(start_date) end = dateparser.parse(end_date) if (not isinstance(start, datetime)) or (not isinstance(end, datetime)): raise RuntimeError("invalid time strings") td = end - start if td.days <= 0: raise RuntimeError("end_date must be after start_date") days_per_chunk = td.days // n output_list = [] for i in range(n): chunk_start = start + timedelta(days=(i * days_per_chunk)) chunk_end = start + timedelta(days=((i + 1) * days_per_chunk)) ## unless we are in the last chunk, which should finish at end_date if i == n - 1: chunk_end = end output_list.append( (chunk_start.isoformat().split("T")[0], chunk_end.isoformat().split("T")[0]) ) return output_list
[docs]def slice_time_period(start_date, end_date, period_length): """ Slice a time period into chunks, whose length is determined by the period_length, which will be e.g. '30d' for 30 days, or '1m' for one month. Parameters ========== start_date: str, format YYYY-MM-DD end_date: str, format YYYY-MM-DD period_length: str, format '<integer><d|w|m|y>', e.g. 30d Returns ======= periods: list of lists of strings in format YYYY-MM-DD, each of which is the start and end of a sub-period """ periods = [] start_datetime = datetime.fromisoformat(start_date) end_datetime = datetime.fromisoformat(end_date) # parse the period_length match = re.search("^([\d]+)([dwmy])", period_length) if not match: raise RuntimeError("Period length must be in format '<int><d|w|m|y>', e.g. 30d") num, units = match.groups() num = int(num) previous_date = start_datetime t = previous_date while True: if units == "d": t = previous_date + relativedelta(days=+num) elif units == "w": t = previous_date + relativedelta(weeks=+num) elif units == "m": t = previous_date + relativedelta(months=+num) else: t = previous_date + relativedelta(years=+num) # break out of the loop if we go after the end of our time period if t > end_datetime: break # otherwise, add this sub-period to the list periods.append( [previous_date.isoformat().split("T")[0], t.isoformat().split("T")[0]] ) previous_date = t return periods
[docs]def find_mid_period(start_date, end_date): """ Given two strings in the format YYYY-MM-DD return a string in the same format representing the middle (to the nearest day) Parameters ========== start_date: str, date in format YYYY-MM-DD end_date: str, date in format YYYY-MM-DD Returns ======= mid_date: str, mid point of those dates, format YYYY-MM-DD """ t0 = dateparser.parse(start_date) t1 = dateparser.parse(end_date) td = (t1 - t0).days mid = (t0 + timedelta(days=(td // 2))).isoformat() mid_date = mid.split("T")[0] return mid_date
[docs]def get_date_strings_for_time_period(start_date, end_date, period_length): """ Use the two functions above to slice a time period into sub-periods, then find the mid-date of each of these. Parameters ========== start_date: str, format YYYY-MM-DD end_date: str, format YYYY-MM-DD period_length: str, format '<integer><d|w|m|y>', e.g. 30d Returns ======= periods: list of strings in format YYYY-MM-DD, each of which is the mid-point of a sub-period """ sub_periods = slice_time_period(start_date, end_date, period_length) date_strings = [find_mid_period(p[0], p[1]) for p in sub_periods] return date_strings
[docs]def get_date_range_for_collection(date_range, coll_dict): """ Return the intersection of the date range asked for by the user, and the min and max dates for that collection. Parameters ========== date_range: list or tuple of strings, format YYYY-MM-DD coll_dict: dictionary containing min_date and max_date kyes Returns ======= tuple of strings, format YYYY-MM-DD """ if not "min_date" in coll_dict.keys() or (not "max_date" in coll_dict.keys()): return date_range datetime_range = [dateparser.parse(d) for d in date_range] collection_min = dateparser.parse(coll_dict["min_date"]) collection_max = dateparser.parse(coll_dict["max_date"]) date_min = ( datetime_range[0] if datetime_range[0] > collection_min else collection_min ) date_max = ( datetime_range[1] if datetime_range[1] < collection_max else collection_max ) return (date_min.isoformat().split("T")[0], date_max.isoformat().split("T")[0])
[docs]def assign_dates_to_tasks(date_list, n_tasks): """ For batch jobs, will want to split dates as evenly as possible over some number of tasks. """ output_lists = [[] for _ in range(min(n_tasks, len(date_list)))] j = 0 while j < len(date_list): for i in range(n_tasks): output_lists[i].append(date_list[j]) j += 1 if j == len(date_list): break return output_lists
[docs]def get_time_diff(date1, date2, units="years"): """ calculate the time difference between two dates, Parameters ========== date1, date2: str, dates in format YYYY-MM-DD units: str, can be "years", "months", "days" Returns ======= time_diff: int, difference in times, in specified units """ if not isinstance(date1, datetime): date1 = dateparser.parse(date1) if not isinstance(date2, datetime): date2 = dateparser.parse(date2) diff = relativedelta(date1, date2) if units == "years": return diff.years elif units == "months": return diff.months elif units == "days": return diff.days else: print("unknown units for time_diff: {}".format(units)) return None