1
0
mirror of https://github.com/exaloop/codon.git synced 2025-06-03 15:03:52 +08:00
codon/stdlib/numpy/npdatetime.codon
A. R. Shajii b8c1eeed36
2025 updates (#619)
* 2025 updates

* Update ci.yml
2025-01-29 15:41:43 -05:00

2976 lines
96 KiB
Python

# Copyright (C) 2022-2025 Exaloop Inc. <https://exaloop.io>
import datetime
import math
_DATETIME_NAT: Static[int] = -9_223_372_036_854_775_808
_DATETIME_MAX_ISO8601_STRLEN: Static[int] = (21 + 3 * 5 + 1 + 3 * 6 + 6 + 1)
_FR_ERROR: Static[int] = -1 # error or undetermined
_FR_Y: Static[int] = 0 # Years
_FR_M: Static[int] = 1 # Months
_FR_W: Static[int] = 2 # Weeks
_FR_D: Static[int] = 4 # Days
_FR_h: Static[int] = 5 # hours
_FR_m: Static[int] = 6 # minutes
_FR_s: Static[int] = 7 # seconds
_FR_ms: Static[int] = 8 # milliseconds
_FR_us: Static[int] = 9 # microseconds
_FR_ns: Static[int] = 10 # nanoseconds
_FR_ps: Static[int] = 11 # picoseconds
_FR_fs: Static[int] = 12 # femtoseconds
_FR_as: Static[int] = 13 # attoseconds
_FR_GENERIC: Static[int] = 14 # unbound units, can convert to anything
_DATETIME_NUMUNITS: Static[int] = (_FR_GENERIC + 1)
_DATETIME_DEFAULTUNIT: Static[int] = _FR_GENERIC
@tuple
class _time_t:
year: i16
yday: i16
sec: i8
min: i8
hour: i8
mday: i8
mon: i8
wday: i8
isdst: i8
def __new__() -> _time_t:
return (i16(0), i16(0), i8(0), i8(0), i8(0), i8(0), i8(0), i8(0),
i8(0))
@pure
@llvm
def _cdiv(a: T, b: T, T: type) -> T:
%c = sdiv {=T} %a, %b
ret {=T} %c
@pure
@llvm
def _cmod(a: T, b: T, T: type) -> T:
%c = srem {=T} %a, %b
ret {=T} %c
def _extract_unit_64(d: int, unit: int):
div = _cdiv(d, unit)
mod = _cmod(d, unit)
if mod < 0:
mod += unit
div -= 1
return div, mod
def _extract_unit_32(d: i32, unit: i32):
div = _cdiv(d, unit)
mod = _cmod(d, unit)
if mod < i32(0):
mod += unit
div -= i32(1)
return div, mod
_datetime_strings = ("Y", "M", "W", "<invalid>", "D", "h", "m", "s", "ms",
"us", "ns", "ps", "fs", "as", "generic")
_days_per_month_table = ((31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31),
(31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31))
_multiples_table = ((12, 52, 365, 0), (_FR_M, _FR_W, _FR_D, 0),
(4, 30, 720, 0), (_FR_W, _FR_D, _FR_h, 0),
(7, 168, 10080, 0), (_FR_D, _FR_h, _FR_m, 0), (0, 0, 0, 0),
(0, 0, 0, 0), (24, 1440, 86400, 0), (_FR_h, _FR_m, _FR_s,
0), (60, 3600, 0, 0),
(_FR_m, _FR_s, 0, 0), (60, 60000, 0, 0),
(_FR_s, _FR_ms, 0, 0), (1000, 1000000, 0, 0), (0, 0, 0, 0))
_datetime_factors = (1, 1, 7, 1, 24, 60, 60, 1000, 1000, 1000, 1000, 1000,
1000, 1, 0)
def _is_leapyear(year: int):
return year & 0x3 == 0 and (_cmod(year, 100) != 0 or _cmod(year, 400) == 0)
def _days_to_yearsdays(days: int):
days_per_400years: Static[int] = 400 * 365 + 100 - 4 + 1
days = days - (365 * 30 + 7)
div, days = _extract_unit_64(days, days_per_400years)
year = 400 * div
if days >= 366:
year += 100 * ((days - 1) // (100 * 365 + 25 - 1))
days = (days - 1) % (100 * 365 + 25 - 1)
if days >= 365:
year += 4 * ((days + 1) // (4 * 365 + 1))
days = (days + 1) % (4 * 365 + 1)
if days >= 366:
year += (days - 1) // 365
days = (days - 1) % 365
return days, year + 2000
def _days_to_month_number(days: int):
days, year = _days_to_yearsdays(days)
month_lengths = _days_per_month_table[int(_is_leapyear(year))]
for i in range(12):
if days < month_lengths[i]:
return i + 1
else:
days -= month_lengths[i]
# should never be reached
return 1
def _get_datetime_units_factor(bigbase: int, littlebase: int):
factor = u64(1)
unit = bigbase
while unit < littlebase:
factor *= u64(_datetime_factors[unit])
if factor & u64(0xff00000000000000):
return u64(0)
unit += 1
return factor
def _uint64_euclidean_gcd(x: u64, y: u64):
if x > y:
tmp = x
x = y
y = tmp
while x != y and y:
tmp = x % y
x = y
y = tmp
return x
def _get_time():
@C
def time(a: cobj) -> int:
pass
return time(cobj())
def _get_localtime(ts: int = -1):
tm = _time_t()
if not _C.seq_localtime(ts, __ptr__(tm).as_byte()):
raise OSError("Failed to get local time")
return tm
def _unit_error(base: Static[str]):
compile_error("Invalid datetime unit in metadata string: \"[" + base +
"]\"")
def _validate_base(base: Static[str]):
if (base != "Y" and base != "M" and base != "W" and base != "D"
and base != "h" and base != "m" and base != "s" and base != "ms"
and base != "us" and base != "ns" and base != "ps" and base != "fs"
and base != "as" and base != "generic"):
_unit_error(base)
def _validate_num(num: Static[int]):
if num <= 0 or num > 0x7fffffff:
compile_error("Invalid datetime number (out of range 1..0x7fffffff)")
def _base_code(base: Static[str]):
if base == "Y":
return _FR_Y
elif base == "M":
return _FR_M
elif base == "W":
return _FR_W
elif base == "D":
return _FR_D
elif base == "h":
return _FR_h
elif base == "m":
return _FR_m
elif base == "s":
return _FR_s
elif base == "ms":
return _FR_ms
elif base == "us":
return _FR_us
elif base == "ns":
return _FR_ns
elif base == "ps":
return _FR_ps
elif base == "fs":
return _FR_fs
elif base == "as":
return _FR_as
elif base == "generic":
return _FR_GENERIC
else:
_unit_error(base)
def _base_code_nonstatic(base: str):
if base == "Y":
return _FR_Y
elif base == "M":
return _FR_M
elif base == "W":
return _FR_W
elif base == "D":
return _FR_D
elif base == "h":
return _FR_h
elif base == "m":
return _FR_m
elif base == "s":
return _FR_s
elif base == "ms":
return _FR_ms
elif base == "us":
return _FR_us
elif base == "ns":
return _FR_ns
elif base == "ps":
return _FR_ps
elif base == "fs":
return _FR_fs
elif base == "as":
return _FR_as
elif base == "generic":
return _FR_GENERIC
else:
raise ValueError(
f"Invalid datetime unit in metadata string: \"[{base}]\"")
def _base_str(base: Static[str]):
if base == "Y":
return "years"
elif base == "M":
return "months"
elif base == "W":
return "weeks"
elif base == "D":
return "days"
elif base == "h":
return "hours"
elif base == "m":
return "minutes"
elif base == "s":
return "seconds"
elif base == "ms":
return "milliseconds"
elif base == "us":
return "microseconds"
elif base == "ns":
return "nanoseconds"
elif base == "ps":
return "picoseconds"
elif base == "fs":
return "femtoseconds"
elif base == "as":
return "attoseconds"
elif base == "generic":
return "generic time units"
else:
_unit_error(base)
class _Meta:
base: Static[str]
num: Static[int]
class _StaticIntWrapper:
n: Static[int]
class _StaticStrWrapper:
s: Static[str]
def _num_digits(s: Static[str]):
if s == "":
return _StaticIntWrapper[0]()
if (s[0] == "0" or s[0] == "1" or
s[0] == "2" or s[0] == "3" or
s[0] == "4" or s[0] == "5" or
s[0] == "6" or s[0] == "7" or
s[0] == "8" or s[0] == "9"):
rest: Static[int] = _num_digits(s[1:]).n
return _StaticIntWrapper[rest + 1]()
return _StaticIntWrapper[0]()
def _parse_static_str(s: Static[str]):
if s == "":
return _StaticIntWrapper[0]()
else:
rest: Static[int] = _parse_static_str(s[:-1]).n * 10
if s[-1] == "0":
return _StaticIntWrapper[rest + 0]()
elif s[-1] == "1":
return _StaticIntWrapper[rest + 1]()
elif s[-1] == "2":
return _StaticIntWrapper[rest + 2]()
elif s[-1] == "3":
return _StaticIntWrapper[rest + 3]()
elif s[-1] == "4":
return _StaticIntWrapper[rest + 4]()
elif s[-1] == "5":
return _StaticIntWrapper[rest + 5]()
elif s[-1] == "6":
return _StaticIntWrapper[rest + 6]()
elif s[-1] == "7":
return _StaticIntWrapper[rest + 7]()
elif s[-1] == "8":
return _StaticIntWrapper[rest + 8]()
elif s[-1] == "9":
return _StaticIntWrapper[rest + 9]()
else:
compile_error("invalid digit: " + s[-1])
def _parse_datetime_helper(s: Static[str], tlen: Static[int]):
if s[-1] != "]":
compile_error("data type '" + s + "' not understood")
num_digits: Static[int] = _num_digits(s[tlen:]).n
num: Static[int] = _parse_static_str(s[tlen:tlen + num_digits]).n if num_digits > 0 else 1
base: Static[str] = s[tlen + num_digits:-1]
return _Meta[base, num]()
def _min_base(base1: Static[str], base2: Static[str]):
if base1 == base2:
return _StaticStrWrapper[base1]()
if base1 == "generic":
return _StaticStrWrapper[base2]()
if base2 == "generic":
return _StaticStrWrapper[base1]()
if base1 == "Y" and base2 == "M":
return _StaticStrWrapper["M"]()
if base1 == "Y" and base2 == "W":
return _StaticStrWrapper["W"]()
if base1 == "Y" and base2 == "D":
return _StaticStrWrapper["D"]()
if base1 == "Y" and base2 == "h":
return _StaticStrWrapper["h"]()
if base1 == "Y" and base2 == "m":
return _StaticStrWrapper["m"]()
if base1 == "Y" and base2 == "s":
return _StaticStrWrapper["s"]()
if base1 == "Y" and base2 == "ms":
return _StaticStrWrapper["ms"]()
if base1 == "Y" and base2 == "us":
return _StaticStrWrapper["us"]()
if base1 == "Y" and base2 == "ns":
return _StaticStrWrapper["ns"]()
if base1 == "Y" and base2 == "ps":
return _StaticStrWrapper["ps"]()
if base1 == "Y" and base2 == "fs":
return _StaticStrWrapper["fs"]()
if base1 == "Y" and base2 == "as":
return _StaticStrWrapper["as"]()
if base1 == "M" and base2 == "Y":
return _StaticStrWrapper["M"]()
if base1 == "M" and base2 == "W":
return _StaticStrWrapper["W"]()
if base1 == "M" and base2 == "D":
return _StaticStrWrapper["D"]()
if base1 == "M" and base2 == "h":
return _StaticStrWrapper["h"]()
if base1 == "M" and base2 == "m":
return _StaticStrWrapper["m"]()
if base1 == "M" and base2 == "s":
return _StaticStrWrapper["s"]()
if base1 == "M" and base2 == "ms":
return _StaticStrWrapper["ms"]()
if base1 == "M" and base2 == "us":
return _StaticStrWrapper["us"]()
if base1 == "M" and base2 == "ns":
return _StaticStrWrapper["ns"]()
if base1 == "M" and base2 == "ps":
return _StaticStrWrapper["ps"]()
if base1 == "M" and base2 == "fs":
return _StaticStrWrapper["fs"]()
if base1 == "M" and base2 == "as":
return _StaticStrWrapper["as"]()
if base1 == "W" and base2 == "Y":
return _StaticStrWrapper["W"]()
if base1 == "W" and base2 == "M":
return _StaticStrWrapper["W"]()
if base1 == "W" and base2 == "D":
return _StaticStrWrapper["D"]()
if base1 == "W" and base2 == "h":
return _StaticStrWrapper["h"]()
if base1 == "W" and base2 == "m":
return _StaticStrWrapper["m"]()
if base1 == "W" and base2 == "s":
return _StaticStrWrapper["s"]()
if base1 == "W" and base2 == "ms":
return _StaticStrWrapper["ms"]()
if base1 == "W" and base2 == "us":
return _StaticStrWrapper["us"]()
if base1 == "W" and base2 == "ns":
return _StaticStrWrapper["ns"]()
if base1 == "W" and base2 == "ps":
return _StaticStrWrapper["ps"]()
if base1 == "W" and base2 == "fs":
return _StaticStrWrapper["fs"]()
if base1 == "W" and base2 == "as":
return _StaticStrWrapper["as"]()
if base1 == "D" and base2 == "Y":
return _StaticStrWrapper["D"]()
if base1 == "D" and base2 == "M":
return _StaticStrWrapper["D"]()
if base1 == "D" and base2 == "W":
return _StaticStrWrapper["D"]()
if base1 == "D" and base2 == "h":
return _StaticStrWrapper["h"]()
if base1 == "D" and base2 == "m":
return _StaticStrWrapper["m"]()
if base1 == "D" and base2 == "s":
return _StaticStrWrapper["s"]()
if base1 == "D" and base2 == "ms":
return _StaticStrWrapper["ms"]()
if base1 == "D" and base2 == "us":
return _StaticStrWrapper["us"]()
if base1 == "D" and base2 == "ns":
return _StaticStrWrapper["ns"]()
if base1 == "D" and base2 == "ps":
return _StaticStrWrapper["ps"]()
if base1 == "D" and base2 == "fs":
return _StaticStrWrapper["fs"]()
if base1 == "D" and base2 == "as":
return _StaticStrWrapper["as"]()
if base1 == "h" and base2 == "Y":
return _StaticStrWrapper["h"]()
if base1 == "h" and base2 == "M":
return _StaticStrWrapper["h"]()
if base1 == "h" and base2 == "W":
return _StaticStrWrapper["h"]()
if base1 == "h" and base2 == "D":
return _StaticStrWrapper["h"]()
if base1 == "h" and base2 == "m":
return _StaticStrWrapper["m"]()
if base1 == "h" and base2 == "s":
return _StaticStrWrapper["s"]()
if base1 == "h" and base2 == "ms":
return _StaticStrWrapper["ms"]()
if base1 == "h" and base2 == "us":
return _StaticStrWrapper["us"]()
if base1 == "h" and base2 == "ns":
return _StaticStrWrapper["ns"]()
if base1 == "h" and base2 == "ps":
return _StaticStrWrapper["ps"]()
if base1 == "h" and base2 == "fs":
return _StaticStrWrapper["fs"]()
if base1 == "h" and base2 == "as":
return _StaticStrWrapper["as"]()
if base1 == "m" and base2 == "Y":
return _StaticStrWrapper["m"]()
if base1 == "m" and base2 == "M":
return _StaticStrWrapper["m"]()
if base1 == "m" and base2 == "W":
return _StaticStrWrapper["m"]()
if base1 == "m" and base2 == "D":
return _StaticStrWrapper["m"]()
if base1 == "m" and base2 == "h":
return _StaticStrWrapper["m"]()
if base1 == "m" and base2 == "s":
return _StaticStrWrapper["s"]()
if base1 == "m" and base2 == "ms":
return _StaticStrWrapper["ms"]()
if base1 == "m" and base2 == "us":
return _StaticStrWrapper["us"]()
if base1 == "m" and base2 == "ns":
return _StaticStrWrapper["ns"]()
if base1 == "m" and base2 == "ps":
return _StaticStrWrapper["ps"]()
if base1 == "m" and base2 == "fs":
return _StaticStrWrapper["fs"]()
if base1 == "m" and base2 == "as":
return _StaticStrWrapper["as"]()
if base1 == "s" and base2 == "Y":
return _StaticStrWrapper["s"]()
if base1 == "s" and base2 == "M":
return _StaticStrWrapper["s"]()
if base1 == "s" and base2 == "W":
return _StaticStrWrapper["s"]()
if base1 == "s" and base2 == "D":
return _StaticStrWrapper["s"]()
if base1 == "s" and base2 == "h":
return _StaticStrWrapper["s"]()
if base1 == "s" and base2 == "m":
return _StaticStrWrapper["s"]()
if base1 == "s" and base2 == "ms":
return _StaticStrWrapper["ms"]()
if base1 == "s" and base2 == "us":
return _StaticStrWrapper["us"]()
if base1 == "s" and base2 == "ns":
return _StaticStrWrapper["ns"]()
if base1 == "s" and base2 == "ps":
return _StaticStrWrapper["ps"]()
if base1 == "s" and base2 == "fs":
return _StaticStrWrapper["fs"]()
if base1 == "s" and base2 == "as":
return _StaticStrWrapper["as"]()
if base1 == "ms" and base2 == "Y":
return _StaticStrWrapper["ms"]()
if base1 == "ms" and base2 == "M":
return _StaticStrWrapper["ms"]()
if base1 == "ms" and base2 == "W":
return _StaticStrWrapper["ms"]()
if base1 == "ms" and base2 == "D":
return _StaticStrWrapper["ms"]()
if base1 == "ms" and base2 == "h":
return _StaticStrWrapper["ms"]()
if base1 == "ms" and base2 == "m":
return _StaticStrWrapper["ms"]()
if base1 == "ms" and base2 == "s":
return _StaticStrWrapper["ms"]()
if base1 == "ms" and base2 == "us":
return _StaticStrWrapper["us"]()
if base1 == "ms" and base2 == "ns":
return _StaticStrWrapper["ns"]()
if base1 == "ms" and base2 == "ps":
return _StaticStrWrapper["ps"]()
if base1 == "ms" and base2 == "fs":
return _StaticStrWrapper["fs"]()
if base1 == "ms" and base2 == "as":
return _StaticStrWrapper["as"]()
if base1 == "us" and base2 == "Y":
return _StaticStrWrapper["us"]()
if base1 == "us" and base2 == "M":
return _StaticStrWrapper["us"]()
if base1 == "us" and base2 == "W":
return _StaticStrWrapper["us"]()
if base1 == "us" and base2 == "D":
return _StaticStrWrapper["us"]()
if base1 == "us" and base2 == "h":
return _StaticStrWrapper["us"]()
if base1 == "us" and base2 == "m":
return _StaticStrWrapper["us"]()
if base1 == "us" and base2 == "s":
return _StaticStrWrapper["us"]()
if base1 == "us" and base2 == "ms":
return _StaticStrWrapper["us"]()
if base1 == "us" and base2 == "ns":
return _StaticStrWrapper["ns"]()
if base1 == "us" and base2 == "ps":
return _StaticStrWrapper["ps"]()
if base1 == "us" and base2 == "fs":
return _StaticStrWrapper["fs"]()
if base1 == "us" and base2 == "as":
return _StaticStrWrapper["as"]()
if base1 == "ns" and base2 == "Y":
return _StaticStrWrapper["ns"]()
if base1 == "ns" and base2 == "M":
return _StaticStrWrapper["ns"]()
if base1 == "ns" and base2 == "W":
return _StaticStrWrapper["ns"]()
if base1 == "ns" and base2 == "D":
return _StaticStrWrapper["ns"]()
if base1 == "ns" and base2 == "h":
return _StaticStrWrapper["ns"]()
if base1 == "ns" and base2 == "m":
return _StaticStrWrapper["ns"]()
if base1 == "ns" and base2 == "s":
return _StaticStrWrapper["ns"]()
if base1 == "ns" and base2 == "ms":
return _StaticStrWrapper["ns"]()
if base1 == "ns" and base2 == "us":
return _StaticStrWrapper["ns"]()
if base1 == "ns" and base2 == "ps":
return _StaticStrWrapper["ps"]()
if base1 == "ns" and base2 == "fs":
return _StaticStrWrapper["fs"]()
if base1 == "ns" and base2 == "as":
return _StaticStrWrapper["as"]()
if base1 == "ps" and base2 == "Y":
return _StaticStrWrapper["ps"]()
if base1 == "ps" and base2 == "M":
return _StaticStrWrapper["ps"]()
if base1 == "ps" and base2 == "W":
return _StaticStrWrapper["ps"]()
if base1 == "ps" and base2 == "D":
return _StaticStrWrapper["ps"]()
if base1 == "ps" and base2 == "h":
return _StaticStrWrapper["ps"]()
if base1 == "ps" and base2 == "m":
return _StaticStrWrapper["ps"]()
if base1 == "ps" and base2 == "s":
return _StaticStrWrapper["ps"]()
if base1 == "ps" and base2 == "ms":
return _StaticStrWrapper["ps"]()
if base1 == "ps" and base2 == "us":
return _StaticStrWrapper["ps"]()
if base1 == "ps" and base2 == "ns":
return _StaticStrWrapper["ps"]()
if base1 == "ps" and base2 == "fs":
return _StaticStrWrapper["fs"]()
if base1 == "ps" and base2 == "as":
return _StaticStrWrapper["as"]()
if base1 == "fs" and base2 == "Y":
return _StaticStrWrapper["fs"]()
if base1 == "fs" and base2 == "M":
return _StaticStrWrapper["fs"]()
if base1 == "fs" and base2 == "W":
return _StaticStrWrapper["fs"]()
if base1 == "fs" and base2 == "D":
return _StaticStrWrapper["fs"]()
if base1 == "fs" and base2 == "h":
return _StaticStrWrapper["fs"]()
if base1 == "fs" and base2 == "m":
return _StaticStrWrapper["fs"]()
if base1 == "fs" and base2 == "s":
return _StaticStrWrapper["fs"]()
if base1 == "fs" and base2 == "ms":
return _StaticStrWrapper["fs"]()
if base1 == "fs" and base2 == "us":
return _StaticStrWrapper["fs"]()
if base1 == "fs" and base2 == "ns":
return _StaticStrWrapper["fs"]()
if base1 == "fs" and base2 == "ps":
return _StaticStrWrapper["fs"]()
if base1 == "fs" and base2 == "as":
return _StaticStrWrapper["as"]()
if base1 == "as" and base2 == "Y":
return _StaticStrWrapper["as"]()
if base1 == "as" and base2 == "M":
return _StaticStrWrapper["as"]()
if base1 == "as" and base2 == "W":
return _StaticStrWrapper["as"]()
if base1 == "as" and base2 == "D":
return _StaticStrWrapper["as"]()
if base1 == "as" and base2 == "h":
return _StaticStrWrapper["as"]()
if base1 == "as" and base2 == "m":
return _StaticStrWrapper["as"]()
if base1 == "as" and base2 == "s":
return _StaticStrWrapper["as"]()
if base1 == "as" and base2 == "ms":
return _StaticStrWrapper["as"]()
if base1 == "as" and base2 == "us":
return _StaticStrWrapper["as"]()
if base1 == "as" and base2 == "ns":
return _StaticStrWrapper["as"]()
if base1 == "as" and base2 == "ps":
return _StaticStrWrapper["as"]()
if base1 == "as" and base2 == "fs":
return _StaticStrWrapper["as"]()
compile_error("unknown units: " + base1 + " and/or " + base2)
def _base_factor(base: Static[str]):
if base == "Y":
return _StaticIntWrapper[1]()
if base == "M":
return _StaticIntWrapper[1]()
if base == "W":
return _StaticIntWrapper[7]()
if base == "D":
return _StaticIntWrapper[24]()
if base == "h":
return _StaticIntWrapper[60]()
if base == "m":
return _StaticIntWrapper[60]()
if base == "s":
return _StaticIntWrapper[1000]()
if base == "ms":
return _StaticIntWrapper[1000]()
if base == "us":
return _StaticIntWrapper[1000]()
if base == "ns":
return _StaticIntWrapper[1000]()
if base == "ps":
return _StaticIntWrapper[1000]()
if base == "fs":
return _StaticIntWrapper[1000]()
if base == "as":
return _StaticIntWrapper[1]()
if base == "generic":
return _StaticIntWrapper[0]()
compile_error("[internal error] no factor for base: " + base)
def _next_base(base: Static[str]):
if base == "Y":
return _StaticStrWrapper["M"]()
if base == "M":
return _StaticStrWrapper["W"]()
if base == "W":
return _StaticStrWrapper["D"]()
if base == "D":
return _StaticStrWrapper["h"]()
if base == "h":
return _StaticStrWrapper["m"]()
if base == "m":
return _StaticStrWrapper["s"]()
if base == "s":
return _StaticStrWrapper["ms"]()
if base == "ms":
return _StaticStrWrapper["us"]()
if base == "us":
return _StaticStrWrapper["ns"]()
if base == "ns":
return _StaticStrWrapper["ps"]()
if base == "ps":
return _StaticStrWrapper["fs"]()
if base == "fs":
return _StaticStrWrapper["as"]()
if base == "as":
return _StaticStrWrapper["generic"]()
if base == "generic":
return _StaticStrWrapper["<invalid>"]()
compile_error("[internal error] no next base for: " + base)
def _get_units_factor(bigbase: Static[str],
littlebase: Static[str],
factor: Static[int] = 1):
minbase: Static[str] = _min_base(bigbase, littlebase).s
if bigbase != littlebase and minbase == littlebase:
next_factor: Static[int] = factor * _base_factor(bigbase).n
if next_factor & 0xff00000000000000 != 0:
compile_error("Integer overflow getting a common metadata divisor")
return _get_units_factor(
_next_base(bigbase).s, littlebase, next_factor)
else:
return _StaticIntWrapper[factor]()
def _static_gcd(x: Static[int], y: Static[int]):
if y == 0:
return _StaticIntWrapper[x]()
else:
return _static_gcd(y, x % y)
def _meta_gcd(meta1: _Meta, meta2: _Meta, strict1: Static[int],
strict2: Static[int]):
def incompatible_units(base1: Static[str], base2: Static[str]):
compile_error(
"Cannot get a common metadata divisor for Numpy datetime metadata ["
+ base1 + "] and [" + base2 +
"] because they have incompatible nonlinear " + "base time units.")
def overflow():
compile_error("Integer overflow getting a common metadata divisor")
def make_meta(base: Static[str], num1: Static[int], num2: Static[int],
f1: Static[int], f2: Static[int]):
num: Static[int] = _static_gcd(num1 * f1, num2 * f2).n
if num <= 0 or num > 0x7fffffff:
overflow()
return _Meta[base, num]()
if meta1.base == "generic":
return meta2
if meta2.base == "generic":
return meta1
if meta1.base == meta2.base:
return make_meta(meta1.base, meta1.num, meta2.num, 1, 1)
f_12: Static[int] = _get_units_factor(meta1.base, meta2.base).n
f_21: Static[int] = _get_units_factor(meta2.base, meta1.base).n
f1: Static[int] = f_12 if (
_min_base(meta1.base, meta2.base).s == meta2.base
and meta1.base != meta2.base) else 1
f2: Static[int] = f_21 if (
_min_base(meta1.base, meta2.base).s == meta1.base
and meta1.base != meta2.base) else 1
if meta1.base == "Y":
if meta2.base == "M":
return make_meta("M", meta1.num * 12, meta2.num, f1, f2)
elif strict1:
incompatible_units(meta1.base, meta2.base)
else:
return make_meta(meta2.base, meta1.num, meta2.num, f1, f2)
elif meta2.base == "Y":
if meta1.base == "M":
return make_meta("M", meta1.num, meta2.num * 12)
elif strict2:
incompatible_units(meta1.base, meta2.base)
else:
return make_meta(meta1.base, meta1.num, meta2.num, f1, f2)
elif meta1.base == "M":
if strict1:
incompatible_units(meta1.base, meta2.base)
else:
return make_meta(meta2.base, meta1.num, meta2.num, f1, f2)
elif meta2.base == "M":
if strict2:
incompatible_units(meta1.base, meta2.base)
else:
return make_meta(meta1.base, meta1.num, meta2.num, f1, f2)
else:
return make_meta(
_min_base(meta1.base, meta2.base).s, meta1.num, meta2.num, f1, f2)
def _can_cast_units(src_unit: int,
dst_unit: int,
casting: Static[str] = 'same_kind'):
if casting == 'unsafe':
return True
elif casting == 'same_kind':
if src_unit == _FR_GENERIC or dst_unit == _FR_GENERIC:
return src_unit == _FR_GENERIC
else:
return True
elif casting == 'safe':
if src_unit == _FR_GENERIC or dst_unit == _FR_GENERIC:
return src_unit == _FR_GENERIC
else:
return src_unit <= dst_unit
else:
return src_unit == dst_unit
@tuple
class DatetimeMetaData:
base: i32
num: i32
@tuple
class timedelta64:
value: int
base: Static[str]
num: Static[int]
@tuple
class datetime64:
value: int
base: Static[str]
num: Static[int]
class datetimestruct:
year: int
month: i32
day: i32
hour: i32
min: i32
sec: i32
us: i32
ps: i32
as_: i32
def _parse_datetime_type(s: Static[str]):
if s[:11] == "datetime64[":
meta = _parse_datetime_helper(s, 11)
return datetime64[meta.base, meta.num]()
elif s[:12] == "timedelta64[":
meta = _parse_datetime_helper(s, 12)
return timedelta64[meta.base, meta.num]()
elif s[:3] == "M8[":
meta = _parse_datetime_helper(s, 3)
return datetime64[meta.base, meta.num]()
elif s[:3] == "m8[":
meta = _parse_datetime_helper(s, 3)
return timedelta64[meta.base, meta.num]()
else:
compile_error("unknown datetime type: " + s)
def _promote(T1: type, T2: type):
meta1 = _Meta[T1.base, T1.num]()
meta2 = _Meta[T2.base, T2.num]()
strict1: Static[int] = isinstance(T1, timedelta64)
strict2: Static[int] = isinstance(T2, timedelta64)
meta_out = _meta_gcd(meta1, meta2, strict1, strict2)
if isinstance(T1, datetime64) or isinstance(T2, datetime64):
return datetime64[meta_out.base, meta_out.num]()
else:
return timedelta64[meta_out.base, meta_out.num]()
def _coerce(d1, d2):
T = type(_promote(type(d1), type(d2)))
if isinstance(d1, datetime64):
r1 = d1._cast(datetime64[T.base, T.num])
else:
r1 = d1._cast(timedelta64[T.base, T.num])
if isinstance(d2, datetime64):
r2 = d2._cast(datetime64[T.base, T.num])
else:
r2 = d2._cast(timedelta64[T.base, T.num])
return r1, r2
@extend
class DatetimeMetaData:
def __new__(base: int, num: int = 1) -> DatetimeMetaData:
return (i32(base), i32(num))
def __new__(base: Static[str], num: int = 1) -> DatetimeMetaData:
return DatetimeMetaData(base=_base_code(base), num=num)
def __new__(dt: datetime64):
return DatetimeMetaData(base=_base_code(dt.base), num=dt.num)
def __new__(td: timedelta64):
return DatetimeMetaData(base=_base_code(td.base), num=td.num)
def divisor_to_multiple(self, den: int, metastr: str):
base = self.base
if base == i32(_FR_GENERIC):
raise ValueError("Can't use 'den' divisor with generic units")
baseunit = (0, ) * 2
totry = (0, ) * 4
num = 3
if base == i32(_FR_W):
num = 4
elif base == i32(_FR_D):
num = 2
elif base >= i32(_FR_s):
ind = (_FR_s - _FR_Y) * 2
totry = _multiples_table[ind]
baseunit = _multiples_table[ind + 1]
baseunit = (int(base) + 1, int(base) + 2) + baseunit[2:]
if base == i32(_FR_as - 1):
num = 1
if base == i32(_FR_as):
num = 0
else:
ind = (int(self.base) - _FR_Y) * 2
totry = _multiples_table[ind]
baseunit = _multiples_table[ind + 1]
q = i32(0)
r = i32(0)
for i in range(num):
q = totry[i] // den
r = totry[i] % den
if r == i32(0):
break
if i == num:
if not metastr:
msg = f"divisor ({den}) is not a multiple of a lower-unit in datetime metadata"
else:
msg = f"divisor ({den}) is not a multiple of a lower-unit in datetime metadata \"{metastr}\""
raise ValueError(msg)
return DatetimeMetaData(base=baseunit[i], num=int(self.num * q))
def conversion_factor(self, dst: DatetimeMetaData):
num = u64(1)
denom = u64(1)
if self.base == i32(_FR_GENERIC):
return (1, 1)
elif dst.base == i32(_FR_GENERIC):
raise ValueError("Cannot convert from specific units to generic "
"units in NumPy datetimes or timedeltas")
swapped = False
if self.base <= dst.base:
src_base = int(self.base)
dst_base = int(dst.base)
else:
src_base = int(dst.base)
dst_base = int(self.base)
swapped = True
if src_base != dst_base:
if src_base == _FR_Y:
if dst_base == _FR_M:
num *= u64(12)
elif dst_base == _FR_W:
num *= u64(97 + 400 * 365)
denom *= u64(400 * 7)
else:
num *= u64(97 + 400 * 365)
denom *= u64(400)
num *= _get_datetime_units_factor(_FR_D, dst_base)
elif src_base == _FR_M:
if dst_base == _FR_W:
num *= u64(97 + 400 * 365)
denom *= u64(400 * 12 * 7)
else:
num *= u64(97 + 400 * 365)
denom *= u64(400 * 12)
num *= _get_datetime_units_factor(_FR_D, dst_base)
else:
num *= _get_datetime_units_factor(src_base, dst_base)
if not num:
raise OverflowError(
f"Integer overflow while computing the conversion factor between NumPy datetime units {_datetime_strings[src_base]} and {_datetime_strings[dst_base]}"
)
if swapped:
tmp = num
num = denom
denom = tmp
num *= u64(int(self.num))
denom *= u64(int(dst.num))
gcd = _uint64_euclidean_gcd(num, denom)
return int(num // gcd), int(denom // gcd)
@extend
class timedelta64:
def __new__() -> timedelta64[base, num]:
return (0, )
def __new__(value: int,
base: Static[str],
num: Static[int]) -> timedelta64[base, num]:
_validate_base(base)
_validate_num(num)
return (value, )
def __new__(td: timedelta64,
base: Static[str],
num: Static[int]) -> timedelta64[base, num]:
_validate_base(base)
_validate_num(num)
return td._cast(timedelta64[base, num])
def __new__(s: str,
base: Static[str],
num: Static[int]) -> timedelta64[base, num]:
_validate_base(base)
_validate_num(num)
if (len(s) == 3 and (s.ptr[0] == byte(78) or s.ptr[0] == byte(110))
and (s.ptr[1] == byte(65) or s.ptr[1] == byte(97))
and (s.ptr[2] == byte(116) or s.ptr[2] == byte(84))):
return (_DATETIME_NAT, )
else:
return (int(s), )
def __new__(value: int, unit: Static[str]):
TD = _parse_datetime_type("timedelta64[" + unit + "]")
return timedelta64(value, TD.base, TD.num)
def __new__(td: timedelta64, unit: Static[str] = "generic"):
TD = _parse_datetime_type("timedelta64[" + unit + "]")
return timedelta64(td, TD.base, TD.num)
def __new__(s: str, unit: Static[str] = "generic"):
TD = _parse_datetime_type("timedelta64[" + unit + "]")
return timedelta64(s, TD.base, TD.num)
def __new__(td: datetime.timedelta, unit: Static[str] = "us"):
TD = _parse_datetime_type("timedelta64[" + unit + "]")
return timedelta64(td._microseconds, "us", 1)._cast(type(TD))
def _cast(self, TD: type):
if not isinstance(TD, timedelta64):
compile_error(
"[internal error] can only cast timedelta64 to timedelta64")
meta1 = DatetimeMetaData(self.base, self.num)
meta2 = DatetimeMetaData(TD.base, TD.num)
num, denom = meta1.conversion_factor(meta2)
if num == 0:
raise TypeError(
f"cannot cast {self.__class__.__name__} to {TD.__name__}")
if self._nat:
return timedelta64(_DATETIME_NAT, TD.base, TD.num)
elif self.value < 0:
return timedelta64(_cdiv(self.value * num - (denom - 1), denom),
TD.base, TD.num)
else:
return timedelta64(_cdiv(self.value * num, denom), TD.base, TD.num)
def _code():
return _base_code(base)
@property
def _nat(self):
return self.value == _DATETIME_NAT
def tolist(self):
v = self.num * self.value
if (base == "M" or base == "ns" or base == "ps" or base == "fs"
or base == "as" or base == "generic"):
return self.value
elif base == "Y":
return datetime.timedelta(days=(365 * v))
elif base == "W":
return datetime.timedelta(weeks=v)
elif base == "D":
return datetime.timedelta(days=v)
elif base == "h":
return datetime.timedelta(hours=v)
elif base == "m":
return datetime.timedelta(minutes=v)
elif base == "s":
return datetime.timedelta(seconds=v)
elif base == "ms":
return datetime.timedelta(milliseconds=v)
elif base == "us":
return datetime.timedelta(microseconds=v)
else:
_unit_error(base)
def __eq__(self, other: timedelta64):
if self._nat or other._nat:
return False
else:
self, other = _coerce(self, other)
return self.value == other.value
def __ne__(self, other: timedelta64):
if self._nat or other._nat:
return True
else:
self, other = _coerce(self, other)
return self.value != other.value
def __lt__(self, other: timedelta64):
if self._nat or other._nat:
return False
else:
self, other = _coerce(self, other)
return self.value < other.value
def __gt__(self, other: timedelta64):
if self._nat or other._nat:
return False
else:
self, other = _coerce(self, other)
return self.value > other.value
def __le__(self, other: timedelta64):
if self._nat or other._nat:
return False
else:
self, other = _coerce(self, other)
return self.value <= other.value
def __ge__(self, other: timedelta64):
if self._nat or other._nat:
return False
else:
self, other = _coerce(self, other)
return self.value >= other.value
def __bool__(self):
return bool(self.value)
def __pos__(self):
return self
def __neg__(self):
if self._nat:
return self
else:
return timedelta64(-self.value, base, num)
def __abs__(self):
if self._nat:
return self
else:
v = self.value
return timedelta64(v if v >= 0 else -v, base, num)
def _sign(self):
v = self.value
return timedelta64(1 if v > 0 else (-1 if v < 0 else 0), base, num)
def __add__(self, other):
if (isinstance(other, int) or
isinstance(other, bool) or
isinstance(other, byte) or
isinstance(other, Int) or
isinstance(other, UInt)):
return self + int(other)
else:
compile_error("cannot add timedelta64 and non-int type")
def __radd__(self, other):
if (isinstance(other, int) or
isinstance(other, bool) or
isinstance(other, byte) or
isinstance(other, Int) or
isinstance(other, UInt)):
return int(other) + self
else:
compile_error("cannot add timedelta64 and non-int type")
def __add__(self, other: timedelta64):
self, other = _coerce(self, other)
if self._nat or other._nat:
return timedelta64(_DATETIME_NAT, self.base, self.num)
else:
return timedelta64(self.value + other.value, self.base, self.num)
def __add__(self, other: datetime64):
self, other = _coerce(self, other)
if self._nat or other._nat:
return datetime64(_DATETIME_NAT, self.base, self.num)
else:
return datetime64(self.value + other.value, self.base, self.num)
def __add__(self, other: int):
return self + timedelta64(other, base, num)
def __radd__(self, other: int):
return self + timedelta64(other, base, num)
def __sub__(self, other):
if (isinstance(other, int) or
isinstance(other, bool) or
isinstance(other, byte) or
isinstance(other, Int) or
isinstance(other, UInt)):
return self - int(other)
else:
compile_error("cannot subtract timedelta64 and non-int type")
def __rsub__(self, other):
if (isinstance(other, int) or
isinstance(other, bool) or
isinstance(other, byte) or
isinstance(other, Int) or
isinstance(other, UInt)):
return int(other) - self
else:
compile_error("cannot subtract timedelta64 and non-int type")
def __sub__(self, other: timedelta64):
self, other = _coerce(self, other)
if self._nat or other._nat:
return timedelta64(_DATETIME_NAT, self.base, self.num)
else:
return timedelta64(self.value - other.value, self.base, self.num)
def __sub__(self, other: int):
return self - timedelta64(other, base, num)
def __rsub__(self, other: int):
return (-self) + other
def __mul__(self, other):
if (isinstance(other, int) or
isinstance(other, bool) or
isinstance(other, byte) or
isinstance(other, Int) or
isinstance(other, UInt)):
return self * int(other)
elif (isinstance(other, float) or
isinstance(other, float32) or
isinstance(other, float16)):
return self * float(other)
else:
compile_error("cannot multiply timedelta64 and non-int, non-float type")
def __rmul__(self, other):
if (isinstance(other, int) or
isinstance(other, bool) or
isinstance(other, byte) or
isinstance(other, Int) or
isinstance(other, UInt)):
return int(other) * self
elif (isinstance(other, float) or
isinstance(other, float32) or
isinstance(other, float16)):
return float(other) * self
else:
compile_error("cannot multiply timedelta64 and non-int, non-float type")
def __mul__(self, other: float):
if self._nat:
return timedelta64(_DATETIME_NAT, base, num)
else:
x = self.value * other
if math.isfinite(x):
return timedelta64(int(x), base)
else:
return timedelta64(_DATETIME_NAT, base, num)
def __rmul__(self, other: float):
if self._nat:
return timedelta64(_DATETIME_NAT, base, num)
else:
x = self.value * other
if math.isfinite(x):
return timedelta64(int(x), base)
else:
return timedelta64(_DATETIME_NAT, base, num)
def __mul__(self, other: int):
if self._nat:
return timedelta64(_DATETIME_NAT, base, num)
else:
return timedelta64(self.value * other, base, num)
def __rmul__(self, other: int):
if self._nat:
return timedelta64(_DATETIME_NAT, base, num)
else:
return timedelta64(self.value * other, base, num)
def __floordiv__(self, other):
if (isinstance(other, int) or
isinstance(other, bool) or
isinstance(other, byte) or
isinstance(other, Int) or
isinstance(other, UInt)):
return self // int(other)
elif (isinstance(other, float) or
isinstance(other, float32) or
isinstance(other, float16)):
return self // float(other)
else:
compile_error("cannot floordiv timedelta64 and non-int, non-float type")
def __floordiv__(self, other: timedelta64):
if self._nat or other._nat:
return 0
else:
self, other = _coerce(self, other)
in1 = self.value
in2 = other.value
if in2 == 0:
return 0
else:
x = _cdiv(in1, in2)
if (in1 > 0) != (in2 > 0) and x * in2 != in1:
x -= 1
return x
def __floordiv__(self, other: float):
if self._nat or other == 0.0 or math.isnan(other):
return timedelta64(_DATETIME_NAT, base, num)
else:
return timedelta64(int(float(self.value) / other), base, num)
def __floordiv__(self, other: int):
if self._nat or other == 0:
return timedelta64(_DATETIME_NAT, base, num)
else:
return timedelta64(_cdiv(self.value, other), base, num)
def __truediv__(self, other):
if (isinstance(other, int) or
isinstance(other, bool) or
isinstance(other, byte) or
isinstance(other, Int) or
isinstance(other, UInt)):
return self / int(other)
elif (isinstance(other, float) or
isinstance(other, float32) or
isinstance(other, float16)):
return self / float(other)
else:
compile_error("cannot truediv timedelta64 and non-int, non-float type")
def __truediv__(self, other: timedelta64):
self, other = _coerce(self, other)
if self._nat or other._nat:
return math.nan
else:
return self.value / other.value
def __truediv__(self, other: float):
if self._nat or other == 0.0 or math.isnan(other):
return timedelta64(_DATETIME_NAT, base, num)
else:
return timedelta64(int(float(self.value) / other), base, num)
def __truediv__(self, other: int):
if self._nat or other == 0:
return timedelta64(_DATETIME_NAT, base, num)
else:
return timedelta64(int(float(self.value) / other), base, num)
def __mod__(self, other: timedelta64):
self, other = _coerce(self, other)
if self._nat or other._nat:
return timedelta64(_DATETIME_NAT, self.base, self.num)
else:
in1 = self.value
in2 = other.value
if in2 == 0:
return timedelta64(_DATETIME_NAT, self.base, self.num)
else:
rem = _cmod(in1, in2)
if (in1 > 0) == (in2 > 0) or rem == 0:
return timedelta64(rem, self.base)
else:
return timedelta64(rem + in2, self.base, self.num)
def __divmod__(self, other: timedelta64):
self, other = _coerce(self, other)
if self._nat or other._nat:
return (0, timedelta64(_DATETIME_NAT, self.base, self.num))
else:
in1 = self.value
in2 = other.value
if in2 == 0:
return (0, timedelta64(_DATETIME_NAT, self.base, self.num))
else:
quo = _cdiv(in1, in2)
rem = _cmod(in1, in2)
if (in1 > 0) == (in2 > 0) or rem == 0:
return (quo, timedelta64(rem, self.base, self.num))
else:
return (quo - 1, timedelta64(rem + in2, self.base,
self.num))
def __repr__(self):
val = "NaT" if self._nat else str(self.value * self.num)
if base == "generic":
return f"numpy.timedelta64({val}, '{base}')"
else:
return f"numpy.timedelta64({val})"
def __str__(self):
if self._nat:
return "NaT"
else:
return f"{self.value * self.num} {_base_str(base)}"
@extend
class datetime64:
def __new__() -> datetime64[base, num]:
return (0, )
def __new__(value: int,
base: Static[str],
num: Static[int]) -> datetime64[base, num]:
_validate_base(base)
_validate_num(num)
return (value, )
def __new__(value: datetime64,
base: Static[str],
num: Static[int]) -> datetime64[base, num]:
_validate_base(base)
_validate_num(num)
return value._cast(datetime64[base, num])
def __new__(s: str,
base: Static[str],
num: Static[int]) -> datetime64[base, num]:
_validate_base(base)
_validate_num(num)
meta = DatetimeMetaData(base=base, num=num)
dts = datetimestruct(s)
return (dts.to_datetime64(meta), )
def __new__(value: int, unit: Static[str]):
DT = _parse_datetime_type("datetime64[" + unit + "]")
return datetime64(value, DT.base, DT.num)
def __new__(dt: datetime64, unit: Static[str]):
DT = _parse_datetime_type("datetime64[" + unit + "]")
return datetime64(dt, DT.base, DT.num)
def __new__(s: str, unit: Static[str] = "us"):
DT = _parse_datetime_type("datetime64[" + unit + "]")
return datetime64(s, DT.base, DT.num)
def __new__(dt: datetime.datetime, unit: Static[str] = "us"):
DT = _parse_datetime_type("datetime64[" + unit + "]")
meta = DatetimeMetaData(base=DT.base, num=DT.num)
dts = datetimestruct(dt)
return datetime64(dts.to_datetime64(meta), DT.base, DT.num)
def __new__(dt: datetime.date, unit: Static[str] = "D"):
DT = _parse_datetime_type("datetime64[" + unit + "]")
meta = DatetimeMetaData(base=DT.base, num=DT.num)
dts = datetimestruct(datetime.datetime(year=dt.year, month=dt.month, day=dt.day))
return datetime64(dts.to_datetime64(meta), DT.base, DT.num)
def _cast(self, DT: type):
if not isinstance(DT, datetime64):
compile_error(
"[internal error] can only cast datetime64 to datetime64")
meta1 = DatetimeMetaData(self.base, self.num)
meta2 = DatetimeMetaData(DT.base, DT.num)
dts = datetimestruct()
dts.from_datetime64(meta1, self.value)
return datetime64(dts.to_datetime64(meta2), DT.base, DT.num)
def _code():
return _base_code(base)
def tolist(self):
if (base == "ns" or base == "ps" or base == "fs" or base == "as"):
return self.value
else:
meta = DatetimeMetaData(base, num)
dts = datetimestruct()
dts.from_datetime64(meta, self.value)
return dts.datetime
@property
def _nat(self):
return self.value == _DATETIME_NAT
def __eq__(self, other: datetime64):
if self._nat or other._nat:
return False
else:
self, other = _coerce(self, other)
return self.value == other.value
def __ne__(self, other: datetime64):
if self._nat or other._nat:
return True
else:
self, other = _coerce(self, other)
return self.value != other.value
def __lt__(self, other: datetime64):
if self._nat or other._nat:
return False
else:
self, other = _coerce(self, other)
return self.value < other.value
def __gt__(self, other: datetime64):
if self._nat or other._nat:
return False
else:
self, other = _coerce(self, other)
return self.value > other.value
def __le__(self, other: datetime64):
if self._nat or other._nat:
return False
else:
self, other = _coerce(self, other)
return self.value <= other.value
def __ge__(self, other: datetime64):
if self._nat or other._nat:
return False
else:
self, other = _coerce(self, other)
return self.value >= other.value
def __bool__(self):
return bool(self.value)
def __add__(self, other):
if (isinstance(other, int) or
isinstance(other, bool) or
isinstance(other, byte) or
isinstance(other, Int) or
isinstance(other, UInt)):
return self + int(other)
else:
compile_error("cannot add datetime64 and non-int type")
def __radd__(self, other):
if (isinstance(other, int) or
isinstance(other, bool) or
isinstance(other, byte) or
isinstance(other, Int) or
isinstance(other, UInt)):
return int(other) + self
else:
compile_error("cannot add datetime64 and non-int type")
def __add__(self, other: timedelta64):
self, other = _coerce(self, other)
if self._nat or other._nat:
return datetime64(_DATETIME_NAT, self.base, self.num)
else:
return datetime64(self.value + other.value, self.base, self.num)
def __add__(self, other: int):
return self + timedelta64(other, base, num)
def __radd__(self, other: int):
return self + timedelta64(other, base, num)
def __sub__(self, other):
if (isinstance(other, int) or
isinstance(other, bool) or
isinstance(other, byte) or
isinstance(other, Int) or
isinstance(other, UInt)):
return self - int(other)
else:
compile_error("cannot subtract datetime64 and non-int type")
def __sub__(self, other: timedelta64):
self, other = _coerce(self, other)
if self._nat or other._nat:
return datetime64(_DATETIME_NAT, self.base, self.num)
else:
return datetime64(self.value - other.value, self.base, self.num)
def __sub__(self, other: datetime64):
self, other = _coerce(self, other)
if self._nat or other._nat:
return timedelta64(_DATETIME_NAT, self.base, self.num)
else:
return timedelta64(self.value - other.value, self.base, self.num)
def __sub__(self, other: int):
return self - timedelta64(other, base, num)
def __repr__(self):
return f"numpy.datetime64('{self.__str__()}', '{base}')"
def __str__(self):
dts = datetimestruct()
meta = DatetimeMetaData(base, num)
dts.from_datetime64(meta, self.value)
return dts.to_str(_base_code(base))
def _as_string(self, unit=None, timezone: str = "naive"):
unit_code = _base_code(base)
if unit is not None:
if not isinstance(unit, str):
compile_error("unit must be a string or None")
if unit == "auto":
unit_code = _FR_ERROR
else:
unit_code = _base_code_nonstatic(unit)
local = False
utc = False
if timezone == "local":
local = True
elif timezone == "UTC":
utc = True
elif timezone != "naive":
raise ValueError(
f"Unsupported timezone input string \"{timezone}\"")
meta = DatetimeMetaData(base, num)
dts = datetimestruct()
dts.from_datetime64(meta, self.value)
return dts.to_str(unit=unit_code, local=local, utc=utc, tzoffset=-1)
@extend
class datetimestruct:
def __init__(self):
self.reset()
def __init__(self, s: str):
self.parse_iso8601(s)
def __init__(self, dt: datetime.datetime):
self.year = dt.year
self.month = i32(dt.month)
self.day = i32(dt.day)
self.hour = i32(dt.hour)
self.min = i32(dt.minute)
self.sec = i32(dt.second)
self.us = i32(dt.microsecond)
self.ps = i32(0)
self.as_ = i32(0)
def reset(self):
self.year = 0
self.month = i32(0)
self.day = i32(0)
self.hour = i32(0)
self.min = i32(0)
self.sec = i32(0)
self.us = i32(0)
self.ps = i32(0)
self.as_ = i32(0)
def copy_to(self, other: datetimestruct):
other.year = self.year
other.month = self.month
other.day = self.day
other.hour = self.hour
other.min = self.min
other.sec = self.sec
other.us = self.us
other.ps = self.ps
other.as_ = self.as_
@property
def datetime(self):
return datetime.datetime(year=self.year,
month=int(self.month),
day=int(self.day),
hour=int(self.hour),
minute=int(self.min),
second=int(self.sec),
microsecond=int(self.us))
@property
def days(self):
year = self.year - 1970
days = year * 365
if days >= 0:
year += 1
days += year // 4
year += 68
days -= year // 100
year += 300
days += year // 400
else:
year -= 2
days += year // 4
year -= 28
days -= year // 100
days += year // 400
month_lengths = _days_per_month_table[int(_is_leapyear(self.year))]
month = int(self.month) - 1
for i in range(month):
days += month_lengths[i]
days += int(self.day) - 1
return days
@property
def minutes(self):
days = self.days * 24 * 60
days += int(self.hour) * 60
days += int(self.min)
return days
def set_days(self, days: int):
days, year = _days_to_yearsdays(days)
self.year = year
month_lengths = _days_per_month_table[int(_is_leapyear(self.year))]
for i in range(12):
if days < month_lengths[i]:
self.month = i32(i + 1)
self.day = i32(days + 1)
return
else:
days -= month_lengths[i]
def add_minutes(self, minutes: i32):
self.min += minutes
h, m = _extract_unit_32(self.min, i32(60))
self.min = m
self.hour += h
d, h = _extract_unit_32(self.hour, i32(24))
self.hour = h
self.day += d
if self.day < i32(1):
self.month -= i32(1)
if self.month < i32(1):
self.year -= 1
self.month = i32(12)
leap = int(_is_leapyear(self.year))
self.day += i32(_days_per_month_table[leap][int(self.month) - 1])
elif self.day > i32(28):
leap = int(_is_leapyear(self.year))
dpm = i32(_days_per_month_table[leap][int(self.month) - 1])
if self.day > dpm:
self.day -= dpm
self.month += i32(1)
if self.month > i32(12):
self.year += 1
self.month = i32(1)
def to_datetime64(self, meta: DatetimeMetaData):
if self.year == _DATETIME_NAT:
return _DATETIME_NAT
base = meta.base
if base == i32(_FR_GENERIC):
raise ValueError("Cannot create a NumPy datetime other than NaT "
"with generic units")
ret = 0
if base == i32(_FR_Y):
ret = self.year - 1970
elif base == i32(_FR_M):
ret = 12 * (self.year - 1970) + (int(self.month) - 1)
else:
days = self.days
if base == i32(_FR_W):
if days >= 0:
ret = days // 7
else:
ret = (days - 6) // 7
elif base == i32(_FR_D):
ret = days
elif base == i32(_FR_h):
ret = days * 24 + int(self.hour)
elif base == i32(_FR_m):
ret = (days * 24 + int(self.hour)) * 60 + int(self.min)
elif base == i32(_FR_s):
ret = ((days * 24 + int(self.hour)) * 60 +
int(self.min)) * 60 + int(self.sec)
elif base == i32(_FR_ms):
ret = ((
(days * 24 + int(self.hour)) * 60 + int(self.min)) * 60 +
int(self.sec)) * 1000 + int(self.us) // 1000
elif base == i32(_FR_us):
ret = ((
(days * 24 + int(self.hour)) * 60 + int(self.min)) * 60 +
int(self.sec)) * 1000000 + int(self.us)
elif base == i32(_FR_ns):
ret = (((
(days * 24 + int(self.hour)) * 60 + int(self.min)) * 60 +
int(self.sec)) * 1000000 + int(self.us)) * 1000 + int(
self.ps) // 1000
elif base == i32(_FR_ps):
ret = (
(((days * 24 + int(self.hour)) * 60 + int(self.min)) * 60 +
int(self.sec)) * 1000000 + int(self.us)) * 1000000 + int(
self.ps)
elif base == i32(_FR_fs):
ret = ((((
(days * 24 + int(self.hour)) * 60 + int(self.min)) * 60 +
int(self.sec)) * 1000000 + int(self.us)) * 1000000 +
int(self.ps)) * 1000 + int(self.as_) // 1000
elif base == i32(_FR_as):
ret = ((((
(days * 24 + int(self.hour)) * 60 + int(self.min)) * 60 +
int(self.sec)) * 1000000 + int(self.us)) * 1000000 +
int(self.ps)) * 1000000 + int(self.as_)
else:
raise ValueError(
"NumPy datetime metadata with corrupt unit value")
num = int(meta.num)
if num > 1:
if ret >= 0:
ret //= num
else:
ret = (ret - num + 1) // num
return ret
def from_datetime64(self, meta: DatetimeMetaData, dt: int):
days = 0
self.reset()
self.year = 1970
self.month = i32(1)
self.day = i32(1)
if dt == _DATETIME_NAT:
self.year = _DATETIME_NAT
return
base = meta.base
if base == i32(_FR_GENERIC):
raise ValueError(
"Cannot convert a NumPy datetime value other than NaT "
"with generic units")
dt *= int(meta.num)
if base == i32(_FR_Y):
self.year = 1970 + dt
elif base == i32(_FR_M):
year, dt = _extract_unit_64(dt, 12)
self.year = 1970 + year
self.month = i32(dt + 1)
elif base == i32(_FR_W):
self.set_days(dt * 7)
elif base == i32(_FR_D):
self.set_days(dt)
elif base == i32(_FR_h):
days, dt = _extract_unit_64(dt, 24)
self.set_days(days)
self.hour = i32(dt)
elif base == i32(_FR_m):
days, dt = _extract_unit_64(dt, 60 * 24)
self.set_days(days)
hour, dt = _extract_unit_64(dt, 60)
self.hour = i32(hour)
self.min = i32(dt)
elif base == i32(_FR_s):
days, dt = _extract_unit_64(dt, 60 * 60 * 24)
self.set_days(days)
hour, dt = _extract_unit_64(dt, 60 * 60)
self.hour = i32(hour)
min, dt = _extract_unit_64(dt, 60)
self.min = i32(min)
self.sec = i32(dt)
elif base == i32(_FR_ms):
days, dt = _extract_unit_64(dt, 1000 * 60 * 60 * 24)
self.set_days(days)
hour, dt = _extract_unit_64(dt, 1000 * 60 * 60)
self.hour = i32(hour)
min, dt = _extract_unit_64(dt, 1000 * 60)
self.min = i32(min)
sec, dt = _extract_unit_64(dt, 1000)
self.sec = i32(sec)
self.us = i32(dt * 1000)
elif base == i32(_FR_us):
days, dt = _extract_unit_64(dt, 1000 * 1000 * 60 * 60 * 24)
self.set_days(days)
hour, dt = _extract_unit_64(dt, 1000 * 1000 * 60 * 60)
self.hour = i32(hour)
min, dt = _extract_unit_64(dt, 1000 * 1000 * 60)
self.min = i32(min)
sec, dt = _extract_unit_64(dt, 1000 * 1000)
self.sec = i32(sec)
self.us = i32(dt)
elif base == i32(_FR_ns):
days, dt = _extract_unit_64(dt, 1000 * 1000 * 1000 * 60 * 60 * 24)
self.set_days(days)
hour, dt = _extract_unit_64(dt, 1000 * 1000 * 1000 * 60 * 60)
self.hour = i32(hour)
min, dt = _extract_unit_64(dt, 1000 * 1000 * 1000 * 60)
self.min = i32(min)
sec, dt = _extract_unit_64(dt, 1000 * 1000 * 1000)
self.sec = i32(sec)
us, dt = _extract_unit_64(dt, 1000)
self.us = i32(us)
self.ps = i32(dt * 1000)
elif base == i32(_FR_ps):
days, dt = _extract_unit_64(
dt, 1000 * 1000 * 1000 * 1000 * 60 * 60 * 24)
self.set_days(days)
hour, dt = _extract_unit_64(dt,
1000 * 1000 * 1000 * 1000 * 60 * 60)
self.hour = i32(hour)
min, dt = _extract_unit_64(dt, 1000 * 1000 * 1000 * 1000 * 60)
self.min = i32(min)
sec, dt = _extract_unit_64(dt, 1000 * 1000 * 1000 * 1000)
self.sec = i32(sec)
us, dt = _extract_unit_64(dt, 1000 * 1000)
self.us = i32(us)
self.ps = i32(dt)
elif base == i32(_FR_fs):
hour, dt = _extract_unit_64(
dt, 1000 * 1000 * 1000 * 1000 * 1000 * 60 * 60)
self.hour = i32(hour)
if self.hour < i32(0):
self.year = 1969
self.month = i32(12)
self.day = i32(31)
self.hour += i32(24)
min, dt = _extract_unit_64(dt,
1000 * 1000 * 1000 * 1000 * 1000 * 60)
self.min = i32(min)
sec, dt = _extract_unit_64(dt, 1000 * 1000 * 1000 * 1000 * 1000)
self.sec = i32(sec)
us, dt = _extract_unit_64(dt, 1000 * 1000 * 1000)
self.us = i32(us)
ps, dt = _extract_unit_64(dt, 1000)
self.ps = i32(ps)
self.as_ = i32(dt * 1000)
elif base == i32(_FR_as):
sec, dt = _extract_unit_64(dt,
1000 * 1000 * 1000 * 1000 * 1000 * 1000)
self.sec = i32(sec)
if self.sec < i32(0):
self.year = 1969
self.month = i32(12)
self.day = i32(31)
self.hour = i32(23)
self.min = i32(59)
self.sec += i32(60)
us, dt = _extract_unit_64(dt, 1000 * 1000 * 1000 * 1000)
self.us = i32(us)
ps, dt = _extract_unit_64(dt, 1000 * 1000)
self.ps = i32(ps)
self.as_ = i32(dt)
else:
raise ValueError(
"NumPy datetime metadata is corrupted with invalid base unit")
def utc_to_local(self, out: datetimestruct):
rawtime = 0
localrawtime = 0
year_correction = 0
self.copy_to(out)
rawtime = self.days * 24 * 60 * 60
rawtime += int(self.hour) * 60 * 60
rawtime += int(self.min) * 60
tm = _get_localtime(rawtime)
out.min = i32(int(tm.min))
out.hour = i32(int(tm.hour))
out.day = i32(int(tm.mday))
out.month = i32(int(tm.mon) + 1)
out.year = int(tm.year) + 1900
rawtime = _cdiv(rawtime, 60)
localrawtime = out.days * 24 * 60
localrawtime += int(out.hour) * 60
localrawtime += int(out.min)
timezone_offset = localrawtime - rawtime
out.year += year_correction
return timezone_offset
def lossless_unit(self):
if _cmod(self.as_, i32(1000)):
return _FR_as
elif self.as_:
return _FR_fs
elif _cmod(self.ps, i32(1000)):
return _FR_ps
elif self.ps:
return _FR_ns
elif _cmod(self.us, i32(1000)):
return _FR_us
elif self.us:
return _FR_ms
elif self.sec:
return _FR_s
elif self.min:
return _FR_m
elif self.hour:
return _FR_h
elif self.day != i32(1):
return _FR_D
elif self.month != i32(1):
return _FR_M
else:
return _FR_Y
def parse_iso8601(self, s: str):
def tolower(b):
return byte(int(_C.tolower(i32(int(b)))))
def isspace(b):
return bool(_C.isspace(i32(int(b))))
def isdigit(b):
return bool(_C.isdigit(i32(int(b))))
def strcmp(str_: cobj, len_: int, vals):
if len_ != staticlen(vals):
return False
for i in staticrange(staticlen(vals)):
if tolower(str_[i]) != byte(vals[i]):
return False
return True
def parse_error(s: str, substr: cobj):
raise ValueError(
f"Error parsing datetime string \"{s}\" at position {substr - s.ptr}"
)
def parse_timezone(self, s: str, substr: cobj, sublen: int):
# Note: parsing timezones is deprecated in NumPy
if sublen == 0:
return
if substr[0] == byte(90):
if sublen == 1:
return
else:
substr += 1
sublen -= 1
elif substr[0] == byte(45) or substr[0] == byte(43):
offset_neg = False
offset_hour = i32(0)
offset_minute = i32(0)
if substr[0] == byte(45):
offset_neg = True
substr += 1
sublen -= 1
# Hours offset
if sublen >= 2 and isdigit(substr[0]) and isdigit(substr[1]):
offset_hour = i32(10 * (int(substr[0]) - 48) +
(int(substr[1]) - 48))
substr += 2
sublen -= 2
if offset_hour >= i32(24):
raise ValueError(
f"Timezone hours offset out of range in datetime string \"{s}\""
)
else:
parse_error(s, substr)
if sublen > 0:
if substr[0] == byte(58):
substr += 1
sublen -= 1
# Minutes offset (optional)
if sublen >= 2 and isdigit(substr[0]) and isdigit(
substr[1]):
offset_minute = i32(10 * (int(substr[0]) - 48) +
(int(substr[1]) - 48))
substr += 2
sublen -= 2
if offset_minute >= i32(60):
raise ValueError(
f"Timezone minutes offset out of range in datetime string \"{s}\""
)
else:
parse_error(s, substr)
if offset_neg:
offset_hour = -offset_hour
offset_minute = -offset_minute
self.add_minutes(i32(-60) * offset_hour - offset_minute)
while sublen > 0 and isspace(substr[0]):
substr += 1
sublen -= 1
if sublen != 0:
parse_error(s, substr)
self.reset()
self.month = i32(1)
self.day = i32(1)
bestunit = 0
str_ = s.ptr
len_ = len(s)
if len_ <= 0 or strcmp(str_, len_, (110, 97, 116)): # 'nat'
self.year = _DATETIME_NAT
bestunit = _FR_GENERIC
return bestunit
#if unit == _FR_GENERIC:
# raise ValueError("Cannot create a NumPy datetime other than NaT with generic units")
if strcmp(str_, len_, (116, 111, 100, 97, 121)): # 'today'
tm = _get_localtime(-1)
self.year = int(tm.year) + 1900
self.month = i32(int(tm.mon) + 1)
self.day = i32(int(tm.mday))
bestunit = _FR_D
return bestunit
if strcmp(str_, len_, (110, 111, 119)): # 'now'
rawtime = _get_time()
meta = DatetimeMetaData(base=_FR_s)
bestunit = _FR_s
self.from_datetime64(meta, rawtime)
bestunit = _FR_s
return bestunit
substr = str_
sublen = len_
while sublen > 0 and isspace(substr[0]):
substr += 1
sublen -= 1
if substr[0] == byte(45) or substr[0] == byte(43):
substr += 1
sublen -= 1
if sublen == 0:
parse_error(s, substr)
# Parse year
self.year = 0
while sublen > 0 and isdigit(substr[0]):
self.year = 10 * self.year + (int(substr[0]) - 48)
substr += 1
sublen -= 1
if str_[0] == byte(45):
self.year = -self.year
year_leap = _is_leapyear(self.year)
if sublen == 0:
bestunit = _FR_Y
return bestunit
elif substr[0] == byte(45):
substr += 1
sublen -= 1
else:
parse_error(s, substr)
if sublen == 0:
parse_error(s, substr)
# Parse month
if sublen >= 2 and isdigit(substr[0]) and isdigit(substr[1]):
self.month = i32(10 * (int(substr[0]) - 48) +
(int(substr[1]) - 48))
if self.month < i32(1) or self.month > i32(12):
raise ValueError(
f"Month out of range in datetime string \"{s}\"")
substr += 2
sublen -= 2
else:
parse_error(s, substr)
if sublen == 0:
bestunit = _FR_M
return bestunit
elif substr[0] == byte(45):
substr += 1
sublen -= 1
else:
parse_error(s, substr)
if sublen == 0:
parse_error(s, substr)
# Parse day
if sublen >= 2 and isdigit(substr[0]) and isdigit(substr[1]):
self.day = i32(10 * (int(substr[0]) - 48) + (int(substr[1]) - 48))
if self.day < i32(1) or self.day > i32(
_days_per_month_table[int(year_leap)][int(self.month) -
1]):
raise ValueError(
f"Day out of range in datetime string \"{s}\"")
substr += 2
sublen -= 2
else:
parse_error(s, substr)
# Next char must be 'T', ' ' or end of string
if sublen == 0:
bestunit = _FR_D
return bestunit
elif substr[0] != byte(84) and substr[0] != byte(32):
parse_error(s, substr)
else:
substr += 1
sublen -= 1
# Parse hour
if sublen >= 2 and isdigit(substr[0]) and isdigit(substr[1]):
self.hour = i32(10 * (int(substr[0]) - 48) + (int(substr[1]) - 48))
if self.hour >= i32(24):
raise ValueError(
f"Hours out of range in datetime string \"{s}\"")
substr += 2
sublen -= 2
else:
parse_error(s, substr)
if sublen > 0 and substr[0] == byte(58):
substr += 1
sublen -= 1
else:
bestunit = _FR_h
parse_timezone(self, s, substr, sublen)
return bestunit
if sublen == 0:
parse_error(s, substr)
# Parse minutes
if sublen >= 2 and isdigit(substr[0]) and isdigit(substr[1]):
self.min = i32(10 * (int(substr[0]) - 48) + (int(substr[1]) - 48))
if self.min >= i32(60):
raise ValueError(
f"Minutes out of range in datetime string \"{s}\"")
substr += 2
sublen -= 2
else:
parse_error(s, substr)
if sublen > 0 and substr[0] == byte(58):
substr += 1
sublen -= 1
else:
bestunit = _FR_m
parse_timezone(self, s, substr, sublen)
return bestunit
if sublen == 0:
parse_error(s, substr)
# Parse seconds
if sublen >= 2 and isdigit(substr[0]) and isdigit(substr[1]):
self.sec = i32(10 * (int(substr[0]) - 48) + (int(substr[1]) - 48))
if self.sec >= i32(60):
raise ValueError(
f"Seconds out of range in datetime string \"{s}\"")
substr += 2
sublen -= 2
else:
parse_error(s, substr)
if sublen > 0 and substr[0] == byte(46):
substr += 1
sublen -= 1
else:
bestunit = _FR_s
parse_timezone(self, s, substr, sublen)
return bestunit
# Parse microsec
numdigits = 0
for i in range(6):
self.us *= i32(10)
if sublen > 0 and isdigit(substr[0]):
self.us += i32(int(substr[0]) - 48)
substr += 1
sublen -= 1
numdigits += 1
if sublen == 0 or not isdigit(substr[0]):
if numdigits > 3:
bestunit = _FR_us
else:
bestunit = _FR_ms
parse_timezone(self, s, substr, sublen)
return bestunit
# Parse picosec
numdigits = 0
for i in range(6):
self.ps *= i32(10)
if sublen > 0 and isdigit(substr[0]):
self.ps += i32(int(substr[0]) - 48)
substr += 1
sublen -= 1
numdigits += 1
if sublen == 0 or not isdigit(substr[0]):
if numdigits > 3:
bestunit = _FR_ps
else:
bestunit = _FR_ns
parse_timezone(self, s, substr, sublen)
return bestunit
# Parse attosec
numdigits = 0
for i in range(6):
self.as_ *= i32(10)
if sublen > 0 and isdigit(substr[0]):
self.as_ += i32(int(substr[0]) - 48)
substr += 1
sublen -= 1
numdigits += 1
if numdigits > 3:
bestunit = _FR_as
else:
bestunit = _FR_fs
parse_timezone(self, s, substr, sublen)
return bestunit
def to_str(self,
unit: int,
local: bool = False,
utc: bool = False,
tzoffset: int = -1):
def date_length(unit: int):
if unit == _FR_Y:
return 4
if unit == _FR_M:
return 7
if unit == _FR_D or unit == _FR_W:
return 10
if unit == _FR_h:
return 13
if unit == _FR_m:
return 16
if unit == _FR_s:
return 19
if unit == _FR_ms:
return 23
if unit == _FR_us:
return 26
if unit == _FR_ns:
return 29
if unit == _FR_ps:
return 32
if unit == _FR_fs:
return 35
if unit == _FR_as:
return 38
return 0
def timezone_length(local: bool, utc: bool):
if local:
return 5
elif utc:
return 1
else:
return 0
def write_int(buf: _strbuf, n, width: int):
n = int(n)
if n < 0:
buf.append('-')
n = -n
elif n == 0:
for i in range(width):
buf.append('0')
return
# compute and append leading 0s
m = n
digits = 0
while m != 0:
digits += 1
m //= 10
for i in range(width - digits):
buf.append('0')
# append digits
i1 = buf.n
while n != 0:
digit = n % 10
b = byte(48 + digit)
buf.append(str(__ptr__(b), 1))
n //= 10
# reverse added digits
i2 = buf.n - 1
while i1 < i2:
c1 = buf.data[i1]
c2 = buf.data[i2]
buf.data[i1] = c2
buf.data[i2] = c1
i1 += 1
i2 -= 1
def add_timezone(buf: _strbuf, local: bool, utc: bool,
timezone_offset: int):
if local:
if timezone_offset < 0:
buf.append('-')
timezone_offset = -timezone_offset
else:
buf.append('+')
c0 = byte(_cmod(_cdiv(timezone_offset, 10 * 60), 10) + 48)
c1 = byte(_cmod(_cdiv(timezone_offset, 60), 10) + 48)
c2 = byte(
_cmod(_cdiv(_cmod(timezone_offset, 60), 10), 10) + 48)
c3 = byte(_cmod(_cmod(timezone_offset, 60), 10) + 48)
buf.append(str(__ptr__(c0), 1))
buf.append(str(__ptr__(c1), 1))
buf.append(str(__ptr__(c2), 1))
buf.append(str(__ptr__(c3), 1))
elif utc:
buf.append('Z')
if self.year == _DATETIME_NAT:
return "NaT"
if (self.year < 1970 or self.year >= 10000) and tzoffset == -1:
local = False
if unit == _FR_ERROR:
unit = self.lossless_unit()
if (unit < _FR_m and local) or unit == _FR_h:
unit = _FR_m
elif unit < _FR_D:
unit = _FR_D
if unit == _FR_W:
unit = _FR_D
timezone_offset = 0
if local:
tmp = datetimestruct()
if tzoffset == -1:
timezone_offset = self.utc_to_local(tmp)
else:
self.copy_to(tmp)
timezone_offset = tzoffset
tmp.add_minutes(i32(timezone_offset))
self = tmp
buf = _strbuf(capacity=(date_length(unit) +
timezone_length(local=local, utc=utc)))
write_int(buf, self.year, 4)
if unit == _FR_Y:
return buf.__str__()
buf.append('-')
write_int(buf, self.month, 2)
if unit == _FR_M:
return buf.__str__()
buf.append('-')
write_int(buf, self.day, 2)
if unit == _FR_D:
return buf.__str__()
buf.append('T')
write_int(buf, self.hour, 2)
if unit == _FR_h:
add_timezone(buf,
local=local,
utc=utc,
timezone_offset=timezone_offset)
return buf.__str__()
buf.append(':')
write_int(buf, self.min, 2)
if unit == _FR_m:
add_timezone(buf,
local=local,
utc=utc,
timezone_offset=timezone_offset)
return buf.__str__()
buf.append(':')
write_int(buf, self.sec, 2)
if unit == _FR_s:
add_timezone(buf,
local=local,
utc=utc,
timezone_offset=timezone_offset)
return buf.__str__()
buf.append('.')
write_int(buf, self.us // i32(1000), 3)
if unit == _FR_ms:
add_timezone(buf,
local=local,
utc=utc,
timezone_offset=timezone_offset)
return buf.__str__()
write_int(buf, self.us % i32(1000), 3)
if unit == _FR_us:
add_timezone(buf,
local=local,
utc=utc,
timezone_offset=timezone_offset)
return buf.__str__()
write_int(buf, self.ps // i32(1000), 3)
if unit == _FR_ns:
add_timezone(buf,
local=local,
utc=utc,
timezone_offset=timezone_offset)
return buf.__str__()
write_int(buf, self.ps % i32(1000), 3)
if unit == _FR_ps:
add_timezone(buf,
local=local,
utc=utc,
timezone_offset=timezone_offset)
return buf.__str__()
write_int(buf, self.as_ // i32(1000), 3)
if unit == _FR_fs:
add_timezone(buf,
local=local,
utc=utc,
timezone_offset=timezone_offset)
return buf.__str__()
write_int(buf, self.as_ % i32(1000), 3)
add_timezone(buf,
local=local,
utc=utc,
timezone_offset=timezone_offset)
return buf.__str__()
def __str__(self):
return self.to_str(_FR_as)
# Busday functionality
_BUSDAY_FORWARD: Static[int] = 0
_BUSDAY_FOLLOWING: Static[int] = _BUSDAY_FORWARD
_BUSDAY_BACKWARD: Static[int] = 1
_BUSDAY_PRECEDING: Static[int] = _BUSDAY_BACKWARD
_BUSDAY_MODIFIEDFOLLOWING: Static[int] = 2
_BUSDAY_MODIFIEDPRECEDING: Static[int] = 3
_BUSDAY_NAT: Static[int] = 4
_BUSDAY_RAISE: Static[int] = 5
@tuple
class _weekmask:
mask: u8
def __getitem__(self, idx: int):
return bool(self.mask & u8(1 << idx))
@property
def count(self):
return self.mask.popcnt()
def _get_day_of_week(date: datetime64['D', 1]):
day_of_week = _cmod(date.value - 4, 7)
if day_of_week < 0:
day_of_week += 7
return day_of_week
def _is_holiday(date: datetime64['D', 1], holidays_begin: Ptr[datetime64['D',
1]],
holidays_end: Ptr[datetime64['D', 1]]):
while holidays_begin < holidays_end:
trial = holidays_begin + (holidays_end - holidays_begin) // 2
if date.value < trial[0].value:
holidays_end = trial
elif date.value > trial[0].value:
holidays_begin = trial + 1
else:
return True
return False
def _find_earliest_holiday_on_or_after(date: datetime64['D', 1],
holidays_begin: Ptr[datetime64['D', 1]],
holidays_end: Ptr[datetime64['D', 1]]):
while holidays_begin < holidays_end:
trial = holidays_begin + (holidays_end - holidays_begin) // 2
if date.value < trial[0].value:
holidays_end = trial
elif date.value > trial[0].value:
holidays_begin = trial + 1
else:
return trial
return holidays_begin
def _find_earliest_holiday_after(date: datetime64['D', 1],
holidays_begin: Ptr[datetime64['D', 1]],
holidays_end: Ptr[datetime64['D', 1]]):
while holidays_begin < holidays_end:
trial = holidays_begin + (holidays_end - holidays_begin) // 2
if date.value < trial[0].value:
holidays_end = trial
elif date.value > trial[0].value:
holidays_begin = trial + 1
else:
return trial + 1
return holidays_begin
def _apply_busines_day_roll(date: datetime64['D', 1], roll: int,
weekmask: _weekmask,
holidays_begin: Ptr[datetime64['D', 1]],
holidays_end: Ptr[datetime64['D', 1]]):
if date._nat:
if roll == _BUSDAY_RAISE:
raise ValueError("NaT input in busday_offset")
else:
return date, 0
day_of_week = _get_day_of_week(date)
if (not weekmask[day_of_week]) or _is_holiday(date, holidays_begin,
holidays_end):
start_date = date
start_day_of_week = day_of_week
if roll == _BUSDAY_FOLLOWING or roll == _BUSDAY_MODIFIEDFOLLOWING:
while True:
date += 1
day_of_week += 1
if day_of_week == 7:
day_of_week = 0
if weekmask[day_of_week] and not _is_holiday(
date, holidays_begin, holidays_end):
break
if roll == _BUSDAY_MODIFIEDFOLLOWING:
if _days_to_month_number(
start_date.value) != _days_to_month_number(date.value):
date = start_date
day_of_week = start_day_of_week
while True:
date -= 1
day_of_week -= 1
if day_of_week == -1:
day_of_week = 6
if weekmask[day_of_week] and not _is_holiday(
date, holidays_begin, holidays_end):
break
elif roll == _BUSDAY_PRECEDING or roll == _BUSDAY_MODIFIEDPRECEDING:
while True:
date -= 1
day_of_week -= 1
if day_of_week == -1:
day_of_week = 6
if weekmask[day_of_week] and not _is_holiday(
date, holidays_begin, holidays_end):
break
if roll == _BUSDAY_MODIFIEDPRECEDING:
if _days_to_month_number(
start_date.value) != _days_to_month_number(date.value):
date = start_date
day_of_week = start_day_of_week
while True:
date += 1
day_of_week += 1
if day_of_week == 7:
day_of_week = 0
if weekmask[day_of_week] and not _is_holiday(
date, holidays_begin, holidays_end):
break
elif roll == _BUSDAY_NAT:
date = datetime64(_DATETIME_NAT, 'D')
elif roll == _BUSDAY_RAISE:
raise ValueError("Non-business day date in busday_offset")
return date, day_of_week
def _apply_busines_day_offset(date: datetime64['D', 1], offset: int, roll: int,
weekmask: _weekmask, busdays_in_weekmask: int,
holidays_begin: Ptr[datetime64['D', 1]],
holidays_end: Ptr[datetime64['D', 1]]):
date, day_of_week = _apply_busines_day_roll(date, roll, weekmask,
holidays_begin, holidays_end)
if date._nat:
return date
if offset > 0:
holidays_begin = _find_earliest_holiday_on_or_after(
date, holidays_begin, holidays_end)
date += _cdiv(offset, busdays_in_weekmask) * 7
offset = _cmod(offset, busdays_in_weekmask)
holidays_temp = _find_earliest_holiday_after(date, holidays_begin,
holidays_end)
offset += holidays_temp - holidays_begin
holidays_begin = holidays_temp
while offset > 0:
date += 1
day_of_week += 1
if day_of_week == 7:
day_of_week = 0
if weekmask[day_of_week] and not _is_holiday(
date, holidays_begin, holidays_end):
offset -= 1
elif offset < 0:
holidays_end = _find_earliest_holiday_after(date, holidays_begin,
holidays_end)
date += _cdiv(offset, busdays_in_weekmask) * 7
offset = _cmod(offset, busdays_in_weekmask)
holidays_temp = _find_earliest_holiday_on_or_after(
date, holidays_begin, holidays_end)
offset -= holidays_end - holidays_temp
holidays_end = holidays_temp
while offset < 0:
date -= 1
day_of_week -= 1
if day_of_week == -1:
day_of_week = 6
if weekmask[day_of_week] and not _is_holiday(
date, holidays_begin, holidays_end):
offset += 1
return date
def _apply_busines_day_count(date_begin: datetime64['D', 1],
date_end: datetime64['D', 1], weekmask: _weekmask,
busdays_in_weekmask: int,
holidays_begin: Ptr[datetime64['D', 1]],
holidays_end: Ptr[datetime64['D', 1]]):
if date_begin._nat or date_end._nat:
raise ValueError(
"Cannot compute a business day count with a NaT (not-a-time) date")
swapped = False
if date_begin.value == date_end.value:
return 0
elif date_begin.value > date_end.value:
tmp = date_begin
date_begin = date_end
date_end = tmp
swapped = True
date_begin += 1
date_end += 1
holidays_begin = _find_earliest_holiday_on_or_after(
date_begin, holidays_begin, holidays_end)
holidays_end = _find_earliest_holiday_on_or_after(date_end, holidays_begin,
holidays_end)
count = -(holidays_end - holidays_begin)
whole_weeks = _cdiv(date_end.value - date_begin.value, 7)
count += whole_weeks * busdays_in_weekmask
date_begin += whole_weeks * 7
if date_begin < date_end:
day_of_week = _get_day_of_week(date_begin)
while date_begin.value < date_end.value:
if weekmask[day_of_week]:
count += 1
date_begin += 1
day_of_week += 1
if day_of_week == 7:
day_of_week = 0
if swapped:
count = -count
return count
def _apply_is_business_day(date: datetime64['D', 1], weekmask: _weekmask,
holidays_begin: Ptr[datetime64['D', 1]],
holidays_end: Ptr[datetime64['D', 1]]):
day_of_week = _get_day_of_week(date)
return weekmask[day_of_week] and not _is_holiday(
date, holidays_begin, holidays_end) and not date._nat
class busdaycalendar:
_wm: _weekmask
_holidays: Ptr[datetime64['D', 1]]
_nholidays: int
def __init__(self, weekmask='1111100', holidays=None):
def invalid(weekmask):
raise ValueError(
f"Invalid business day weekmask string \"{weekmask}\""
)
n = len(weekmask)
mask = u8(0)
if isinstance(weekmask, str):
if n == 7 and (weekmask[0] == '0' or weekmask[0] == '1'):
for i in range(7):
w = weekmask[i]
if w == '1':
mask |= u8(1 << i)
elif w != '0':
invalid(weekmask)
else:
i = 0
while i < n:
while i < n and weekmask[i].isspace():
i += 1
if i == n:
break
elif i + 2 >= n:
invalid(weekmask)
w = weekmask[i:i+3]
j = 0
if w == "Mon":
j = 0
elif w == "Tue":
j = 1
elif w == "Wed":
j = 2
elif w == "Thu":
j = 3
elif w == "Fri":
j = 4
elif w == "Sat":
j = 5
elif w == "Sun":
j = 6
else:
invalid(weekmask)
mask |= u8(1 << j)
i += 3
else:
if n != 7:
raise ValueError(
"A business day weekmask array must have length 7")
for i in range(7):
w = weekmask[i]
if isinstance(w, bool):
if w:
mask |= u8(1 << i)
elif isinstance(w, int):
if w == 1:
mask |= u8(1 << i)
elif w != 0:
raise ValueError(
"A business day weekmask array must have all 1's and 0's"
)
else:
compile_error(
"weekmask array elements must be int or bool")
if not mask:
raise ValueError(
"Cannot construct a numpy.busdaycal with a weekmask of all zeros"
)
self._wm = _weekmask(mask)
if holidays is not None:
lastdate = _DATETIME_NAT
count = len(holidays)
hlist = List[datetime64['D', 1]](capacity=count)
for i in range(count):
if isinstance(holidays[i], datetime64['D', 1]):
hlist.append(holidays[i])
else:
hlist.append(datetime64(holidays[i], 'D'))
hlist.sort()
hptr = hlist.arr.ptr
trimcount = 0
for i in range(count):
date = hptr[i]
if not date._nat and date.value != lastdate:
day_of_week = _cmod(date.value - 4, 7)
if day_of_week < 0:
day_of_week += 7
if self._wm[day_of_week]:
hptr[trimcount] = date
trimcount += 1
lastdate = date.value
self._holidays = hptr
self._nholidays = trimcount
else:
self._nholidays = 0
self._holidays = Ptr[datetime64['D', 1]]()