1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Helpful utilities related to the stream module.""" 15 16import logging 17import threading 18 19from grpc.framework.foundation import stream 20 21_NO_VALUE = object() 22logging.basicConfig() 23_LOGGER = logging.getLogger(__name__) 24 25 26class TransformingConsumer(stream.Consumer): 27 """A stream.Consumer that passes a transformation of its input to another.""" 28 29 def __init__(self, transformation, downstream): 30 self._transformation = transformation 31 self._downstream = downstream 32 33 def consume(self, value): 34 self._downstream.consume(self._transformation(value)) 35 36 def terminate(self): 37 self._downstream.terminate() 38 39 def consume_and_terminate(self, value): 40 self._downstream.consume_and_terminate(self._transformation(value)) 41 42 43class IterableConsumer(stream.Consumer): 44 """A Consumer that when iterated over emits the values it has consumed.""" 45 46 def __init__(self): 47 self._condition = threading.Condition() 48 self._values = [] 49 self._active = True 50 51 def consume(self, value): 52 with self._condition: 53 if self._active: 54 self._values.append(value) 55 self._condition.notify() 56 57 def terminate(self): 58 with self._condition: 59 self._active = False 60 self._condition.notify() 61 62 def consume_and_terminate(self, value): 63 with self._condition: 64 if self._active: 65 self._values.append(value) 66 self._active = False 67 self._condition.notify() 68 69 def __iter__(self): 70 return self 71 72 def __next__(self): 73 return self.next() 74 75 def next(self): 76 with self._condition: 77 while self._active and not self._values: 78 self._condition.wait() 79 if self._values: 80 return self._values.pop(0) 81 else: 82 raise StopIteration() 83 84 85class ThreadSwitchingConsumer(stream.Consumer): 86 """A Consumer decorator that affords serialization and asynchrony.""" 87 88 def __init__(self, sink, pool): 89 self._lock = threading.Lock() 90 self._sink = sink 91 self._pool = pool 92 # True if self._spin has been submitted to the pool to be called once and 93 # that call has not yet returned, False otherwise. 94 self._spinning = False 95 self._values = [] 96 self._active = True 97 98 def _spin(self, sink, value, terminate): 99 while True: 100 try: 101 if value is _NO_VALUE: 102 sink.terminate() 103 elif terminate: 104 sink.consume_and_terminate(value) 105 else: 106 sink.consume(value) 107 except Exception as e: # pylint:disable=broad-except 108 _LOGGER.exception(e) 109 110 with self._lock: 111 if terminate: 112 self._spinning = False 113 return 114 elif self._values: 115 value = self._values.pop(0) 116 terminate = not self._values and not self._active 117 elif not self._active: 118 value = _NO_VALUE 119 terminate = True 120 else: 121 self._spinning = False 122 return 123 124 def consume(self, value): 125 with self._lock: 126 if self._active: 127 if self._spinning: 128 self._values.append(value) 129 else: 130 self._pool.submit(self._spin, self._sink, value, False) 131 self._spinning = True 132 133 def terminate(self): 134 with self._lock: 135 if self._active: 136 self._active = False 137 if not self._spinning: 138 self._pool.submit(self._spin, self._sink, _NO_VALUE, True) 139 self._spinning = True 140 141 def consume_and_terminate(self, value): 142 with self._lock: 143 if self._active: 144 self._active = False 145 if self._spinning: 146 self._values.append(value) 147 else: 148 self._pool.submit(self._spin, self._sink, value, True) 149 self._spinning = True 150