Skip to content

Commit

Permalink
chore: unify the event time of messages to drop (#86)
Browse files Browse the repository at this point in the history
Set the event time of messages to drop to epoch(0) - 1ms across all our SDKs.

Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Oct 13, 2023
1 parent f0a6686 commit ab523a9
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
8 changes: 6 additions & 2 deletions pkg/sourcetransformer/messaget.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (

var (
DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
// Watermark are at millisecond granularity, hence we use epoch(0) - 1 to indicate watermark is not available.
// eventTimeForDrop is used to indicate that the message is dropped hence, excluded from watermark calculation
eventTimeForDrop = time.Unix(0, -int64(time.Millisecond))
)

// Message is used to wrap the data return by SourceTransformer functions.
// Compared with Message, Message contains one more field, the event time, usually extracted from the payload.
// Compared with Message of other UDFs, source transformer Message contains one more field,
// the event time, usually extracted from the payload.
type Message struct {
value []byte
eventTime time.Time
Expand Down Expand Up @@ -58,7 +62,7 @@ func (m Message) Tags() []string {

// MessageToDrop creates a Message to be dropped
func MessageToDrop() Message {
return Message{eventTime: time.Time{}, value: []byte{}, tags: []string{DROP}}
return Message{eventTime: eventTimeForDrop, value: []byte{}, tags: []string{DROP}}
}

type Messages []Message
Expand Down
2 changes: 1 addition & 1 deletion pkg/sourcetransformer/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestService_sourceTransformFn(t *testing.T) {
want: &stpb.SourceTransformResponse{
Results: []*stpb.SourceTransformResponse_Result{
{
EventTime: timestamppb.New(time.Time{}),
EventTime: timestamppb.New(eventTimeForDrop),
Tags: []string{DROP},
Value: []byte{},
},
Expand Down

0 comments on commit ab523a9

Please sign in to comment.