1import contextlib
2import sys
3import os
4import unittest
5from test import support
6import time
7
8resource = support.import_module('resource')
9
10# This test is checking a few specific problem spots with the resource module.
11
12class ResourceTest(unittest.TestCase):
13
14    def test_args(self):
15        self.assertRaises(TypeError, resource.getrlimit)
16        self.assertRaises(TypeError, resource.getrlimit, 42, 42)
17        self.assertRaises(TypeError, resource.setrlimit)
18        self.assertRaises(TypeError, resource.setrlimit, 42, 42, 42)
19
20    def test_fsize_ismax(self):
21        try:
22            (cur, max) = resource.getrlimit(resource.RLIMIT_FSIZE)
23        except AttributeError:
24            pass
25        else:
26            # RLIMIT_FSIZE should be RLIM_INFINITY, which will be a really big
27            # number on a platform with large file support.  On these platforms,
28            # we need to test that the get/setrlimit functions properly convert
29            # the number to a C long long and that the conversion doesn't raise
30            # an error.
31            self.assertEqual(resource.RLIM_INFINITY, max)
32            resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max))
33
34    def test_fsize_enforced(self):
35        try:
36            (cur, max) = resource.getrlimit(resource.RLIMIT_FSIZE)
37        except AttributeError:
38            pass
39        else:
40            # Check to see what happens when the RLIMIT_FSIZE is small.  Some
41            # versions of Python were terminated by an uncaught SIGXFSZ, but
42            # pythonrun.c has been fixed to ignore that exception.  If so, the
43            # write() should return EFBIG when the limit is exceeded.
44
45            # At least one platform has an unlimited RLIMIT_FSIZE and attempts
46            # to change it raise ValueError instead.
47            try:
48                try:
49                    resource.setrlimit(resource.RLIMIT_FSIZE, (1024, max))
50                    limit_set = True
51                except ValueError:
52                    limit_set = False
53                f = open(support.TESTFN, "wb")
54                try:
55                    f.write(b"X" * 1024)
56                    try:
57                        f.write(b"Y")
58                        f.flush()
59                        # On some systems (e.g., Ubuntu on hppa) the flush()
60                        # doesn't always cause the exception, but the close()
61                        # does eventually.  Try flushing several times in
62                        # an attempt to ensure the file is really synced and
63                        # the exception raised.
64                        for i in range(5):
65                            time.sleep(.1)
66                            f.flush()
67                    except OSError:
68                        if not limit_set:
69                            raise
70                    if limit_set:
71                        # Close will attempt to flush the byte we wrote
72                        # Restore limit first to avoid getting a spurious error
73                        resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max))
74                finally:
75                    f.close()
76            finally:
77                if limit_set:
78                    resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max))
79                support.unlink(support.TESTFN)
80
81    def test_fsize_toobig(self):
82        # Be sure that setrlimit is checking for really large values
83        too_big = 10**50
84        try:
85            (cur, max) = resource.getrlimit(resource.RLIMIT_FSIZE)
86        except AttributeError:
87            pass
88        else:
89            try:
90                resource.setrlimit(resource.RLIMIT_FSIZE, (too_big, max))
91            except (OverflowError, ValueError):
92                pass
93            try:
94                resource.setrlimit(resource.RLIMIT_FSIZE, (max, too_big))
95            except (OverflowError, ValueError):
96                pass
97
98    def test_getrusage(self):
99        self.assertRaises(TypeError, resource.getrusage)
100        self.assertRaises(TypeError, resource.getrusage, 42, 42)
101        usageself = resource.getrusage(resource.RUSAGE_SELF)
102        usagechildren = resource.getrusage(resource.RUSAGE_CHILDREN)
103        # May not be available on all systems.
104        try:
105            usageboth = resource.getrusage(resource.RUSAGE_BOTH)
106        except (ValueError, AttributeError):
107            pass
108        try:
109            usage_thread = resource.getrusage(resource.RUSAGE_THREAD)
110        except (ValueError, AttributeError):
111            pass
112
113    # Issue 6083: Reference counting bug
114    def test_setrusage_refcount(self):
115        try:
116            limits = resource.getrlimit(resource.RLIMIT_CPU)
117        except AttributeError:
118            pass
119        else:
120            class BadSequence:
121                def __len__(self):
122                    return 2
123                def __getitem__(self, key):
124                    if key in (0, 1):
125                        return len(tuple(range(1000000)))
126                    raise IndexError
127
128            resource.setrlimit(resource.RLIMIT_CPU, BadSequence())
129
130    def test_pagesize(self):
131        pagesize = resource.getpagesize()
132        self.assertIsInstance(pagesize, int)
133        self.assertGreaterEqual(pagesize, 0)
134
135    @unittest.skipUnless(sys.platform == 'linux', 'test requires Linux')
136    def test_linux_constants(self):
137        for attr in ['MSGQUEUE', 'NICE', 'RTPRIO', 'RTTIME', 'SIGPENDING']:
138            with contextlib.suppress(AttributeError):
139                self.assertIsInstance(getattr(resource, 'RLIMIT_' + attr), int)
140
141    def test_freebsd_contants(self):
142        for attr in ['SWAP', 'SBSIZE', 'NPTS']:
143            with contextlib.suppress(AttributeError):
144                self.assertIsInstance(getattr(resource, 'RLIMIT_' + attr), int)
145
146    @unittest.skipUnless(hasattr(resource, 'prlimit'), 'no prlimit')
147    @support.requires_linux_version(2, 6, 36)
148    def test_prlimit(self):
149        self.assertRaises(TypeError, resource.prlimit)
150        self.assertRaises(ProcessLookupError, resource.prlimit,
151                          -1, resource.RLIMIT_AS)
152        limit = resource.getrlimit(resource.RLIMIT_AS)
153        self.assertEqual(resource.prlimit(0, resource.RLIMIT_AS), limit)
154        self.assertEqual(resource.prlimit(0, resource.RLIMIT_AS, limit),
155                         limit)
156
157    # Issue 20191: Reference counting bug
158    @unittest.skipUnless(hasattr(resource, 'prlimit'), 'no prlimit')
159    @support.requires_linux_version(2, 6, 36)
160    def test_prlimit_refcount(self):
161        class BadSeq:
162            def __len__(self):
163                return 2
164            def __getitem__(self, key):
165                return limits[key] - 1  # new reference
166
167        limits = resource.getrlimit(resource.RLIMIT_AS)
168        self.assertEqual(resource.prlimit(0, resource.RLIMIT_AS, BadSeq()),
169                         limits)
170
171
172def test_main(verbose=None):
173    support.run_unittest(ResourceTest)
174
175if __name__ == "__main__":
176    test_main()
177