1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"io"
23	"math"
24	"net"
25	"strings"
26	"sync"
27	"sync/atomic"
28	"time"
29
30	"golang.org/x/net/context"
31	"golang.org/x/net/http2"
32	"golang.org/x/net/http2/hpack"
33
34	"google.golang.org/grpc/codes"
35	"google.golang.org/grpc/credentials"
36	"google.golang.org/grpc/internal/channelz"
37	"google.golang.org/grpc/keepalive"
38	"google.golang.org/grpc/metadata"
39	"google.golang.org/grpc/peer"
40	"google.golang.org/grpc/stats"
41	"google.golang.org/grpc/status"
42)
43
44// http2Client implements the ClientTransport interface with HTTP2.
45type http2Client struct {
46	ctx        context.Context
47	cancel     context.CancelFunc
48	ctxDone    <-chan struct{} // Cache the ctx.Done() chan.
49	userAgent  string
50	md         interface{}
51	conn       net.Conn // underlying communication channel
52	loopy      *loopyWriter
53	remoteAddr net.Addr
54	localAddr  net.Addr
55	authInfo   credentials.AuthInfo // auth info about the connection
56
57	readerDone chan struct{} // sync point to enable testing.
58	writerDone chan struct{} // sync point to enable testing.
59	// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
60	// that the server sent GoAway on this transport.
61	goAway chan struct{}
62	// awakenKeepalive is used to wake up keepalive when after it has gone dormant.
63	awakenKeepalive chan struct{}
64
65	framer *framer
66	// controlBuf delivers all the control related tasks (e.g., window
67	// updates, reset streams, and various settings) to the controller.
68	controlBuf *controlBuffer
69	fc         *trInFlow
70	// The scheme used: https if TLS is on, http otherwise.
71	scheme string
72
73	isSecure bool
74
75	creds []credentials.PerRPCCredentials
76
77	// Boolean to keep track of reading activity on transport.
78	// 1 is true and 0 is false.
79	activity         uint32 // Accessed atomically.
80	kp               keepalive.ClientParameters
81	keepaliveEnabled bool
82
83	statsHandler stats.Handler
84
85	initialWindowSize int32
86
87	bdpEst *bdpEstimator
88	// onSuccess is a callback that client transport calls upon
89	// receiving server preface to signal that a succefull HTTP2
90	// connection was established.
91	onSuccess func()
92
93	maxConcurrentStreams  uint32
94	streamQuota           int64
95	streamsQuotaAvailable chan struct{}
96	waitingStreams        uint32
97	nextID                uint32
98
99	mu            sync.Mutex // guard the following variables
100	state         transportState
101	activeStreams map[uint32]*Stream
102	// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
103	prevGoAwayID uint32
104	// goAwayReason records the http2.ErrCode and debug data received with the
105	// GoAway frame.
106	goAwayReason GoAwayReason
107
108	// Fields below are for channelz metric collection.
109	channelzID int64 // channelz unique identification number
110	czmu       sync.RWMutex
111	kpCount    int64
112	// The number of streams that have started, including already finished ones.
113	streamsStarted int64
114	// The number of streams that have ended successfully by receiving EoS bit set
115	// frame from server.
116	streamsSucceeded  int64
117	streamsFailed     int64
118	lastStreamCreated time.Time
119	msgSent           int64
120	msgRecv           int64
121	lastMsgSent       time.Time
122	lastMsgRecv       time.Time
123}
124
125func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
126	if fn != nil {
127		return fn(ctx, addr)
128	}
129	return dialContext(ctx, "tcp", addr)
130}
131
132func isTemporary(err error) bool {
133	switch err := err.(type) {
134	case interface {
135		Temporary() bool
136	}:
137		return err.Temporary()
138	case interface {
139		Timeout() bool
140	}:
141		// Timeouts may be resolved upon retry, and are thus treated as
142		// temporary.
143		return err.Timeout()
144	}
145	return true
146}
147
148// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
149// and starts to receive messages on it. Non-nil error returns if construction
150// fails.
151func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ ClientTransport, err error) {
152	scheme := "http"
153	ctx, cancel := context.WithCancel(ctx)
154	defer func() {
155		if err != nil {
156			cancel()
157		}
158	}()
159
160	conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
161	if err != nil {
162		if opts.FailOnNonTempDialError {
163			return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
164		}
165		return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
166	}
167	// Any further errors will close the underlying connection
168	defer func(conn net.Conn) {
169		if err != nil {
170			conn.Close()
171		}
172	}(conn)
173	var (
174		isSecure bool
175		authInfo credentials.AuthInfo
176	)
177	if creds := opts.TransportCredentials; creds != nil {
178		scheme = "https"
179		conn, authInfo, err = creds.ClientHandshake(connectCtx, addr.Authority, conn)
180		if err != nil {
181			return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
182		}
183		isSecure = true
184	}
185	kp := opts.KeepaliveParams
186	// Validate keepalive parameters.
187	if kp.Time == 0 {
188		kp.Time = defaultClientKeepaliveTime
189	}
190	if kp.Timeout == 0 {
191		kp.Timeout = defaultClientKeepaliveTimeout
192	}
193	dynamicWindow := true
194	icwz := int32(initialWindowSize)
195	if opts.InitialConnWindowSize >= defaultWindowSize {
196		icwz = opts.InitialConnWindowSize
197		dynamicWindow = false
198	}
199	writeBufSize := defaultWriteBufSize
200	if opts.WriteBufferSize > 0 {
201		writeBufSize = opts.WriteBufferSize
202	}
203	readBufSize := defaultReadBufSize
204	if opts.ReadBufferSize > 0 {
205		readBufSize = opts.ReadBufferSize
206	}
207	t := &http2Client{
208		ctx:                   ctx,
209		ctxDone:               ctx.Done(), // Cache Done chan.
210		cancel:                cancel,
211		userAgent:             opts.UserAgent,
212		md:                    addr.Metadata,
213		conn:                  conn,
214		remoteAddr:            conn.RemoteAddr(),
215		localAddr:             conn.LocalAddr(),
216		authInfo:              authInfo,
217		readerDone:            make(chan struct{}),
218		writerDone:            make(chan struct{}),
219		goAway:                make(chan struct{}),
220		awakenKeepalive:       make(chan struct{}, 1),
221		framer:                newFramer(conn, writeBufSize, readBufSize),
222		fc:                    &trInFlow{limit: uint32(icwz)},
223		scheme:                scheme,
224		activeStreams:         make(map[uint32]*Stream),
225		isSecure:              isSecure,
226		creds:                 opts.PerRPCCredentials,
227		kp:                    kp,
228		statsHandler:          opts.StatsHandler,
229		initialWindowSize:     initialWindowSize,
230		onSuccess:             onSuccess,
231		nextID:                1,
232		maxConcurrentStreams:  defaultMaxStreamsClient,
233		streamQuota:           defaultMaxStreamsClient,
234		streamsQuotaAvailable: make(chan struct{}, 1),
235	}
236	t.controlBuf = newControlBuffer(t.ctxDone)
237	if opts.InitialWindowSize >= defaultWindowSize {
238		t.initialWindowSize = opts.InitialWindowSize
239		dynamicWindow = false
240	}
241	if dynamicWindow {
242		t.bdpEst = &bdpEstimator{
243			bdp:               initialWindowSize,
244			updateFlowControl: t.updateFlowControl,
245		}
246	}
247	// Make sure awakenKeepalive can't be written upon.
248	// keepalive routine will make it writable, if need be.
249	t.awakenKeepalive <- struct{}{}
250	if t.statsHandler != nil {
251		t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
252			RemoteAddr: t.remoteAddr,
253			LocalAddr:  t.localAddr,
254		})
255		connBegin := &stats.ConnBegin{
256			Client: true,
257		}
258		t.statsHandler.HandleConn(t.ctx, connBegin)
259	}
260	if channelz.IsOn() {
261		t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, "")
262	}
263	if t.kp.Time != infinity {
264		t.keepaliveEnabled = true
265		go t.keepalive()
266	}
267	// Start the reader goroutine for incoming message. Each transport has
268	// a dedicated goroutine which reads HTTP2 frame from network. Then it
269	// dispatches the frame to the corresponding stream entity.
270	go t.reader()
271	// Send connection preface to server.
272	n, err := t.conn.Write(clientPreface)
273	if err != nil {
274		t.Close()
275		return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
276	}
277	if n != len(clientPreface) {
278		t.Close()
279		return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
280	}
281	if t.initialWindowSize != defaultWindowSize {
282		err = t.framer.fr.WriteSettings(http2.Setting{
283			ID:  http2.SettingInitialWindowSize,
284			Val: uint32(t.initialWindowSize),
285		})
286	} else {
287		err = t.framer.fr.WriteSettings()
288	}
289	if err != nil {
290		t.Close()
291		return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
292	}
293	// Adjust the connection flow control window if needed.
294	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
295		if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
296			t.Close()
297			return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
298		}
299	}
300	t.framer.writer.Flush()
301	go func() {
302		t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
303		err := t.loopy.run()
304		if err != nil {
305			errorf("transport: loopyWriter.run returning. Err: %v", err)
306		}
307		// If it's a connection error, let reader goroutine handle it
308		// since there might be data in the buffers.
309		if _, ok := err.(net.Error); !ok {
310			t.conn.Close()
311		}
312		close(t.writerDone)
313	}()
314	return t, nil
315}
316
317func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
318	// TODO(zhaoq): Handle uint32 overflow of Stream.id.
319	s := &Stream{
320		done:           make(chan struct{}),
321		method:         callHdr.Method,
322		sendCompress:   callHdr.SendCompress,
323		buf:            newRecvBuffer(),
324		headerChan:     make(chan struct{}),
325		contentSubtype: callHdr.ContentSubtype,
326	}
327	s.wq = newWriteQuota(defaultWriteQuota, s.done)
328	s.requestRead = func(n int) {
329		t.adjustWindow(s, uint32(n))
330	}
331	// The client side stream context should have exactly the same life cycle with the user provided context.
332	// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
333	// So we use the original context here instead of creating a copy.
334	s.ctx = ctx
335	s.trReader = &transportReader{
336		reader: &recvBufferReader{
337			ctx:     s.ctx,
338			ctxDone: s.ctx.Done(),
339			recv:    s.buf,
340		},
341		windowHandler: func(n int) {
342			t.updateWindow(s, uint32(n))
343		},
344	}
345	return s
346}
347
348func (t *http2Client) getPeer() *peer.Peer {
349	pr := &peer.Peer{
350		Addr: t.remoteAddr,
351	}
352	// Attach Auth info if there is any.
353	if t.authInfo != nil {
354		pr.AuthInfo = t.authInfo
355	}
356	return pr
357}
358
359func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
360	aud := t.createAudience(callHdr)
361	authData, err := t.getTrAuthData(ctx, aud)
362	if err != nil {
363		return nil, err
364	}
365	callAuthData, err := t.getCallAuthData(ctx, aud, callHdr)
366	if err != nil {
367		return nil, err
368	}
369	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
370	// first and create a slice of that exact size.
371	// Make the slice of certain predictable size to reduce allocations made by append.
372	hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
373	hfLen += len(authData) + len(callAuthData)
374	headerFields := make([]hpack.HeaderField, 0, hfLen)
375	headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
376	headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
377	headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
378	headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
379	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
380	headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
381	headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
382
383	if callHdr.SendCompress != "" {
384		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
385	}
386	if dl, ok := ctx.Deadline(); ok {
387		// Send out timeout regardless its value. The server can detect timeout context by itself.
388		// TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
389		timeout := dl.Sub(time.Now())
390		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
391	}
392	for k, v := range authData {
393		headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
394	}
395	for k, v := range callAuthData {
396		headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
397	}
398	if b := stats.OutgoingTags(ctx); b != nil {
399		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
400	}
401	if b := stats.OutgoingTrace(ctx); b != nil {
402		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
403	}
404
405	if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
406		var k string
407		for _, vv := range added {
408			for i, v := range vv {
409				if i%2 == 0 {
410					k = v
411					continue
412				}
413				// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
414				if isReservedHeader(k) {
415					continue
416				}
417				headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
418			}
419		}
420		for k, vv := range md {
421			// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
422			if isReservedHeader(k) {
423				continue
424			}
425			for _, v := range vv {
426				headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
427			}
428		}
429	}
430	if md, ok := t.md.(*metadata.MD); ok {
431		for k, vv := range *md {
432			if isReservedHeader(k) {
433				continue
434			}
435			for _, v := range vv {
436				headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
437			}
438		}
439	}
440	return headerFields, nil
441}
442
443func (t *http2Client) createAudience(callHdr *CallHdr) string {
444	// Create an audience string only if needed.
445	if len(t.creds) == 0 && callHdr.Creds == nil {
446		return ""
447	}
448	// Construct URI required to get auth request metadata.
449	// Omit port if it is the default one.
450	host := strings.TrimSuffix(callHdr.Host, ":443")
451	pos := strings.LastIndex(callHdr.Method, "/")
452	if pos == -1 {
453		pos = len(callHdr.Method)
454	}
455	return "https://" + host + callHdr.Method[:pos]
456}
457
458func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
459	authData := map[string]string{}
460	for _, c := range t.creds {
461		data, err := c.GetRequestMetadata(ctx, audience)
462		if err != nil {
463			if _, ok := status.FromError(err); ok {
464				return nil, err
465			}
466
467			return nil, streamErrorf(codes.Unauthenticated, "transport: %v", err)
468		}
469		for k, v := range data {
470			// Capital header names are illegal in HTTP/2.
471			k = strings.ToLower(k)
472			authData[k] = v
473		}
474	}
475	return authData, nil
476}
477
478func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
479	callAuthData := map[string]string{}
480	// Check if credentials.PerRPCCredentials were provided via call options.
481	// Note: if these credentials are provided both via dial options and call
482	// options, then both sets of credentials will be applied.
483	if callCreds := callHdr.Creds; callCreds != nil {
484		if !t.isSecure && callCreds.RequireTransportSecurity() {
485			return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
486		}
487		data, err := callCreds.GetRequestMetadata(ctx, audience)
488		if err != nil {
489			return nil, streamErrorf(codes.Internal, "transport: %v", err)
490		}
491		for k, v := range data {
492			// Capital header names are illegal in HTTP/2
493			k = strings.ToLower(k)
494			callAuthData[k] = v
495		}
496	}
497	return callAuthData, nil
498}
499
500// NewStream creates a stream and registers it into the transport as "active"
501// streams.
502func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
503	ctx = peer.NewContext(ctx, t.getPeer())
504	headerFields, err := t.createHeaderFields(ctx, callHdr)
505	if err != nil {
506		return nil, err
507	}
508	s := t.newStream(ctx, callHdr)
509	cleanup := func(err error) {
510		if s.swapState(streamDone) == streamDone {
511			// If it was already done, return.
512			return
513		}
514		// The stream was unprocessed by the server.
515		atomic.StoreUint32(&s.unprocessed, 1)
516		s.write(recvMsg{err: err})
517		close(s.done)
518		// If headerChan isn't closed, then close it.
519		if atomic.SwapUint32(&s.headerDone, 1) == 0 {
520			close(s.headerChan)
521		}
522
523	}
524	hdr := &headerFrame{
525		hf:        headerFields,
526		endStream: false,
527		initStream: func(id uint32) (bool, error) {
528			t.mu.Lock()
529			if state := t.state; state != reachable {
530				t.mu.Unlock()
531				// Do a quick cleanup.
532				err := error(errStreamDrain)
533				if state == closing {
534					err = ErrConnClosing
535				}
536				cleanup(err)
537				return false, err
538			}
539			t.activeStreams[id] = s
540			if channelz.IsOn() {
541				t.czmu.Lock()
542				t.streamsStarted++
543				t.lastStreamCreated = time.Now()
544				t.czmu.Unlock()
545			}
546			var sendPing bool
547			// If the number of active streams change from 0 to 1, then check if keepalive
548			// has gone dormant. If so, wake it up.
549			if len(t.activeStreams) == 1 && t.keepaliveEnabled {
550				select {
551				case t.awakenKeepalive <- struct{}{}:
552					sendPing = true
553					// Fill the awakenKeepalive channel again as this channel must be
554					// kept non-writable except at the point that the keepalive()
555					// goroutine is waiting either to be awaken or shutdown.
556					t.awakenKeepalive <- struct{}{}
557				default:
558				}
559			}
560			t.mu.Unlock()
561			return sendPing, nil
562		},
563		onOrphaned: cleanup,
564		wq:         s.wq,
565	}
566	firstTry := true
567	var ch chan struct{}
568	checkForStreamQuota := func(it interface{}) bool {
569		if t.streamQuota <= 0 { // Can go negative if server decreases it.
570			if firstTry {
571				t.waitingStreams++
572			}
573			ch = t.streamsQuotaAvailable
574			return false
575		}
576		if !firstTry {
577			t.waitingStreams--
578		}
579		t.streamQuota--
580		h := it.(*headerFrame)
581		h.streamID = t.nextID
582		t.nextID += 2
583		s.id = h.streamID
584		s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
585		if t.streamQuota > 0 && t.waitingStreams > 0 {
586			select {
587			case t.streamsQuotaAvailable <- struct{}{}:
588			default:
589			}
590		}
591		return true
592	}
593	for {
594		success, err := t.controlBuf.executeAndPut(checkForStreamQuota, hdr)
595		if err != nil {
596			return nil, err
597		}
598		if success {
599			break
600		}
601		firstTry = false
602		select {
603		case <-ch:
604		case <-s.ctx.Done():
605			return nil, ContextErr(s.ctx.Err())
606		case <-t.goAway:
607			return nil, errStreamDrain
608		case <-t.ctx.Done():
609			return nil, ErrConnClosing
610		}
611	}
612	if t.statsHandler != nil {
613		outHeader := &stats.OutHeader{
614			Client:      true,
615			FullMethod:  callHdr.Method,
616			RemoteAddr:  t.remoteAddr,
617			LocalAddr:   t.localAddr,
618			Compression: callHdr.SendCompress,
619		}
620		t.statsHandler.HandleRPC(s.ctx, outHeader)
621	}
622	return s, nil
623}
624
625// CloseStream clears the footprint of a stream when the stream is not needed any more.
626// This must not be executed in reader's goroutine.
627func (t *http2Client) CloseStream(s *Stream, err error) {
628	var (
629		rst     bool
630		rstCode http2.ErrCode
631	)
632	if err != nil {
633		rst = true
634		rstCode = http2.ErrCodeCancel
635	}
636	t.closeStream(s, err, rst, rstCode, nil, nil, false)
637}
638
639func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
640	// Set stream status to done.
641	if s.swapState(streamDone) == streamDone {
642		// If it was already done, return.
643		return
644	}
645	// status and trailers can be updated here without any synchronization because the stream goroutine will
646	// only read it after it sees an io.EOF error from read or write and we'll write those errors
647	// only after updating this.
648	s.status = st
649	if len(mdata) > 0 {
650		s.trailer = mdata
651	}
652	if err != nil {
653		// This will unblock reads eventually.
654		s.write(recvMsg{err: err})
655	}
656	// This will unblock write.
657	close(s.done)
658	// If headerChan isn't closed, then close it.
659	if atomic.SwapUint32(&s.headerDone, 1) == 0 {
660		close(s.headerChan)
661	}
662	cleanup := &cleanupStream{
663		streamID: s.id,
664		onWrite: func() {
665			t.mu.Lock()
666			if t.activeStreams != nil {
667				delete(t.activeStreams, s.id)
668			}
669			t.mu.Unlock()
670			if channelz.IsOn() {
671				t.czmu.Lock()
672				if eosReceived {
673					t.streamsSucceeded++
674				} else {
675					t.streamsFailed++
676				}
677				t.czmu.Unlock()
678			}
679		},
680		rst:     rst,
681		rstCode: rstCode,
682	}
683	addBackStreamQuota := func(interface{}) bool {
684		t.streamQuota++
685		if t.streamQuota > 0 && t.waitingStreams > 0 {
686			select {
687			case t.streamsQuotaAvailable <- struct{}{}:
688			default:
689			}
690		}
691		return true
692	}
693	t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
694}
695
696// Close kicks off the shutdown process of the transport. This should be called
697// only once on a transport. Once it is called, the transport should not be
698// accessed any more.
699func (t *http2Client) Close() error {
700	t.mu.Lock()
701	// Make sure we only Close once.
702	if t.state == closing {
703		t.mu.Unlock()
704		return nil
705	}
706	t.state = closing
707	streams := t.activeStreams
708	t.activeStreams = nil
709	t.mu.Unlock()
710	t.controlBuf.finish()
711	t.cancel()
712	err := t.conn.Close()
713	if channelz.IsOn() {
714		channelz.RemoveEntry(t.channelzID)
715	}
716	// Notify all active streams.
717	for _, s := range streams {
718		t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, nil, nil, false)
719	}
720	if t.statsHandler != nil {
721		connEnd := &stats.ConnEnd{
722			Client: true,
723		}
724		t.statsHandler.HandleConn(t.ctx, connEnd)
725	}
726	return err
727}
728
729// GracefulClose sets the state to draining, which prevents new streams from
730// being created and causes the transport to be closed when the last active
731// stream is closed.  If there are no active streams, the transport is closed
732// immediately.  This does nothing if the transport is already draining or
733// closing.
734func (t *http2Client) GracefulClose() error {
735	t.mu.Lock()
736	// Make sure we move to draining only from active.
737	if t.state == draining || t.state == closing {
738		t.mu.Unlock()
739		return nil
740	}
741	t.state = draining
742	active := len(t.activeStreams)
743	t.mu.Unlock()
744	if active == 0 {
745		return t.Close()
746	}
747	t.controlBuf.put(&incomingGoAway{})
748	return nil
749}
750
751// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
752// should proceed only if Write returns nil.
753func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
754	if opts.Last {
755		// If it's the last message, update stream state.
756		if !s.compareAndSwapState(streamActive, streamWriteDone) {
757			return errStreamDone
758		}
759	} else if s.getState() != streamActive {
760		return errStreamDone
761	}
762	df := &dataFrame{
763		streamID:  s.id,
764		endStream: opts.Last,
765	}
766	if hdr != nil || data != nil { // If it's not an empty data frame.
767		// Add some data to grpc message header so that we can equally
768		// distribute bytes across frames.
769		emptyLen := http2MaxFrameLen - len(hdr)
770		if emptyLen > len(data) {
771			emptyLen = len(data)
772		}
773		hdr = append(hdr, data[:emptyLen]...)
774		data = data[emptyLen:]
775		df.h, df.d = hdr, data
776		// TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
777		if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
778			return err
779		}
780	}
781	return t.controlBuf.put(df)
782}
783
784func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
785	t.mu.Lock()
786	defer t.mu.Unlock()
787	s, ok := t.activeStreams[f.Header().StreamID]
788	return s, ok
789}
790
791// adjustWindow sends out extra window update over the initial window size
792// of stream if the application is requesting data larger in size than
793// the window.
794func (t *http2Client) adjustWindow(s *Stream, n uint32) {
795	if w := s.fc.maybeAdjust(n); w > 0 {
796		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
797	}
798}
799
800// updateWindow adjusts the inbound quota for the stream.
801// Window updates will be sent out when the cumulative quota
802// exceeds the corresponding threshold.
803func (t *http2Client) updateWindow(s *Stream, n uint32) {
804	if w := s.fc.onRead(n); w > 0 {
805		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
806	}
807}
808
809// updateFlowControl updates the incoming flow control windows
810// for the transport and the stream based on the current bdp
811// estimation.
812func (t *http2Client) updateFlowControl(n uint32) {
813	t.mu.Lock()
814	for _, s := range t.activeStreams {
815		s.fc.newLimit(n)
816	}
817	t.mu.Unlock()
818	updateIWS := func(interface{}) bool {
819		t.initialWindowSize = int32(n)
820		return true
821	}
822	t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
823	t.controlBuf.put(&outgoingSettings{
824		ss: []http2.Setting{
825			{
826				ID:  http2.SettingInitialWindowSize,
827				Val: n,
828			},
829		},
830	})
831}
832
833func (t *http2Client) handleData(f *http2.DataFrame) {
834	size := f.Header().Length
835	var sendBDPPing bool
836	if t.bdpEst != nil {
837		sendBDPPing = t.bdpEst.add(size)
838	}
839	// Decouple connection's flow control from application's read.
840	// An update on connection's flow control should not depend on
841	// whether user application has read the data or not. Such a
842	// restriction is already imposed on the stream's flow control,
843	// and therefore the sender will be blocked anyways.
844	// Decoupling the connection flow control will prevent other
845	// active(fast) streams from starving in presence of slow or
846	// inactive streams.
847	//
848	if w := t.fc.onData(size); w > 0 {
849		t.controlBuf.put(&outgoingWindowUpdate{
850			streamID:  0,
851			increment: w,
852		})
853	}
854	if sendBDPPing {
855		// Avoid excessive ping detection (e.g. in an L7 proxy)
856		// by sending a window update prior to the BDP ping.
857
858		if w := t.fc.reset(); w > 0 {
859			t.controlBuf.put(&outgoingWindowUpdate{
860				streamID:  0,
861				increment: w,
862			})
863		}
864
865		t.controlBuf.put(bdpPing)
866	}
867	// Select the right stream to dispatch.
868	s, ok := t.getStream(f)
869	if !ok {
870		return
871	}
872	if size > 0 {
873		if err := s.fc.onData(size); err != nil {
874			t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
875			return
876		}
877		if f.Header().Flags.Has(http2.FlagDataPadded) {
878			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
879				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
880			}
881		}
882		// TODO(bradfitz, zhaoq): A copy is required here because there is no
883		// guarantee f.Data() is consumed before the arrival of next frame.
884		// Can this copy be eliminated?
885		if len(f.Data()) > 0 {
886			data := make([]byte, len(f.Data()))
887			copy(data, f.Data())
888			s.write(recvMsg{data: data})
889		}
890	}
891	// The server has closed the stream without sending trailers.  Record that
892	// the read direction is closed, and set the status appropriately.
893	if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
894		t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
895	}
896}
897
898func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
899	s, ok := t.getStream(f)
900	if !ok {
901		return
902	}
903	if f.ErrCode == http2.ErrCodeRefusedStream {
904		// The stream was unprocessed by the server.
905		atomic.StoreUint32(&s.unprocessed, 1)
906	}
907	statusCode, ok := http2ErrConvTab[f.ErrCode]
908	if !ok {
909		warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
910		statusCode = codes.Unknown
911	}
912	t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
913}
914
915func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
916	if f.IsAck() {
917		return
918	}
919	var maxStreams *uint32
920	var ss []http2.Setting
921	f.ForeachSetting(func(s http2.Setting) error {
922		if s.ID == http2.SettingMaxConcurrentStreams {
923			maxStreams = new(uint32)
924			*maxStreams = s.Val
925			return nil
926		}
927		ss = append(ss, s)
928		return nil
929	})
930	if isFirst && maxStreams == nil {
931		maxStreams = new(uint32)
932		*maxStreams = math.MaxUint32
933	}
934	sf := &incomingSettings{
935		ss: ss,
936	}
937	if maxStreams == nil {
938		t.controlBuf.put(sf)
939		return
940	}
941	updateStreamQuota := func(interface{}) bool {
942		delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
943		t.maxConcurrentStreams = *maxStreams
944		t.streamQuota += delta
945		if delta > 0 && t.waitingStreams > 0 {
946			close(t.streamsQuotaAvailable) // wake all of them up.
947			t.streamsQuotaAvailable = make(chan struct{}, 1)
948		}
949		return true
950	}
951	t.controlBuf.executeAndPut(updateStreamQuota, sf)
952}
953
954func (t *http2Client) handlePing(f *http2.PingFrame) {
955	if f.IsAck() {
956		// Maybe it's a BDP ping.
957		if t.bdpEst != nil {
958			t.bdpEst.calculate(f.Data)
959		}
960		return
961	}
962	pingAck := &ping{ack: true}
963	copy(pingAck.data[:], f.Data[:])
964	t.controlBuf.put(pingAck)
965}
966
967func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
968	t.mu.Lock()
969	if t.state == closing {
970		t.mu.Unlock()
971		return
972	}
973	if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
974		infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
975	}
976	id := f.LastStreamID
977	if id > 0 && id%2 != 1 {
978		t.mu.Unlock()
979		t.Close()
980		return
981	}
982	// A client can receive multiple GoAways from the server (see
983	// https://github.com/grpc/grpc-go/issues/1387).  The idea is that the first
984	// GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
985	// sent after an RTT delay with the ID of the last stream the server will
986	// process.
987	//
988	// Therefore, when we get the first GoAway we don't necessarily close any
989	// streams. While in case of second GoAway we close all streams created after
990	// the GoAwayId. This way streams that were in-flight while the GoAway from
991	// server was being sent don't get killed.
992	select {
993	case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
994		// If there are multiple GoAways the first one should always have an ID greater than the following ones.
995		if id > t.prevGoAwayID {
996			t.mu.Unlock()
997			t.Close()
998			return
999		}
1000	default:
1001		t.setGoAwayReason(f)
1002		close(t.goAway)
1003		t.state = draining
1004		t.controlBuf.put(&incomingGoAway{})
1005	}
1006	// All streams with IDs greater than the GoAwayId
1007	// and smaller than the previous GoAway ID should be killed.
1008	upperLimit := t.prevGoAwayID
1009	if upperLimit == 0 { // This is the first GoAway Frame.
1010		upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
1011	}
1012	for streamID, stream := range t.activeStreams {
1013		if streamID > id && streamID <= upperLimit {
1014			// The stream was unprocessed by the server.
1015			atomic.StoreUint32(&stream.unprocessed, 1)
1016			t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
1017		}
1018	}
1019	t.prevGoAwayID = id
1020	active := len(t.activeStreams)
1021	t.mu.Unlock()
1022	if active == 0 {
1023		t.Close()
1024	}
1025}
1026
1027// setGoAwayReason sets the value of t.goAwayReason based
1028// on the GoAway frame received.
1029// It expects a lock on transport's mutext to be held by
1030// the caller.
1031func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
1032	t.goAwayReason = GoAwayNoReason
1033	switch f.ErrCode {
1034	case http2.ErrCodeEnhanceYourCalm:
1035		if string(f.DebugData()) == "too_many_pings" {
1036			t.goAwayReason = GoAwayTooManyPings
1037		}
1038	}
1039}
1040
1041func (t *http2Client) GetGoAwayReason() GoAwayReason {
1042	t.mu.Lock()
1043	defer t.mu.Unlock()
1044	return t.goAwayReason
1045}
1046
1047func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
1048	t.controlBuf.put(&incomingWindowUpdate{
1049		streamID:  f.Header().StreamID,
1050		increment: f.Increment,
1051	})
1052}
1053
1054// operateHeaders takes action on the decoded headers.
1055func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
1056	s, ok := t.getStream(frame)
1057	if !ok {
1058		return
1059	}
1060	atomic.StoreUint32(&s.bytesReceived, 1)
1061	var state decodeState
1062	if err := state.decodeResponseHeader(frame); err != nil {
1063		t.closeStream(s, err, true, http2.ErrCodeProtocol, nil, nil, false)
1064		// Something wrong. Stops reading even when there is remaining.
1065		return
1066	}
1067
1068	endStream := frame.StreamEnded()
1069	var isHeader bool
1070	defer func() {
1071		if t.statsHandler != nil {
1072			if isHeader {
1073				inHeader := &stats.InHeader{
1074					Client:     true,
1075					WireLength: int(frame.Header().Length),
1076				}
1077				t.statsHandler.HandleRPC(s.ctx, inHeader)
1078			} else {
1079				inTrailer := &stats.InTrailer{
1080					Client:     true,
1081					WireLength: int(frame.Header().Length),
1082				}
1083				t.statsHandler.HandleRPC(s.ctx, inTrailer)
1084			}
1085		}
1086	}()
1087	// If headers haven't been received yet.
1088	if atomic.SwapUint32(&s.headerDone, 1) == 0 {
1089		if !endStream {
1090			// Headers frame is not actually a trailers-only frame.
1091			isHeader = true
1092			// These values can be set without any synchronization because
1093			// stream goroutine will read it only after seeing a closed
1094			// headerChan which we'll close after setting this.
1095			s.recvCompress = state.encoding
1096			if len(state.mdata) > 0 {
1097				s.header = state.mdata
1098			}
1099		}
1100		close(s.headerChan)
1101	}
1102	if !endStream {
1103		return
1104	}
1105	t.closeStream(s, io.EOF, false, http2.ErrCodeNo, state.status(), state.mdata, true)
1106}
1107
1108// reader runs as a separate goroutine in charge of reading data from network
1109// connection.
1110//
1111// TODO(zhaoq): currently one reader per transport. Investigate whether this is
1112// optimal.
1113// TODO(zhaoq): Check the validity of the incoming frame sequence.
1114func (t *http2Client) reader() {
1115	defer close(t.readerDone)
1116	// Check the validity of server preface.
1117	frame, err := t.framer.fr.ReadFrame()
1118	if err != nil {
1119		t.Close()
1120		return
1121	}
1122	if t.keepaliveEnabled {
1123		atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1124	}
1125	sf, ok := frame.(*http2.SettingsFrame)
1126	if !ok {
1127		t.Close()
1128		return
1129	}
1130	t.onSuccess()
1131	t.handleSettings(sf, true)
1132
1133	// loop to keep reading incoming messages on this transport.
1134	for {
1135		frame, err := t.framer.fr.ReadFrame()
1136		if t.keepaliveEnabled {
1137			atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1138		}
1139		if err != nil {
1140			// Abort an active stream if the http2.Framer returns a
1141			// http2.StreamError. This can happen only if the server's response
1142			// is malformed http2.
1143			if se, ok := err.(http2.StreamError); ok {
1144				t.mu.Lock()
1145				s := t.activeStreams[se.StreamID]
1146				t.mu.Unlock()
1147				if s != nil {
1148					// use error detail to provide better err message
1149					t.closeStream(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.fr.ErrorDetail()), true, http2.ErrCodeProtocol, nil, nil, false)
1150				}
1151				continue
1152			} else {
1153				// Transport error.
1154				t.Close()
1155				return
1156			}
1157		}
1158		switch frame := frame.(type) {
1159		case *http2.MetaHeadersFrame:
1160			t.operateHeaders(frame)
1161		case *http2.DataFrame:
1162			t.handleData(frame)
1163		case *http2.RSTStreamFrame:
1164			t.handleRSTStream(frame)
1165		case *http2.SettingsFrame:
1166			t.handleSettings(frame, false)
1167		case *http2.PingFrame:
1168			t.handlePing(frame)
1169		case *http2.GoAwayFrame:
1170			t.handleGoAway(frame)
1171		case *http2.WindowUpdateFrame:
1172			t.handleWindowUpdate(frame)
1173		default:
1174			errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
1175		}
1176	}
1177}
1178
1179// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
1180func (t *http2Client) keepalive() {
1181	p := &ping{data: [8]byte{}}
1182	timer := time.NewTimer(t.kp.Time)
1183	for {
1184		select {
1185		case <-timer.C:
1186			if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1187				timer.Reset(t.kp.Time)
1188				continue
1189			}
1190			// Check if keepalive should go dormant.
1191			t.mu.Lock()
1192			if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
1193				// Make awakenKeepalive writable.
1194				<-t.awakenKeepalive
1195				t.mu.Unlock()
1196				select {
1197				case <-t.awakenKeepalive:
1198					// If the control gets here a ping has been sent
1199					// need to reset the timer with keepalive.Timeout.
1200				case <-t.ctx.Done():
1201					return
1202				}
1203			} else {
1204				t.mu.Unlock()
1205				if channelz.IsOn() {
1206					t.czmu.Lock()
1207					t.kpCount++
1208					t.czmu.Unlock()
1209				}
1210				// Send ping.
1211				t.controlBuf.put(p)
1212			}
1213
1214			// By the time control gets here a ping has been sent one way or the other.
1215			timer.Reset(t.kp.Timeout)
1216			select {
1217			case <-timer.C:
1218				if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1219					timer.Reset(t.kp.Time)
1220					continue
1221				}
1222				t.Close()
1223				return
1224			case <-t.ctx.Done():
1225				if !timer.Stop() {
1226					<-timer.C
1227				}
1228				return
1229			}
1230		case <-t.ctx.Done():
1231			if !timer.Stop() {
1232				<-timer.C
1233			}
1234			return
1235		}
1236	}
1237}
1238
1239func (t *http2Client) Error() <-chan struct{} {
1240	return t.ctx.Done()
1241}
1242
1243func (t *http2Client) GoAway() <-chan struct{} {
1244	return t.goAway
1245}
1246
1247func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
1248	t.czmu.RLock()
1249	s := channelz.SocketInternalMetric{
1250		StreamsStarted:                  t.streamsStarted,
1251		StreamsSucceeded:                t.streamsSucceeded,
1252		StreamsFailed:                   t.streamsFailed,
1253		MessagesSent:                    t.msgSent,
1254		MessagesReceived:                t.msgRecv,
1255		KeepAlivesSent:                  t.kpCount,
1256		LastLocalStreamCreatedTimestamp: t.lastStreamCreated,
1257		LastMessageSentTimestamp:        t.lastMsgSent,
1258		LastMessageReceivedTimestamp:    t.lastMsgRecv,
1259		LocalFlowControlWindow:          int64(t.fc.getSize()),
1260		SocketOptions:                   channelz.GetSocketOption(t.conn),
1261		LocalAddr:                       t.localAddr,
1262		RemoteAddr:                      t.remoteAddr,
1263		// RemoteName :
1264	}
1265	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1266		s.Security = au.GetSecurityValue()
1267	}
1268	t.czmu.RUnlock()
1269	s.RemoteFlowControlWindow = t.getOutFlowWindow()
1270	return &s
1271}
1272
1273func (t *http2Client) IncrMsgSent() {
1274	t.czmu.Lock()
1275	t.msgSent++
1276	t.lastMsgSent = time.Now()
1277	t.czmu.Unlock()
1278}
1279
1280func (t *http2Client) IncrMsgRecv() {
1281	t.czmu.Lock()
1282	t.msgRecv++
1283	t.lastMsgRecv = time.Now()
1284	t.czmu.Unlock()
1285}
1286
1287func (t *http2Client) getOutFlowWindow() int64 {
1288	resp := make(chan uint32, 1)
1289	timer := time.NewTimer(time.Second)
1290	defer timer.Stop()
1291	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1292	select {
1293	case sz := <-resp:
1294		return int64(sz)
1295	case <-t.ctxDone:
1296		return -1
1297	case <-timer.C:
1298		return -2
1299	}
1300}
1301