1# Run the _testcapi module tests (tests for the Python/C API):  by defn,
2# these are all functions _testcapi exports whose name begins with 'test_'.
3
4from __future__ import with_statement
5import string
6import sys
7import time
8import random
9import unittest
10from test import test_support as support
11try:
12    import thread
13    import threading
14except ImportError:
15    thread = None
16    threading = None
17# Skip this test if the _testcapi module isn't available.
18_testcapi = support.import_module('_testcapi')
19
20class CAPITest(unittest.TestCase):
21
22    def test_buildvalue_N(self):
23        _testcapi.test_buildvalue_N()
24
25
26@unittest.skipUnless(threading, 'Threading required for this test.')
27class TestPendingCalls(unittest.TestCase):
28
29    def pendingcalls_submit(self, l, n):
30        def callback():
31            #this function can be interrupted by thread switching so let's
32            #use an atomic operation
33            l.append(None)
34
35        for i in range(n):
36            time.sleep(random.random()*0.02) #0.01 secs on average
37            #try submitting callback until successful.
38            #rely on regular interrupt to flush queue if we are
39            #unsuccessful.
40            while True:
41                if _testcapi._pending_threadfunc(callback):
42                    break;
43
44    def pendingcalls_wait(self, l, n, context = None):
45        #now, stick around until l[0] has grown to 10
46        count = 0;
47        while len(l) != n:
48            #this busy loop is where we expect to be interrupted to
49            #run our callbacks.  Note that callbacks are only run on the
50            #main thread
51            if False and support.verbose:
52                print "(%i)"%(len(l),),
53            for i in xrange(1000):
54                a = i*i
55            if context and not context.event.is_set():
56                continue
57            count += 1
58            self.assertTrue(count < 10000,
59                "timeout waiting for %i callbacks, got %i"%(n, len(l)))
60        if False and support.verbose:
61            print "(%i)"%(len(l),)
62
63    def test_pendingcalls_threaded(self):
64        #do every callback on a separate thread
65        n = 32 #total callbacks
66        threads = []
67        class foo(object):pass
68        context = foo()
69        context.l = []
70        context.n = 2 #submits per thread
71        context.nThreads = n // context.n
72        context.nFinished = 0
73        context.lock = threading.Lock()
74        context.event = threading.Event()
75
76        threads = [threading.Thread(target=self.pendingcalls_thread,
77                                    args=(context,))
78                   for i in range(context.nThreads)]
79        with support.start_threads(threads):
80            self.pendingcalls_wait(context.l, n, context)
81
82    def pendingcalls_thread(self, context):
83        try:
84            self.pendingcalls_submit(context.l, context.n)
85        finally:
86            with context.lock:
87                context.nFinished += 1
88                nFinished = context.nFinished
89                if False and support.verbose:
90                    print "finished threads: ", nFinished
91            if nFinished == context.nThreads:
92                context.event.set()
93
94    def test_pendingcalls_non_threaded(self):
95        #again, just using the main thread, likely they will all be dispatched at
96        #once.  It is ok to ask for too many, because we loop until we find a slot.
97        #the loop can be interrupted to dispatch.
98        #there are only 32 dispatch slots, so we go for twice that!
99        l = []
100        n = 64
101        self.pendingcalls_submit(l, n)
102        self.pendingcalls_wait(l, n)
103
104
105class TestGetIndices(unittest.TestCase):
106
107    def test_get_indices(self):
108        self.assertEqual(_testcapi.get_indices(slice(10L, 20, 1), 100), (0, 10, 20, 1))
109        self.assertEqual(_testcapi.get_indices(slice(10.1, 20, 1), 100), None)
110        self.assertEqual(_testcapi.get_indices(slice(10, 20L, 1), 100), (0, 10, 20, 1))
111        self.assertEqual(_testcapi.get_indices(slice(10, 20.1, 1), 100), None)
112
113        self.assertEqual(_testcapi.get_indices(slice(10L, 20, 1L), 100), (0, 10, 20, 1))
114        self.assertEqual(_testcapi.get_indices(slice(10.1, 20, 1L), 100), None)
115        self.assertEqual(_testcapi.get_indices(slice(10, 20L, 1L), 100), (0, 10, 20, 1))
116        self.assertEqual(_testcapi.get_indices(slice(10, 20.1, 1L), 100), None)
117
118
119@unittest.skipUnless(threading and thread, 'Threading required for this test.')
120class TestThreadState(unittest.TestCase):
121
122    @support.reap_threads
123    def test_thread_state(self):
124        # some extra thread-state tests driven via _testcapi
125        def target():
126            idents = []
127
128            def callback():
129                idents.append(thread.get_ident())
130
131            _testcapi._test_thread_state(callback)
132            a = b = callback
133            time.sleep(1)
134            # Check our main thread is in the list exactly 3 times.
135            self.assertEqual(idents.count(thread.get_ident()), 3,
136                             "Couldn't find main thread correctly in the list")
137
138        target()
139        t = threading.Thread(target=target)
140        t.start()
141        t.join()
142
143
144class Test_testcapi(unittest.TestCase):
145    locals().update((name, getattr(_testcapi, name))
146                    for name in dir(_testcapi)
147                    if name.startswith('test_') and not name.endswith('_code'))
148
149
150def test_main():
151    support.run_unittest(CAPITest, TestPendingCalls,
152                         TestThreadState, TestGetIndices, Test_testcapi)
153
154if __name__ == "__main__":
155    test_main()
156