Skip to content

Commit

Permalink
introducing DefaultPartitions()
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Dec 8, 2023
1 parent 329c5c7 commit 80d6321
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
4 changes: 3 additions & 1 deletion pkg/sourcer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ type Sourcer interface {
// When the return value is negative, it indicates the pending information is not available.
// With pending information being not available, the Numaflow platform doesn't auto-scale the source.
Pending(ctx context.Context) int64
//Partitions returns the partitions associated with the source.
//Partitions returns the partitions associated with the source, will be used by the platform to determine
// the partitions to which the watermark should be published. If the source doesn't have partitions,
// DefaultPartitions() can be used to return the default partitions.
Partitions(ctx context.Context) []int32
}

Expand Down
22 changes: 21 additions & 1 deletion pkg/sourcer/message.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package sourcer

import "time"
import (
"os"
"strconv"
"time"
)

// create default partition id from the environment variable "NUMAFLOW_REPLICA"
var defaultPartitionId, _ = strconv.Atoi(os.Getenv("NUMAFLOW_REPLICA"))

// Message is used to wrap the data return by UDSource
type Message struct {
Expand Down Expand Up @@ -51,6 +58,19 @@ func NewOffset(value []byte, partitionId int32) Offset {
return Offset{value: value, partitionId: partitionId}
}

// NewOffsetWithDefaultPartitionId creates an Offset with value and default partition id
func NewOffsetWithDefaultPartitionId(value []byte) Offset {
return Offset{value: value, partitionId: DefaultPartitions()[0]}
}

// DefaultPartitions returns default partitions for the source
// it can be used in the Partitions() function of the Sourcer implementation
// if the source doesn't have partitions, default partition will be pod replica
// index of the source.
func DefaultPartitions() []int32 {
return []int32{int32(defaultPartitionId)}
}

// Value returns value of the offset
func (o Offset) Value() []byte {
return o.value
Expand Down

0 comments on commit 80d6321

Please sign in to comment.