1#
2# A test of `multiprocessing.Pool` class
3#
4# Copyright (c) 2006-2008, R Oudkerk
5# All rights reserved.
6#
7
8import multiprocessing
9import time
10import random
11import sys
12
13#
14# Functions used by test code
15#
16
17def calculate(func, args):
18    result = func(*args)
19    return '%s says that %s%s = %s' % (
20        multiprocessing.current_process().name,
21        func.__name__, args, result
22        )
23
24def calculatestar(args):
25    return calculate(*args)
26
27def mul(a, b):
28    time.sleep(0.5*random.random())
29    return a * b
30
31def plus(a, b):
32    time.sleep(0.5*random.random())
33    return a + b
34
35def f(x):
36    return 1.0 / (x-5.0)
37
38def pow3(x):
39    return x**3
40
41def noop(x):
42    pass
43
44#
45# Test code
46#
47
48def test():
49    print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
50
51    #
52    # Create pool
53    #
54
55    PROCESSES = 4
56    print 'Creating pool with %d processes\n' % PROCESSES
57    pool = multiprocessing.Pool(PROCESSES)
58    print 'pool = %s' % pool
59    print
60
61    #
62    # Tests
63    #
64
65    TASKS = [(mul, (i, 7)) for i in range(10)] + \
66            [(plus, (i, 8)) for i in range(10)]
67
68    results = [pool.apply_async(calculate, t) for t in TASKS]
69    imap_it = pool.imap(calculatestar, TASKS)
70    imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
71
72    print 'Ordered results using pool.apply_async():'
73    for r in results:
74        print '\t', r.get()
75    print
76
77    print 'Ordered results using pool.imap():'
78    for x in imap_it:
79        print '\t', x
80    print
81
82    print 'Unordered results using pool.imap_unordered():'
83    for x in imap_unordered_it:
84        print '\t', x
85    print
86
87    print 'Ordered results using pool.map() --- will block till complete:'
88    for x in pool.map(calculatestar, TASKS):
89        print '\t', x
90    print
91
92    #
93    # Simple benchmarks
94    #
95
96    N = 100000
97    print 'def pow3(x): return x**3'
98
99    t = time.time()
100    A = map(pow3, xrange(N))
101    print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
102          (N, time.time() - t)
103
104    t = time.time()
105    B = pool.map(pow3, xrange(N))
106    print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
107          (N, time.time() - t)
108
109    t = time.time()
110    C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
111    print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
112          ' seconds' % (N, N//8, time.time() - t)
113
114    assert A == B == C, (len(A), len(B), len(C))
115    print
116
117    L = [None] * 1000000
118    print 'def noop(x): pass'
119    print 'L = [None] * 1000000'
120
121    t = time.time()
122    A = map(noop, L)
123    print '\tmap(noop, L):\n\t\t%s seconds' % \
124          (time.time() - t)
125
126    t = time.time()
127    B = pool.map(noop, L)
128    print '\tpool.map(noop, L):\n\t\t%s seconds' % \
129          (time.time() - t)
130
131    t = time.time()
132    C = list(pool.imap(noop, L, chunksize=len(L)//8))
133    print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
134          (len(L)//8, time.time() - t)
135
136    assert A == B == C, (len(A), len(B), len(C))
137    print
138
139    del A, B, C, L
140
141    #
142    # Test error handling
143    #
144
145    print 'Testing error handling:'
146
147    try:
148        print pool.apply(f, (5,))
149    except ZeroDivisionError:
150        print '\tGot ZeroDivisionError as expected from pool.apply()'
151    else:
152        raise AssertionError('expected ZeroDivisionError')
153
154    try:
155        print pool.map(f, range(10))
156    except ZeroDivisionError:
157        print '\tGot ZeroDivisionError as expected from pool.map()'
158    else:
159        raise AssertionError('expected ZeroDivisionError')
160
161    try:
162        print list(pool.imap(f, range(10)))
163    except ZeroDivisionError:
164        print '\tGot ZeroDivisionError as expected from list(pool.imap())'
165    else:
166        raise AssertionError('expected ZeroDivisionError')
167
168    it = pool.imap(f, range(10))
169    for i in range(10):
170        try:
171            x = it.next()
172        except ZeroDivisionError:
173            if i == 5:
174                pass
175        except StopIteration:
176            break
177        else:
178            if i == 5:
179                raise AssertionError('expected ZeroDivisionError')
180
181    assert i == 9
182    print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
183    print
184
185    #
186    # Testing timeouts
187    #
188
189    print 'Testing ApplyResult.get() with timeout:',
190    res = pool.apply_async(calculate, TASKS[0])
191    while 1:
192        sys.stdout.flush()
193        try:
194            sys.stdout.write('\n\t%s' % res.get(0.02))
195            break
196        except multiprocessing.TimeoutError:
197            sys.stdout.write('.')
198    print
199    print
200
201    print 'Testing IMapIterator.next() with timeout:',
202    it = pool.imap(calculatestar, TASKS)
203    while 1:
204        sys.stdout.flush()
205        try:
206            sys.stdout.write('\n\t%s' % it.next(0.02))
207        except StopIteration:
208            break
209        except multiprocessing.TimeoutError:
210            sys.stdout.write('.')
211    print
212    print
213
214    #
215    # Testing callback
216    #
217
218    print 'Testing callback:'
219
220    A = []
221    B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
222
223    r = pool.apply_async(mul, (7, 8), callback=A.append)
224    r.wait()
225
226    r = pool.map_async(pow3, range(10), callback=A.extend)
227    r.wait()
228
229    if A == B:
230        print '\tcallbacks succeeded\n'
231    else:
232        print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
233
234    #
235    # Check there are no outstanding tasks
236    #
237
238    assert not pool._cache, 'cache = %r' % pool._cache
239
240    #
241    # Check close() methods
242    #
243
244    print 'Testing close():'
245
246    for worker in pool._pool:
247        assert worker.is_alive()
248
249    result = pool.apply_async(time.sleep, [0.5])
250    pool.close()
251    pool.join()
252
253    assert result.get() is None
254
255    for worker in pool._pool:
256        assert not worker.is_alive()
257
258    print '\tclose() succeeded\n'
259
260    #
261    # Check terminate() method
262    #
263
264    print 'Testing terminate():'
265
266    pool = multiprocessing.Pool(2)
267    DELTA = 0.1
268    ignore = pool.apply(pow3, [2])
269    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
270    pool.terminate()
271    pool.join()
272
273    for worker in pool._pool:
274        assert not worker.is_alive()
275
276    print '\tterminate() succeeded\n'
277
278    #
279    # Check garbage collection
280    #
281
282    print 'Testing garbage collection:'
283
284    pool = multiprocessing.Pool(2)
285    DELTA = 0.1
286    processes = pool._pool
287    ignore = pool.apply(pow3, [2])
288    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
289
290    results = pool = None
291
292    time.sleep(DELTA * 2)
293
294    for worker in processes:
295        assert not worker.is_alive()
296
297    print '\tgarbage collection succeeded\n'
298
299
300if __name__ == '__main__':
301    multiprocessing.freeze_support()
302
303    assert len(sys.argv) in (1, 2)
304
305    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
306        print ' Using processes '.center(79, '-')
307    elif sys.argv[1] == 'threads':
308        print ' Using threads '.center(79, '-')
309        import multiprocessing.dummy as multiprocessing
310    else:
311        print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
312        raise SystemExit(2)
313
314    test()
315