Skip to content

Commit

Permalink
feat: shared meta
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 committed Dec 6, 2024
1 parent e2ac89e commit 6110760
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 32 deletions.
37 changes: 33 additions & 4 deletions pkg/client/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type adminClientImpl struct {
namespace string

subscriber SubscribeClient

sharedMeta map[string]string
}

func newAdmin(conn *grpc.ClientConn, opts *sharedClientOpts, subscriber SubscribeClient) AdminClient {
Expand All @@ -79,6 +81,7 @@ func newAdmin(conn *grpc.ClientConn, opts *sharedClientOpts, subscriber Subscrib
ctx: opts.ctxLoader,
namespace: opts.namespace,
subscriber: subscriber,
sharedMeta: opts.sharedMeta,
}
}

Expand Down Expand Up @@ -286,10 +289,12 @@ func (a *adminClientImpl) RunChildWorkflow(workflowName string, input interface{

childIndex := int32(opts.ChildIndex) // nolint: gosec

metadataBytes, err := json.Marshal(opts.AdditionalMetadata)
metadataBytes, err := a.getAdditionalMetaBytes(opts.AdditionalMetadata)

if err != nil {
return "", fmt.Errorf("could not marshal additional metadata: %w", err)
return "", fmt.Errorf("could not get additional metadata: %w", err)
}

metadata := string(metadataBytes)

res, err := a.client.TriggerWorkflow(a.ctx.newContext(context.Background()), &admincontracts.TriggerWorkflowRequest{
Expand Down Expand Up @@ -349,10 +354,12 @@ func (a *adminClientImpl) RunChildWorkflows(workflows []*RunChildWorkflowsOpts)
}
childIndex := int32(workflow.Opts.ChildIndex) // nolint: gosec

metadataBytes, err := json.Marshal(workflow.Opts.AdditionalMetadata)
metadataBytes, err := a.getAdditionalMetaBytes(workflow.Opts.AdditionalMetadata)

if err != nil {
return nil, fmt.Errorf("could not marshal additional metadata: %w", err)
return nil, fmt.Errorf("could not get additional metadata: %w", err)
}

metadata := string(metadataBytes)

triggerWorkflowRequests[i] = &admincontracts.TriggerWorkflowRequest{
Expand Down Expand Up @@ -574,3 +581,25 @@ func (a *adminClientImpl) getJobOpts(jobName string, job *types.WorkflowJob) (*a

return jobOpt, nil
}

func (a *adminClientImpl) getAdditionalMetaBytes(opt *map[string]string) ([]byte, error) {
additionalMeta := make(map[string]string)

for key, value := range a.sharedMeta {
additionalMeta[key] = value
}

if opt != nil {
for key, value := range *opt {
additionalMeta[key] = value
}
}

metadataBytes, err := json.Marshal(additionalMeta)

if err != nil {
return nil, fmt.Errorf("could not marshal additional metadata: %w", err)
}

return metadataBytes, nil
}
36 changes: 26 additions & 10 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type ClientOpts struct {
token string
namespace string
noGrpcRetry bool
sharedMeta map[string]string

cloudRegisterID *string
runnableActions []string
Expand Down Expand Up @@ -129,6 +130,7 @@ func defaultClientOpts(token *string, cf *client.ClientConfigFile) *ClientOpts {
cloudRegisterID: clientConfig.CloudRegisterID,
runnableActions: clientConfig.RunnableActions,
noGrpcRetry: clientConfig.NoGrpcRetry,
sharedMeta: make(map[string]string),
}
}

Expand Down Expand Up @@ -169,6 +171,18 @@ func WithNamespace(namespace string) ClientOpt {
}
}

func WithSharedMeta(meta map[string]string) ClientOpt {
return func(opts *ClientOpts) {
if opts.sharedMeta == nil {
opts.sharedMeta = make(map[string]string)
}

for k, v := range meta {
opts.sharedMeta[k] = v
}
}
}

func InitWorkflows() ClientOpt {
return func(opts *ClientOpts) {
opts.initWorkflows = true
Expand All @@ -186,11 +200,12 @@ func WithWorkflows(files []*types.Workflow) ClientOpt {
}

type sharedClientOpts struct {
tenantId string
namespace string
l *zerolog.Logger
v validator.Validator
ctxLoader *contextLoader
tenantId string
namespace string
l *zerolog.Logger
v validator.Validator
ctxLoader *contextLoader
sharedMeta map[string]string
}

// New creates a new client instance.
Expand Down Expand Up @@ -275,11 +290,12 @@ func newFromOpts(opts *ClientOpts) (Client, error) {
}

shared := &sharedClientOpts{
tenantId: opts.tenantId,
namespace: opts.namespace,
l: opts.l,
v: opts.v,
ctxLoader: newContextLoader(opts.token),
tenantId: opts.tenantId,
namespace: opts.namespace,
l: opts.l,
v: opts.v,
ctxLoader: newContextLoader(opts.token),
sharedMeta: opts.sharedMeta,
}

subscribe := newSubscribe(conn, shared)
Expand Down
71 changes: 53 additions & 18 deletions pkg/client/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"context"
"encoding/json"
"fmt"

"github.com/rs/zerolog"
"google.golang.org/grpc"
Expand All @@ -12,7 +13,11 @@ import (
"github.com/hatchet-dev/hatchet/pkg/validator"
)

type PushOpFunc func(*eventcontracts.PushEventRequest) error
type pushOpt struct {
additionalMetadata map[string]string
}

type PushOpFunc func(*pushOpt) error

type BulkPushOpFunc func(*eventcontracts.BulkPushEventRequest) error

Expand Down Expand Up @@ -44,29 +49,25 @@ type eventClientImpl struct {
v validator.Validator

ctx *contextLoader

sharedMeta map[string]string
}

func newEvent(conn *grpc.ClientConn, opts *sharedClientOpts) EventClient {
return &eventClientImpl{
client: eventcontracts.NewEventsServiceClient(conn),
tenantId: opts.tenantId,
namespace: opts.namespace,
l: opts.l,
v: opts.v,
ctx: opts.ctxLoader,
client: eventcontracts.NewEventsServiceClient(conn),
tenantId: opts.tenantId,
namespace: opts.namespace,
l: opts.l,
v: opts.v,
ctx: opts.ctxLoader,
sharedMeta: opts.sharedMeta,
}
}

func WithEventMetadata(metadata interface{}) PushOpFunc {
return func(r *eventcontracts.PushEventRequest) error {
metadataBytes, err := json.Marshal(metadata)
if err != nil {
return err
}

metadataString := string(metadataBytes)

r.AdditionalMetadata = &metadataString
func WithEventMetadata(metadata map[string]string) PushOpFunc {
return func(r *pushOpt) error {
r.additionalMetadata = metadata

return nil
}
Expand All @@ -87,13 +88,25 @@ func (a *eventClientImpl) Push(ctx context.Context, eventKey string, payload int

request.Payload = string(payloadBytes)

opts := &pushOpt{}

for _, optionFunc := range options {
err = optionFunc(&request)
err = optionFunc(opts)
if err != nil {
return err
}
}

additionalMetaBytes, err := a.getAdditionalMetaBytes(&opts.additionalMetadata)

if err != nil {
return err
}

additionalMetaString := string(additionalMetaBytes)

request.AdditionalMetadata = &additionalMetaString

_, err = a.client.Push(a.ctx.newContext(ctx), &request)

if err != nil {
Expand Down Expand Up @@ -159,3 +172,25 @@ func (a *eventClientImpl) PutStreamEvent(ctx context.Context, stepRunId string,

return err
}

func (e *eventClientImpl) getAdditionalMetaBytes(opt *map[string]string) ([]byte, error) {
additionalMeta := make(map[string]string)

for key, value := range e.sharedMeta {
additionalMeta[key] = value
}

if opt != nil {
for key, value := range *opt {
additionalMeta[key] = value
}
}

metadataBytes, err := json.Marshal(additionalMeta)

if err != nil {
return nil, fmt.Errorf("could not marshal additional metadata: %w", err)
}

return metadataBytes, nil
}

0 comments on commit 6110760

Please sign in to comment.