1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"fmt"
23	"math"
24	"sync"
25	"sync/atomic"
26	"time"
27)
28
29const (
30	// The default value of flow control window size in HTTP2 spec.
31	defaultWindowSize = 65535
32	// The initial window size for flow control.
33	initialWindowSize             = defaultWindowSize // for an RPC
34	infinity                      = time.Duration(math.MaxInt64)
35	defaultClientKeepaliveTime    = infinity
36	defaultClientKeepaliveTimeout = 20 * time.Second
37	defaultMaxStreamsClient       = 100
38	defaultMaxConnectionIdle      = infinity
39	defaultMaxConnectionAge       = infinity
40	defaultMaxConnectionAgeGrace  = infinity
41	defaultServerKeepaliveTime    = 2 * time.Hour
42	defaultServerKeepaliveTimeout = 20 * time.Second
43	defaultKeepalivePolicyMinTime = 5 * time.Minute
44	// max window limit set by HTTP2 Specs.
45	maxWindowSize = math.MaxInt32
46	// defaultWriteQuota is the default value for number of data
47	// bytes that each stream can schedule before some of it being
48	// flushed out.
49	defaultWriteQuota = 64 * 1024
50)
51
52// writeQuota is a soft limit on the amount of data a stream can
53// schedule before some of it is written out.
54type writeQuota struct {
55	quota int32
56	// get waits on read from when quota goes less than or equal to zero.
57	// replenish writes on it when quota goes positive again.
58	ch chan struct{}
59	// done is triggered in error case.
60	done <-chan struct{}
61	// replenish is called by loopyWriter to give quota back to.
62	// It is implemented as a field so that it can be updated
63	// by tests.
64	replenish func(n int)
65}
66
67func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
68	w := &writeQuota{
69		quota: sz,
70		ch:    make(chan struct{}, 1),
71		done:  done,
72	}
73	w.replenish = w.realReplenish
74	return w
75}
76
77func (w *writeQuota) get(sz int32) error {
78	for {
79		if atomic.LoadInt32(&w.quota) > 0 {
80			atomic.AddInt32(&w.quota, -sz)
81			return nil
82		}
83		select {
84		case <-w.ch:
85			continue
86		case <-w.done:
87			return errStreamDone
88		}
89	}
90}
91
92func (w *writeQuota) realReplenish(n int) {
93	sz := int32(n)
94	a := atomic.AddInt32(&w.quota, sz)
95	b := a - sz
96	if b <= 0 && a > 0 {
97		select {
98		case w.ch <- struct{}{}:
99		default:
100		}
101	}
102}
103
104type trInFlow struct {
105	limit               uint32
106	unacked             uint32
107	effectiveWindowSize uint32
108}
109
110func (f *trInFlow) newLimit(n uint32) uint32 {
111	d := n - f.limit
112	f.limit = n
113	f.updateEffectiveWindowSize()
114	return d
115}
116
117func (f *trInFlow) onData(n uint32) uint32 {
118	f.unacked += n
119	if f.unacked >= f.limit/4 {
120		w := f.unacked
121		f.unacked = 0
122		f.updateEffectiveWindowSize()
123		return w
124	}
125	f.updateEffectiveWindowSize()
126	return 0
127}
128
129func (f *trInFlow) reset() uint32 {
130	w := f.unacked
131	f.unacked = 0
132	f.updateEffectiveWindowSize()
133	return w
134}
135
136func (f *trInFlow) updateEffectiveWindowSize() {
137	atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked)
138}
139
140func (f *trInFlow) getSize() uint32 {
141	return atomic.LoadUint32(&f.effectiveWindowSize)
142}
143
144// TODO(mmukhi): Simplify this code.
145// inFlow deals with inbound flow control
146type inFlow struct {
147	mu sync.Mutex
148	// The inbound flow control limit for pending data.
149	limit uint32
150	// pendingData is the overall data which have been received but not been
151	// consumed by applications.
152	pendingData uint32
153	// The amount of data the application has consumed but grpc has not sent
154	// window update for them. Used to reduce window update frequency.
155	pendingUpdate uint32
156	// delta is the extra window update given by receiver when an application
157	// is reading data bigger in size than the inFlow limit.
158	delta uint32
159}
160
161// newLimit updates the inflow window to a new value n.
162// It assumes that n is always greater than the old limit.
163func (f *inFlow) newLimit(n uint32) uint32 {
164	f.mu.Lock()
165	d := n - f.limit
166	f.limit = n
167	f.mu.Unlock()
168	return d
169}
170
171func (f *inFlow) maybeAdjust(n uint32) uint32 {
172	if n > uint32(math.MaxInt32) {
173		n = uint32(math.MaxInt32)
174	}
175	f.mu.Lock()
176	// estSenderQuota is the receiver's view of the maximum number of bytes the sender
177	// can send without a window update.
178	estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
179	// estUntransmittedData is the maximum number of bytes the sends might not have put
180	// on the wire yet. A value of 0 or less means that we have already received all or
181	// more bytes than the application is requesting to read.
182	estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
183	// This implies that unless we send a window update, the sender won't be able to send all the bytes
184	// for this message. Therefore we must send an update over the limit since there's an active read
185	// request from the application.
186	if estUntransmittedData > estSenderQuota {
187		// Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec.
188		if f.limit+n > maxWindowSize {
189			f.delta = maxWindowSize - f.limit
190		} else {
191			// Send a window update for the whole message and not just the difference between
192			// estUntransmittedData and estSenderQuota. This will be helpful in case the message
193			// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
194			f.delta = n
195		}
196		f.mu.Unlock()
197		return f.delta
198	}
199	f.mu.Unlock()
200	return 0
201}
202
203// onData is invoked when some data frame is received. It updates pendingData.
204func (f *inFlow) onData(n uint32) error {
205	f.mu.Lock()
206	f.pendingData += n
207	if f.pendingData+f.pendingUpdate > f.limit+f.delta {
208		limit := f.limit
209		rcvd := f.pendingData + f.pendingUpdate
210		f.mu.Unlock()
211		return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
212	}
213	f.mu.Unlock()
214	return nil
215}
216
217// onRead is invoked when the application reads the data. It returns the window size
218// to be sent to the peer.
219func (f *inFlow) onRead(n uint32) uint32 {
220	f.mu.Lock()
221	if f.pendingData == 0 {
222		f.mu.Unlock()
223		return 0
224	}
225	f.pendingData -= n
226	if n > f.delta {
227		n -= f.delta
228		f.delta = 0
229	} else {
230		f.delta -= n
231		n = 0
232	}
233	f.pendingUpdate += n
234	if f.pendingUpdate >= f.limit/4 {
235		wu := f.pendingUpdate
236		f.pendingUpdate = 0
237		f.mu.Unlock()
238		return wu
239	}
240	f.mu.Unlock()
241	return 0
242}
243