1import struct
2
3
4def load_tzdata(key):
5    import importlib.resources
6
7    components = key.split("/")
8    package_name = ".".join(["tzdata.zoneinfo"] + components[:-1])
9    resource_name = components[-1]
10
11    try:
12        return importlib.resources.open_binary(package_name, resource_name)
13    except (ImportError, FileNotFoundError, UnicodeEncodeError):
14        # There are three types of exception that can be raised that all amount
15        # to "we cannot find this key":
16        #
17        # ImportError: If package_name doesn't exist (e.g. if tzdata is not
18        #   installed, or if there's an error in the folder name like
19        #   Amrica/New_York)
20        # FileNotFoundError: If resource_name doesn't exist in the package
21        #   (e.g. Europe/Krasnoy)
22        # UnicodeEncodeError: If package_name or resource_name are not UTF-8,
23        #   such as keys containing a surrogate character.
24        raise ZoneInfoNotFoundError(f"No time zone found with key {key}")
25
26
27def load_data(fobj):
28    header = _TZifHeader.from_file(fobj)
29
30    if header.version == 1:
31        time_size = 4
32        time_type = "l"
33    else:
34        # Version 2+ has 64-bit integer transition times
35        time_size = 8
36        time_type = "q"
37
38        # Version 2+ also starts with a Version 1 header and data, which
39        # we need to skip now
40        skip_bytes = (
41            header.timecnt * 5  # Transition times and types
42            + header.typecnt * 6  # Local time type records
43            + header.charcnt  # Time zone designations
44            + header.leapcnt * 8  # Leap second records
45            + header.isstdcnt  # Standard/wall indicators
46            + header.isutcnt  # UT/local indicators
47        )
48
49        fobj.seek(skip_bytes, 1)
50
51        # Now we need to read the second header, which is not the same
52        # as the first
53        header = _TZifHeader.from_file(fobj)
54
55    typecnt = header.typecnt
56    timecnt = header.timecnt
57    charcnt = header.charcnt
58
59    # The data portion starts with timecnt transitions and indices
60    if timecnt:
61        trans_list_utc = struct.unpack(
62            f">{timecnt}{time_type}", fobj.read(timecnt * time_size)
63        )
64        trans_idx = struct.unpack(f">{timecnt}B", fobj.read(timecnt))
65    else:
66        trans_list_utc = ()
67        trans_idx = ()
68
69    # Read the ttinfo struct, (utoff, isdst, abbrind)
70    if typecnt:
71        utcoff, isdst, abbrind = zip(
72            *(struct.unpack(">lbb", fobj.read(6)) for i in range(typecnt))
73        )
74    else:
75        utcoff = ()
76        isdst = ()
77        abbrind = ()
78
79    # Now read the abbreviations. They are null-terminated strings, indexed
80    # not by position in the array but by position in the unsplit
81    # abbreviation string. I suppose this makes more sense in C, which uses
82    # null to terminate the strings, but it's inconvenient here...
83    abbr_vals = {}
84    abbr_chars = fobj.read(charcnt)
85
86    def get_abbr(idx):
87        # Gets a string starting at idx and running until the next \x00
88        #
89        # We cannot pre-populate abbr_vals by splitting on \x00 because there
90        # are some zones that use subsets of longer abbreviations, like so:
91        #
92        #  LMT\x00AHST\x00HDT\x00
93        #
94        # Where the idx to abbr mapping should be:
95        #
96        # {0: "LMT", 4: "AHST", 5: "HST", 9: "HDT"}
97        if idx not in abbr_vals:
98            span_end = abbr_chars.find(b"\x00", idx)
99            abbr_vals[idx] = abbr_chars[idx:span_end].decode()
100
101        return abbr_vals[idx]
102
103    abbr = tuple(get_abbr(idx) for idx in abbrind)
104
105    # The remainder of the file consists of leap seconds (currently unused) and
106    # the standard/wall and ut/local indicators, which are metadata we don't need.
107    # In version 2 files, we need to skip the unnecessary data to get at the TZ string:
108    if header.version >= 2:
109        # Each leap second record has size (time_size + 4)
110        skip_bytes = header.isutcnt + header.isstdcnt + header.leapcnt * 12
111        fobj.seek(skip_bytes, 1)
112
113        c = fobj.read(1)  # Should be \n
114        assert c == b"\n", c
115
116        tz_bytes = b""
117        while (c := fobj.read(1)) != b"\n":
118            tz_bytes += c
119
120        tz_str = tz_bytes
121    else:
122        tz_str = None
123
124    return trans_idx, trans_list_utc, utcoff, isdst, abbr, tz_str
125
126
127class _TZifHeader:
128    __slots__ = [
129        "version",
130        "isutcnt",
131        "isstdcnt",
132        "leapcnt",
133        "timecnt",
134        "typecnt",
135        "charcnt",
136    ]
137
138    def __init__(self, *args):
139        assert len(self.__slots__) == len(args)
140        for attr, val in zip(self.__slots__, args):
141            setattr(self, attr, val)
142
143    @classmethod
144    def from_file(cls, stream):
145        # The header starts with a 4-byte "magic" value
146        if stream.read(4) != b"TZif":
147            raise ValueError("Invalid TZif file: magic not found")
148
149        _version = stream.read(1)
150        if _version == b"\x00":
151            version = 1
152        else:
153            version = int(_version)
154        stream.read(15)
155
156        args = (version,)
157
158        # Slots are defined in the order that the bytes are arranged
159        args = args + struct.unpack(">6l", stream.read(24))
160
161        return cls(*args)
162
163
164class ZoneInfoNotFoundError(KeyError):
165    """Exception raised when a ZoneInfo key is not found."""
166