1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package transport 20 21import ( 22 "io" 23 "math" 24 "net" 25 "strings" 26 "sync" 27 "sync/atomic" 28 "time" 29 30 "golang.org/x/net/context" 31 "golang.org/x/net/http2" 32 "golang.org/x/net/http2/hpack" 33 34 "google.golang.org/grpc/codes" 35 "google.golang.org/grpc/credentials" 36 "google.golang.org/grpc/internal/channelz" 37 "google.golang.org/grpc/keepalive" 38 "google.golang.org/grpc/metadata" 39 "google.golang.org/grpc/peer" 40 "google.golang.org/grpc/stats" 41 "google.golang.org/grpc/status" 42) 43 44// http2Client implements the ClientTransport interface with HTTP2. 45type http2Client struct { 46 ctx context.Context 47 cancel context.CancelFunc 48 ctxDone <-chan struct{} // Cache the ctx.Done() chan. 49 userAgent string 50 md interface{} 51 conn net.Conn // underlying communication channel 52 loopy *loopyWriter 53 remoteAddr net.Addr 54 localAddr net.Addr 55 authInfo credentials.AuthInfo // auth info about the connection 56 57 readerDone chan struct{} // sync point to enable testing. 58 writerDone chan struct{} // sync point to enable testing. 59 // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) 60 // that the server sent GoAway on this transport. 61 goAway chan struct{} 62 // awakenKeepalive is used to wake up keepalive when after it has gone dormant. 63 awakenKeepalive chan struct{} 64 65 framer *framer 66 // controlBuf delivers all the control related tasks (e.g., window 67 // updates, reset streams, and various settings) to the controller. 68 controlBuf *controlBuffer 69 fc *trInFlow 70 // The scheme used: https if TLS is on, http otherwise. 71 scheme string 72 73 isSecure bool 74 75 creds []credentials.PerRPCCredentials 76 77 // Boolean to keep track of reading activity on transport. 78 // 1 is true and 0 is false. 79 activity uint32 // Accessed atomically. 80 kp keepalive.ClientParameters 81 keepaliveEnabled bool 82 83 statsHandler stats.Handler 84 85 initialWindowSize int32 86 87 bdpEst *bdpEstimator 88 // onSuccess is a callback that client transport calls upon 89 // receiving server preface to signal that a succefull HTTP2 90 // connection was established. 91 onSuccess func() 92 93 maxConcurrentStreams uint32 94 streamQuota int64 95 streamsQuotaAvailable chan struct{} 96 waitingStreams uint32 97 nextID uint32 98 99 mu sync.Mutex // guard the following variables 100 state transportState 101 activeStreams map[uint32]*Stream 102 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. 103 prevGoAwayID uint32 104 // goAwayReason records the http2.ErrCode and debug data received with the 105 // GoAway frame. 106 goAwayReason GoAwayReason 107 108 // Fields below are for channelz metric collection. 109 channelzID int64 // channelz unique identification number 110 czmu sync.RWMutex 111 kpCount int64 112 // The number of streams that have started, including already finished ones. 113 streamsStarted int64 114 // The number of streams that have ended successfully by receiving EoS bit set 115 // frame from server. 116 streamsSucceeded int64 117 streamsFailed int64 118 lastStreamCreated time.Time 119 msgSent int64 120 msgRecv int64 121 lastMsgSent time.Time 122 lastMsgRecv time.Time 123} 124 125func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { 126 if fn != nil { 127 return fn(ctx, addr) 128 } 129 return dialContext(ctx, "tcp", addr) 130} 131 132func isTemporary(err error) bool { 133 switch err := err.(type) { 134 case interface { 135 Temporary() bool 136 }: 137 return err.Temporary() 138 case interface { 139 Timeout() bool 140 }: 141 // Timeouts may be resolved upon retry, and are thus treated as 142 // temporary. 143 return err.Timeout() 144 } 145 return true 146} 147 148// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 149// and starts to receive messages on it. Non-nil error returns if construction 150// fails. 151func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ ClientTransport, err error) { 152 scheme := "http" 153 ctx, cancel := context.WithCancel(ctx) 154 defer func() { 155 if err != nil { 156 cancel() 157 } 158 }() 159 160 conn, err := dial(connectCtx, opts.Dialer, addr.Addr) 161 if err != nil { 162 if opts.FailOnNonTempDialError { 163 return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err) 164 } 165 return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err) 166 } 167 // Any further errors will close the underlying connection 168 defer func(conn net.Conn) { 169 if err != nil { 170 conn.Close() 171 } 172 }(conn) 173 var ( 174 isSecure bool 175 authInfo credentials.AuthInfo 176 ) 177 if creds := opts.TransportCredentials; creds != nil { 178 scheme = "https" 179 conn, authInfo, err = creds.ClientHandshake(connectCtx, addr.Authority, conn) 180 if err != nil { 181 return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err) 182 } 183 isSecure = true 184 } 185 kp := opts.KeepaliveParams 186 // Validate keepalive parameters. 187 if kp.Time == 0 { 188 kp.Time = defaultClientKeepaliveTime 189 } 190 if kp.Timeout == 0 { 191 kp.Timeout = defaultClientKeepaliveTimeout 192 } 193 dynamicWindow := true 194 icwz := int32(initialWindowSize) 195 if opts.InitialConnWindowSize >= defaultWindowSize { 196 icwz = opts.InitialConnWindowSize 197 dynamicWindow = false 198 } 199 writeBufSize := defaultWriteBufSize 200 if opts.WriteBufferSize > 0 { 201 writeBufSize = opts.WriteBufferSize 202 } 203 readBufSize := defaultReadBufSize 204 if opts.ReadBufferSize > 0 { 205 readBufSize = opts.ReadBufferSize 206 } 207 t := &http2Client{ 208 ctx: ctx, 209 ctxDone: ctx.Done(), // Cache Done chan. 210 cancel: cancel, 211 userAgent: opts.UserAgent, 212 md: addr.Metadata, 213 conn: conn, 214 remoteAddr: conn.RemoteAddr(), 215 localAddr: conn.LocalAddr(), 216 authInfo: authInfo, 217 readerDone: make(chan struct{}), 218 writerDone: make(chan struct{}), 219 goAway: make(chan struct{}), 220 awakenKeepalive: make(chan struct{}, 1), 221 framer: newFramer(conn, writeBufSize, readBufSize), 222 fc: &trInFlow{limit: uint32(icwz)}, 223 scheme: scheme, 224 activeStreams: make(map[uint32]*Stream), 225 isSecure: isSecure, 226 creds: opts.PerRPCCredentials, 227 kp: kp, 228 statsHandler: opts.StatsHandler, 229 initialWindowSize: initialWindowSize, 230 onSuccess: onSuccess, 231 nextID: 1, 232 maxConcurrentStreams: defaultMaxStreamsClient, 233 streamQuota: defaultMaxStreamsClient, 234 streamsQuotaAvailable: make(chan struct{}, 1), 235 } 236 t.controlBuf = newControlBuffer(t.ctxDone) 237 if opts.InitialWindowSize >= defaultWindowSize { 238 t.initialWindowSize = opts.InitialWindowSize 239 dynamicWindow = false 240 } 241 if dynamicWindow { 242 t.bdpEst = &bdpEstimator{ 243 bdp: initialWindowSize, 244 updateFlowControl: t.updateFlowControl, 245 } 246 } 247 // Make sure awakenKeepalive can't be written upon. 248 // keepalive routine will make it writable, if need be. 249 t.awakenKeepalive <- struct{}{} 250 if t.statsHandler != nil { 251 t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ 252 RemoteAddr: t.remoteAddr, 253 LocalAddr: t.localAddr, 254 }) 255 connBegin := &stats.ConnBegin{ 256 Client: true, 257 } 258 t.statsHandler.HandleConn(t.ctx, connBegin) 259 } 260 if channelz.IsOn() { 261 t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, "") 262 } 263 if t.kp.Time != infinity { 264 t.keepaliveEnabled = true 265 go t.keepalive() 266 } 267 // Start the reader goroutine for incoming message. Each transport has 268 // a dedicated goroutine which reads HTTP2 frame from network. Then it 269 // dispatches the frame to the corresponding stream entity. 270 go t.reader() 271 // Send connection preface to server. 272 n, err := t.conn.Write(clientPreface) 273 if err != nil { 274 t.Close() 275 return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err) 276 } 277 if n != len(clientPreface) { 278 t.Close() 279 return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) 280 } 281 if t.initialWindowSize != defaultWindowSize { 282 err = t.framer.fr.WriteSettings(http2.Setting{ 283 ID: http2.SettingInitialWindowSize, 284 Val: uint32(t.initialWindowSize), 285 }) 286 } else { 287 err = t.framer.fr.WriteSettings() 288 } 289 if err != nil { 290 t.Close() 291 return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err) 292 } 293 // Adjust the connection flow control window if needed. 294 if delta := uint32(icwz - defaultWindowSize); delta > 0 { 295 if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil { 296 t.Close() 297 return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err) 298 } 299 } 300 t.framer.writer.Flush() 301 go func() { 302 t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst) 303 err := t.loopy.run() 304 if err != nil { 305 errorf("transport: loopyWriter.run returning. Err: %v", err) 306 } 307 // If it's a connection error, let reader goroutine handle it 308 // since there might be data in the buffers. 309 if _, ok := err.(net.Error); !ok { 310 t.conn.Close() 311 } 312 close(t.writerDone) 313 }() 314 return t, nil 315} 316 317func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { 318 // TODO(zhaoq): Handle uint32 overflow of Stream.id. 319 s := &Stream{ 320 done: make(chan struct{}), 321 method: callHdr.Method, 322 sendCompress: callHdr.SendCompress, 323 buf: newRecvBuffer(), 324 headerChan: make(chan struct{}), 325 contentSubtype: callHdr.ContentSubtype, 326 } 327 s.wq = newWriteQuota(defaultWriteQuota, s.done) 328 s.requestRead = func(n int) { 329 t.adjustWindow(s, uint32(n)) 330 } 331 // The client side stream context should have exactly the same life cycle with the user provided context. 332 // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done. 333 // So we use the original context here instead of creating a copy. 334 s.ctx = ctx 335 s.trReader = &transportReader{ 336 reader: &recvBufferReader{ 337 ctx: s.ctx, 338 ctxDone: s.ctx.Done(), 339 recv: s.buf, 340 }, 341 windowHandler: func(n int) { 342 t.updateWindow(s, uint32(n)) 343 }, 344 } 345 return s 346} 347 348func (t *http2Client) getPeer() *peer.Peer { 349 pr := &peer.Peer{ 350 Addr: t.remoteAddr, 351 } 352 // Attach Auth info if there is any. 353 if t.authInfo != nil { 354 pr.AuthInfo = t.authInfo 355 } 356 return pr 357} 358 359func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) { 360 aud := t.createAudience(callHdr) 361 authData, err := t.getTrAuthData(ctx, aud) 362 if err != nil { 363 return nil, err 364 } 365 callAuthData, err := t.getCallAuthData(ctx, aud, callHdr) 366 if err != nil { 367 return nil, err 368 } 369 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields 370 // first and create a slice of that exact size. 371 // Make the slice of certain predictable size to reduce allocations made by append. 372 hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te 373 hfLen += len(authData) + len(callAuthData) 374 headerFields := make([]hpack.HeaderField, 0, hfLen) 375 headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"}) 376 headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme}) 377 headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method}) 378 headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host}) 379 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)}) 380 headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent}) 381 headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"}) 382 383 if callHdr.SendCompress != "" { 384 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress}) 385 } 386 if dl, ok := ctx.Deadline(); ok { 387 // Send out timeout regardless its value. The server can detect timeout context by itself. 388 // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire. 389 timeout := dl.Sub(time.Now()) 390 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)}) 391 } 392 for k, v := range authData { 393 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 394 } 395 for k, v := range callAuthData { 396 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 397 } 398 if b := stats.OutgoingTags(ctx); b != nil { 399 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)}) 400 } 401 if b := stats.OutgoingTrace(ctx); b != nil { 402 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)}) 403 } 404 405 if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok { 406 var k string 407 for _, vv := range added { 408 for i, v := range vv { 409 if i%2 == 0 { 410 k = v 411 continue 412 } 413 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. 414 if isReservedHeader(k) { 415 continue 416 } 417 headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)}) 418 } 419 } 420 for k, vv := range md { 421 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. 422 if isReservedHeader(k) { 423 continue 424 } 425 for _, v := range vv { 426 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 427 } 428 } 429 } 430 if md, ok := t.md.(*metadata.MD); ok { 431 for k, vv := range *md { 432 if isReservedHeader(k) { 433 continue 434 } 435 for _, v := range vv { 436 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) 437 } 438 } 439 } 440 return headerFields, nil 441} 442 443func (t *http2Client) createAudience(callHdr *CallHdr) string { 444 // Create an audience string only if needed. 445 if len(t.creds) == 0 && callHdr.Creds == nil { 446 return "" 447 } 448 // Construct URI required to get auth request metadata. 449 // Omit port if it is the default one. 450 host := strings.TrimSuffix(callHdr.Host, ":443") 451 pos := strings.LastIndex(callHdr.Method, "/") 452 if pos == -1 { 453 pos = len(callHdr.Method) 454 } 455 return "https://" + host + callHdr.Method[:pos] 456} 457 458func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) { 459 authData := map[string]string{} 460 for _, c := range t.creds { 461 data, err := c.GetRequestMetadata(ctx, audience) 462 if err != nil { 463 if _, ok := status.FromError(err); ok { 464 return nil, err 465 } 466 467 return nil, streamErrorf(codes.Unauthenticated, "transport: %v", err) 468 } 469 for k, v := range data { 470 // Capital header names are illegal in HTTP/2. 471 k = strings.ToLower(k) 472 authData[k] = v 473 } 474 } 475 return authData, nil 476} 477 478func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) { 479 callAuthData := map[string]string{} 480 // Check if credentials.PerRPCCredentials were provided via call options. 481 // Note: if these credentials are provided both via dial options and call 482 // options, then both sets of credentials will be applied. 483 if callCreds := callHdr.Creds; callCreds != nil { 484 if !t.isSecure && callCreds.RequireTransportSecurity() { 485 return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection") 486 } 487 data, err := callCreds.GetRequestMetadata(ctx, audience) 488 if err != nil { 489 return nil, streamErrorf(codes.Internal, "transport: %v", err) 490 } 491 for k, v := range data { 492 // Capital header names are illegal in HTTP/2 493 k = strings.ToLower(k) 494 callAuthData[k] = v 495 } 496 } 497 return callAuthData, nil 498} 499 500// NewStream creates a stream and registers it into the transport as "active" 501// streams. 502func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { 503 ctx = peer.NewContext(ctx, t.getPeer()) 504 headerFields, err := t.createHeaderFields(ctx, callHdr) 505 if err != nil { 506 return nil, err 507 } 508 s := t.newStream(ctx, callHdr) 509 cleanup := func(err error) { 510 if s.swapState(streamDone) == streamDone { 511 // If it was already done, return. 512 return 513 } 514 // The stream was unprocessed by the server. 515 atomic.StoreUint32(&s.unprocessed, 1) 516 s.write(recvMsg{err: err}) 517 close(s.done) 518 // If headerChan isn't closed, then close it. 519 if atomic.SwapUint32(&s.headerDone, 1) == 0 { 520 close(s.headerChan) 521 } 522 523 } 524 hdr := &headerFrame{ 525 hf: headerFields, 526 endStream: false, 527 initStream: func(id uint32) (bool, error) { 528 t.mu.Lock() 529 if state := t.state; state != reachable { 530 t.mu.Unlock() 531 // Do a quick cleanup. 532 err := error(errStreamDrain) 533 if state == closing { 534 err = ErrConnClosing 535 } 536 cleanup(err) 537 return false, err 538 } 539 t.activeStreams[id] = s 540 if channelz.IsOn() { 541 t.czmu.Lock() 542 t.streamsStarted++ 543 t.lastStreamCreated = time.Now() 544 t.czmu.Unlock() 545 } 546 var sendPing bool 547 // If the number of active streams change from 0 to 1, then check if keepalive 548 // has gone dormant. If so, wake it up. 549 if len(t.activeStreams) == 1 && t.keepaliveEnabled { 550 select { 551 case t.awakenKeepalive <- struct{}{}: 552 sendPing = true 553 // Fill the awakenKeepalive channel again as this channel must be 554 // kept non-writable except at the point that the keepalive() 555 // goroutine is waiting either to be awaken or shutdown. 556 t.awakenKeepalive <- struct{}{} 557 default: 558 } 559 } 560 t.mu.Unlock() 561 return sendPing, nil 562 }, 563 onOrphaned: cleanup, 564 wq: s.wq, 565 } 566 firstTry := true 567 var ch chan struct{} 568 checkForStreamQuota := func(it interface{}) bool { 569 if t.streamQuota <= 0 { // Can go negative if server decreases it. 570 if firstTry { 571 t.waitingStreams++ 572 } 573 ch = t.streamsQuotaAvailable 574 return false 575 } 576 if !firstTry { 577 t.waitingStreams-- 578 } 579 t.streamQuota-- 580 h := it.(*headerFrame) 581 h.streamID = t.nextID 582 t.nextID += 2 583 s.id = h.streamID 584 s.fc = &inFlow{limit: uint32(t.initialWindowSize)} 585 if t.streamQuota > 0 && t.waitingStreams > 0 { 586 select { 587 case t.streamsQuotaAvailable <- struct{}{}: 588 default: 589 } 590 } 591 return true 592 } 593 for { 594 success, err := t.controlBuf.executeAndPut(checkForStreamQuota, hdr) 595 if err != nil { 596 return nil, err 597 } 598 if success { 599 break 600 } 601 firstTry = false 602 select { 603 case <-ch: 604 case <-s.ctx.Done(): 605 return nil, ContextErr(s.ctx.Err()) 606 case <-t.goAway: 607 return nil, errStreamDrain 608 case <-t.ctx.Done(): 609 return nil, ErrConnClosing 610 } 611 } 612 if t.statsHandler != nil { 613 outHeader := &stats.OutHeader{ 614 Client: true, 615 FullMethod: callHdr.Method, 616 RemoteAddr: t.remoteAddr, 617 LocalAddr: t.localAddr, 618 Compression: callHdr.SendCompress, 619 } 620 t.statsHandler.HandleRPC(s.ctx, outHeader) 621 } 622 return s, nil 623} 624 625// CloseStream clears the footprint of a stream when the stream is not needed any more. 626// This must not be executed in reader's goroutine. 627func (t *http2Client) CloseStream(s *Stream, err error) { 628 var ( 629 rst bool 630 rstCode http2.ErrCode 631 ) 632 if err != nil { 633 rst = true 634 rstCode = http2.ErrCodeCancel 635 } 636 t.closeStream(s, err, rst, rstCode, nil, nil, false) 637} 638 639func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) { 640 // Set stream status to done. 641 if s.swapState(streamDone) == streamDone { 642 // If it was already done, return. 643 return 644 } 645 // status and trailers can be updated here without any synchronization because the stream goroutine will 646 // only read it after it sees an io.EOF error from read or write and we'll write those errors 647 // only after updating this. 648 s.status = st 649 if len(mdata) > 0 { 650 s.trailer = mdata 651 } 652 if err != nil { 653 // This will unblock reads eventually. 654 s.write(recvMsg{err: err}) 655 } 656 // This will unblock write. 657 close(s.done) 658 // If headerChan isn't closed, then close it. 659 if atomic.SwapUint32(&s.headerDone, 1) == 0 { 660 close(s.headerChan) 661 } 662 cleanup := &cleanupStream{ 663 streamID: s.id, 664 onWrite: func() { 665 t.mu.Lock() 666 if t.activeStreams != nil { 667 delete(t.activeStreams, s.id) 668 } 669 t.mu.Unlock() 670 if channelz.IsOn() { 671 t.czmu.Lock() 672 if eosReceived { 673 t.streamsSucceeded++ 674 } else { 675 t.streamsFailed++ 676 } 677 t.czmu.Unlock() 678 } 679 }, 680 rst: rst, 681 rstCode: rstCode, 682 } 683 addBackStreamQuota := func(interface{}) bool { 684 t.streamQuota++ 685 if t.streamQuota > 0 && t.waitingStreams > 0 { 686 select { 687 case t.streamsQuotaAvailable <- struct{}{}: 688 default: 689 } 690 } 691 return true 692 } 693 t.controlBuf.executeAndPut(addBackStreamQuota, cleanup) 694} 695 696// Close kicks off the shutdown process of the transport. This should be called 697// only once on a transport. Once it is called, the transport should not be 698// accessed any more. 699func (t *http2Client) Close() error { 700 t.mu.Lock() 701 // Make sure we only Close once. 702 if t.state == closing { 703 t.mu.Unlock() 704 return nil 705 } 706 t.state = closing 707 streams := t.activeStreams 708 t.activeStreams = nil 709 t.mu.Unlock() 710 t.controlBuf.finish() 711 t.cancel() 712 err := t.conn.Close() 713 if channelz.IsOn() { 714 channelz.RemoveEntry(t.channelzID) 715 } 716 // Notify all active streams. 717 for _, s := range streams { 718 t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, nil, nil, false) 719 } 720 if t.statsHandler != nil { 721 connEnd := &stats.ConnEnd{ 722 Client: true, 723 } 724 t.statsHandler.HandleConn(t.ctx, connEnd) 725 } 726 return err 727} 728 729// GracefulClose sets the state to draining, which prevents new streams from 730// being created and causes the transport to be closed when the last active 731// stream is closed. If there are no active streams, the transport is closed 732// immediately. This does nothing if the transport is already draining or 733// closing. 734func (t *http2Client) GracefulClose() error { 735 t.mu.Lock() 736 // Make sure we move to draining only from active. 737 if t.state == draining || t.state == closing { 738 t.mu.Unlock() 739 return nil 740 } 741 t.state = draining 742 active := len(t.activeStreams) 743 t.mu.Unlock() 744 if active == 0 { 745 return t.Close() 746 } 747 t.controlBuf.put(&incomingGoAway{}) 748 return nil 749} 750 751// Write formats the data into HTTP2 data frame(s) and sends it out. The caller 752// should proceed only if Write returns nil. 753func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { 754 if opts.Last { 755 // If it's the last message, update stream state. 756 if !s.compareAndSwapState(streamActive, streamWriteDone) { 757 return errStreamDone 758 } 759 } else if s.getState() != streamActive { 760 return errStreamDone 761 } 762 df := &dataFrame{ 763 streamID: s.id, 764 endStream: opts.Last, 765 } 766 if hdr != nil || data != nil { // If it's not an empty data frame. 767 // Add some data to grpc message header so that we can equally 768 // distribute bytes across frames. 769 emptyLen := http2MaxFrameLen - len(hdr) 770 if emptyLen > len(data) { 771 emptyLen = len(data) 772 } 773 hdr = append(hdr, data[:emptyLen]...) 774 data = data[emptyLen:] 775 df.h, df.d = hdr, data 776 // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler. 777 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { 778 return err 779 } 780 } 781 return t.controlBuf.put(df) 782} 783 784func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) { 785 t.mu.Lock() 786 defer t.mu.Unlock() 787 s, ok := t.activeStreams[f.Header().StreamID] 788 return s, ok 789} 790 791// adjustWindow sends out extra window update over the initial window size 792// of stream if the application is requesting data larger in size than 793// the window. 794func (t *http2Client) adjustWindow(s *Stream, n uint32) { 795 if w := s.fc.maybeAdjust(n); w > 0 { 796 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) 797 } 798} 799 800// updateWindow adjusts the inbound quota for the stream. 801// Window updates will be sent out when the cumulative quota 802// exceeds the corresponding threshold. 803func (t *http2Client) updateWindow(s *Stream, n uint32) { 804 if w := s.fc.onRead(n); w > 0 { 805 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) 806 } 807} 808 809// updateFlowControl updates the incoming flow control windows 810// for the transport and the stream based on the current bdp 811// estimation. 812func (t *http2Client) updateFlowControl(n uint32) { 813 t.mu.Lock() 814 for _, s := range t.activeStreams { 815 s.fc.newLimit(n) 816 } 817 t.mu.Unlock() 818 updateIWS := func(interface{}) bool { 819 t.initialWindowSize = int32(n) 820 return true 821 } 822 t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)}) 823 t.controlBuf.put(&outgoingSettings{ 824 ss: []http2.Setting{ 825 { 826 ID: http2.SettingInitialWindowSize, 827 Val: n, 828 }, 829 }, 830 }) 831} 832 833func (t *http2Client) handleData(f *http2.DataFrame) { 834 size := f.Header().Length 835 var sendBDPPing bool 836 if t.bdpEst != nil { 837 sendBDPPing = t.bdpEst.add(size) 838 } 839 // Decouple connection's flow control from application's read. 840 // An update on connection's flow control should not depend on 841 // whether user application has read the data or not. Such a 842 // restriction is already imposed on the stream's flow control, 843 // and therefore the sender will be blocked anyways. 844 // Decoupling the connection flow control will prevent other 845 // active(fast) streams from starving in presence of slow or 846 // inactive streams. 847 // 848 if w := t.fc.onData(size); w > 0 { 849 t.controlBuf.put(&outgoingWindowUpdate{ 850 streamID: 0, 851 increment: w, 852 }) 853 } 854 if sendBDPPing { 855 // Avoid excessive ping detection (e.g. in an L7 proxy) 856 // by sending a window update prior to the BDP ping. 857 858 if w := t.fc.reset(); w > 0 { 859 t.controlBuf.put(&outgoingWindowUpdate{ 860 streamID: 0, 861 increment: w, 862 }) 863 } 864 865 t.controlBuf.put(bdpPing) 866 } 867 // Select the right stream to dispatch. 868 s, ok := t.getStream(f) 869 if !ok { 870 return 871 } 872 if size > 0 { 873 if err := s.fc.onData(size); err != nil { 874 t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false) 875 return 876 } 877 if f.Header().Flags.Has(http2.FlagDataPadded) { 878 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 { 879 t.controlBuf.put(&outgoingWindowUpdate{s.id, w}) 880 } 881 } 882 // TODO(bradfitz, zhaoq): A copy is required here because there is no 883 // guarantee f.Data() is consumed before the arrival of next frame. 884 // Can this copy be eliminated? 885 if len(f.Data()) > 0 { 886 data := make([]byte, len(f.Data())) 887 copy(data, f.Data()) 888 s.write(recvMsg{data: data}) 889 } 890 } 891 // The server has closed the stream without sending trailers. Record that 892 // the read direction is closed, and set the status appropriately. 893 if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { 894 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true) 895 } 896} 897 898func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { 899 s, ok := t.getStream(f) 900 if !ok { 901 return 902 } 903 if f.ErrCode == http2.ErrCodeRefusedStream { 904 // The stream was unprocessed by the server. 905 atomic.StoreUint32(&s.unprocessed, 1) 906 } 907 statusCode, ok := http2ErrConvTab[f.ErrCode] 908 if !ok { 909 warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode) 910 statusCode = codes.Unknown 911 } 912 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false) 913} 914 915func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) { 916 if f.IsAck() { 917 return 918 } 919 var maxStreams *uint32 920 var ss []http2.Setting 921 f.ForeachSetting(func(s http2.Setting) error { 922 if s.ID == http2.SettingMaxConcurrentStreams { 923 maxStreams = new(uint32) 924 *maxStreams = s.Val 925 return nil 926 } 927 ss = append(ss, s) 928 return nil 929 }) 930 if isFirst && maxStreams == nil { 931 maxStreams = new(uint32) 932 *maxStreams = math.MaxUint32 933 } 934 sf := &incomingSettings{ 935 ss: ss, 936 } 937 if maxStreams == nil { 938 t.controlBuf.put(sf) 939 return 940 } 941 updateStreamQuota := func(interface{}) bool { 942 delta := int64(*maxStreams) - int64(t.maxConcurrentStreams) 943 t.maxConcurrentStreams = *maxStreams 944 t.streamQuota += delta 945 if delta > 0 && t.waitingStreams > 0 { 946 close(t.streamsQuotaAvailable) // wake all of them up. 947 t.streamsQuotaAvailable = make(chan struct{}, 1) 948 } 949 return true 950 } 951 t.controlBuf.executeAndPut(updateStreamQuota, sf) 952} 953 954func (t *http2Client) handlePing(f *http2.PingFrame) { 955 if f.IsAck() { 956 // Maybe it's a BDP ping. 957 if t.bdpEst != nil { 958 t.bdpEst.calculate(f.Data) 959 } 960 return 961 } 962 pingAck := &ping{ack: true} 963 copy(pingAck.data[:], f.Data[:]) 964 t.controlBuf.put(pingAck) 965} 966 967func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { 968 t.mu.Lock() 969 if t.state == closing { 970 t.mu.Unlock() 971 return 972 } 973 if f.ErrCode == http2.ErrCodeEnhanceYourCalm { 974 infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.") 975 } 976 id := f.LastStreamID 977 if id > 0 && id%2 != 1 { 978 t.mu.Unlock() 979 t.Close() 980 return 981 } 982 // A client can receive multiple GoAways from the server (see 983 // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first 984 // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be 985 // sent after an RTT delay with the ID of the last stream the server will 986 // process. 987 // 988 // Therefore, when we get the first GoAway we don't necessarily close any 989 // streams. While in case of second GoAway we close all streams created after 990 // the GoAwayId. This way streams that were in-flight while the GoAway from 991 // server was being sent don't get killed. 992 select { 993 case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways). 994 // If there are multiple GoAways the first one should always have an ID greater than the following ones. 995 if id > t.prevGoAwayID { 996 t.mu.Unlock() 997 t.Close() 998 return 999 } 1000 default: 1001 t.setGoAwayReason(f) 1002 close(t.goAway) 1003 t.state = draining 1004 t.controlBuf.put(&incomingGoAway{}) 1005 } 1006 // All streams with IDs greater than the GoAwayId 1007 // and smaller than the previous GoAway ID should be killed. 1008 upperLimit := t.prevGoAwayID 1009 if upperLimit == 0 { // This is the first GoAway Frame. 1010 upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID. 1011 } 1012 for streamID, stream := range t.activeStreams { 1013 if streamID > id && streamID <= upperLimit { 1014 // The stream was unprocessed by the server. 1015 atomic.StoreUint32(&stream.unprocessed, 1) 1016 t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false) 1017 } 1018 } 1019 t.prevGoAwayID = id 1020 active := len(t.activeStreams) 1021 t.mu.Unlock() 1022 if active == 0 { 1023 t.Close() 1024 } 1025} 1026 1027// setGoAwayReason sets the value of t.goAwayReason based 1028// on the GoAway frame received. 1029// It expects a lock on transport's mutext to be held by 1030// the caller. 1031func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) { 1032 t.goAwayReason = GoAwayNoReason 1033 switch f.ErrCode { 1034 case http2.ErrCodeEnhanceYourCalm: 1035 if string(f.DebugData()) == "too_many_pings" { 1036 t.goAwayReason = GoAwayTooManyPings 1037 } 1038 } 1039} 1040 1041func (t *http2Client) GetGoAwayReason() GoAwayReason { 1042 t.mu.Lock() 1043 defer t.mu.Unlock() 1044 return t.goAwayReason 1045} 1046 1047func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) { 1048 t.controlBuf.put(&incomingWindowUpdate{ 1049 streamID: f.Header().StreamID, 1050 increment: f.Increment, 1051 }) 1052} 1053 1054// operateHeaders takes action on the decoded headers. 1055func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { 1056 s, ok := t.getStream(frame) 1057 if !ok { 1058 return 1059 } 1060 atomic.StoreUint32(&s.bytesReceived, 1) 1061 var state decodeState 1062 if err := state.decodeResponseHeader(frame); err != nil { 1063 t.closeStream(s, err, true, http2.ErrCodeProtocol, nil, nil, false) 1064 // Something wrong. Stops reading even when there is remaining. 1065 return 1066 } 1067 1068 endStream := frame.StreamEnded() 1069 var isHeader bool 1070 defer func() { 1071 if t.statsHandler != nil { 1072 if isHeader { 1073 inHeader := &stats.InHeader{ 1074 Client: true, 1075 WireLength: int(frame.Header().Length), 1076 } 1077 t.statsHandler.HandleRPC(s.ctx, inHeader) 1078 } else { 1079 inTrailer := &stats.InTrailer{ 1080 Client: true, 1081 WireLength: int(frame.Header().Length), 1082 } 1083 t.statsHandler.HandleRPC(s.ctx, inTrailer) 1084 } 1085 } 1086 }() 1087 // If headers haven't been received yet. 1088 if atomic.SwapUint32(&s.headerDone, 1) == 0 { 1089 if !endStream { 1090 // Headers frame is not actually a trailers-only frame. 1091 isHeader = true 1092 // These values can be set without any synchronization because 1093 // stream goroutine will read it only after seeing a closed 1094 // headerChan which we'll close after setting this. 1095 s.recvCompress = state.encoding 1096 if len(state.mdata) > 0 { 1097 s.header = state.mdata 1098 } 1099 } 1100 close(s.headerChan) 1101 } 1102 if !endStream { 1103 return 1104 } 1105 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, state.status(), state.mdata, true) 1106} 1107 1108// reader runs as a separate goroutine in charge of reading data from network 1109// connection. 1110// 1111// TODO(zhaoq): currently one reader per transport. Investigate whether this is 1112// optimal. 1113// TODO(zhaoq): Check the validity of the incoming frame sequence. 1114func (t *http2Client) reader() { 1115 defer close(t.readerDone) 1116 // Check the validity of server preface. 1117 frame, err := t.framer.fr.ReadFrame() 1118 if err != nil { 1119 t.Close() 1120 return 1121 } 1122 if t.keepaliveEnabled { 1123 atomic.CompareAndSwapUint32(&t.activity, 0, 1) 1124 } 1125 sf, ok := frame.(*http2.SettingsFrame) 1126 if !ok { 1127 t.Close() 1128 return 1129 } 1130 t.onSuccess() 1131 t.handleSettings(sf, true) 1132 1133 // loop to keep reading incoming messages on this transport. 1134 for { 1135 frame, err := t.framer.fr.ReadFrame() 1136 if t.keepaliveEnabled { 1137 atomic.CompareAndSwapUint32(&t.activity, 0, 1) 1138 } 1139 if err != nil { 1140 // Abort an active stream if the http2.Framer returns a 1141 // http2.StreamError. This can happen only if the server's response 1142 // is malformed http2. 1143 if se, ok := err.(http2.StreamError); ok { 1144 t.mu.Lock() 1145 s := t.activeStreams[se.StreamID] 1146 t.mu.Unlock() 1147 if s != nil { 1148 // use error detail to provide better err message 1149 t.closeStream(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.fr.ErrorDetail()), true, http2.ErrCodeProtocol, nil, nil, false) 1150 } 1151 continue 1152 } else { 1153 // Transport error. 1154 t.Close() 1155 return 1156 } 1157 } 1158 switch frame := frame.(type) { 1159 case *http2.MetaHeadersFrame: 1160 t.operateHeaders(frame) 1161 case *http2.DataFrame: 1162 t.handleData(frame) 1163 case *http2.RSTStreamFrame: 1164 t.handleRSTStream(frame) 1165 case *http2.SettingsFrame: 1166 t.handleSettings(frame, false) 1167 case *http2.PingFrame: 1168 t.handlePing(frame) 1169 case *http2.GoAwayFrame: 1170 t.handleGoAway(frame) 1171 case *http2.WindowUpdateFrame: 1172 t.handleWindowUpdate(frame) 1173 default: 1174 errorf("transport: http2Client.reader got unhandled frame type %v.", frame) 1175 } 1176 } 1177} 1178 1179// keepalive running in a separate goroutune makes sure the connection is alive by sending pings. 1180func (t *http2Client) keepalive() { 1181 p := &ping{data: [8]byte{}} 1182 timer := time.NewTimer(t.kp.Time) 1183 for { 1184 select { 1185 case <-timer.C: 1186 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { 1187 timer.Reset(t.kp.Time) 1188 continue 1189 } 1190 // Check if keepalive should go dormant. 1191 t.mu.Lock() 1192 if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { 1193 // Make awakenKeepalive writable. 1194 <-t.awakenKeepalive 1195 t.mu.Unlock() 1196 select { 1197 case <-t.awakenKeepalive: 1198 // If the control gets here a ping has been sent 1199 // need to reset the timer with keepalive.Timeout. 1200 case <-t.ctx.Done(): 1201 return 1202 } 1203 } else { 1204 t.mu.Unlock() 1205 if channelz.IsOn() { 1206 t.czmu.Lock() 1207 t.kpCount++ 1208 t.czmu.Unlock() 1209 } 1210 // Send ping. 1211 t.controlBuf.put(p) 1212 } 1213 1214 // By the time control gets here a ping has been sent one way or the other. 1215 timer.Reset(t.kp.Timeout) 1216 select { 1217 case <-timer.C: 1218 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { 1219 timer.Reset(t.kp.Time) 1220 continue 1221 } 1222 t.Close() 1223 return 1224 case <-t.ctx.Done(): 1225 if !timer.Stop() { 1226 <-timer.C 1227 } 1228 return 1229 } 1230 case <-t.ctx.Done(): 1231 if !timer.Stop() { 1232 <-timer.C 1233 } 1234 return 1235 } 1236 } 1237} 1238 1239func (t *http2Client) Error() <-chan struct{} { 1240 return t.ctx.Done() 1241} 1242 1243func (t *http2Client) GoAway() <-chan struct{} { 1244 return t.goAway 1245} 1246 1247func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric { 1248 t.czmu.RLock() 1249 s := channelz.SocketInternalMetric{ 1250 StreamsStarted: t.streamsStarted, 1251 StreamsSucceeded: t.streamsSucceeded, 1252 StreamsFailed: t.streamsFailed, 1253 MessagesSent: t.msgSent, 1254 MessagesReceived: t.msgRecv, 1255 KeepAlivesSent: t.kpCount, 1256 LastLocalStreamCreatedTimestamp: t.lastStreamCreated, 1257 LastMessageSentTimestamp: t.lastMsgSent, 1258 LastMessageReceivedTimestamp: t.lastMsgRecv, 1259 LocalFlowControlWindow: int64(t.fc.getSize()), 1260 SocketOptions: channelz.GetSocketOption(t.conn), 1261 LocalAddr: t.localAddr, 1262 RemoteAddr: t.remoteAddr, 1263 // RemoteName : 1264 } 1265 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { 1266 s.Security = au.GetSecurityValue() 1267 } 1268 t.czmu.RUnlock() 1269 s.RemoteFlowControlWindow = t.getOutFlowWindow() 1270 return &s 1271} 1272 1273func (t *http2Client) IncrMsgSent() { 1274 t.czmu.Lock() 1275 t.msgSent++ 1276 t.lastMsgSent = time.Now() 1277 t.czmu.Unlock() 1278} 1279 1280func (t *http2Client) IncrMsgRecv() { 1281 t.czmu.Lock() 1282 t.msgRecv++ 1283 t.lastMsgRecv = time.Now() 1284 t.czmu.Unlock() 1285} 1286 1287func (t *http2Client) getOutFlowWindow() int64 { 1288 resp := make(chan uint32, 1) 1289 timer := time.NewTimer(time.Second) 1290 defer timer.Stop() 1291 t.controlBuf.put(&outFlowControlSizeRequest{resp}) 1292 select { 1293 case sz := <-resp: 1294 return int64(sz) 1295 case <-t.ctxDone: 1296 return -1 1297 case <-timer.C: 1298 return -2 1299 } 1300} 1301