Temporal specific additions to base/go

This commit is contained in:
Mustafa Gezen 2023-08-27 06:02:06 +02:00
parent 2fd0fdf5f5
commit bfa41e2970
Signed by: mustafa
GPG Key ID: DCDF010D946438C1
3 changed files with 120 additions and 0 deletions

View File

@ -27,6 +27,7 @@ go_library(
"pb.go",
"pointer.go",
"slice.go",
"temporal.go",
"wrapper_helpers.go",
],
embedsrcs = ["assets/oh_no_unauthenticated.png"],
@ -43,8 +44,11 @@ go_library(
"//vendor/github.com/urfave/cli/v2:cli",
"//vendor/github.com/wk8/go-ordered-map/v2:go-ordered-map",
"//vendor/go.ciq.dev/pika",
"//vendor/go.temporal.io/api/workflowservice/v1:workflowservice",
"//vendor/go.temporal.io/sdk/client",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//status",

View File

@ -17,6 +17,7 @@ package base
import (
"github.com/coreos/go-oidc/v3/oidc"
"github.com/urfave/cli/v2"
"go.temporal.io/sdk/client"
"os"
)
@ -34,7 +35,13 @@ const (
EnvVarFrontendRequiredOIDCGroup EnvVar = "FRONTEND_REQUIRED_OIDC_GROUP"
EnvVarTemporalNamespace EnvVar = "TEMPORAL_NAMESPACE"
EnvVarTemporalAddress EnvVar = "TEMPORAL_ADDRESS"
EnvVarTemporalTaskQueue EnvVar = "TEMPORAL_TASK_QUEUE"
EnvVarFrontendSelf EnvVar = "FRONTEND_SELF"
EnvVarStorageEndpoint EnvVar = "STORAGE_ENDPOINT"
EnvVarStorageConnectionString EnvVar = "STORAGE_CONNECTION_STRING"
EnvVarStorageRegion EnvVar = "STORAGE_REGION"
EnvVarStorageSecure EnvVar = "STORAGE_SECURE"
EnvVarStoragePathStyle EnvVar = "STORAGE_PATH_STYLE"
)
var defaultCliFlagsDatabaseOnly = []cli.Flag{
@ -62,6 +69,12 @@ var defaultCliFlagsTemporal = append(defaultCliFlagsDatabaseOnly, []cli.Flag{
EnvVars: []string{string(EnvVarTemporalAddress)},
Value: "localhost:7233",
},
&cli.StringFlag{
Name: "temporal-task-queue",
Aliases: []string{"q"},
Usage: "temporal task queue",
EnvVars: []string{string(EnvVarTemporalTaskQueue)},
},
}...)
var defaultCliFlagsNoAuth = append(defaultCliFlagsDatabaseOnly, []cli.Flag{
@ -136,6 +149,39 @@ var defaultFrontendCliFlags = append(defaultFrontendNoAuthCliFlags, []cli.Flag{
},
}...)
var storageFlags = []cli.Flag{
&cli.StringFlag{
Name: "storage-endpoint",
Usage: "storage endpoint",
EnvVars: []string{string(EnvVarStorageEndpoint)},
Value: "",
},
&cli.StringFlag{
Name: "storage-connection-string",
Usage: "storage connection string",
EnvVars: []string{string(EnvVarStorageConnectionString)},
},
&cli.StringFlag{
Name: "storage-region",
Usage: "storage region",
EnvVars: []string{string(EnvVarStorageRegion)},
// RESF default region
Value: "us-east-2",
},
&cli.BoolFlag{
Name: "storage-secure",
Usage: "storage secure",
EnvVars: []string{string(EnvVarStorageSecure)},
Value: true,
},
&cli.BoolFlag{
Name: "storage-path-style",
Usage: "storage path style",
EnvVars: []string{string(EnvVarStoragePathStyle)},
Value: false,
},
}
// WithDefaultCliFlags adds the default cli flags to the app.
func WithDefaultCliFlags(flags ...cli.Flag) []cli.Flag {
return append(defaultCliFlags, flags...)
@ -166,6 +212,11 @@ func WithDefaultFrontendCliFlags(flags ...cli.Flag) []cli.Flag {
return append(defaultFrontendCliFlags, flags...)
}
// WithStorageFlags adds the storage flags to the app.
func WithStorageFlags(flags ...cli.Flag) []cli.Flag {
return append(storageFlags, flags...)
}
// FlagsToGRPCServerOptions converts the cli flags to gRPC server options.
func FlagsToGRPCServerOptions(ctx *cli.Context) []GRPCServerOption {
return []GRPCServerOption{
@ -211,6 +262,11 @@ func GetDBFromFlags(ctx *cli.Context) *DB {
return db
}
// GetTemporalClientFromFlags gets the temporal client from the cli flags.
func GetTemporalClientFromFlags(ctx *cli.Context, opts client.Options) (client.Client, error) {
return NewTemporalClient(ctx.String("temporal-address"), ctx.String("temporal-namespace"), opts)
}
// ChangeDefaultForEnvVar changes the default value of a flag based on an environment variable.
func ChangeDefaultForEnvVar(envVar EnvVar, newDefault string) {
// Check if the environment variable is set.
@ -224,6 +280,11 @@ func ChangeDefaultForEnvVar(envVar EnvVar, newDefault string) {
}
}
// RareUseChangeDefault changes the default value of an arbitrary environment variable.
func RareUseChangeDefault(envVar string, newDefault string) {
ChangeDefaultForEnvVar(EnvVar(envVar), newDefault)
}
// ChangeDefaultDatabaseURL changes the default value of the database url based on an environment variable.
func ChangeDefaultDatabaseURL(appName string) {
ChangeDefaultForEnvVar(EnvVarDatabaseURL, "postgres://postgres:postgres@localhost:5432/"+appName+"?sslmode=disable")

55
base/go/temporal.go Normal file
View File

@ -0,0 +1,55 @@
package base
import (
"context"
"crypto/tls"
"github.com/pkg/errors"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"strings"
"time"
)
func NewTemporalClient(host string, namespace string, opts client.Options) (client.Client, error) {
// If host contains :443, then use TLS
if strings.Contains(host, ":443") {
opts.ConnectionOptions = client.ConnectionOptions{
DialOptions: []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})),
},
}
}
LogInfof("Connecting to Temporal at %s", host)
opts.HostPort = host
// Register namespace (ignore error if already exists)
nscl, err := client.NewNamespaceClient(opts)
if err != nil {
return nil, errors.Wrap(err, "failed to create namespace client")
}
// Set default retention period to 5 days
dur := 5 * 24 * time.Hour
err = nscl.Register(context.TODO(), &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
WorkflowExecutionRetentionPeriod: &dur,
})
if err != nil && !strings.Contains(err.Error(), "Namespace already exists") {
return nil, errors.Wrap(err, "failed to register namespace")
}
// Set namespace in opts
opts.Namespace = namespace
// Dial Temporal
cl, err := client.Dial(opts)
if err != nil {
return nil, errors.Wrap(err, "failed to dial temporal")
}
return cl, nil
}