1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Helpful utilities related to the stream module."""
15
16import logging
17import threading
18
19from grpc.framework.foundation import stream
20
21_NO_VALUE = object()
22logging.basicConfig()
23_LOGGER = logging.getLogger(__name__)
24
25
26class TransformingConsumer(stream.Consumer):
27    """A stream.Consumer that passes a transformation of its input to another."""
28
29    def __init__(self, transformation, downstream):
30        self._transformation = transformation
31        self._downstream = downstream
32
33    def consume(self, value):
34        self._downstream.consume(self._transformation(value))
35
36    def terminate(self):
37        self._downstream.terminate()
38
39    def consume_and_terminate(self, value):
40        self._downstream.consume_and_terminate(self._transformation(value))
41
42
43class IterableConsumer(stream.Consumer):
44    """A Consumer that when iterated over emits the values it has consumed."""
45
46    def __init__(self):
47        self._condition = threading.Condition()
48        self._values = []
49        self._active = True
50
51    def consume(self, value):
52        with self._condition:
53            if self._active:
54                self._values.append(value)
55                self._condition.notify()
56
57    def terminate(self):
58        with self._condition:
59            self._active = False
60            self._condition.notify()
61
62    def consume_and_terminate(self, value):
63        with self._condition:
64            if self._active:
65                self._values.append(value)
66                self._active = False
67                self._condition.notify()
68
69    def __iter__(self):
70        return self
71
72    def __next__(self):
73        return self.next()
74
75    def next(self):
76        with self._condition:
77            while self._active and not self._values:
78                self._condition.wait()
79            if self._values:
80                return self._values.pop(0)
81            else:
82                raise StopIteration()
83
84
85class ThreadSwitchingConsumer(stream.Consumer):
86    """A Consumer decorator that affords serialization and asynchrony."""
87
88    def __init__(self, sink, pool):
89        self._lock = threading.Lock()
90        self._sink = sink
91        self._pool = pool
92        # True if self._spin has been submitted to the pool to be called once and
93        # that call has not yet returned, False otherwise.
94        self._spinning = False
95        self._values = []
96        self._active = True
97
98    def _spin(self, sink, value, terminate):
99        while True:
100            try:
101                if value is _NO_VALUE:
102                    sink.terminate()
103                elif terminate:
104                    sink.consume_and_terminate(value)
105                else:
106                    sink.consume(value)
107            except Exception as e:  # pylint:disable=broad-except
108                _LOGGER.exception(e)
109
110            with self._lock:
111                if terminate:
112                    self._spinning = False
113                    return
114                elif self._values:
115                    value = self._values.pop(0)
116                    terminate = not self._values and not self._active
117                elif not self._active:
118                    value = _NO_VALUE
119                    terminate = True
120                else:
121                    self._spinning = False
122                    return
123
124    def consume(self, value):
125        with self._lock:
126            if self._active:
127                if self._spinning:
128                    self._values.append(value)
129                else:
130                    self._pool.submit(self._spin, self._sink, value, False)
131                    self._spinning = True
132
133    def terminate(self):
134        with self._lock:
135            if self._active:
136                self._active = False
137                if not self._spinning:
138                    self._pool.submit(self._spin, self._sink, _NO_VALUE, True)
139                    self._spinning = True
140
141    def consume_and_terminate(self, value):
142        with self._lock:
143            if self._active:
144                self._active = False
145                if self._spinning:
146                    self._values.append(value)
147                else:
148                    self._pool.submit(self._spin, self._sink, value, True)
149                    self._spinning = True
150