1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"bytes"
23	"errors"
24	"fmt"
25	"io"
26	"math"
27	"net"
28	"strconv"
29	"sync"
30	"sync/atomic"
31	"time"
32
33	"github.com/golang/protobuf/proto"
34	"golang.org/x/net/context"
35	"golang.org/x/net/http2"
36	"golang.org/x/net/http2/hpack"
37
38	"google.golang.org/grpc/codes"
39	"google.golang.org/grpc/credentials"
40	"google.golang.org/grpc/grpclog"
41	"google.golang.org/grpc/internal/channelz"
42	"google.golang.org/grpc/internal/grpcrand"
43	"google.golang.org/grpc/keepalive"
44	"google.golang.org/grpc/metadata"
45	"google.golang.org/grpc/peer"
46	"google.golang.org/grpc/stats"
47	"google.golang.org/grpc/status"
48	"google.golang.org/grpc/tap"
49)
50
51// ErrIllegalHeaderWrite indicates that setting header is illegal because of
52// the stream's state.
53var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
54
55// http2Server implements the ServerTransport interface with HTTP2.
56type http2Server struct {
57	ctx         context.Context
58	ctxDone     <-chan struct{} // Cache the context.Done() chan
59	cancel      context.CancelFunc
60	conn        net.Conn
61	loopy       *loopyWriter
62	readerDone  chan struct{} // sync point to enable testing.
63	writerDone  chan struct{} // sync point to enable testing.
64	remoteAddr  net.Addr
65	localAddr   net.Addr
66	maxStreamID uint32               // max stream ID ever seen
67	authInfo    credentials.AuthInfo // auth info about the connection
68	inTapHandle tap.ServerInHandle
69	framer      *framer
70	// The max number of concurrent streams.
71	maxStreams uint32
72	// controlBuf delivers all the control related tasks (e.g., window
73	// updates, reset streams, and various settings) to the controller.
74	controlBuf *controlBuffer
75	fc         *trInFlow
76	stats      stats.Handler
77	// Flag to keep track of reading activity on transport.
78	// 1 is true and 0 is false.
79	activity uint32 // Accessed atomically.
80	// Keepalive and max-age parameters for the server.
81	kp keepalive.ServerParameters
82
83	// Keepalive enforcement policy.
84	kep keepalive.EnforcementPolicy
85	// The time instance last ping was received.
86	lastPingAt time.Time
87	// Number of times the client has violated keepalive ping policy so far.
88	pingStrikes uint8
89	// Flag to signify that number of ping strikes should be reset to 0.
90	// This is set whenever data or header frames are sent.
91	// 1 means yes.
92	resetPingStrikes  uint32 // Accessed atomically.
93	initialWindowSize int32
94	bdpEst            *bdpEstimator
95
96	mu sync.Mutex // guard the following
97
98	// drainChan is initialized when drain(...) is called the first time.
99	// After which the server writes out the first GoAway(with ID 2^31-1) frame.
100	// Then an independent goroutine will be launched to later send the second GoAway.
101	// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
102	// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
103	// already underway.
104	drainChan     chan struct{}
105	state         transportState
106	activeStreams map[uint32]*Stream
107	// idle is the time instant when the connection went idle.
108	// This is either the beginning of the connection or when the number of
109	// RPCs go down to 0.
110	// When the connection is busy, this value is set to 0.
111	idle time.Time
112
113	// Fields below are for channelz metric collection.
114	channelzID int64 // channelz unique identification number
115	czmu       sync.RWMutex
116	kpCount    int64
117	// The number of streams that have started, including already finished ones.
118	streamsStarted int64
119	// The number of streams that have ended successfully by sending frame with
120	// EoS bit set.
121	streamsSucceeded  int64
122	streamsFailed     int64
123	lastStreamCreated time.Time
124	msgSent           int64
125	msgRecv           int64
126	lastMsgSent       time.Time
127	lastMsgRecv       time.Time
128}
129
130// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
131// returned if something goes wrong.
132func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
133	writeBufSize := defaultWriteBufSize
134	if config.WriteBufferSize > 0 {
135		writeBufSize = config.WriteBufferSize
136	}
137	readBufSize := defaultReadBufSize
138	if config.ReadBufferSize > 0 {
139		readBufSize = config.ReadBufferSize
140	}
141	framer := newFramer(conn, writeBufSize, readBufSize)
142	// Send initial settings as connection preface to client.
143	var isettings []http2.Setting
144	// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
145	// permitted in the HTTP2 spec.
146	maxStreams := config.MaxStreams
147	if maxStreams == 0 {
148		maxStreams = math.MaxUint32
149	} else {
150		isettings = append(isettings, http2.Setting{
151			ID:  http2.SettingMaxConcurrentStreams,
152			Val: maxStreams,
153		})
154	}
155	dynamicWindow := true
156	iwz := int32(initialWindowSize)
157	if config.InitialWindowSize >= defaultWindowSize {
158		iwz = config.InitialWindowSize
159		dynamicWindow = false
160	}
161	icwz := int32(initialWindowSize)
162	if config.InitialConnWindowSize >= defaultWindowSize {
163		icwz = config.InitialConnWindowSize
164		dynamicWindow = false
165	}
166	if iwz != defaultWindowSize {
167		isettings = append(isettings, http2.Setting{
168			ID:  http2.SettingInitialWindowSize,
169			Val: uint32(iwz)})
170	}
171	if err := framer.fr.WriteSettings(isettings...); err != nil {
172		return nil, connectionErrorf(false, err, "transport: %v", err)
173	}
174	// Adjust the connection flow control window if needed.
175	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
176		if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
177			return nil, connectionErrorf(false, err, "transport: %v", err)
178		}
179	}
180	kp := config.KeepaliveParams
181	if kp.MaxConnectionIdle == 0 {
182		kp.MaxConnectionIdle = defaultMaxConnectionIdle
183	}
184	if kp.MaxConnectionAge == 0 {
185		kp.MaxConnectionAge = defaultMaxConnectionAge
186	}
187	// Add a jitter to MaxConnectionAge.
188	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
189	if kp.MaxConnectionAgeGrace == 0 {
190		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
191	}
192	if kp.Time == 0 {
193		kp.Time = defaultServerKeepaliveTime
194	}
195	if kp.Timeout == 0 {
196		kp.Timeout = defaultServerKeepaliveTimeout
197	}
198	kep := config.KeepalivePolicy
199	if kep.MinTime == 0 {
200		kep.MinTime = defaultKeepalivePolicyMinTime
201	}
202	ctx, cancel := context.WithCancel(context.Background())
203	t := &http2Server{
204		ctx:               ctx,
205		cancel:            cancel,
206		ctxDone:           ctx.Done(),
207		conn:              conn,
208		remoteAddr:        conn.RemoteAddr(),
209		localAddr:         conn.LocalAddr(),
210		authInfo:          config.AuthInfo,
211		framer:            framer,
212		readerDone:        make(chan struct{}),
213		writerDone:        make(chan struct{}),
214		maxStreams:        maxStreams,
215		inTapHandle:       config.InTapHandle,
216		fc:                &trInFlow{limit: uint32(icwz)},
217		state:             reachable,
218		activeStreams:     make(map[uint32]*Stream),
219		stats:             config.StatsHandler,
220		kp:                kp,
221		idle:              time.Now(),
222		kep:               kep,
223		initialWindowSize: iwz,
224	}
225	t.controlBuf = newControlBuffer(t.ctxDone)
226	if dynamicWindow {
227		t.bdpEst = &bdpEstimator{
228			bdp:               initialWindowSize,
229			updateFlowControl: t.updateFlowControl,
230		}
231	}
232	if t.stats != nil {
233		t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
234			RemoteAddr: t.remoteAddr,
235			LocalAddr:  t.localAddr,
236		})
237		connBegin := &stats.ConnBegin{}
238		t.stats.HandleConn(t.ctx, connBegin)
239	}
240	if channelz.IsOn() {
241		t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, "")
242	}
243	t.framer.writer.Flush()
244
245	defer func() {
246		if err != nil {
247			t.Close()
248		}
249	}()
250
251	// Check the validity of client preface.
252	preface := make([]byte, len(clientPreface))
253	if _, err := io.ReadFull(t.conn, preface); err != nil {
254		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
255	}
256	if !bytes.Equal(preface, clientPreface) {
257		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
258	}
259
260	frame, err := t.framer.fr.ReadFrame()
261	if err == io.EOF || err == io.ErrUnexpectedEOF {
262		return nil, err
263	}
264	if err != nil {
265		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
266	}
267	atomic.StoreUint32(&t.activity, 1)
268	sf, ok := frame.(*http2.SettingsFrame)
269	if !ok {
270		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
271	}
272	t.handleSettings(sf)
273
274	go func() {
275		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
276		t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
277		if err := t.loopy.run(); err != nil {
278			errorf("transport: loopyWriter.run returning. Err: %v", err)
279		}
280		t.conn.Close()
281		close(t.writerDone)
282	}()
283	go t.keepalive()
284	return t, nil
285}
286
287// operateHeader takes action on the decoded headers.
288func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
289	streamID := frame.Header().StreamID
290	var state decodeState
291	for _, hf := range frame.Fields {
292		if err := state.processHeaderField(hf); err != nil {
293			if se, ok := err.(StreamError); ok {
294				t.controlBuf.put(&cleanupStream{
295					streamID: streamID,
296					rst:      true,
297					rstCode:  statusCodeConvTab[se.Code],
298					onWrite:  func() {},
299				})
300			}
301			return
302		}
303	}
304
305	buf := newRecvBuffer()
306	s := &Stream{
307		id:             streamID,
308		st:             t,
309		buf:            buf,
310		fc:             &inFlow{limit: uint32(t.initialWindowSize)},
311		recvCompress:   state.encoding,
312		method:         state.method,
313		contentSubtype: state.contentSubtype,
314	}
315	if frame.StreamEnded() {
316		// s is just created by the caller. No lock needed.
317		s.state = streamReadDone
318	}
319	if state.timeoutSet {
320		s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
321	} else {
322		s.ctx, s.cancel = context.WithCancel(t.ctx)
323	}
324	pr := &peer.Peer{
325		Addr: t.remoteAddr,
326	}
327	// Attach Auth info if there is any.
328	if t.authInfo != nil {
329		pr.AuthInfo = t.authInfo
330	}
331	s.ctx = peer.NewContext(s.ctx, pr)
332	// Attach the received metadata to the context.
333	if len(state.mdata) > 0 {
334		s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
335	}
336	if state.statsTags != nil {
337		s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
338	}
339	if state.statsTrace != nil {
340		s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
341	}
342	if t.inTapHandle != nil {
343		var err error
344		info := &tap.Info{
345			FullMethodName: state.method,
346		}
347		s.ctx, err = t.inTapHandle(s.ctx, info)
348		if err != nil {
349			warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
350			t.controlBuf.put(&cleanupStream{
351				streamID: s.id,
352				rst:      true,
353				rstCode:  http2.ErrCodeRefusedStream,
354				onWrite:  func() {},
355			})
356			return
357		}
358	}
359	t.mu.Lock()
360	if t.state != reachable {
361		t.mu.Unlock()
362		return
363	}
364	if uint32(len(t.activeStreams)) >= t.maxStreams {
365		t.mu.Unlock()
366		t.controlBuf.put(&cleanupStream{
367			streamID: streamID,
368			rst:      true,
369			rstCode:  http2.ErrCodeRefusedStream,
370			onWrite:  func() {},
371		})
372		return
373	}
374	if streamID%2 != 1 || streamID <= t.maxStreamID {
375		t.mu.Unlock()
376		// illegal gRPC stream id.
377		errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
378		return true
379	}
380	t.maxStreamID = streamID
381	t.activeStreams[streamID] = s
382	if len(t.activeStreams) == 1 {
383		t.idle = time.Time{}
384	}
385	t.mu.Unlock()
386	if channelz.IsOn() {
387		t.czmu.Lock()
388		t.streamsStarted++
389		t.lastStreamCreated = time.Now()
390		t.czmu.Unlock()
391	}
392	s.requestRead = func(n int) {
393		t.adjustWindow(s, uint32(n))
394	}
395	s.ctx = traceCtx(s.ctx, s.method)
396	if t.stats != nil {
397		s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
398		inHeader := &stats.InHeader{
399			FullMethod:  s.method,
400			RemoteAddr:  t.remoteAddr,
401			LocalAddr:   t.localAddr,
402			Compression: s.recvCompress,
403			WireLength:  int(frame.Header().Length),
404		}
405		t.stats.HandleRPC(s.ctx, inHeader)
406	}
407	s.ctxDone = s.ctx.Done()
408	s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
409	s.trReader = &transportReader{
410		reader: &recvBufferReader{
411			ctx:     s.ctx,
412			ctxDone: s.ctxDone,
413			recv:    s.buf,
414		},
415		windowHandler: func(n int) {
416			t.updateWindow(s, uint32(n))
417		},
418	}
419	// Register the stream with loopy.
420	t.controlBuf.put(&registerStream{
421		streamID: s.id,
422		wq:       s.wq,
423	})
424	handle(s)
425	return
426}
427
428// HandleStreams receives incoming streams using the given handler. This is
429// typically run in a separate goroutine.
430// traceCtx attaches trace to ctx and returns the new context.
431func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
432	defer close(t.readerDone)
433	for {
434		frame, err := t.framer.fr.ReadFrame()
435		atomic.StoreUint32(&t.activity, 1)
436		if err != nil {
437			if se, ok := err.(http2.StreamError); ok {
438				warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
439				t.mu.Lock()
440				s := t.activeStreams[se.StreamID]
441				t.mu.Unlock()
442				if s != nil {
443					t.closeStream(s, true, se.Code, nil, false)
444				} else {
445					t.controlBuf.put(&cleanupStream{
446						streamID: se.StreamID,
447						rst:      true,
448						rstCode:  se.Code,
449						onWrite:  func() {},
450					})
451				}
452				continue
453			}
454			if err == io.EOF || err == io.ErrUnexpectedEOF {
455				t.Close()
456				return
457			}
458			warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
459			t.Close()
460			return
461		}
462		switch frame := frame.(type) {
463		case *http2.MetaHeadersFrame:
464			if t.operateHeaders(frame, handle, traceCtx) {
465				t.Close()
466				break
467			}
468		case *http2.DataFrame:
469			t.handleData(frame)
470		case *http2.RSTStreamFrame:
471			t.handleRSTStream(frame)
472		case *http2.SettingsFrame:
473			t.handleSettings(frame)
474		case *http2.PingFrame:
475			t.handlePing(frame)
476		case *http2.WindowUpdateFrame:
477			t.handleWindowUpdate(frame)
478		case *http2.GoAwayFrame:
479			// TODO: Handle GoAway from the client appropriately.
480		default:
481			errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
482		}
483	}
484}
485
486func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
487	t.mu.Lock()
488	defer t.mu.Unlock()
489	if t.activeStreams == nil {
490		// The transport is closing.
491		return nil, false
492	}
493	s, ok := t.activeStreams[f.Header().StreamID]
494	if !ok {
495		// The stream is already done.
496		return nil, false
497	}
498	return s, true
499}
500
501// adjustWindow sends out extra window update over the initial window size
502// of stream if the application is requesting data larger in size than
503// the window.
504func (t *http2Server) adjustWindow(s *Stream, n uint32) {
505	if w := s.fc.maybeAdjust(n); w > 0 {
506		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
507	}
508
509}
510
511// updateWindow adjusts the inbound quota for the stream and the transport.
512// Window updates will deliver to the controller for sending when
513// the cumulative quota exceeds the corresponding threshold.
514func (t *http2Server) updateWindow(s *Stream, n uint32) {
515	if w := s.fc.onRead(n); w > 0 {
516		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
517			increment: w,
518		})
519	}
520}
521
522// updateFlowControl updates the incoming flow control windows
523// for the transport and the stream based on the current bdp
524// estimation.
525func (t *http2Server) updateFlowControl(n uint32) {
526	t.mu.Lock()
527	for _, s := range t.activeStreams {
528		s.fc.newLimit(n)
529	}
530	t.initialWindowSize = int32(n)
531	t.mu.Unlock()
532	t.controlBuf.put(&outgoingWindowUpdate{
533		streamID:  0,
534		increment: t.fc.newLimit(n),
535	})
536	t.controlBuf.put(&outgoingSettings{
537		ss: []http2.Setting{
538			{
539				ID:  http2.SettingInitialWindowSize,
540				Val: n,
541			},
542		},
543	})
544
545}
546
547func (t *http2Server) handleData(f *http2.DataFrame) {
548	size := f.Header().Length
549	var sendBDPPing bool
550	if t.bdpEst != nil {
551		sendBDPPing = t.bdpEst.add(size)
552	}
553	// Decouple connection's flow control from application's read.
554	// An update on connection's flow control should not depend on
555	// whether user application has read the data or not. Such a
556	// restriction is already imposed on the stream's flow control,
557	// and therefore the sender will be blocked anyways.
558	// Decoupling the connection flow control will prevent other
559	// active(fast) streams from starving in presence of slow or
560	// inactive streams.
561	if w := t.fc.onData(size); w > 0 {
562		t.controlBuf.put(&outgoingWindowUpdate{
563			streamID:  0,
564			increment: w,
565		})
566	}
567	if sendBDPPing {
568		// Avoid excessive ping detection (e.g. in an L7 proxy)
569		// by sending a window update prior to the BDP ping.
570		if w := t.fc.reset(); w > 0 {
571			t.controlBuf.put(&outgoingWindowUpdate{
572				streamID:  0,
573				increment: w,
574			})
575		}
576		t.controlBuf.put(bdpPing)
577	}
578	// Select the right stream to dispatch.
579	s, ok := t.getStream(f)
580	if !ok {
581		return
582	}
583	if size > 0 {
584		if err := s.fc.onData(size); err != nil {
585			t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
586			return
587		}
588		if f.Header().Flags.Has(http2.FlagDataPadded) {
589			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
590				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
591			}
592		}
593		// TODO(bradfitz, zhaoq): A copy is required here because there is no
594		// guarantee f.Data() is consumed before the arrival of next frame.
595		// Can this copy be eliminated?
596		if len(f.Data()) > 0 {
597			data := make([]byte, len(f.Data()))
598			copy(data, f.Data())
599			s.write(recvMsg{data: data})
600		}
601	}
602	if f.Header().Flags.Has(http2.FlagDataEndStream) {
603		// Received the end of stream from the client.
604		s.compareAndSwapState(streamActive, streamReadDone)
605		s.write(recvMsg{err: io.EOF})
606	}
607}
608
609func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
610	s, ok := t.getStream(f)
611	if !ok {
612		return
613	}
614	t.closeStream(s, false, 0, nil, false)
615}
616
617func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
618	if f.IsAck() {
619		return
620	}
621	var ss []http2.Setting
622	f.ForeachSetting(func(s http2.Setting) error {
623		ss = append(ss, s)
624		return nil
625	})
626	t.controlBuf.put(&incomingSettings{
627		ss: ss,
628	})
629}
630
631const (
632	maxPingStrikes     = 2
633	defaultPingTimeout = 2 * time.Hour
634)
635
636func (t *http2Server) handlePing(f *http2.PingFrame) {
637	if f.IsAck() {
638		if f.Data == goAwayPing.data && t.drainChan != nil {
639			close(t.drainChan)
640			return
641		}
642		// Maybe it's a BDP ping.
643		if t.bdpEst != nil {
644			t.bdpEst.calculate(f.Data)
645		}
646		return
647	}
648	pingAck := &ping{ack: true}
649	copy(pingAck.data[:], f.Data[:])
650	t.controlBuf.put(pingAck)
651
652	now := time.Now()
653	defer func() {
654		t.lastPingAt = now
655	}()
656	// A reset ping strikes means that we don't need to check for policy
657	// violation for this ping and the pingStrikes counter should be set
658	// to 0.
659	if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
660		t.pingStrikes = 0
661		return
662	}
663	t.mu.Lock()
664	ns := len(t.activeStreams)
665	t.mu.Unlock()
666	if ns < 1 && !t.kep.PermitWithoutStream {
667		// Keepalive shouldn't be active thus, this new ping should
668		// have come after at least defaultPingTimeout.
669		if t.lastPingAt.Add(defaultPingTimeout).After(now) {
670			t.pingStrikes++
671		}
672	} else {
673		// Check if keepalive policy is respected.
674		if t.lastPingAt.Add(t.kep.MinTime).After(now) {
675			t.pingStrikes++
676		}
677	}
678
679	if t.pingStrikes > maxPingStrikes {
680		// Send goaway and close the connection.
681		errorf("transport: Got too many pings from the client, closing the connection.")
682		t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
683	}
684}
685
686func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
687	t.controlBuf.put(&incomingWindowUpdate{
688		streamID:  f.Header().StreamID,
689		increment: f.Increment,
690	})
691}
692
693func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
694	for k, vv := range md {
695		if isReservedHeader(k) {
696			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
697			continue
698		}
699		for _, v := range vv {
700			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
701		}
702	}
703	return headerFields
704}
705
706// WriteHeader sends the header metedata md back to the client.
707func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
708	if s.updateHeaderSent() || s.getState() == streamDone {
709		return ErrIllegalHeaderWrite
710	}
711	s.hdrMu.Lock()
712	if md.Len() > 0 {
713		if s.header.Len() > 0 {
714			s.header = metadata.Join(s.header, md)
715		} else {
716			s.header = md
717		}
718	}
719	t.writeHeaderLocked(s)
720	s.hdrMu.Unlock()
721	return nil
722}
723
724func (t *http2Server) writeHeaderLocked(s *Stream) {
725	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
726	// first and create a slice of that exact size.
727	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
728	headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
729	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
730	if s.sendCompress != "" {
731		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
732	}
733	headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
734	t.controlBuf.put(&headerFrame{
735		streamID:  s.id,
736		hf:        headerFields,
737		endStream: false,
738		onWrite: func() {
739			atomic.StoreUint32(&t.resetPingStrikes, 1)
740		},
741	})
742	if t.stats != nil {
743		// Note: WireLength is not set in outHeader.
744		// TODO(mmukhi): Revisit this later, if needed.
745		outHeader := &stats.OutHeader{}
746		t.stats.HandleRPC(s.Context(), outHeader)
747	}
748}
749
750// WriteStatus sends stream status to the client and terminates the stream.
751// There is no further I/O operations being able to perform on this stream.
752// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
753// OK is adopted.
754func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
755	if s.getState() == streamDone {
756		return nil
757	}
758	s.hdrMu.Lock()
759	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
760	// first and create a slice of that exact size.
761	headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
762	if !s.updateHeaderSent() {                      // No headers have been sent.
763		if len(s.header) > 0 { // Send a separate header frame.
764			t.writeHeaderLocked(s)
765		} else { // Send a trailer only response.
766			headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
767			headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
768		}
769	}
770	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
771	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
772
773	if p := st.Proto(); p != nil && len(p.Details) > 0 {
774		stBytes, err := proto.Marshal(p)
775		if err != nil {
776			// TODO: return error instead, when callers are able to handle it.
777			grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
778		} else {
779			headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
780		}
781	}
782
783	// Attach the trailer metadata.
784	headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
785	trailingHeader := &headerFrame{
786		streamID:  s.id,
787		hf:        headerFields,
788		endStream: true,
789		onWrite: func() {
790			atomic.StoreUint32(&t.resetPingStrikes, 1)
791		},
792	}
793	s.hdrMu.Unlock()
794	t.closeStream(s, false, 0, trailingHeader, true)
795	if t.stats != nil {
796		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
797	}
798	return nil
799}
800
801// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
802// is returns if it fails (e.g., framing error, transport error).
803func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
804	if !s.isHeaderSent() { // Headers haven't been written yet.
805		if err := t.WriteHeader(s, nil); err != nil {
806			// TODO(mmukhi, dfawley): Make sure this is the right code to return.
807			return streamErrorf(codes.Internal, "transport: %v", err)
808		}
809	} else {
810		// Writing headers checks for this condition.
811		if s.getState() == streamDone {
812			// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
813			s.cancel()
814			select {
815			case <-t.ctx.Done():
816				return ErrConnClosing
817			default:
818			}
819			return ContextErr(s.ctx.Err())
820		}
821	}
822	// Add some data to header frame so that we can equally distribute bytes across frames.
823	emptyLen := http2MaxFrameLen - len(hdr)
824	if emptyLen > len(data) {
825		emptyLen = len(data)
826	}
827	hdr = append(hdr, data[:emptyLen]...)
828	data = data[emptyLen:]
829	df := &dataFrame{
830		streamID: s.id,
831		h:        hdr,
832		d:        data,
833		onEachWrite: func() {
834			atomic.StoreUint32(&t.resetPingStrikes, 1)
835		},
836	}
837	if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
838		select {
839		case <-t.ctx.Done():
840			return ErrConnClosing
841		default:
842		}
843		return ContextErr(s.ctx.Err())
844	}
845	return t.controlBuf.put(df)
846}
847
848// keepalive running in a separate goroutine does the following:
849// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
850// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
851// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
852// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
853// after an additional duration of keepalive.Timeout.
854func (t *http2Server) keepalive() {
855	p := &ping{}
856	var pingSent bool
857	maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
858	maxAge := time.NewTimer(t.kp.MaxConnectionAge)
859	keepalive := time.NewTimer(t.kp.Time)
860	// NOTE: All exit paths of this function should reset their
861	// respective timers. A failure to do so will cause the
862	// following clean-up to deadlock and eventually leak.
863	defer func() {
864		if !maxIdle.Stop() {
865			<-maxIdle.C
866		}
867		if !maxAge.Stop() {
868			<-maxAge.C
869		}
870		if !keepalive.Stop() {
871			<-keepalive.C
872		}
873	}()
874	for {
875		select {
876		case <-maxIdle.C:
877			t.mu.Lock()
878			idle := t.idle
879			if idle.IsZero() { // The connection is non-idle.
880				t.mu.Unlock()
881				maxIdle.Reset(t.kp.MaxConnectionIdle)
882				continue
883			}
884			val := t.kp.MaxConnectionIdle - time.Since(idle)
885			t.mu.Unlock()
886			if val <= 0 {
887				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
888				// Gracefully close the connection.
889				t.drain(http2.ErrCodeNo, []byte{})
890				// Resetting the timer so that the clean-up doesn't deadlock.
891				maxIdle.Reset(infinity)
892				return
893			}
894			maxIdle.Reset(val)
895		case <-maxAge.C:
896			t.drain(http2.ErrCodeNo, []byte{})
897			maxAge.Reset(t.kp.MaxConnectionAgeGrace)
898			select {
899			case <-maxAge.C:
900				// Close the connection after grace period.
901				t.Close()
902				// Resetting the timer so that the clean-up doesn't deadlock.
903				maxAge.Reset(infinity)
904			case <-t.ctx.Done():
905			}
906			return
907		case <-keepalive.C:
908			if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
909				pingSent = false
910				keepalive.Reset(t.kp.Time)
911				continue
912			}
913			if pingSent {
914				t.Close()
915				// Resetting the timer so that the clean-up doesn't deadlock.
916				keepalive.Reset(infinity)
917				return
918			}
919			pingSent = true
920			if channelz.IsOn() {
921				t.czmu.Lock()
922				t.kpCount++
923				t.czmu.Unlock()
924			}
925			t.controlBuf.put(p)
926			keepalive.Reset(t.kp.Timeout)
927		case <-t.ctx.Done():
928			return
929		}
930	}
931}
932
933// Close starts shutting down the http2Server transport.
934// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
935// could cause some resource issue. Revisit this later.
936func (t *http2Server) Close() error {
937	t.mu.Lock()
938	if t.state == closing {
939		t.mu.Unlock()
940		return errors.New("transport: Close() was already called")
941	}
942	t.state = closing
943	streams := t.activeStreams
944	t.activeStreams = nil
945	t.mu.Unlock()
946	t.controlBuf.finish()
947	t.cancel()
948	err := t.conn.Close()
949	if channelz.IsOn() {
950		channelz.RemoveEntry(t.channelzID)
951	}
952	// Cancel all active streams.
953	for _, s := range streams {
954		s.cancel()
955	}
956	if t.stats != nil {
957		connEnd := &stats.ConnEnd{}
958		t.stats.HandleConn(t.ctx, connEnd)
959	}
960	return err
961}
962
963// closeStream clears the footprint of a stream when the stream is not needed
964// any more.
965func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
966	if s.swapState(streamDone) == streamDone {
967		// If the stream was already done, return.
968		return
969	}
970	// In case stream sending and receiving are invoked in separate
971	// goroutines (e.g., bi-directional streaming), cancel needs to be
972	// called to interrupt the potential blocking on other goroutines.
973	s.cancel()
974	cleanup := &cleanupStream{
975		streamID: s.id,
976		rst:      rst,
977		rstCode:  rstCode,
978		onWrite: func() {
979			t.mu.Lock()
980			if t.activeStreams != nil {
981				delete(t.activeStreams, s.id)
982				if len(t.activeStreams) == 0 {
983					t.idle = time.Now()
984				}
985			}
986			t.mu.Unlock()
987			if channelz.IsOn() {
988				t.czmu.Lock()
989				if eosReceived {
990					t.streamsSucceeded++
991				} else {
992					t.streamsFailed++
993				}
994				t.czmu.Unlock()
995			}
996		},
997	}
998	if hdr != nil {
999		hdr.cleanup = cleanup
1000		t.controlBuf.put(hdr)
1001	} else {
1002		t.controlBuf.put(cleanup)
1003	}
1004}
1005
1006func (t *http2Server) RemoteAddr() net.Addr {
1007	return t.remoteAddr
1008}
1009
1010func (t *http2Server) Drain() {
1011	t.drain(http2.ErrCodeNo, []byte{})
1012}
1013
1014func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1015	t.mu.Lock()
1016	defer t.mu.Unlock()
1017	if t.drainChan != nil {
1018		return
1019	}
1020	t.drainChan = make(chan struct{})
1021	t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1022}
1023
1024var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1025
1026// Handles outgoing GoAway and returns true if loopy needs to put itself
1027// in draining mode.
1028func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1029	t.mu.Lock()
1030	if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1031		t.mu.Unlock()
1032		// The transport is closing.
1033		return false, ErrConnClosing
1034	}
1035	sid := t.maxStreamID
1036	if !g.headsUp {
1037		// Stop accepting more streams now.
1038		t.state = draining
1039		if len(t.activeStreams) == 0 {
1040			g.closeConn = true
1041		}
1042		t.mu.Unlock()
1043		if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1044			return false, err
1045		}
1046		if g.closeConn {
1047			// Abruptly close the connection following the GoAway (via
1048			// loopywriter).  But flush out what's inside the buffer first.
1049			t.framer.writer.Flush()
1050			return false, fmt.Errorf("transport: Connection closing")
1051		}
1052		return true, nil
1053	}
1054	t.mu.Unlock()
1055	// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1056	// Follow that with a ping and wait for the ack to come back or a timer
1057	// to expire. During this time accept new streams since they might have
1058	// originated before the GoAway reaches the client.
1059	// After getting the ack or timer expiration send out another GoAway this
1060	// time with an ID of the max stream server intends to process.
1061	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1062		return false, err
1063	}
1064	if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1065		return false, err
1066	}
1067	go func() {
1068		timer := time.NewTimer(time.Minute)
1069		defer timer.Stop()
1070		select {
1071		case <-t.drainChan:
1072		case <-timer.C:
1073		case <-t.ctx.Done():
1074			return
1075		}
1076		t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1077	}()
1078	return false, nil
1079}
1080
1081func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1082	t.czmu.RLock()
1083	s := channelz.SocketInternalMetric{
1084		StreamsStarted:                   t.streamsStarted,
1085		StreamsSucceeded:                 t.streamsSucceeded,
1086		StreamsFailed:                    t.streamsFailed,
1087		MessagesSent:                     t.msgSent,
1088		MessagesReceived:                 t.msgRecv,
1089		KeepAlivesSent:                   t.kpCount,
1090		LastRemoteStreamCreatedTimestamp: t.lastStreamCreated,
1091		LastMessageSentTimestamp:         t.lastMsgSent,
1092		LastMessageReceivedTimestamp:     t.lastMsgRecv,
1093		LocalFlowControlWindow:           int64(t.fc.getSize()),
1094		SocketOptions:                    channelz.GetSocketOption(t.conn),
1095		LocalAddr:                        t.localAddr,
1096		RemoteAddr:                       t.remoteAddr,
1097		// RemoteName :
1098	}
1099	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1100		s.Security = au.GetSecurityValue()
1101	}
1102	t.czmu.RUnlock()
1103	s.RemoteFlowControlWindow = t.getOutFlowWindow()
1104	return &s
1105}
1106
1107func (t *http2Server) IncrMsgSent() {
1108	t.czmu.Lock()
1109	t.msgSent++
1110	t.lastMsgSent = time.Now()
1111	t.czmu.Unlock()
1112}
1113
1114func (t *http2Server) IncrMsgRecv() {
1115	t.czmu.Lock()
1116	t.msgRecv++
1117	t.lastMsgRecv = time.Now()
1118	t.czmu.Unlock()
1119}
1120
1121func (t *http2Server) getOutFlowWindow() int64 {
1122	resp := make(chan uint32)
1123	timer := time.NewTimer(time.Second)
1124	defer timer.Stop()
1125	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1126	select {
1127	case sz := <-resp:
1128		return int64(sz)
1129	case <-t.ctxDone:
1130		return -1
1131	case <-timer.C:
1132		return -2
1133	}
1134}
1135
1136func getJitter(v time.Duration) time.Duration {
1137	if v == infinity {
1138		return 0
1139	}
1140	// Generate a jitter between +/- 10% of the value.
1141	r := int64(v / 10)
1142	j := grpcrand.Int63n(2*r) - r
1143	return time.Duration(j)
1144}
1145