1#!/usr/bin/env python
2
3'''
4Multithreaded video processing sample.
5Usage:
6   video_threaded.py {<video device number>|<video file name>}
7
8   Shows how python threading capabilities can be used
9   to organize parallel captured frame processing pipeline
10   for smoother playback.
11
12Keyboard shortcuts:
13
14   ESC - exit
15   space - switch between multi and single threaded processing
16'''
17
18
19import numpy as np
20import cv2
21
22from multiprocessing.pool import ThreadPool
23from collections import deque
24
25from common import clock, draw_str, StatValue
26import video
27
28
29class DummyTask:
30    def __init__(self, data):
31        self.data = data
32    def ready(self):
33        return True
34    def get(self):
35        return self.data
36
37if __name__ == '__main__':
38    import sys
39
40    print __doc__
41
42    try:
43        fn = sys.argv[1]
44    except:
45        fn = 0
46    cap = video.create_capture(fn)
47
48
49    def process_frame(frame, t0):
50        # some intensive computation...
51        frame = cv2.medianBlur(frame, 19)
52        frame = cv2.medianBlur(frame, 19)
53        return frame, t0
54
55    threadn = cv2.getNumberOfCPUs()
56    pool = ThreadPool(processes = threadn)
57    pending = deque()
58
59    threaded_mode = True
60
61    latency = StatValue()
62    frame_interval = StatValue()
63    last_frame_time = clock()
64    while True:
65        while len(pending) > 0 and pending[0].ready():
66            res, t0 = pending.popleft().get()
67            latency.update(clock() - t0)
68            draw_str(res, (20, 20), "threaded      :  " + str(threaded_mode))
69            draw_str(res, (20, 40), "latency        :  %.1f ms" % (latency.value*1000))
70            draw_str(res, (20, 60), "frame interval :  %.1f ms" % (frame_interval.value*1000))
71            cv2.imshow('threaded video', res)
72        if len(pending) < threadn:
73            ret, frame = cap.read()
74            t = clock()
75            frame_interval.update(t - last_frame_time)
76            last_frame_time = t
77            if threaded_mode:
78                task = pool.apply_async(process_frame, (frame.copy(), t))
79            else:
80                task = DummyTask(process_frame(frame, t))
81            pending.append(task)
82        ch = 0xFF & cv2.waitKey(1)
83        if ch == ord(' '):
84            threaded_mode = not threaded_mode
85        if ch == 27:
86            break
87cv2.destroyAllWindows()
88