1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15require 'spec_helper'
16
17Thread.abort_on_exception = true
18
19describe GRPC::Pool do
20  Pool = GRPC::Pool
21
22  describe '#new' do
23    it 'raises if a non-positive size is used' do
24      expect { Pool.new(0) }.to raise_error
25      expect { Pool.new(-1) }.to raise_error
26      expect { Pool.new(Object.new) }.to raise_error
27    end
28
29    it 'is constructed OK with a positive size' do
30      expect { Pool.new(1) }.not_to raise_error
31    end
32  end
33
34  describe '#ready_for_work?' do
35    it 'before start it is not ready' do
36      p = Pool.new(1)
37      expect(p.ready_for_work?).to be(false)
38    end
39
40    it 'it stops being ready after all workers are busy' do
41      p = Pool.new(5)
42      p.start
43
44      wait_mu = Mutex.new
45      wait_cv = ConditionVariable.new
46      wait = true
47
48      job = proc do
49        wait_mu.synchronize do
50          wait_cv.wait(wait_mu) while wait
51        end
52      end
53
54      5.times do
55        expect(p.ready_for_work?).to be(true)
56        p.schedule(&job)
57      end
58
59      expect(p.ready_for_work?).to be(false)
60
61      wait_mu.synchronize do
62        wait = false
63        wait_cv.broadcast
64      end
65    end
66  end
67
68  describe '#schedule' do
69    it 'return if the pool is already stopped' do
70      p = Pool.new(1)
71      p.stop
72      job = proc {}
73      expect { p.schedule(&job) }.to_not raise_error
74    end
75
76    it 'adds jobs that get run by the pool' do
77      p = Pool.new(1)
78      p.start
79      o, q = Object.new, Queue.new
80      job = proc { q.push(o) }
81      p.schedule(&job)
82      expect(q.pop).to be(o)
83      p.stop
84    end
85  end
86
87  describe '#stop' do
88    it 'works when there are no scheduled tasks' do
89      p = Pool.new(1)
90      expect { p.stop }.not_to raise_error
91    end
92
93    it 'stops jobs when there are long running jobs' do
94      p = Pool.new(1)
95      p.start
96
97      wait_forever_mu = Mutex.new
98      wait_forever_cv = ConditionVariable.new
99      wait_forever = true
100
101      job_running = Queue.new
102      job = proc do
103        job_running.push(Object.new)
104        wait_forever_mu.synchronize do
105          wait_forever_cv.wait while wait_forever
106        end
107      end
108      p.schedule(&job)
109      job_running.pop
110      expect { p.stop }.not_to raise_error
111    end
112  end
113
114  describe '#start' do
115    it 'runs jobs as they are scheduled' do
116      p = Pool.new(5)
117      o, q = Object.new, Queue.new
118      p.start
119      n = 5  # arbitrary
120      n.times do
121        p.schedule(o, &q.method(:push))
122        expect(q.pop).to be(o)
123      end
124      p.stop
125    end
126  end
127end
128