2024-02-24 00:34:55 +00:00
// Copyright 2015 Google LLC.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package grpc supports network connections to GRPC servers.
// This package is not intended for use by end developers. Use the
// google.golang.org/api/option package to configure API clients.
package grpc
import (
"context"
"errors"
"log"
"net"
"os"
"strings"
"sync"
"time"
2024-10-16 10:54:40 +00:00
"cloud.google.com/go/auth"
"cloud.google.com/go/auth/credentials"
"cloud.google.com/go/auth/grpctransport"
"cloud.google.com/go/auth/oauth2adapt"
2024-02-24 00:34:55 +00:00
"cloud.google.com/go/compute/metadata"
"go.opencensus.io/plugin/ocgrpc"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/oauth2"
"golang.org/x/time/rate"
"google.golang.org/api/internal"
"google.golang.org/api/option"
"google.golang.org/grpc"
grpcgoogle "google.golang.org/grpc/credentials/google"
grpcinsecure "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/credentials/oauth"
"google.golang.org/grpc/stats"
// Install grpclb, which is required for direct path.
_ "google.golang.org/grpc/balancer/grpclb"
)
// Check env to disable DirectPath traffic.
const disableDirectPath = "GOOGLE_CLOUD_DISABLE_DIRECT_PATH"
// Check env to decide if using google-c2p resolver for DirectPath traffic.
const enableDirectPathXds = "GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS"
// Set at init time by dial_socketopt.go. If nil, socketopt is not supported.
var timeoutDialerOption grpc . DialOption
// Log rate limiter
var logRateLimiter = rate . Sometimes { Interval : 1 * time . Second }
// Assign to var for unit test replacement
var dialContext = grpc . DialContext
// otelStatsHandler is a singleton otelgrpc.clientHandler to be used across
// all dial connections to avoid the memory leak documented in
// https://github.com/open-telemetry/opentelemetry-go-contrib/issues/4226
//
// TODO: If 4226 has been fixed in opentelemetry-go-contrib, replace this
// singleton with inline usage for simplicity.
var (
initOtelStatsHandlerOnce sync . Once
otelStatsHandler stats . Handler
)
// otelGRPCStatsHandler returns singleton otelStatsHandler for reuse across all
// dial connections.
func otelGRPCStatsHandler ( ) stats . Handler {
initOtelStatsHandlerOnce . Do ( func ( ) {
otelStatsHandler = otelgrpc . NewClientHandler ( )
} )
return otelStatsHandler
}
// Dial returns a GRPC connection for use communicating with a Google cloud
// service, configured with the given ClientOptions.
func Dial ( ctx context . Context , opts ... option . ClientOption ) ( * grpc . ClientConn , error ) {
o , err := processAndValidateOpts ( opts )
if err != nil {
return nil , err
}
if o . GRPCConnPool != nil {
return o . GRPCConnPool . Conn ( ) , nil
}
2024-10-16 10:54:40 +00:00
if o . IsNewAuthLibraryEnabled ( ) {
pool , err := dialPoolNewAuth ( ctx , true , 1 , o )
if err != nil {
return nil , err
}
return pool . Connection ( ) , nil
}
2024-02-24 00:34:55 +00:00
// NOTE(cbro): We removed support for option.WithGRPCConnPool (GRPCConnPoolSize)
// on 2020-02-12 because RoundRobin and WithBalancer are deprecated and we need to remove usages of it.
//
// Connection pooling is only done via DialPool.
return dial ( ctx , false , o )
}
// DialInsecure returns an insecure GRPC connection for use communicating
// with fake or mock Google cloud service implementations, such as emulators.
// The connection is configured with the given ClientOptions.
func DialInsecure ( ctx context . Context , opts ... option . ClientOption ) ( * grpc . ClientConn , error ) {
o , err := processAndValidateOpts ( opts )
if err != nil {
return nil , err
}
2024-10-16 10:54:40 +00:00
if o . IsNewAuthLibraryEnabled ( ) {
pool , err := dialPoolNewAuth ( ctx , false , 1 , o )
if err != nil {
return nil , err
}
return pool . Connection ( ) , nil
}
2024-02-24 00:34:55 +00:00
return dial ( ctx , true , o )
}
// DialPool returns a pool of GRPC connections for the given service.
// This differs from the connection pooling implementation used by Dial, which uses a custom GRPC load balancer.
// DialPool should be used instead of Dial when a pool is used by default or a different custom GRPC load balancer is needed.
// The context and options are shared between each Conn in the pool.
// The pool size is configured using the WithGRPCConnectionPool option.
//
// This API is subject to change as we further refine requirements. It will go away if gRPC stubs accept an interface instead of the concrete ClientConn type. See https://github.com/grpc/grpc-go/issues/1287.
func DialPool ( ctx context . Context , opts ... option . ClientOption ) ( ConnPool , error ) {
o , err := processAndValidateOpts ( opts )
if err != nil {
return nil , err
}
if o . GRPCConnPool != nil {
return o . GRPCConnPool , nil
}
2024-10-16 10:54:40 +00:00
if o . IsNewAuthLibraryEnabled ( ) {
if o . GRPCConn != nil {
return & singleConnPool { o . GRPCConn } , nil
}
pool , err := dialPoolNewAuth ( ctx , true , o . GRPCConnPoolSize , o )
if err != nil {
return nil , err
}
return & poolAdapter { pool } , nil
}
2024-02-24 00:34:55 +00:00
poolSize := o . GRPCConnPoolSize
if o . GRPCConn != nil {
// WithGRPCConn is technically incompatible with WithGRPCConnectionPool.
// Always assume pool size is 1 when a grpc.ClientConn is explicitly used.
poolSize = 1
}
o . GRPCConnPoolSize = 0 // we don't *need* to set this to zero, but it's safe to.
if poolSize == 0 || poolSize == 1 {
// Fast path for common case for a connection pool with a single connection.
conn , err := dial ( ctx , false , o )
if err != nil {
return nil , err
}
return & singleConnPool { conn } , nil
}
pool := & roundRobinConnPool { }
for i := 0 ; i < poolSize ; i ++ {
conn , err := dial ( ctx , false , o )
if err != nil {
defer pool . Close ( ) // NOTE: error from Close is ignored.
return nil , err
}
pool . conns = append ( pool . conns , conn )
}
return pool , nil
}
2024-10-16 10:54:40 +00:00
// dialPoolNewAuth is an adapter to call new auth library.
func dialPoolNewAuth ( ctx context . Context , secure bool , poolSize int , ds * internal . DialSettings ) ( grpctransport . GRPCClientConnPool , error ) {
// honor options if set
var creds * auth . Credentials
if ds . InternalCredentials != nil {
creds = oauth2adapt . AuthCredentialsFromOauth2Credentials ( ds . InternalCredentials )
} else if ds . Credentials != nil {
creds = oauth2adapt . AuthCredentialsFromOauth2Credentials ( ds . Credentials )
} else if ds . AuthCredentials != nil {
creds = ds . AuthCredentials
} else if ds . TokenSource != nil {
credOpts := & auth . CredentialsOptions {
TokenProvider : oauth2adapt . TokenProviderFromTokenSource ( ds . TokenSource ) ,
}
if ds . QuotaProject != "" {
credOpts . QuotaProjectIDProvider = auth . CredentialsPropertyFunc ( func ( ctx context . Context ) ( string , error ) {
return ds . QuotaProject , nil
} )
}
creds = auth . NewCredentials ( credOpts )
}
var skipValidation bool
// If our clients explicitly setup the credential skip validation as it is
// assumed correct
if ds . SkipValidation || ds . InternalCredentials != nil {
skipValidation = true
}
var aud string
if len ( ds . Audiences ) > 0 {
aud = ds . Audiences [ 0 ]
}
metadata := map [ string ] string { }
if ds . QuotaProject != "" {
metadata [ "X-goog-user-project" ] = ds . QuotaProject
}
if ds . RequestReason != "" {
metadata [ "X-goog-request-reason" ] = ds . RequestReason
}
// Defaults for older clients that don't set this value yet
defaultEndpointTemplate := ds . DefaultEndpointTemplate
if defaultEndpointTemplate == "" {
defaultEndpointTemplate = ds . DefaultEndpoint
}
tokenURL , oauth2Client , err := internal . GetOAuth2Configuration ( ctx , ds )
if err != nil {
return nil , err
}
pool , err := grpctransport . Dial ( ctx , secure , & grpctransport . Options {
DisableTelemetry : ds . TelemetryDisabled ,
DisableAuthentication : ds . NoAuth ,
Endpoint : ds . Endpoint ,
Metadata : metadata ,
GRPCDialOpts : ds . GRPCDialOpts ,
PoolSize : poolSize ,
Credentials : creds ,
APIKey : ds . APIKey ,
DetectOpts : & credentials . DetectOptions {
Scopes : ds . Scopes ,
Audience : aud ,
CredentialsFile : ds . CredentialsFile ,
CredentialsJSON : ds . CredentialsJSON ,
TokenURL : tokenURL ,
Client : oauth2Client ,
} ,
InternalOptions : & grpctransport . InternalOptions {
EnableNonDefaultSAForDirectPath : ds . AllowNonDefaultServiceAccount ,
EnableDirectPath : ds . EnableDirectPath ,
EnableDirectPathXds : ds . EnableDirectPathXds ,
EnableJWTWithScope : ds . EnableJwtWithScope ,
DefaultAudience : ds . DefaultAudience ,
DefaultEndpointTemplate : defaultEndpointTemplate ,
DefaultMTLSEndpoint : ds . DefaultMTLSEndpoint ,
DefaultScopes : ds . DefaultScopes ,
SkipValidation : skipValidation ,
} ,
} )
return pool , err
}
2024-02-24 00:34:55 +00:00
func dial ( ctx context . Context , insecure bool , o * internal . DialSettings ) ( * grpc . ClientConn , error ) {
if o . HTTPClient != nil {
return nil , errors . New ( "unsupported HTTP client specified" )
}
if o . GRPCConn != nil {
return o . GRPCConn , nil
}
transportCreds , endpoint , err := internal . GetGRPCTransportConfigAndEndpoint ( o )
if err != nil {
return nil , err
}
if insecure {
transportCreds = grpcinsecure . NewCredentials ( )
}
// Initialize gRPC dial options with transport-level security options.
grpcOpts := [ ] grpc . DialOption {
grpc . WithTransportCredentials ( transportCreds ) ,
}
// Authentication can only be sent when communicating over a secure connection.
//
// TODO: Should we be more lenient in the future and allow sending credentials
// when dialing an insecure connection?
if ! o . NoAuth && ! insecure {
if o . APIKey != "" {
grpcOpts = append ( grpcOpts , grpc . WithPerRPCCredentials ( grpcAPIKey {
apiKey : o . APIKey ,
requestReason : o . RequestReason ,
} ) )
} else {
creds , err := internal . Creds ( ctx , o )
if err != nil {
return nil , err
}
if o . TokenSource == nil {
// We only validate non-tokensource creds, as TokenSource-based credentials
// don't propagate universe.
credsUniverseDomain , err := internal . GetUniverseDomain ( creds )
if err != nil {
return nil , err
}
if o . GetUniverseDomain ( ) != credsUniverseDomain {
return nil , internal . ErrUniverseNotMatch ( o . GetUniverseDomain ( ) , credsUniverseDomain )
}
}
grpcOpts = append ( grpcOpts , grpc . WithPerRPCCredentials ( grpcTokenSource {
TokenSource : oauth . TokenSource { TokenSource : creds . TokenSource } ,
quotaProject : internal . GetQuotaProject ( creds , o . QuotaProject ) ,
requestReason : o . RequestReason ,
} ) )
// Attempt Direct Path:
logRateLimiter . Do ( func ( ) {
logDirectPathMisconfig ( endpoint , creds . TokenSource , o )
} )
if isDirectPathEnabled ( endpoint , o ) && isTokenSourceDirectPathCompatible ( creds . TokenSource , o ) && metadata . OnGCE ( ) {
// Overwrite all of the previously specific DialOptions, DirectPath uses its own set of credentials and certificates.
grpcOpts = [ ] grpc . DialOption {
grpc . WithCredentialsBundle ( grpcgoogle . NewDefaultCredentialsWithOptions (
grpcgoogle . DefaultCredentialsOptions {
PerRPCCreds : oauth . TokenSource { TokenSource : creds . TokenSource } ,
} ) ) ,
}
if timeoutDialerOption != nil {
grpcOpts = append ( grpcOpts , timeoutDialerOption )
}
// Check if google-c2p resolver is enabled for DirectPath
if isDirectPathXdsUsed ( o ) {
// google-c2p resolver target must not have a port number
if addr , _ , err := net . SplitHostPort ( endpoint ) ; err == nil {
endpoint = "google-c2p:///" + addr
} else {
endpoint = "google-c2p:///" + endpoint
}
} else {
if ! strings . HasPrefix ( endpoint , "dns:///" ) {
endpoint = "dns:///" + endpoint
}
grpcOpts = append ( grpcOpts ,
// For now all DirectPath go clients will be using the following lb config, but in future
// when different services need different configs, then we should change this to a
// per-service config.
grpc . WithDisableServiceConfig ( ) ,
grpc . WithDefaultServiceConfig ( ` { "loadBalancingConfig":[ { "grpclb": { "childPolicy":[ { "pick_first": { }}]}}]} ` ) )
}
// TODO(cbro): add support for system parameters (quota project, request reason) via chained interceptor.
}
}
}
// Add tracing, but before the other options, so that clients can override the
// gRPC stats handler.
// This assumes that gRPC options are processed in order, left to right.
grpcOpts = addOCStatsHandler ( grpcOpts , o )
grpcOpts = addOpenTelemetryStatsHandler ( grpcOpts , o )
grpcOpts = append ( grpcOpts , o . GRPCDialOpts ... )
if o . UserAgent != "" {
grpcOpts = append ( grpcOpts , grpc . WithUserAgent ( o . UserAgent ) )
}
return dialContext ( ctx , endpoint , grpcOpts ... )
}
func addOCStatsHandler ( opts [ ] grpc . DialOption , settings * internal . DialSettings ) [ ] grpc . DialOption {
if settings . TelemetryDisabled {
return opts
}
return append ( opts , grpc . WithStatsHandler ( & ocgrpc . ClientHandler { } ) )
}
func addOpenTelemetryStatsHandler ( opts [ ] grpc . DialOption , settings * internal . DialSettings ) [ ] grpc . DialOption {
if settings . TelemetryDisabled {
return opts
}
return append ( opts , grpc . WithStatsHandler ( otelGRPCStatsHandler ( ) ) )
}
// grpcTokenSource supplies PerRPCCredentials from an oauth.TokenSource.
type grpcTokenSource struct {
oauth . TokenSource
// Additional metadata attached as headers.
quotaProject string
requestReason string
}
// GetRequestMetadata gets the request metadata as a map from a grpcTokenSource.
func ( ts grpcTokenSource ) GetRequestMetadata ( ctx context . Context , uri ... string ) (
map [ string ] string , error ) {
metadata , err := ts . TokenSource . GetRequestMetadata ( ctx , uri ... )
if err != nil {
return nil , err
}
// Attach system parameter
if ts . quotaProject != "" {
metadata [ "X-goog-user-project" ] = ts . quotaProject
}
if ts . requestReason != "" {
metadata [ "X-goog-request-reason" ] = ts . requestReason
}
return metadata , nil
}
// grpcAPIKey supplies PerRPCCredentials from an API Key.
type grpcAPIKey struct {
apiKey string
// Additional metadata attached as headers.
requestReason string
}
// GetRequestMetadata gets the request metadata as a map from a grpcAPIKey.
func ( ts grpcAPIKey ) GetRequestMetadata ( ctx context . Context , uri ... string ) (
map [ string ] string , error ) {
metadata := map [ string ] string {
"X-goog-api-key" : ts . apiKey ,
}
if ts . requestReason != "" {
metadata [ "X-goog-request-reason" ] = ts . requestReason
}
return metadata , nil
}
// RequireTransportSecurity indicates whether the credentials requires transport security.
func ( ts grpcAPIKey ) RequireTransportSecurity ( ) bool {
return true
}
func isDirectPathEnabled ( endpoint string , o * internal . DialSettings ) bool {
if ! o . EnableDirectPath {
return false
}
if ! checkDirectPathEndPoint ( endpoint ) {
return false
}
if strings . EqualFold ( os . Getenv ( disableDirectPath ) , "true" ) {
return false
}
return true
}
func isDirectPathXdsUsed ( o * internal . DialSettings ) bool {
// Method 1: Enable DirectPath xDS by env;
if strings . EqualFold ( os . Getenv ( enableDirectPathXds ) , "true" ) {
return true
}
// Method 2: Enable DirectPath xDS by option;
if o . EnableDirectPathXds {
return true
}
return false
}
func isTokenSourceDirectPathCompatible ( ts oauth2 . TokenSource , o * internal . DialSettings ) bool {
if ts == nil {
return false
}
tok , err := ts . Token ( )
if err != nil {
return false
}
if tok == nil {
return false
}
if o . AllowNonDefaultServiceAccount {
return true
}
if source , _ := tok . Extra ( "oauth2.google.tokenSource" ) . ( string ) ; source != "compute-metadata" {
return false
}
if acct , _ := tok . Extra ( "oauth2.google.serviceAccount" ) . ( string ) ; acct != "default" {
return false
}
return true
}
func checkDirectPathEndPoint ( endpoint string ) bool {
// Only [dns:///]host[:port] is supported, not other schemes (e.g., "tcp://" or "unix://").
// Also don't try direct path if the user has chosen an alternate name resolver
// (i.e., via ":///" prefix).
//
// TODO(cbro): once gRPC has introspectible options, check the user hasn't
// provided a custom dialer in gRPC options.
if strings . Contains ( endpoint , "://" ) && ! strings . HasPrefix ( endpoint , "dns:///" ) {
return false
}
if endpoint == "" {
return false
}
return true
}
func logDirectPathMisconfig ( endpoint string , ts oauth2 . TokenSource , o * internal . DialSettings ) {
if isDirectPathXdsUsed ( o ) {
// Case 1: does not enable DirectPath
if ! isDirectPathEnabled ( endpoint , o ) {
log . Println ( "WARNING: DirectPath is misconfigured. Please set the EnableDirectPath option along with the EnableDirectPathXds option." )
} else {
// Case 2: credential is not correctly set
if ! isTokenSourceDirectPathCompatible ( ts , o ) {
log . Println ( "WARNING: DirectPath is misconfigured. Please make sure the token source is fetched from GCE metadata server and the default service account is used." )
}
// Case 3: not running on GCE
if ! metadata . OnGCE ( ) {
log . Println ( "WARNING: DirectPath is misconfigured. DirectPath is only available in a GCE environment." )
}
}
}
}
func processAndValidateOpts ( opts [ ] option . ClientOption ) ( * internal . DialSettings , error ) {
var o internal . DialSettings
for _ , opt := range opts {
opt . Apply ( & o )
}
if err := o . Validate ( ) ; err != nil {
return nil , err
}
return & o , nil
}
type connPoolOption struct { ConnPool }
// WithConnPool returns a ClientOption that specifies the ConnPool
// connection to use as the basis of communications.
//
// This is only to be used by Google client libraries internally, for example
// when creating a longrunning API client that shares the same connection pool
// as a service client.
func WithConnPool ( p ConnPool ) option . ClientOption {
return connPoolOption { p }
}
func ( o connPoolOption ) Apply ( s * internal . DialSettings ) {
s . GRPCConnPool = o . ConnPool
}