-
Notifications
You must be signed in to change notification settings - Fork 220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add saga interceptor for worker hosting saga #936
base: master
Are you sure you want to change the base?
Conversation
I am not sure this has value as a completely separate module in the SDK. It is a single, simple file. Can you add a new |
One reason of this PR is to ask for discussion of Saga pattern impl. I think Go sdk should ship Saga util in some form, not just samples, and interceptor is my proposal. Compensations should be done in a framework way instead of handling errors manually. |
This general conversation may be best at temporalio/features#50 where we're tracking our cross-SDK features instead of specific to Go SDK. I am not sure it is best to separate the invocation of an activity from its compensation declaration or rollback or error handling. Also we often discourage hidden logic in interceptors as they make it hard for a developer to understand what is actually happening in their workflow when trying to follow the code while looking at history. A helper to call on defer is often clearer since it's more explicit. If there's a general purpose Go Saga implementation accepted by the community, it could easily be used with Temporal. But I think you'll usually find explicit error handling and rollbacks to be the norm in the ecosystem. There are a few issues with the code if you'd like me to give it a code review anyways. |
Thanks for the answers, I will move the code to samples. For my team, we can get the conclusion that SAGAs is trivial to be built on top of workflow engines, and transaction solutions like Seata is tuned for performance issue not for general business purpose. |
Note that the "high performance" in the first bullet of Seata SAGA is compared to their other modes, which involve locking. Temporal doesn't require locking, is very high throughout, and it's easy to implement compensation logic—easier and more flexible to implement in Temporal workflow code than in Seata's JSON state machines. |
func testWorkflow(ctx workflow.Context, a int) error { | ||
zap.L().Debug("enter workflow") | ||
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ | ||
StartToCloseTimeout: time.Minute, | ||
}) | ||
var id string | ||
if err := workflow.ExecuteActivity(ctx, "createOrder", a).Get(ctx, &id); err != nil { | ||
return err | ||
} | ||
zap.L().Debug("create order, id:", zap.String("id", id)) | ||
if err := workflow.ExecuteActivity(ctx, "stockDeduct", a).Get(ctx, nil); err != nil { | ||
return err | ||
} | ||
if err := workflow.ExecuteActivity(ctx, "createPay", a).Get(ctx, nil); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func testWorkflow(ctx workflow.Context, a int) error { | |
zap.L().Debug("enter workflow") | |
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ | |
StartToCloseTimeout: time.Minute, | |
}) | |
var id string | |
if err := workflow.ExecuteActivity(ctx, "createOrder", a).Get(ctx, &id); err != nil { | |
return err | |
} | |
zap.L().Debug("create order, id:", zap.String("id", id)) | |
if err := workflow.ExecuteActivity(ctx, "stockDeduct", a).Get(ctx, nil); err != nil { | |
return err | |
} | |
if err := workflow.ExecuteActivity(ctx, "createPay", a).Get(ctx, nil); err != nil { | |
return err | |
} | |
return nil | |
} | |
func testWorkflow(ctx workflow.Context, a int) error { | |
var txn txn | |
zap.L().Debug("enter workflow") | |
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ | |
StartToCloseTimeout: time.Minute, | |
}) | |
var id string | |
if err := workflow.ExecuteActivity(ctx, "createOrder", a).Get(ctx, &id); err != nil { | |
return err | |
} | |
zap.L().Debug("create order, id:", zap.String("id", id)) | |
defer txn.executeRollbackActivity(ctx, "deleteOrder") | |
if err := workflow.ExecuteActivity(ctx, "stockDeduct", a).Get(ctx, nil); err != nil { | |
return err | |
} | |
defer txn.executeRollbackActivity(ctx, "stockInc") | |
if err := workflow.ExecuteActivity(ctx, "createPay", a).Get(ctx, nil); err != nil { | |
return err | |
} | |
txn.markSuccessful() | |
return nil | |
} | |
type txn bool | |
func (t *txn) markSuccessful() { *s = true } | |
func (t *txn) executeRollbackActivity(ctx workflow.Context, activity interface{}, args ...interface{}) { | |
if !*txn { | |
ctx, cancel := workflow.NewDisconnectedContext(ctx) | |
defer cancel() | |
if err := workflow.ExecuteActivity(ctx, activity, args...).Get(ctx); err != nil { | |
workflow.GetLogger(ctx).Error("failed to convert to compensate req", zap.Error(err)) | |
} | |
} | |
} |
Untested (just typed here in issue), but this is a much simpler pattern than hiding things behind interceptors and of course you can customize it if you need to convert arguments or whatever. Notice this is just using Go defer
as it should be used which is much simpler/clearer to Go developers than separate interceptors.
We have tested the throughput of some frameworks for distributed transaction. With same hardware resources and SQL storage backend, these frameworks can get 1k tps per core, that's much better than temporal. |
Oh interesting! If you write a post about the test, I'd be interested to read it. I wonder whether they made any reliability or flexibility tradeoffs to get there. |
What was changed
add worker interceptor for hosting saga workflow
Why?
saga sample has many duplicate error handle,
besides, every one will have to learn Saga pattern and then to read© code from others, it sucks.
Interceptor and local state is suitable for temporal to impl Saga pattern.
Checklist
Closes
How was this tested: