Skip to content

Commit

Permalink
ArmadaContext.Log Improvements (#2965)
Browse files Browse the repository at this point in the history
* log error

* context log

* context log

* add cycle id

* typo

* lint

* refactor armadacontext to implement a FieldLogger

---------

Co-authored-by: Chris Martin <[email protected]>
  • Loading branch information
d80tb7 and d80tb7 authored Sep 13, 2023
1 parent ba1973f commit edc1426
Show file tree
Hide file tree
Showing 18 changed files with 156 additions and 138 deletions.
4 changes: 2 additions & 2 deletions cmd/scheduler/cmd/prune_database.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package cmd

import (
"context"
"time"

"github.com/pkg/errors"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/clock"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/database"
schedulerdb "github.com/armadaproject/armada/internal/scheduler/database"
)
Expand Down Expand Up @@ -57,7 +57,7 @@ func pruneDatabase(cmd *cobra.Command, _ []string) error {
return errors.WithMessagef(err, "Failed to connect to database")
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), timeout)
defer cancel()
return schedulerdb.PruneDb(ctx, db, batchSize, expireAfter, clock.RealClock{})
}
12 changes: 6 additions & 6 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
lastSeen,
)
if err != nil {
logging.WithStacktrace(ctx.Log, err).Warnf(
logging.WithStacktrace(ctx, err).Warnf(
"skipping node %s from executor %s", nodeInfo.GetName(), req.GetClusterId(),
)
continue
Expand Down Expand Up @@ -566,7 +566,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
if q.SchedulingContextRepository != nil {
sctx.ClearJobSpecs()
if err := q.SchedulingContextRepository.AddSchedulingContext(sctx); err != nil {
logging.WithStacktrace(ctx.Log, err).Error("failed to store scheduling context")
logging.WithStacktrace(ctx, err).Error("failed to store scheduling context")
}
}

Expand Down Expand Up @@ -641,7 +641,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
jobIdsToDelete := util.Map(jobsToDelete, func(job *api.Job) string { return job.Id })
log.Infof("deleting preempted jobs: %v", jobIdsToDelete)
if deletionResult, err := q.jobRepository.DeleteJobs(jobsToDelete); err != nil {
logging.WithStacktrace(ctx.Log, err).Error("failed to delete preempted jobs from Redis")
logging.WithStacktrace(ctx, err).Error("failed to delete preempted jobs from Redis")
} else {
deleteErrorByJobId := armadamaps.MapKeys(deletionResult, func(job *api.Job) string { return job.Id })
for jobId := range preemptedApiJobsById {
Expand Down Expand Up @@ -704,7 +704,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
}
}
if err := q.usageRepository.UpdateClusterQueueResourceUsage(req.ClusterId, currentExecutorReport); err != nil {
logging.WithStacktrace(ctx.Log, err).Errorf("failed to update cluster usage")
logging.WithStacktrace(ctx, err).Errorf("failed to update cluster usage")
}

allocatedByQueueAndPriorityClassForPool = q.aggregateAllocationAcrossExecutor(reportsByExecutor, req.Pool)
Expand All @@ -728,7 +728,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
}
node, err := nodeDb.GetNode(nodeId)
if err != nil {
logging.WithStacktrace(ctx.Log, err).Warnf("failed to set node id selector on job %s: node with id %s not found", apiJob.Id, nodeId)
logging.WithStacktrace(ctx, err).Warnf("failed to set node id selector on job %s: node with id %s not found", apiJob.Id, nodeId)
continue
}
v := node.Labels[q.schedulingConfig.Preemption.NodeIdLabel]
Expand Down Expand Up @@ -764,7 +764,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
}
node, err := nodeDb.GetNode(nodeId)
if err != nil {
logging.WithStacktrace(ctx.Log, err).Warnf("failed to set node name on job %s: node with id %s not found", apiJob.Id, nodeId)
logging.WithStacktrace(ctx, err).Warnf("failed to set node name on job %s: node with id %s not found", apiJob.Id, nodeId)
continue
}
podSpec.NodeName = node.Name
Expand Down
8 changes: 4 additions & 4 deletions internal/armada/server/submit_from_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ func (srv *SubmitFromLog) Run(ctx *armadacontext.Context) error {
sequence, err := eventutil.UnmarshalEventSequence(ctxWithLogger, msg.Payload())
if err != nil {
srv.ack(ctx, msg)
logging.WithStacktrace(ctxWithLogger.Log, err).Warnf("processing message failed; ignoring")
logging.WithStacktrace(ctxWithLogger, err).Warnf("processing message failed; ignoring")
numErrored++
break
}

ctxWithLogger.Log.WithField("numEvents", len(sequence.Events)).Info("processing sequence")
ctxWithLogger.WithField("numEvents", len(sequence.Events)).Info("processing sequence")
// TODO: Improve retry logic.
srv.ProcessSequence(ctxWithLogger, sequence)
srv.ack(ctx, msg)
Expand All @@ -155,11 +155,11 @@ func (srv *SubmitFromLog) ProcessSequence(ctx *armadacontext.Context, sequence *
for i < len(sequence.Events) && time.Since(lastProgress) < timeout {
j, err := srv.ProcessSubSequence(ctx, i, sequence)
if err != nil {
logging.WithStacktrace(ctx.Log, err).WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Warnf("processing subsequence failed; ignoring")
logging.WithStacktrace(ctx, err).WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Warnf("processing subsequence failed; ignoring")
}

if j == i {
ctx.Log.WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Info("made no progress")
ctx.WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Info("made no progress")

// We should only get here if a transient error occurs.
// Sleep for a bit before retrying.
Expand Down
38 changes: 19 additions & 19 deletions internal/common/armadacontext/armada_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ import (
// while retaining type-safety
type Context struct {
context.Context
Log *logrus.Entry
logrus.FieldLogger
}

// Background creates an empty context with a default logger. It is analogous to context.Background()
func Background() *Context {
return &Context{
Context: context.Background(),
Log: logrus.NewEntry(logrus.New()),
Context: context.Background(),
FieldLogger: logrus.NewEntry(logrus.New()),
}
}

// TODO creates an empty context with a default logger. It is analogous to context.TODO()
func TODO() *Context {
return &Context{
Context: context.TODO(),
Log: logrus.NewEntry(logrus.New()),
Context: context.TODO(),
FieldLogger: logrus.NewEntry(logrus.New()),
}
}

Expand All @@ -42,17 +42,17 @@ func FromGrpcCtx(ctx context.Context) *Context {
// New returns an armada context that encapsulates both a go context and a logger
func New(ctx context.Context, log *logrus.Entry) *Context {
return &Context{
Context: ctx,
Log: log,
Context: ctx,
FieldLogger: log,
}
}

// WithCancel returns a copy of parent with a new Done channel. It is analogous to context.WithCancel()
func WithCancel(parent *Context) (*Context, context.CancelFunc) {
c, cancel := context.WithCancel(parent.Context)
return &Context{
Context: c,
Log: parent.Log,
Context: c,
FieldLogger: parent.FieldLogger,
}, cancel
}

Expand All @@ -61,8 +61,8 @@ func WithCancel(parent *Context) (*Context, context.CancelFunc) {
func WithDeadline(parent *Context, d time.Time) (*Context, context.CancelFunc) {
c, cancel := context.WithDeadline(parent.Context, d)
return &Context{
Context: c,
Log: parent.Log,
Context: c,
FieldLogger: parent.FieldLogger,
}, cancel
}

Expand All @@ -74,25 +74,25 @@ func WithTimeout(parent *Context, timeout time.Duration) (*Context, context.Canc
// WithLogField returns a copy of parent with the supplied key-value added to the logger
func WithLogField(parent *Context, key string, val interface{}) *Context {
return &Context{
Context: parent.Context,
Log: parent.Log.WithField(key, val),
Context: parent.Context,
FieldLogger: parent.FieldLogger.WithField(key, val),
}
}

// WithLogFields returns a copy of parent with the supplied key-values added to the logger
func WithLogFields(parent *Context, fields logrus.Fields) *Context {
return &Context{
Context: parent.Context,
Log: parent.Log.WithFields(fields),
Context: parent.Context,
FieldLogger: parent.FieldLogger.WithFields(fields),
}
}

// WithValue returns a copy of parent in which the value associated with key is
// val. It is analogous to context.WithValue()
func WithValue(parent *Context, key, val any) *Context {
return &Context{
Context: context.WithValue(parent, key, val),
Log: parent.Log,
Context: context.WithValue(parent, key, val),
FieldLogger: parent.FieldLogger,
}
}

Expand All @@ -101,7 +101,7 @@ func WithValue(parent *Context, key, val any) *Context {
func ErrGroup(ctx *Context) (*errgroup.Group, *Context) {
group, goctx := errgroup.WithContext(ctx)
return group, &Context{
Context: goctx,
Log: ctx.Log,
Context: goctx,
FieldLogger: ctx.FieldLogger,
}
}
8 changes: 4 additions & 4 deletions internal/common/armadacontext/armada_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ var defaultLogger = logrus.WithField("foo", "bar")

func TestNew(t *testing.T) {
ctx := New(context.Background(), defaultLogger)
require.Equal(t, defaultLogger, ctx.Log)
require.Equal(t, defaultLogger, ctx.FieldLogger)
require.Equal(t, context.Background(), ctx.Context)
}

func TestFromGrpcContext(t *testing.T) {
grpcCtx := ctxlogrus.ToContext(context.Background(), defaultLogger)
ctx := FromGrpcCtx(grpcCtx)
require.Equal(t, grpcCtx, ctx.Context)
require.Equal(t, defaultLogger, ctx.Log)
require.Equal(t, defaultLogger, ctx.FieldLogger)
}

func TestBackground(t *testing.T) {
Expand All @@ -39,13 +39,13 @@ func TestTODO(t *testing.T) {
func TestWithLogField(t *testing.T) {
ctx := WithLogField(Background(), "fish", "chips")
require.Equal(t, context.Background(), ctx.Context)
require.Equal(t, logrus.Fields{"fish": "chips"}, ctx.Log.Data)
require.Equal(t, logrus.Fields{"fish": "chips"}, ctx.FieldLogger.(*logrus.Entry).Data)
}

func TestWithLogFields(t *testing.T) {
ctx := WithLogFields(Background(), logrus.Fields{"fish": "chips", "salt": "pepper"})
require.Equal(t, context.Background(), ctx.Context)
require.Equal(t, logrus.Fields{"fish": "chips", "salt": "pepper"}, ctx.Log.Data)
require.Equal(t, logrus.Fields{"fish": "chips", "salt": "pepper"}, ctx.FieldLogger.(*logrus.Entry).Data)
}

func TestWithTimeout(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions internal/common/logging/stacktrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ type stackTracer interface {
StackTrace() errors.StackTrace
}

// WithStacktrace returns a new logrus.Entry obtained by adding error information and, if available, a stack trace
// as fields to the provided logrus.Entry.
func WithStacktrace(logger *logrus.Entry, err error) *logrus.Entry {
// WithStacktrace returns a new logrus.FieldLogger obtained by adding error information and, if available, a stack trace
// as fields to the provided logrus.FieldLogger.
func WithStacktrace(logger logrus.FieldLogger, err error) logrus.FieldLogger {
logger = logger.WithError(err)
if stackErr, ok := err.(stackTracer); ok {
return logger.WithField("stacktrace", stackErr.StackTrace())
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
if err != nil {
return err
}
ctx.Log.Infof(
ctx.Infof(
"executor currently has %d job runs; sending %d cancellations and %d new runs",
len(requestRuns), len(runsToCancel), len(newRuns),
)
Expand Down Expand Up @@ -226,7 +226,7 @@ func (srv *ExecutorApi) executorFromLeaseRequest(ctx *armadacontext.Context, req
now := srv.clock.Now().UTC()
for _, nodeInfo := range req.Nodes {
if node, err := api.NewNodeFromNodeInfo(nodeInfo, req.ExecutorId, srv.allowedPriorities, now); err != nil {
logging.WithStacktrace(ctx.Log, err).Warnf(
logging.WithStacktrace(ctx, err).Warnf(
"skipping node %s from executor %s", nodeInfo.GetName(), req.GetExecutorId(),
)
} else {
Expand Down
15 changes: 8 additions & 7 deletions internal/scheduler/database/db_pruner.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package database

import (
ctx "context"
"time"

"github.com/jackc/pgx/v5"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/clock"

"github.com/armadaproject/armada/internal/common/armadacontext"
)

// PruneDb removes completed jobs (and related runs and errors) from the database if their `lastUpdateTime`
// is more than `keepAfterCompletion` in the past.
// Jobs are deleted in batches across transactions. This means that if this job fails midway through, it still
// may have deleted some jobs.
// The function will run until the supplied context is cancelled.
func PruneDb(ctx ctx.Context, db *pgx.Conn, batchLimit int, keepAfterCompletion time.Duration, clock clock.Clock) error {
func PruneDb(ctx *armadacontext.Context, db *pgx.Conn, batchLimit int, keepAfterCompletion time.Duration, clock clock.Clock) error {
start := time.Now()
cutOffTime := clock.Now().Add(-keepAfterCompletion)

Expand All @@ -40,11 +40,11 @@ func PruneDb(ctx ctx.Context, db *pgx.Conn, batchLimit int, keepAfterCompletion
return errors.WithStack(err)
}
if totalJobsToDelete == 0 {
log.Infof("Found no jobs to be deleted. Exiting")
ctx.Infof("Found no jobs to be deleted. Exiting")
return nil
}

log.Infof("Found %d jobs to be deleted", totalJobsToDelete)
ctx.Infof("Found %d jobs to be deleted", totalJobsToDelete)

// create temp table to hold a batch of results
_, err = db.Exec(ctx, "CREATE TEMP TABLE batch (job_id TEXT);")
Expand Down Expand Up @@ -93,9 +93,10 @@ func PruneDb(ctx ctx.Context, db *pgx.Conn, batchLimit int, keepAfterCompletion

taken := time.Now().Sub(batchStart)
jobsDeleted += batchSize
log.Infof("Deleted %d jobs in %s. Deleted %d jobs out of %d", batchSize, taken, jobsDeleted, totalJobsToDelete)
ctx.
Infof("Deleted %d jobs in %s. Deleted %d jobs out of %d", batchSize, taken, jobsDeleted, totalJobsToDelete)
}
taken := time.Now().Sub(start)
log.Infof("Deleted %d jobs in %s", jobsDeleted, taken)
ctx.Infof("Deleted %d jobs in %s", jobsDeleted, taken)
return nil
}
3 changes: 1 addition & 2 deletions internal/scheduler/database/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/database"
Expand All @@ -25,7 +24,7 @@ func Migrate(ctx *armadacontext.Context, db database.Querier) error {
if err != nil {
return err
}
log.Infof("Updated scheduler database in %s", time.Now().Sub(start))
ctx.Infof("Updated scheduler database in %s", time.Now().Sub(start))
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions internal/scheduler/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (lc *KubernetesLeaderController) Run(ctx *armadacontext.Context) error {
return ctx.Err()
default:
lock := lc.getNewLock()
ctx.Log.Infof("attempting to become leader")
ctx.Infof("attempting to become leader")
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
Expand All @@ -154,14 +154,14 @@ func (lc *KubernetesLeaderController) Run(ctx *armadacontext.Context) error {
RetryPeriod: lc.config.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(c context.Context) {
ctx.Log.Infof("I am now leader")
ctx.Infof("I am now leader")
lc.token.Store(NewLeaderToken())
for _, listener := range lc.listeners {
listener.onStartedLeading(ctx)
}
},
OnStoppedLeading: func() {
ctx.Log.Infof("I am no longer leader")
ctx.Infof("I am no longer leader")
lc.token.Store(InvalidLeaderToken())
for _, listener := range lc.listeners {
listener.onStoppedLeading()
Expand All @@ -174,7 +174,7 @@ func (lc *KubernetesLeaderController) Run(ctx *armadacontext.Context) error {
},
},
})
ctx.Log.Infof("leader election round finished")
ctx.Infof("leader election round finished")
}
}
}
Expand Down
Loading

0 comments on commit edc1426

Please sign in to comment.