mirror of
https://github.com/rocky-linux/peridot.git
synced 2024-12-23 03:08:29 +00:00
1476 lines
51 KiB
Go
1476 lines
51 KiB
Go
// The MIT License
|
|
//
|
|
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
|
|
//
|
|
// Copyright (c) 2020 Uber Technologies, Inc.
|
|
//
|
|
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
// of this software and associated documentation files (the "Software"), to deal
|
|
// in the Software without restriction, including without limitation the rights
|
|
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
// copies of the Software, and to permit persons to whom the Software is
|
|
// furnished to do so, subject to the following conditions:
|
|
//
|
|
// The above copyright notice and this permission notice shall be included in
|
|
// all copies or substantial portions of the Software.
|
|
//
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
// THE SOFTWARE.
|
|
|
|
package internal
|
|
|
|
// All code in this file is private to the package.
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
commandpb "go.temporal.io/api/command/v1"
|
|
commonpb "go.temporal.io/api/common/v1"
|
|
enumspb "go.temporal.io/api/enums/v1"
|
|
failurepb "go.temporal.io/api/failure/v1"
|
|
historypb "go.temporal.io/api/history/v1"
|
|
taskqueuepb "go.temporal.io/api/taskqueue/v1"
|
|
"go.temporal.io/sdk/converter"
|
|
"go.temporal.io/sdk/internal/common"
|
|
"go.temporal.io/sdk/internal/common/metrics"
|
|
ilog "go.temporal.io/sdk/internal/log"
|
|
"go.temporal.io/sdk/log"
|
|
)
|
|
|
|
const (
|
|
queryResultSizeLimit = 2000000 // 2MB
|
|
)
|
|
|
|
// Assert that structs do indeed implement the interfaces
|
|
var _ WorkflowEnvironment = (*workflowEnvironmentImpl)(nil)
|
|
var _ workflowExecutionEventHandler = (*workflowExecutionEventHandlerImpl)(nil)
|
|
|
|
type (
|
|
// completionHandler Handler to indicate completion result
|
|
completionHandler func(result *commonpb.Payloads, err error)
|
|
|
|
// workflowExecutionEventHandlerImpl handler to handle workflowExecutionEventHandler
|
|
workflowExecutionEventHandlerImpl struct {
|
|
*workflowEnvironmentImpl
|
|
workflowDefinition WorkflowDefinition
|
|
}
|
|
|
|
scheduledTimer struct {
|
|
callback ResultHandler
|
|
handled bool
|
|
}
|
|
|
|
scheduledActivity struct {
|
|
callback ResultHandler
|
|
waitForCancelRequest bool
|
|
handled bool
|
|
activityType ActivityType
|
|
}
|
|
|
|
scheduledChildWorkflow struct {
|
|
resultCallback ResultHandler
|
|
startedCallback func(r WorkflowExecution, e error)
|
|
waitForCancellation bool
|
|
handled bool
|
|
}
|
|
|
|
scheduledCancellation struct {
|
|
callback ResultHandler
|
|
handled bool
|
|
}
|
|
|
|
scheduledSignal struct {
|
|
callback ResultHandler
|
|
handled bool
|
|
}
|
|
|
|
// workflowEnvironmentImpl an implementation of WorkflowEnvironment represents a environment for workflow execution.
|
|
workflowEnvironmentImpl struct {
|
|
workflowInfo *WorkflowInfo
|
|
|
|
commandsHelper *commandsHelper
|
|
sideEffectResult map[int64]*commonpb.Payloads
|
|
changeVersions map[string]Version
|
|
pendingLaTasks map[string]*localActivityTask
|
|
mutableSideEffect map[string]*commonpb.Payloads
|
|
unstartedLaTasks map[string]struct{}
|
|
openSessions map[string]*SessionInfo
|
|
|
|
// LocalActivities have a separate, individual counter instead of relying on actual commandEventIDs.
|
|
// This is because command IDs are only incremented on activity completion, which breaks
|
|
// local activities that are spawned in parallel as they would all share the same command ID
|
|
localActivityCounterID int64
|
|
|
|
sideEffectCounterID int64
|
|
|
|
currentReplayTime time.Time // Indicates current replay time of the command.
|
|
currentLocalTime time.Time // Local time when currentReplayTime was updated.
|
|
|
|
completeHandler completionHandler // events completion handler
|
|
cancelHandler func() // A cancel handler to be invoked on a cancel notification
|
|
signalHandler func(name string, input *commonpb.Payloads, header *commonpb.Header) error // A signal handler to be invoked on a signal event
|
|
queryHandler func(queryType string, queryArgs *commonpb.Payloads, header *commonpb.Header) (*commonpb.Payloads, error)
|
|
|
|
logger log.Logger
|
|
isReplay bool // flag to indicate if workflow is in replay mode
|
|
enableLoggingInReplay bool // flag to indicate if workflow should enable logging in replay mode
|
|
|
|
metricsHandler metrics.Handler
|
|
registry *registry
|
|
dataConverter converter.DataConverter
|
|
contextPropagators []ContextPropagator
|
|
deadlockDetectionTimeout time.Duration
|
|
}
|
|
|
|
localActivityTask struct {
|
|
sync.Mutex
|
|
workflowTask *workflowTask
|
|
activityID string
|
|
params *ExecuteLocalActivityParams
|
|
callback LocalActivityResultHandler
|
|
wc *workflowExecutionContextImpl
|
|
canceled bool
|
|
cancelFunc func()
|
|
attempt int32 // attempt starting from 1
|
|
retryPolicy *RetryPolicy
|
|
expireTime time.Time
|
|
header *commonpb.Header
|
|
}
|
|
|
|
localActivityMarkerData struct {
|
|
ActivityID string
|
|
ActivityType string
|
|
ReplayTime time.Time
|
|
Attempt int32 // record attempt, starting from 1.
|
|
Backoff time.Duration // retry backoff duration.
|
|
}
|
|
)
|
|
|
|
var (
|
|
// ErrUnknownMarkerName is returned if there is unknown marker name in the history.
|
|
ErrUnknownMarkerName = errors.New("unknown marker name")
|
|
// ErrMissingMarkerDetails is returned when marker details are nil.
|
|
ErrMissingMarkerDetails = errors.New("marker details are nil")
|
|
// ErrMissingMarkerDataKey is returned when marker details doesn't have data key.
|
|
ErrMissingMarkerDataKey = errors.New("marker key is missing in details")
|
|
)
|
|
|
|
func newWorkflowExecutionEventHandler(
|
|
workflowInfo *WorkflowInfo,
|
|
completeHandler completionHandler,
|
|
logger log.Logger,
|
|
enableLoggingInReplay bool,
|
|
metricsHandler metrics.Handler,
|
|
registry *registry,
|
|
dataConverter converter.DataConverter,
|
|
contextPropagators []ContextPropagator,
|
|
deadlockDetectionTimeout time.Duration,
|
|
) workflowExecutionEventHandler {
|
|
context := &workflowEnvironmentImpl{
|
|
workflowInfo: workflowInfo,
|
|
commandsHelper: newCommandsHelper(),
|
|
sideEffectResult: make(map[int64]*commonpb.Payloads),
|
|
mutableSideEffect: make(map[string]*commonpb.Payloads),
|
|
changeVersions: make(map[string]Version),
|
|
pendingLaTasks: make(map[string]*localActivityTask),
|
|
unstartedLaTasks: make(map[string]struct{}),
|
|
openSessions: make(map[string]*SessionInfo),
|
|
completeHandler: completeHandler,
|
|
enableLoggingInReplay: enableLoggingInReplay,
|
|
registry: registry,
|
|
dataConverter: dataConverter,
|
|
contextPropagators: contextPropagators,
|
|
deadlockDetectionTimeout: deadlockDetectionTimeout,
|
|
}
|
|
context.logger = ilog.NewReplayLogger(
|
|
log.With(logger,
|
|
tagWorkflowType, workflowInfo.WorkflowType.Name,
|
|
tagWorkflowID, workflowInfo.WorkflowExecution.ID,
|
|
tagRunID, workflowInfo.WorkflowExecution.RunID,
|
|
tagAttempt, workflowInfo.Attempt,
|
|
),
|
|
&context.isReplay,
|
|
&context.enableLoggingInReplay)
|
|
|
|
if metricsHandler != nil {
|
|
context.metricsHandler = metrics.NewReplayAwareHandler(&context.isReplay, metricsHandler).
|
|
WithTags(metrics.WorkflowTags(workflowInfo.WorkflowType.Name))
|
|
}
|
|
|
|
return &workflowExecutionEventHandlerImpl{context, nil}
|
|
}
|
|
|
|
func (s *scheduledTimer) handle(result *commonpb.Payloads, err error) {
|
|
if s.handled {
|
|
panic(fmt.Sprintf("timer already handled %v", s))
|
|
}
|
|
s.handled = true
|
|
s.callback(result, err)
|
|
}
|
|
|
|
func (s *scheduledActivity) handle(result *commonpb.Payloads, err error) {
|
|
if s.handled {
|
|
panic(fmt.Sprintf("activity already handled %v", s))
|
|
}
|
|
s.handled = true
|
|
s.callback(result, err)
|
|
}
|
|
|
|
func (s *scheduledChildWorkflow) handle(result *commonpb.Payloads, err error) {
|
|
if s.handled {
|
|
panic(fmt.Sprintf("child workflow already handled %v", s))
|
|
}
|
|
s.handled = true
|
|
s.resultCallback(result, err)
|
|
}
|
|
|
|
func (s *scheduledChildWorkflow) handleFailedToStart(result *commonpb.Payloads, err error) {
|
|
if s.handled {
|
|
panic(fmt.Sprintf("child workflow already handled %v", s))
|
|
}
|
|
s.handled = true
|
|
s.resultCallback(result, err)
|
|
s.startedCallback(WorkflowExecution{}, err)
|
|
}
|
|
|
|
func (t *localActivityTask) cancel() {
|
|
t.Lock()
|
|
t.canceled = true
|
|
if t.cancelFunc != nil {
|
|
t.cancelFunc()
|
|
}
|
|
t.Unlock()
|
|
}
|
|
|
|
func (s *scheduledCancellation) handle(result *commonpb.Payloads, err error) {
|
|
if s.handled {
|
|
panic(fmt.Sprintf("cancellation already handled %v", s))
|
|
}
|
|
s.handled = true
|
|
s.callback(result, err)
|
|
}
|
|
|
|
func (s *scheduledSignal) handle(result *commonpb.Payloads, err error) {
|
|
if s.handled {
|
|
panic(fmt.Sprintf("signal already handled %v", s))
|
|
}
|
|
s.handled = true
|
|
s.callback(result, err)
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) getNextLocalActivityID() string {
|
|
wc.localActivityCounterID++
|
|
return getStringID(wc.localActivityCounterID)
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) getNextSideEffectID() int64 {
|
|
wc.sideEffectCounterID++
|
|
return wc.sideEffectCounterID
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) WorkflowInfo() *WorkflowInfo {
|
|
return wc.workflowInfo
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) Complete(result *commonpb.Payloads, err error) {
|
|
wc.completeHandler(result, err)
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) RequestCancelChildWorkflow(namespace string, workflowID string) {
|
|
// For cancellation of child workflow only, we do not use cancellation ID and run ID
|
|
wc.commandsHelper.requestCancelExternalWorkflowExecution(namespace, workflowID, "", "", true)
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) RequestCancelExternalWorkflow(namespace, workflowID, runID string, callback ResultHandler) {
|
|
// for cancellation of external workflow, we have to use cancellation ID and set isChildWorkflowOnly to false
|
|
cancellationID := wc.GenerateSequenceID()
|
|
command := wc.commandsHelper.requestCancelExternalWorkflowExecution(namespace, workflowID, runID, cancellationID, false)
|
|
command.setData(&scheduledCancellation{callback: callback})
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) SignalExternalWorkflow(
|
|
namespace string,
|
|
workflowID string,
|
|
runID string,
|
|
signalName string,
|
|
input *commonpb.Payloads,
|
|
_ /* THIS IS FOR TEST FRAMEWORK. DO NOT USE HERE. */ interface{},
|
|
header *commonpb.Header,
|
|
childWorkflowOnly bool,
|
|
callback ResultHandler,
|
|
) {
|
|
|
|
signalID := wc.GenerateSequenceID()
|
|
command := wc.commandsHelper.signalExternalWorkflowExecution(namespace, workflowID, runID, signalName, input,
|
|
header, signalID, childWorkflowOnly)
|
|
command.setData(&scheduledSignal{callback: callback})
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) UpsertSearchAttributes(attributes map[string]interface{}) error {
|
|
// This has to be used in WorkflowEnvironment implementations instead of in Workflow for testsuite mock purpose.
|
|
attr, err := validateAndSerializeSearchAttributes(attributes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var upsertID string
|
|
if changeVersion, ok := attributes[TemporalChangeVersion]; ok {
|
|
// to ensure backward compatibility on searchable GetVersion, use latest changeVersion as upsertID
|
|
upsertID = changeVersion.([]string)[0]
|
|
} else {
|
|
upsertID = wc.GenerateSequenceID()
|
|
}
|
|
|
|
wc.commandsHelper.upsertSearchAttributes(upsertID, attr)
|
|
wc.updateWorkflowInfoWithSearchAttributes(attr) // this is for getInfo correctness
|
|
return nil
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) updateWorkflowInfoWithSearchAttributes(attributes *commonpb.SearchAttributes) {
|
|
wc.workflowInfo.SearchAttributes = mergeSearchAttributes(wc.workflowInfo.SearchAttributes, attributes)
|
|
}
|
|
|
|
func mergeSearchAttributes(current, upsert *commonpb.SearchAttributes) *commonpb.SearchAttributes {
|
|
if current == nil || len(current.IndexedFields) == 0 {
|
|
if upsert == nil || len(upsert.IndexedFields) == 0 {
|
|
return nil
|
|
}
|
|
current = &commonpb.SearchAttributes{
|
|
IndexedFields: make(map[string]*commonpb.Payload),
|
|
}
|
|
}
|
|
|
|
fields := current.IndexedFields
|
|
for k, v := range upsert.IndexedFields {
|
|
fields[k] = v
|
|
}
|
|
return current
|
|
}
|
|
|
|
func validateAndSerializeSearchAttributes(attributes map[string]interface{}) (*commonpb.SearchAttributes, error) {
|
|
if len(attributes) == 0 {
|
|
return nil, errSearchAttributesNotSet
|
|
}
|
|
attr, err := serializeSearchAttributes(attributes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return attr, nil
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) RegisterCancelHandler(handler func()) {
|
|
wrappedHandler := func() {
|
|
wc.commandsHelper.workflowExecutionIsCancelling = true
|
|
handler()
|
|
}
|
|
wc.cancelHandler = wrappedHandler
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
|
|
params ExecuteWorkflowParams, callback ResultHandler, startedHandler func(r WorkflowExecution, e error)) {
|
|
if params.WorkflowID == "" {
|
|
params.WorkflowID = wc.workflowInfo.WorkflowExecution.RunID + "_" + wc.GenerateSequenceID()
|
|
}
|
|
memo, err := getWorkflowMemo(params.Memo, wc.dataConverter)
|
|
if err != nil {
|
|
callback(nil, err)
|
|
return
|
|
}
|
|
searchAttr, err := serializeSearchAttributes(params.SearchAttributes)
|
|
if err != nil {
|
|
callback(nil, err)
|
|
return
|
|
}
|
|
|
|
attributes := &commandpb.StartChildWorkflowExecutionCommandAttributes{}
|
|
|
|
attributes.Namespace = params.Namespace
|
|
attributes.TaskQueue = &taskqueuepb.TaskQueue{Name: params.TaskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}
|
|
attributes.WorkflowId = params.WorkflowID
|
|
attributes.WorkflowExecutionTimeout = ¶ms.WorkflowExecutionTimeout
|
|
attributes.WorkflowRunTimeout = ¶ms.WorkflowRunTimeout
|
|
attributes.WorkflowTaskTimeout = ¶ms.WorkflowTaskTimeout
|
|
attributes.Input = params.Input
|
|
attributes.WorkflowType = &commonpb.WorkflowType{Name: params.WorkflowType.Name}
|
|
attributes.WorkflowIdReusePolicy = params.WorkflowIDReusePolicy
|
|
attributes.ParentClosePolicy = params.ParentClosePolicy
|
|
attributes.RetryPolicy = params.RetryPolicy
|
|
attributes.Header = params.Header
|
|
attributes.Memo = memo
|
|
attributes.SearchAttributes = searchAttr
|
|
if len(params.CronSchedule) > 0 {
|
|
attributes.CronSchedule = params.CronSchedule
|
|
}
|
|
|
|
command := wc.commandsHelper.startChildWorkflowExecution(attributes)
|
|
command.setData(&scheduledChildWorkflow{
|
|
resultCallback: callback,
|
|
startedCallback: startedHandler,
|
|
waitForCancellation: params.WaitForCancellation,
|
|
})
|
|
|
|
wc.logger.Debug("ExecuteChildWorkflow",
|
|
tagChildWorkflowID, params.WorkflowID,
|
|
tagWorkflowType, params.WorkflowType.Name)
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) RegisterSignalHandler(
|
|
handler func(name string, input *commonpb.Payloads, header *commonpb.Header) error,
|
|
) {
|
|
wc.signalHandler = handler
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) RegisterQueryHandler(
|
|
handler func(string, *commonpb.Payloads, *commonpb.Header) (*commonpb.Payloads, error),
|
|
) {
|
|
wc.queryHandler = handler
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) GetLogger() log.Logger {
|
|
return wc.logger
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) GetMetricsHandler() metrics.Handler {
|
|
return wc.metricsHandler
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) GetDataConverter() converter.DataConverter {
|
|
return wc.dataConverter
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) GetContextPropagators() []ContextPropagator {
|
|
return wc.contextPropagators
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) IsReplaying() bool {
|
|
return wc.isReplay
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) GenerateSequenceID() string {
|
|
return getStringID(wc.GenerateSequence())
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) GenerateSequence() int64 {
|
|
return wc.commandsHelper.getNextID()
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) CreateNewCommand(commandType enumspb.CommandType) *commandpb.Command {
|
|
return &commandpb.Command{
|
|
CommandType: commandType,
|
|
}
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivityParams, callback ResultHandler) ActivityID {
|
|
scheduleTaskAttr := &commandpb.ScheduleActivityTaskCommandAttributes{}
|
|
scheduleID := wc.GenerateSequence()
|
|
if parameters.ActivityID == "" {
|
|
scheduleTaskAttr.ActivityId = getStringID(scheduleID)
|
|
} else {
|
|
scheduleTaskAttr.ActivityId = parameters.ActivityID
|
|
}
|
|
activityID := scheduleTaskAttr.GetActivityId()
|
|
scheduleTaskAttr.ActivityType = &commonpb.ActivityType{Name: parameters.ActivityType.Name}
|
|
scheduleTaskAttr.TaskQueue = &taskqueuepb.TaskQueue{Name: parameters.TaskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}
|
|
scheduleTaskAttr.Input = parameters.Input
|
|
scheduleTaskAttr.ScheduleToCloseTimeout = ¶meters.ScheduleToCloseTimeout
|
|
scheduleTaskAttr.StartToCloseTimeout = ¶meters.StartToCloseTimeout
|
|
scheduleTaskAttr.ScheduleToStartTimeout = ¶meters.ScheduleToStartTimeout
|
|
scheduleTaskAttr.HeartbeatTimeout = ¶meters.HeartbeatTimeout
|
|
scheduleTaskAttr.RetryPolicy = parameters.RetryPolicy
|
|
scheduleTaskAttr.Header = parameters.Header
|
|
|
|
command := wc.commandsHelper.scheduleActivityTask(scheduleID, scheduleTaskAttr)
|
|
command.setData(&scheduledActivity{
|
|
callback: callback,
|
|
waitForCancelRequest: parameters.WaitForCancellation,
|
|
activityType: parameters.ActivityType,
|
|
})
|
|
|
|
wc.logger.Debug("ExecuteActivity",
|
|
tagActivityID, activityID,
|
|
tagActivityType, scheduleTaskAttr.ActivityType.GetName())
|
|
|
|
return ActivityID{id: activityID}
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) RequestCancelActivity(activityID ActivityID) {
|
|
command := wc.commandsHelper.requestCancelActivityTask(activityID.id)
|
|
activity := command.getData().(*scheduledActivity)
|
|
if activity.handled {
|
|
return
|
|
}
|
|
|
|
if command.isDone() || !activity.waitForCancelRequest {
|
|
activity.handle(nil, ErrCanceled)
|
|
}
|
|
|
|
wc.logger.Debug("RequestCancelActivity", tagActivityID, activityID)
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) ExecuteLocalActivity(params ExecuteLocalActivityParams, callback LocalActivityResultHandler) LocalActivityID {
|
|
activityID := wc.getNextLocalActivityID()
|
|
task := newLocalActivityTask(params, callback, activityID)
|
|
wc.pendingLaTasks[activityID] = task
|
|
wc.unstartedLaTasks[activityID] = struct{}{}
|
|
return LocalActivityID{id: activityID}
|
|
}
|
|
|
|
func newLocalActivityTask(params ExecuteLocalActivityParams, callback LocalActivityResultHandler, activityID string) *localActivityTask {
|
|
task := &localActivityTask{
|
|
activityID: activityID,
|
|
params: ¶ms,
|
|
callback: callback,
|
|
retryPolicy: params.RetryPolicy,
|
|
attempt: params.Attempt,
|
|
header: params.Header,
|
|
}
|
|
|
|
if params.ScheduleToCloseTimeout > 0 {
|
|
task.expireTime = params.ScheduledTime.Add(params.ScheduleToCloseTimeout)
|
|
}
|
|
return task
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) RequestCancelLocalActivity(activityID LocalActivityID) {
|
|
if task, ok := wc.pendingLaTasks[activityID.id]; ok {
|
|
task.cancel()
|
|
}
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) SetCurrentReplayTime(replayTime time.Time) {
|
|
if replayTime.Before(wc.currentReplayTime) {
|
|
return
|
|
}
|
|
wc.currentReplayTime = replayTime
|
|
wc.currentLocalTime = time.Now()
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) Now() time.Time {
|
|
return wc.currentReplayTime
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) NewTimer(d time.Duration, callback ResultHandler) *TimerID {
|
|
if d < 0 {
|
|
callback(nil, fmt.Errorf("negative duration provided %v", d))
|
|
return nil
|
|
}
|
|
if d == 0 {
|
|
callback(nil, nil)
|
|
return nil
|
|
}
|
|
|
|
timerID := wc.GenerateSequenceID()
|
|
startTimerAttr := &commandpb.StartTimerCommandAttributes{}
|
|
startTimerAttr.TimerId = timerID
|
|
startTimerAttr.StartToFireTimeout = &d
|
|
|
|
command := wc.commandsHelper.startTimer(startTimerAttr)
|
|
command.setData(&scheduledTimer{callback: callback})
|
|
|
|
wc.logger.Debug("NewTimer",
|
|
tagTimerID, startTimerAttr.GetTimerId(),
|
|
"Duration", d)
|
|
|
|
return &TimerID{id: timerID}
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) RequestCancelTimer(timerID TimerID) {
|
|
command := wc.commandsHelper.cancelTimer(timerID)
|
|
timer := command.getData().(*scheduledTimer)
|
|
if timer != nil {
|
|
if timer.handled {
|
|
return
|
|
}
|
|
timer.handle(nil, ErrCanceled)
|
|
}
|
|
wc.logger.Debug("RequestCancelTimer", tagTimerID, timerID)
|
|
}
|
|
|
|
func validateVersion(changeID string, version, minSupported, maxSupported Version) {
|
|
if version < minSupported {
|
|
panic(fmt.Sprintf("Workflow code removed support of version %v. "+
|
|
"for \"%v\" changeID. The oldest supported version is %v",
|
|
version, changeID, minSupported))
|
|
}
|
|
if version > maxSupported {
|
|
panic(fmt.Sprintf("Workflow code is too old to support version %v "+
|
|
"for \"%v\" changeID. The maximum supported version is %v",
|
|
version, changeID, maxSupported))
|
|
}
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version) Version {
|
|
if version, ok := wc.changeVersions[changeID]; ok {
|
|
validateVersion(changeID, version, minSupported, maxSupported)
|
|
return version
|
|
}
|
|
|
|
var version Version
|
|
if wc.isReplay {
|
|
// GetVersion for changeID is called first time in replay mode, use DefaultVersion
|
|
version = DefaultVersion
|
|
} else {
|
|
// GetVersion for changeID is called first time (non-replay mode), generate a marker command for it.
|
|
// Also upsert search attributes to enable ability to search by changeVersion.
|
|
version = maxSupported
|
|
wc.commandsHelper.recordVersionMarker(changeID, version, wc.GetDataConverter())
|
|
_ = wc.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, wc.changeVersions))
|
|
}
|
|
|
|
validateVersion(changeID, version, minSupported, maxSupported)
|
|
wc.changeVersions[changeID] = version
|
|
return version
|
|
}
|
|
|
|
func createSearchAttributesForChangeVersion(changeID string, version Version, existingChangeVersions map[string]Version) map[string]interface{} {
|
|
return map[string]interface{}{
|
|
TemporalChangeVersion: getChangeVersions(changeID, version, existingChangeVersions),
|
|
}
|
|
}
|
|
|
|
func getChangeVersions(changeID string, version Version, existingChangeVersions map[string]Version) []string {
|
|
res := []string{getChangeVersion(changeID, version)}
|
|
for k, v := range existingChangeVersions {
|
|
res = append(res, getChangeVersion(k, v))
|
|
}
|
|
return res
|
|
}
|
|
|
|
func getChangeVersion(changeID string, version Version) string {
|
|
return fmt.Sprintf("%s-%v", changeID, version)
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, error), callback ResultHandler) {
|
|
sideEffectID := wc.getNextSideEffectID()
|
|
var result *commonpb.Payloads
|
|
if wc.isReplay {
|
|
var ok bool
|
|
result, ok = wc.sideEffectResult[sideEffectID]
|
|
if !ok {
|
|
keys := make([]int64, 0, len(wc.sideEffectResult))
|
|
for k := range wc.sideEffectResult {
|
|
keys = append(keys, k)
|
|
}
|
|
panic(fmt.Sprintf("No cached result found for side effectID=%v. KnownSideEffects=%v",
|
|
sideEffectID, keys))
|
|
}
|
|
|
|
// Once the SideEffect has been consumed, we can free the referenced payload
|
|
// to reduce memory pressure
|
|
delete(wc.sideEffectResult, sideEffectID)
|
|
wc.logger.Debug("SideEffect returning already calculated result.",
|
|
tagSideEffectID, sideEffectID)
|
|
} else {
|
|
var err error
|
|
result, err = f()
|
|
if err != nil {
|
|
callback(result, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
wc.commandsHelper.recordSideEffectMarker(sideEffectID, result, wc.dataConverter)
|
|
|
|
callback(result, nil)
|
|
wc.logger.Debug("SideEffect Marker added", tagSideEffectID, sideEffectID)
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) MutableSideEffect(id string, f func() interface{}, equals func(a, b interface{}) bool) converter.EncodedValue {
|
|
if result, ok := wc.mutableSideEffect[id]; ok {
|
|
encodedResult := newEncodedValue(result, wc.GetDataConverter())
|
|
if wc.isReplay {
|
|
return encodedResult
|
|
}
|
|
|
|
newValue := f()
|
|
if wc.isEqualValue(newValue, result, equals) {
|
|
return encodedResult
|
|
}
|
|
|
|
return wc.recordMutableSideEffect(id, wc.encodeValue(newValue))
|
|
}
|
|
|
|
if wc.isReplay {
|
|
// This should not happen
|
|
panic(fmt.Sprintf("Non deterministic workflow code change detected. MutableSideEffect API call doesn't have a correspondent event in the workflow history. MutableSideEffect ID: %s", id))
|
|
}
|
|
|
|
return wc.recordMutableSideEffect(id, wc.encodeValue(f()))
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) isEqualValue(newValue interface{}, encodedOldValue *commonpb.Payloads, equals func(a, b interface{}) bool) bool {
|
|
if newValue == nil {
|
|
// new value is nil
|
|
newEncodedValue := wc.encodeValue(nil)
|
|
return proto.Equal(newEncodedValue, encodedOldValue)
|
|
}
|
|
|
|
oldValue := decodeValue(newEncodedValue(encodedOldValue, wc.GetDataConverter()), newValue)
|
|
return equals(newValue, oldValue)
|
|
}
|
|
|
|
func decodeValue(encodedValue converter.EncodedValue, value interface{}) interface{} {
|
|
// We need to decode oldValue out of encodedValue, first we need to prepare valuePtr as the same type as value
|
|
valuePtr := reflect.New(reflect.TypeOf(value)).Interface()
|
|
if err := encodedValue.Get(valuePtr); err != nil {
|
|
panic(err)
|
|
}
|
|
decodedValue := reflect.ValueOf(valuePtr).Elem().Interface()
|
|
return decodedValue
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) encodeValue(value interface{}) *commonpb.Payloads {
|
|
payload, err := wc.encodeArg(value)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return payload
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) encodeArg(arg interface{}) (*commonpb.Payloads, error) {
|
|
return wc.GetDataConverter().ToPayloads(arg)
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) recordMutableSideEffect(id string, data *commonpb.Payloads) converter.EncodedValue {
|
|
details, err := encodeArgs(wc.GetDataConverter(), []interface{}{id, data})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
wc.commandsHelper.recordMutableSideEffectMarker(id, details, wc.dataConverter)
|
|
wc.mutableSideEffect[id] = data
|
|
return newEncodedValue(data, wc.GetDataConverter())
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) AddSession(sessionInfo *SessionInfo) {
|
|
wc.openSessions[sessionInfo.SessionID] = sessionInfo
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) RemoveSession(sessionID string) {
|
|
delete(wc.openSessions, sessionID)
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) getOpenSessions() []*SessionInfo {
|
|
openSessions := make([]*SessionInfo, 0, len(wc.openSessions))
|
|
for _, info := range wc.openSessions {
|
|
openSessions = append(openSessions, info)
|
|
}
|
|
return openSessions
|
|
}
|
|
|
|
func (wc *workflowEnvironmentImpl) GetRegistry() *registry {
|
|
return wc.registry
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
|
|
event *historypb.HistoryEvent,
|
|
isReplay bool,
|
|
isLast bool,
|
|
) (err error) {
|
|
if event == nil {
|
|
return errors.New("nil event provided")
|
|
}
|
|
defer func() {
|
|
if p := recover(); p != nil {
|
|
weh.metricsHandler.Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1)
|
|
topLine := fmt.Sprintf("process event for %s [panic]:", weh.workflowInfo.TaskQueueName)
|
|
st := getStackTraceRaw(topLine, 7, 0)
|
|
weh.Complete(nil, newWorkflowPanicError(p, st))
|
|
}
|
|
}()
|
|
|
|
weh.isReplay = isReplay
|
|
traceLog(func() {
|
|
weh.logger.Debug("ProcessEvent",
|
|
tagEventID, event.GetEventId(),
|
|
tagEventType, event.GetEventType().String())
|
|
})
|
|
|
|
switch event.GetEventType() {
|
|
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
|
|
err = weh.handleWorkflowExecutionStarted(event.GetWorkflowExecutionStartedEventAttributes())
|
|
|
|
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED:
|
|
// No Operation
|
|
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED:
|
|
// No Operation
|
|
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
|
|
// No Operation
|
|
case enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
|
|
// No Operation
|
|
case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED:
|
|
// Set replay clock.
|
|
weh.SetCurrentReplayTime(common.TimeValue(event.GetEventTime()))
|
|
// Reset the counter on command helper used for generating ID for commands
|
|
weh.commandsHelper.setCurrentWorkflowTaskStartedEventID(event.GetEventId())
|
|
weh.workflowDefinition.OnWorkflowTaskStarted(weh.deadlockDetectionTimeout)
|
|
|
|
case enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
|
|
// No Operation
|
|
case enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED:
|
|
// No Operation
|
|
case enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
|
|
// No Operation
|
|
case enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
|
|
weh.commandsHelper.handleActivityTaskScheduled(
|
|
event.GetActivityTaskScheduledEventAttributes().GetActivityId(), event.GetEventId())
|
|
|
|
case enumspb.EVENT_TYPE_ACTIVITY_TASK_STARTED:
|
|
// No Operation
|
|
|
|
case enumspb.EVENT_TYPE_ACTIVITY_TASK_COMPLETED:
|
|
err = weh.handleActivityTaskCompleted(event)
|
|
|
|
case enumspb.EVENT_TYPE_ACTIVITY_TASK_FAILED:
|
|
err = weh.handleActivityTaskFailed(event)
|
|
|
|
case enumspb.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
|
|
err = weh.handleActivityTaskTimedOut(event)
|
|
|
|
case enumspb.EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED:
|
|
weh.commandsHelper.handleActivityTaskCancelRequested(
|
|
event.GetActivityTaskCancelRequestedEventAttributes().GetScheduledEventId())
|
|
|
|
case enumspb.EVENT_TYPE_ACTIVITY_TASK_CANCELED:
|
|
err = weh.handleActivityTaskCanceled(event)
|
|
|
|
case enumspb.EVENT_TYPE_TIMER_STARTED:
|
|
weh.commandsHelper.handleTimerStarted(event.GetTimerStartedEventAttributes().GetTimerId())
|
|
|
|
case enumspb.EVENT_TYPE_TIMER_FIRED:
|
|
weh.handleTimerFired(event)
|
|
|
|
case enumspb.EVENT_TYPE_TIMER_CANCELED:
|
|
weh.commandsHelper.handleTimerCanceled(event.GetTimerCanceledEventAttributes().GetTimerId())
|
|
|
|
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
|
|
weh.handleWorkflowExecutionCancelRequested()
|
|
|
|
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
|
|
// No Operation.
|
|
|
|
case enumspb.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
|
|
_ = weh.handleRequestCancelExternalWorkflowExecutionInitiated(event)
|
|
|
|
case enumspb.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
|
|
_ = weh.handleRequestCancelExternalWorkflowExecutionFailed(event)
|
|
|
|
case enumspb.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
|
|
_ = weh.handleExternalWorkflowExecutionCancelRequested(event)
|
|
|
|
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW:
|
|
// No Operation.
|
|
|
|
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED:
|
|
err = weh.handleWorkflowExecutionSignaled(event.GetWorkflowExecutionSignaledEventAttributes())
|
|
|
|
case enumspb.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
|
|
signalID := event.GetSignalExternalWorkflowExecutionInitiatedEventAttributes().Control
|
|
weh.commandsHelper.handleSignalExternalWorkflowExecutionInitiated(event.GetEventId(), signalID)
|
|
|
|
case enumspb.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
|
|
_ = weh.handleSignalExternalWorkflowExecutionFailed(event)
|
|
|
|
case enumspb.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED:
|
|
_ = weh.handleSignalExternalWorkflowExecutionCompleted(event)
|
|
|
|
case enumspb.EVENT_TYPE_MARKER_RECORDED:
|
|
err = weh.handleMarkerRecorded(event.GetEventId(), event.GetMarkerRecordedEventAttributes())
|
|
|
|
case enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED:
|
|
weh.commandsHelper.handleStartChildWorkflowExecutionInitiated(
|
|
event.GetStartChildWorkflowExecutionInitiatedEventAttributes().GetWorkflowId())
|
|
|
|
case enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED:
|
|
err = weh.handleStartChildWorkflowExecutionFailed(event)
|
|
|
|
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED:
|
|
err = weh.handleChildWorkflowExecutionStarted(event)
|
|
|
|
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED:
|
|
err = weh.handleChildWorkflowExecutionCompleted(event)
|
|
|
|
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED:
|
|
err = weh.handleChildWorkflowExecutionFailed(event)
|
|
|
|
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED:
|
|
err = weh.handleChildWorkflowExecutionCanceled(event)
|
|
|
|
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT:
|
|
err = weh.handleChildWorkflowExecutionTimedOut(event)
|
|
|
|
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED:
|
|
err = weh.handleChildWorkflowExecutionTerminated(event)
|
|
|
|
case enumspb.EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
|
|
weh.handleUpsertWorkflowSearchAttributes(event)
|
|
|
|
default:
|
|
weh.logger.Error("unknown event type",
|
|
tagEventID, event.GetEventId(),
|
|
tagEventType, event.GetEventType().String())
|
|
// Do not fail to be forward compatible with new events
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// When replaying histories to get stack trace or current state the last event might be not
|
|
// workflow task started. So always call OnWorkflowTaskStarted on the last event.
|
|
// Don't call for EventType_WorkflowTaskStarted as it was already called when handling it.
|
|
if isLast && event.GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED {
|
|
weh.workflowDefinition.OnWorkflowTaskStarted(weh.deadlockDetectionTimeout)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) ProcessQuery(
|
|
queryType string,
|
|
queryArgs *commonpb.Payloads,
|
|
header *commonpb.Header,
|
|
) (*commonpb.Payloads, error) {
|
|
switch queryType {
|
|
case QueryTypeStackTrace:
|
|
return weh.encodeArg(weh.StackTrace())
|
|
case QueryTypeOpenSessions:
|
|
return weh.encodeArg(weh.getOpenSessions())
|
|
default:
|
|
result, err := weh.queryHandler(queryType, queryArgs, header)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if result.Size() > queryResultSizeLimit {
|
|
weh.logger.Error("Query result size exceeds limit.",
|
|
tagQueryType, queryType,
|
|
tagWorkflowID, weh.workflowInfo.WorkflowExecution.ID,
|
|
tagRunID, weh.workflowInfo.WorkflowExecution.RunID)
|
|
return nil, fmt.Errorf("query result size (%v) exceeds limit (%v)", result.Size(), queryResultSizeLimit)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) StackTrace() string {
|
|
return weh.workflowDefinition.StackTrace()
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) Close() {
|
|
if weh.workflowDefinition != nil {
|
|
weh.workflowDefinition.Close()
|
|
}
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionStarted(
|
|
attributes *historypb.WorkflowExecutionStartedEventAttributes) (err error) {
|
|
weh.workflowDefinition, err = weh.registry.getWorkflowDefinition(
|
|
weh.workflowInfo.WorkflowType,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Invoke the workflow.
|
|
weh.workflowDefinition.Execute(weh, attributes.Header, attributes.Input)
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskCompleted(event *historypb.HistoryEvent) error {
|
|
activityID, scheduledEventID := weh.commandsHelper.getActivityAndScheduledEventIDs(event)
|
|
command := weh.commandsHelper.handleActivityTaskClosed(activityID, scheduledEventID)
|
|
activity := command.getData().(*scheduledActivity)
|
|
if activity.handled {
|
|
return nil
|
|
}
|
|
activity.handle(event.GetActivityTaskCompletedEventAttributes().Result, nil)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskFailed(event *historypb.HistoryEvent) error {
|
|
activityID, scheduledEventID := weh.commandsHelper.getActivityAndScheduledEventIDs(event)
|
|
command := weh.commandsHelper.handleActivityTaskClosed(activityID, scheduledEventID)
|
|
activity := command.getData().(*scheduledActivity)
|
|
if activity.handled {
|
|
return nil
|
|
}
|
|
|
|
attributes := event.GetActivityTaskFailedEventAttributes()
|
|
activityTaskErr := NewActivityError(
|
|
attributes.GetScheduledEventId(),
|
|
attributes.GetStartedEventId(),
|
|
attributes.GetIdentity(),
|
|
&commonpb.ActivityType{Name: activity.activityType.Name},
|
|
activityID,
|
|
attributes.GetRetryState(),
|
|
ConvertFailureToError(attributes.GetFailure(), weh.GetDataConverter()),
|
|
)
|
|
|
|
activity.handle(nil, activityTaskErr)
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskTimedOut(event *historypb.HistoryEvent) error {
|
|
activityID, scheduledEventID := weh.commandsHelper.getActivityAndScheduledEventIDs(event)
|
|
command := weh.commandsHelper.handleActivityTaskClosed(activityID, scheduledEventID)
|
|
activity := command.getData().(*scheduledActivity)
|
|
if activity.handled {
|
|
return nil
|
|
}
|
|
|
|
attributes := event.GetActivityTaskTimedOutEventAttributes()
|
|
timeoutError := ConvertFailureToError(attributes.GetFailure(), weh.GetDataConverter())
|
|
|
|
activityTaskErr := NewActivityError(
|
|
attributes.GetScheduledEventId(),
|
|
attributes.GetStartedEventId(),
|
|
"",
|
|
&commonpb.ActivityType{Name: activity.activityType.Name},
|
|
activityID,
|
|
attributes.GetRetryState(),
|
|
timeoutError,
|
|
)
|
|
|
|
activity.handle(nil, activityTaskErr)
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskCanceled(event *historypb.HistoryEvent) error {
|
|
activityID, scheduledEventID := weh.commandsHelper.getActivityAndScheduledEventIDs(event)
|
|
command := weh.commandsHelper.handleActivityTaskCanceled(activityID, scheduledEventID)
|
|
activity := command.getData().(*scheduledActivity)
|
|
if activity.handled {
|
|
return nil
|
|
}
|
|
|
|
if command.isDone() || !activity.waitForCancelRequest {
|
|
// Clear this so we don't have a recursive call that while executing might call the cancel one.
|
|
|
|
attributes := event.GetActivityTaskCanceledEventAttributes()
|
|
details := newEncodedValues(attributes.GetDetails(), weh.GetDataConverter())
|
|
|
|
activityTaskErr := NewActivityError(
|
|
attributes.GetScheduledEventId(),
|
|
attributes.GetStartedEventId(),
|
|
attributes.GetIdentity(),
|
|
&commonpb.ActivityType{Name: activity.activityType.Name},
|
|
activityID,
|
|
enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE,
|
|
NewCanceledError(details),
|
|
)
|
|
|
|
activity.handle(nil, activityTaskErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleTimerFired(event *historypb.HistoryEvent) {
|
|
timerID := event.GetTimerFiredEventAttributes().GetTimerId()
|
|
command := weh.commandsHelper.handleTimerClosed(timerID)
|
|
timer := command.getData().(*scheduledTimer)
|
|
if timer.handled {
|
|
return
|
|
}
|
|
|
|
timer.handle(nil, nil)
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionCancelRequested() {
|
|
weh.cancelHandler()
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded(
|
|
eventID int64,
|
|
attributes *historypb.MarkerRecordedEventAttributes,
|
|
) error {
|
|
var err error
|
|
if attributes.GetDetails() == nil {
|
|
err = ErrMissingMarkerDetails
|
|
} else {
|
|
switch attributes.GetMarkerName() {
|
|
case sideEffectMarkerName:
|
|
if sideEffectIDPayload, ok := attributes.GetDetails()[sideEffectMarkerIDName]; !ok {
|
|
err = fmt.Errorf("key %q: %w", sideEffectMarkerIDName, ErrMissingMarkerDataKey)
|
|
} else {
|
|
if sideEffectData, ok := attributes.GetDetails()[sideEffectMarkerDataName]; !ok {
|
|
err = fmt.Errorf("key %q: %w", sideEffectMarkerDataName, ErrMissingMarkerDataKey)
|
|
} else {
|
|
var sideEffectID int64
|
|
_ = weh.dataConverter.FromPayloads(sideEffectIDPayload, &sideEffectID)
|
|
weh.sideEffectResult[sideEffectID] = sideEffectData
|
|
}
|
|
}
|
|
case versionMarkerName:
|
|
if changeIDPayload, ok := attributes.GetDetails()[versionMarkerChangeIDName]; !ok {
|
|
err = fmt.Errorf("key %q: %w", versionMarkerChangeIDName, ErrMissingMarkerDataKey)
|
|
} else {
|
|
if versionPayload, ok := attributes.GetDetails()[versionMarkerDataName]; !ok {
|
|
err = fmt.Errorf("key %q: %w", versionMarkerDataName, ErrMissingMarkerDataKey)
|
|
} else {
|
|
var changeID string
|
|
_ = weh.dataConverter.FromPayloads(changeIDPayload, &changeID)
|
|
var version Version
|
|
_ = weh.dataConverter.FromPayloads(versionPayload, &version)
|
|
weh.changeVersions[changeID] = version
|
|
weh.commandsHelper.handleVersionMarker(eventID, changeID)
|
|
}
|
|
}
|
|
case localActivityMarkerName:
|
|
err = weh.handleLocalActivityMarker(attributes.GetDetails(), attributes.GetFailure())
|
|
case mutableSideEffectMarkerName:
|
|
if sideEffectIDPayload, ok := attributes.GetDetails()[sideEffectMarkerIDName]; !ok {
|
|
err = fmt.Errorf("key %q: %w", sideEffectMarkerIDName, ErrMissingMarkerDataKey)
|
|
} else {
|
|
if sideEffectData, ok := attributes.GetDetails()[sideEffectMarkerDataName]; !ok {
|
|
err = fmt.Errorf("key %q: %w", sideEffectMarkerDataName, ErrMissingMarkerDataKey)
|
|
} else {
|
|
var sideEffectID string
|
|
_ = weh.dataConverter.FromPayloads(sideEffectIDPayload, &sideEffectID)
|
|
weh.mutableSideEffect[sideEffectID] = sideEffectData
|
|
}
|
|
}
|
|
default:
|
|
err = ErrUnknownMarkerName
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("marker name %q for eventId %d: %w", attributes.GetMarkerName(), eventID, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(details map[string]*commonpb.Payloads, failure *failurepb.Failure) error {
|
|
var markerData *commonpb.Payloads
|
|
var ok bool
|
|
if markerData, ok = details[localActivityMarkerDataName]; !ok {
|
|
return fmt.Errorf("key %q: %w", localActivityMarkerDataName, ErrMissingMarkerDataKey)
|
|
}
|
|
|
|
lamd := localActivityMarkerData{}
|
|
if err := weh.dataConverter.FromPayloads(markerData, &lamd); err != nil {
|
|
return err
|
|
}
|
|
|
|
if la, ok := weh.pendingLaTasks[lamd.ActivityID]; ok {
|
|
if len(lamd.ActivityType) > 0 && lamd.ActivityType != la.params.ActivityType {
|
|
// history marker mismatch to the current code.
|
|
panicMsg := fmt.Sprintf("code execute local activity %v, but history event found %v, markerData: %v", la.params.ActivityType, lamd.ActivityType, markerData)
|
|
panicIllegalState(panicMsg)
|
|
}
|
|
weh.commandsHelper.recordLocalActivityMarker(lamd.ActivityID, details, failure)
|
|
delete(weh.pendingLaTasks, lamd.ActivityID)
|
|
delete(weh.unstartedLaTasks, lamd.ActivityID)
|
|
lar := &LocalActivityResultWrapper{}
|
|
if failure != nil {
|
|
lar.Attempt = lamd.Attempt
|
|
lar.Backoff = lamd.Backoff
|
|
lar.Err = ConvertFailureToError(failure, weh.GetDataConverter())
|
|
} else {
|
|
// Result might not be there if local activity doesn't have return value.
|
|
lar.Result = details[localActivityResultName]
|
|
}
|
|
la.callback(lar)
|
|
|
|
// update time
|
|
weh.SetCurrentReplayTime(lamd.ReplayTime)
|
|
|
|
// resume workflow execution after apply local activity result
|
|
weh.workflowDefinition.OnWorkflowTaskStarted(weh.deadlockDetectionTimeout)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) ProcessLocalActivityResult(lar *localActivityResult) error {
|
|
details := make(map[string]*commonpb.Payloads)
|
|
|
|
// convert local activity result and error to marker data
|
|
lamd := localActivityMarkerData{
|
|
ActivityID: lar.task.activityID,
|
|
ActivityType: lar.task.params.ActivityType,
|
|
ReplayTime: weh.currentReplayTime.Add(time.Since(weh.currentLocalTime)),
|
|
Attempt: lar.task.attempt,
|
|
}
|
|
if lar.err != nil {
|
|
lamd.Backoff = lar.backoff
|
|
} else if lar.result != nil {
|
|
details[localActivityResultName] = lar.result
|
|
}
|
|
|
|
// encode marker data
|
|
markerData, err := weh.encodeArg(lamd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
details[localActivityMarkerDataName] = markerData
|
|
|
|
// create marker event for local activity result
|
|
markerEvent := &historypb.HistoryEvent{
|
|
EventType: enumspb.EVENT_TYPE_MARKER_RECORDED,
|
|
Attributes: &historypb.HistoryEvent_MarkerRecordedEventAttributes{MarkerRecordedEventAttributes: &historypb.MarkerRecordedEventAttributes{
|
|
MarkerName: localActivityMarkerName,
|
|
Failure: ConvertErrorToFailure(lar.err, weh.GetDataConverter()),
|
|
Details: details,
|
|
}},
|
|
}
|
|
|
|
// apply the local activity result to workflow
|
|
return weh.ProcessEvent(markerEvent, false, false)
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionSignaled(
|
|
attributes *historypb.WorkflowExecutionSignaledEventAttributes) error {
|
|
return weh.signalHandler(attributes.GetSignalName(), attributes.Input, attributes.Header)
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleStartChildWorkflowExecutionFailed(event *historypb.HistoryEvent) error {
|
|
attributes := event.GetStartChildWorkflowExecutionFailedEventAttributes()
|
|
childWorkflowID := attributes.GetWorkflowId()
|
|
command := weh.commandsHelper.handleStartChildWorkflowExecutionFailed(childWorkflowID)
|
|
childWorkflow := command.getData().(*scheduledChildWorkflow)
|
|
if childWorkflow.handled {
|
|
return nil
|
|
}
|
|
|
|
if attributes.GetCause() == enumspb.START_CHILD_WORKFLOW_EXECUTION_FAILED_CAUSE_WORKFLOW_ALREADY_EXISTS {
|
|
err := NewChildWorkflowExecutionError(
|
|
attributes.GetNamespace(),
|
|
attributes.GetWorkflowId(),
|
|
"",
|
|
attributes.GetWorkflowType().GetName(),
|
|
attributes.GetInitiatedEventId(),
|
|
0,
|
|
enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE,
|
|
&ChildWorkflowExecutionAlreadyStartedError{},
|
|
)
|
|
childWorkflow.handleFailedToStart(nil, err)
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("unknown cause: %v", attributes.GetCause())
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionStarted(event *historypb.HistoryEvent) error {
|
|
attributes := event.GetChildWorkflowExecutionStartedEventAttributes()
|
|
childWorkflowID := attributes.WorkflowExecution.GetWorkflowId()
|
|
childRunID := attributes.WorkflowExecution.GetRunId()
|
|
command := weh.commandsHelper.handleChildWorkflowExecutionStarted(childWorkflowID)
|
|
childWorkflow := command.getData().(*scheduledChildWorkflow)
|
|
if childWorkflow.handled {
|
|
return nil
|
|
}
|
|
|
|
childWorkflowExecution := WorkflowExecution{
|
|
ID: childWorkflowID,
|
|
RunID: childRunID,
|
|
}
|
|
childWorkflow.startedCallback(childWorkflowExecution, nil)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionCompleted(event *historypb.HistoryEvent) error {
|
|
attributes := event.GetChildWorkflowExecutionCompletedEventAttributes()
|
|
childWorkflowID := attributes.WorkflowExecution.GetWorkflowId()
|
|
command := weh.commandsHelper.handleChildWorkflowExecutionClosed(childWorkflowID)
|
|
childWorkflow := command.getData().(*scheduledChildWorkflow)
|
|
if childWorkflow.handled {
|
|
return nil
|
|
}
|
|
childWorkflow.handle(attributes.Result, nil)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionFailed(event *historypb.HistoryEvent) error {
|
|
attributes := event.GetChildWorkflowExecutionFailedEventAttributes()
|
|
childWorkflowID := attributes.WorkflowExecution.GetWorkflowId()
|
|
command := weh.commandsHelper.handleChildWorkflowExecutionClosed(childWorkflowID)
|
|
childWorkflow := command.getData().(*scheduledChildWorkflow)
|
|
if childWorkflow.handled {
|
|
return nil
|
|
}
|
|
|
|
childWorkflowExecutionError := NewChildWorkflowExecutionError(
|
|
attributes.GetNamespace(),
|
|
attributes.GetWorkflowExecution().GetWorkflowId(),
|
|
attributes.GetWorkflowExecution().GetRunId(),
|
|
attributes.GetWorkflowType().GetName(),
|
|
attributes.GetInitiatedEventId(),
|
|
attributes.GetStartedEventId(),
|
|
attributes.GetRetryState(),
|
|
ConvertFailureToError(attributes.GetFailure(), weh.GetDataConverter()),
|
|
)
|
|
childWorkflow.handle(nil, childWorkflowExecutionError)
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionCanceled(event *historypb.HistoryEvent) error {
|
|
attributes := event.GetChildWorkflowExecutionCanceledEventAttributes()
|
|
childWorkflowID := attributes.WorkflowExecution.GetWorkflowId()
|
|
command := weh.commandsHelper.handleChildWorkflowExecutionCanceled(childWorkflowID)
|
|
childWorkflow := command.getData().(*scheduledChildWorkflow)
|
|
if childWorkflow.handled {
|
|
return nil
|
|
}
|
|
details := newEncodedValues(attributes.Details, weh.GetDataConverter())
|
|
|
|
childWorkflowExecutionError := NewChildWorkflowExecutionError(
|
|
attributes.GetNamespace(),
|
|
attributes.GetWorkflowExecution().GetWorkflowId(),
|
|
attributes.GetWorkflowExecution().GetRunId(),
|
|
attributes.GetWorkflowType().GetName(),
|
|
attributes.GetInitiatedEventId(),
|
|
attributes.GetStartedEventId(),
|
|
enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE,
|
|
NewCanceledError(details),
|
|
)
|
|
childWorkflow.handle(nil, childWorkflowExecutionError)
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTimedOut(event *historypb.HistoryEvent) error {
|
|
attributes := event.GetChildWorkflowExecutionTimedOutEventAttributes()
|
|
childWorkflowID := attributes.WorkflowExecution.GetWorkflowId()
|
|
command := weh.commandsHelper.handleChildWorkflowExecutionClosed(childWorkflowID)
|
|
childWorkflow := command.getData().(*scheduledChildWorkflow)
|
|
if childWorkflow.handled {
|
|
return nil
|
|
}
|
|
|
|
childWorkflowExecutionError := NewChildWorkflowExecutionError(
|
|
attributes.GetNamespace(),
|
|
attributes.GetWorkflowExecution().GetWorkflowId(),
|
|
attributes.GetWorkflowExecution().GetRunId(),
|
|
attributes.GetWorkflowType().GetName(),
|
|
attributes.GetInitiatedEventId(),
|
|
attributes.GetStartedEventId(),
|
|
attributes.GetRetryState(),
|
|
NewTimeoutError("Child workflow timeout", enumspb.TIMEOUT_TYPE_START_TO_CLOSE, nil),
|
|
)
|
|
childWorkflow.handle(nil, childWorkflowExecutionError)
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTerminated(event *historypb.HistoryEvent) error {
|
|
attributes := event.GetChildWorkflowExecutionTerminatedEventAttributes()
|
|
childWorkflowID := attributes.WorkflowExecution.GetWorkflowId()
|
|
command := weh.commandsHelper.handleChildWorkflowExecutionClosed(childWorkflowID)
|
|
childWorkflow := command.getData().(*scheduledChildWorkflow)
|
|
if childWorkflow.handled {
|
|
return nil
|
|
}
|
|
|
|
childWorkflowExecutionError := NewChildWorkflowExecutionError(
|
|
attributes.GetNamespace(),
|
|
attributes.GetWorkflowExecution().GetWorkflowId(),
|
|
attributes.GetWorkflowExecution().GetRunId(),
|
|
attributes.GetWorkflowType().GetName(),
|
|
attributes.GetInitiatedEventId(),
|
|
attributes.GetStartedEventId(),
|
|
enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE,
|
|
newTerminatedError(),
|
|
)
|
|
childWorkflow.handle(nil, childWorkflowExecutionError)
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleUpsertWorkflowSearchAttributes(event *historypb.HistoryEvent) {
|
|
weh.updateWorkflowInfoWithSearchAttributes(event.GetUpsertWorkflowSearchAttributesEventAttributes().SearchAttributes)
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleRequestCancelExternalWorkflowExecutionInitiated(event *historypb.HistoryEvent) error {
|
|
// For cancellation of child workflow only, we do not use cancellation ID
|
|
// for cancellation of external workflow, we have to use cancellation ID
|
|
attribute := event.GetRequestCancelExternalWorkflowExecutionInitiatedEventAttributes()
|
|
workflowID := attribute.WorkflowExecution.GetWorkflowId()
|
|
cancellationID := attribute.Control
|
|
weh.commandsHelper.handleRequestCancelExternalWorkflowExecutionInitiated(event.GetEventId(), workflowID, cancellationID)
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleExternalWorkflowExecutionCancelRequested(event *historypb.HistoryEvent) error {
|
|
// For cancellation of child workflow only, we do not use cancellation ID
|
|
// for cancellation of external workflow, we have to use cancellation ID
|
|
attributes := event.GetExternalWorkflowExecutionCancelRequestedEventAttributes()
|
|
workflowID := attributes.WorkflowExecution.GetWorkflowId()
|
|
isExternal, command := weh.commandsHelper.handleExternalWorkflowExecutionCancelRequested(attributes.GetInitiatedEventId(), workflowID)
|
|
if isExternal {
|
|
// for cancel external workflow, we need to set the future
|
|
cancellation := command.getData().(*scheduledCancellation)
|
|
if cancellation.handled {
|
|
return nil
|
|
}
|
|
cancellation.handle(nil, nil)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleRequestCancelExternalWorkflowExecutionFailed(event *historypb.HistoryEvent) error {
|
|
// For cancellation of child workflow only, we do not use cancellation ID
|
|
// for cancellation of external workflow, we have to use cancellation ID
|
|
attributes := event.GetRequestCancelExternalWorkflowExecutionFailedEventAttributes()
|
|
workflowID := attributes.WorkflowExecution.GetWorkflowId()
|
|
isExternal, command := weh.commandsHelper.handleRequestCancelExternalWorkflowExecutionFailed(attributes.GetInitiatedEventId(), workflowID)
|
|
if isExternal {
|
|
// for cancel external workflow, we need to set the future
|
|
cancellation := command.getData().(*scheduledCancellation)
|
|
if cancellation.handled {
|
|
return nil
|
|
}
|
|
err := fmt.Errorf("cancel external workflow failed, %v", attributes.GetCause())
|
|
cancellation.handle(nil, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecutionCompleted(event *historypb.HistoryEvent) error {
|
|
attributes := event.GetExternalWorkflowExecutionSignaledEventAttributes()
|
|
command := weh.commandsHelper.handleSignalExternalWorkflowExecutionCompleted(attributes.GetInitiatedEventId())
|
|
signal := command.getData().(*scheduledSignal)
|
|
if signal.handled {
|
|
return nil
|
|
}
|
|
signal.handle(nil, nil)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecutionFailed(event *historypb.HistoryEvent) error {
|
|
attributes := event.GetSignalExternalWorkflowExecutionFailedEventAttributes()
|
|
command := weh.commandsHelper.handleSignalExternalWorkflowExecutionFailed(attributes.GetInitiatedEventId())
|
|
signal := command.getData().(*scheduledSignal)
|
|
if signal.handled {
|
|
return nil
|
|
}
|
|
|
|
var err error
|
|
switch attributes.GetCause() {
|
|
case enumspb.SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_EXTERNAL_WORKFLOW_EXECUTION_NOT_FOUND:
|
|
err = newUnknownExternalWorkflowExecutionError()
|
|
default:
|
|
err = fmt.Errorf("signal external workflow failed, %v", attributes.GetCause())
|
|
}
|
|
|
|
signal.handle(nil, err)
|
|
|
|
return nil
|
|
}
|