Add Temporal related CLI flags

This commit is contained in:
Mustafa Gezen 2023-08-31 10:19:02 +02:00
parent 1b2dd037be
commit 36d3fe32c7
Signed by: mustafa
GPG Key ID: DCDF010D946438C1
2 changed files with 76 additions and 2 deletions

View File

@ -92,6 +92,21 @@ var defaultCliFlagsNoAuth = append(defaultCliFlagsDatabaseOnly, []cli.Flag{
},
}...)
var defaultCliFlagsNoAuthTemporal = append(defaultCliFlagsTemporal, []cli.Flag{
&cli.IntFlag{
Name: "grpc-port",
Usage: "gRPC port",
EnvVars: []string{string(EnvVarGRPCPort)},
Value: 8080,
},
&cli.IntFlag{
Name: "gateway-port",
Usage: "gRPC gateway port",
EnvVars: []string{string(EnvVarGatewayPort)},
Value: 8081,
},
}...)
var defaultCliFlags = append(defaultCliFlagsNoAuth, []cli.Flag{
&cli.StringFlag{
Name: "oidc-issuer",
@ -106,6 +121,20 @@ var defaultCliFlags = append(defaultCliFlagsNoAuth, []cli.Flag{
},
}...)
var defaultCliFlagsTemporalClient = append(defaultCliFlagsNoAuthTemporal, []cli.Flag{
&cli.StringFlag{
Name: "oidc-issuer",
Usage: "OIDC issuer",
EnvVars: []string{string(EnvVarFrontendOIDCIssuer)},
Value: "https://accounts.rockylinux.org/auth/realms/rocky",
},
&cli.StringFlag{
Name: "required-oidc-group",
Usage: "OIDC group that is required to access the frontend",
EnvVars: []string{string(EnvVarFrontendRequiredOIDCGroup)},
},
}...)
var defaultFrontendNoAuthCliFlags = []cli.Flag{
&cli.IntFlag{
Name: "port",
@ -187,11 +216,21 @@ func WithDefaultCliFlags(flags ...cli.Flag) []cli.Flag {
return append(defaultCliFlags, flags...)
}
// WithDefaultCliFlagsTemporalClient adds the default cli flags to the app.
func WithDefaultCliFlagsTemporalClient(flags ...cli.Flag) []cli.Flag {
return append(defaultCliFlagsTemporalClient, flags...)
}
// WithDefaultCliFlagsNoAuth adds the default cli flags to the app.
func WithDefaultCliFlagsNoAuth(flags ...cli.Flag) []cli.Flag {
return append(defaultCliFlagsNoAuth, flags...)
}
// WithDefaultCliFlagsNoAuthTemporal adds the default cli flags to the app.
func WithDefaultCliFlagsNoAuthTemporal(flags ...cli.Flag) []cli.Flag {
return append(defaultCliFlagsNoAuthTemporal, flags...)
}
// WithDefaultCliFlagsTemporal adds the default cli flags to the app.
func WithDefaultCliFlagsTemporal(flags ...cli.Flag) []cli.Flag {
return append(defaultCliFlagsTemporal, flags...)
@ -264,7 +303,12 @@ func GetDBFromFlags(ctx *cli.Context) *DB {
// GetTemporalClientFromFlags gets the temporal client from the cli flags.
func GetTemporalClientFromFlags(ctx *cli.Context, opts client.Options) (client.Client, error) {
return NewTemporalClient(ctx.String("temporal-address"), ctx.String("temporal-namespace"), opts)
return NewTemporalClient(
ctx.String("temporal-address"),
ctx.String("temporal-namespace"),
ctx.String("temporal-task-queue"),
opts,
)
}
// ChangeDefaultForEnvVar changes the default value of a flag based on an environment variable.

View File

@ -20,13 +20,35 @@ import (
"github.com/pkg/errors"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/interceptor"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"strings"
"time"
)
func NewTemporalClient(host string, namespace string, opts client.Options) (client.Client, error) {
type temporalTQInterceptor struct {
interceptor.ClientInterceptorBase
interceptor.ClientOutboundInterceptorBase
taskQueue string
}
func (tqi *temporalTQInterceptor) InterceptClient(next interceptor.ClientOutboundInterceptor) interceptor.ClientOutboundInterceptor {
return &temporalTQInterceptor{
ClientOutboundInterceptorBase: interceptor.ClientOutboundInterceptorBase{
Next: next,
},
taskQueue: tqi.taskQueue,
}
}
func (tqi *temporalTQInterceptor) ExecuteWorkflow(ctx context.Context, in *interceptor.ClientExecuteWorkflowInput) (client.WorkflowRun, error) {
in.Options.TaskQueue = tqi.taskQueue
return tqi.Next.ExecuteWorkflow(ctx, in)
}
func NewTemporalClient(host string, namespace string, taskQueue string, opts client.Options) (client.Client, error) {
// If host contains :443, then use TLS
if strings.Contains(host, ":443") {
opts.ConnectionOptions = client.ConnectionOptions{
@ -59,6 +81,14 @@ func NewTemporalClient(host string, namespace string, opts client.Options) (clie
// Set namespace in opts
opts.Namespace = namespace
// Set interceptor to set task queue
if opts.Interceptors == nil {
opts.Interceptors = []interceptor.ClientInterceptor{}
}
opts.Interceptors = append(opts.Interceptors, &temporalTQInterceptor{
taskQueue: taskQueue,
})
LogInfof("Connecting to Temporal at %s", host)
// Dial Temporal