From 01f77b3237772153bc39326e06c2781e18801b83 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Fri, 21 Jun 2024 21:03:55 +0100 Subject: [PATCH 1/6] core: Improve retry policy for uploads Have 2 tiers of retries: - one shorter cycle, when trying to upload to primary or backup, where we retry a couple times up to 1m - a longer cycle wrapping those 2, to sustain potentially long running crisis. We wait longer between each retry and keep trying for up to 1h The first loop solves for transient errors when saving to primary or backup storage, while the second loop solves for longer running incidents, for which it's probably better to keep trying for a long time instead of dropping the process and lose the recording saving. --- core/uploader.go | 54 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/core/uploader.go b/core/uploader.go index 8028077..3d77bca 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -29,17 +29,25 @@ func (bc *ByteCounter) Write(p []byte) (n int, err error) { return n, nil } -func newExponentialBackOffExecutor() *backoff.ExponentialBackOff { +func newExponentialBackOffExecutor(initial, max, totalMax time.Duration) *backoff.ExponentialBackOff { backOff := backoff.NewExponentialBackOff() - backOff.InitialInterval = 30 * time.Second - backOff.MaxInterval = 2 * time.Minute - backOff.MaxElapsedTime = 0 // don't impose a timeout as part of the retries + backOff.InitialInterval = initial + backOff.MaxInterval = max + backOff.MaxElapsedTime = totalMax backOff.Reset() return backOff } +func NoRetries() backoff.BackOff { + return &backoff.StopBackOff{} +} + func UploadRetryBackoff() backoff.BackOff { - return backoff.WithMaxRetries(newExponentialBackOffExecutor(), 2) + return newExponentialBackOffExecutor(2*time.Minute, 5*time.Minute, 1*time.Hour) +} + +func SingleRequestRetryBackoff() backoff.BackOff { + return newExponentialBackOffExecutor(5*time.Second, 10*time.Second, 30*time.Second) } const segmentWriteTimeout = 5 * time.Minute @@ -116,19 +124,31 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout } func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageFallbackURLs map[string]string) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { - out, bytesWritten, primaryErr := uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries) - if primaryErr == nil { - return out, bytesWritten, nil + retryPolicy := NoRetries() + if withRetries { + retryPolicy = UploadRetryBackoff() } + err = backoff.Retry(func() error { + var primaryErr error + out, bytesWritten, primaryErr = uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries) + if primaryErr == nil { + return nil + } - backupURI, err := buildBackupURI(outputURI, storageFallbackURLs) - if err != nil { - glog.Errorf("failed to build backup URL: %v", err) - return nil, 0, primaryErr - } + backupURI, err := buildBackupURI(outputURI, storageFallbackURLs) + if err != nil { + glog.Errorf("failed to build backup URL: %v", err) + return err + } + glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr) - glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr) - return uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries) + out, bytesWritten, err = uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries) + if err == nil { + return nil + } + return fmt.Errorf("upload file errors: primary: %w; backup: %w", primaryErr, err) + }, retryPolicy) + return out, bytesWritten, err } func buildBackupURI(outputURI *url.URL, storageFallbackURLs map[string]string) (*url.URL, error) { @@ -161,9 +181,9 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro } session := driver.NewSession("") - var retryPolicy backoff.BackOff = &backoff.StopBackOff{} // no retries by default + retryPolicy := NoRetries() if withRetries { - retryPolicy = UploadRetryBackoff() + retryPolicy = SingleRequestRetryBackoff() } err = backoff.Retry(func() error { // To count how many bytes we are trying to read then write (upload) to s3 storage From bab205ff8c9eb4e0019bb4f52256e519735fb72e Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Fri, 21 Jun 2024 23:24:01 +0100 Subject: [PATCH 2/6] core: Avoid keeping files in memory --- core/uploader.go | 78 ++++++++++++++++++++++++++----------------- core/uploader_test.go | 5 ++- 2 files changed, 51 insertions(+), 32 deletions(-) diff --git a/core/uploader.go b/core/uploader.go index 3d77bca..16f53a6 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -57,20 +57,31 @@ var expiryField = map[string]string{ } func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string) (*drivers.SaveDataOutput, error) { - if strings.HasSuffix(outputURI.Path, ".ts") || strings.HasSuffix(outputURI.Path, ".mp4") { + ext := filepath.Ext(outputURI.Path) + inputFile, err := os.CreateTemp("", "upload-*"+ext) + if err != nil { + return nil, fmt.Errorf("failed to write to temp file: %w", err) + } + inputFileName := inputFile.Name() + defer os.Remove(inputFileName) + + if ext == ".ts" || ext == ".mp4" { // For segments we just write them in one go here and return early. // (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.) - fileContents, err := io.ReadAll(input) + _, err = io.Copy(inputFile, input) if err != nil { - return nil, fmt.Errorf("failed to read file") + return nil, fmt.Errorf("failed to write to temp file: %w", err) + } + if err := inputFile.Close(); err != nil { + return nil, fmt.Errorf("failed to close input file: %w", err) } - out, bytesWritten, err := uploadFileWithBackup(outputURI, fileContents, nil, segmentWriteTimeout, true, storageFallbackURLs) + out, bytesWritten, err := uploadFileWithBackup(outputURI, inputFileName, nil, segmentWriteTimeout, true, storageFallbackURLs) if err != nil { return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err) } - if err = extractThumb(outputURI, fileContents, storageFallbackURLs); err != nil { + if err = extractThumb(outputURI, inputFileName, storageFallbackURLs); err != nil { glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err) } return out, nil @@ -78,8 +89,11 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout // For the manifest files we want a very short cache ttl as the files are updating every few seconds fields := &drivers.FileProperties{CacheControl: "max-age=1"} - var fileContents []byte var lastWrite = time.Now() + // Keep the file handle closed while we wait for input data + if err := inputFile.Close(); err != nil { + return nil, fmt.Errorf("failed to close input file: %w", err) + } scanner := bufio.NewScanner(input) @@ -97,11 +111,21 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout for scanner.Scan() { b := scanner.Bytes() - fileContents = append(fileContents, b...) + + inputFile, err = os.OpenFile(inputFileName, os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + if _, err := inputFile.Write(b); err != nil { + return nil, fmt.Errorf("failed to append to input file: %w", err) + } + if err := inputFile.Close(); err != nil { + return nil, fmt.Errorf("failed to close input file: %w", err) + } // Only write the latest version of the data that's been piped in if enough time has elapsed since the last write if lastWrite.Add(waitBetweenWrites).Before(time.Now()) { - if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, false, storageFallbackURLs); err != nil { + if _, _, err := uploadFileWithBackup(outputURI, inputFileName, fields, writeTimeout, false, storageFallbackURLs); err != nil { // Just log this error, since it'll effectively be retried after the next interval glog.Errorf("Failed to write: %v", err) } else { @@ -115,7 +139,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout } // We have to do this final write, otherwise there might be final data that's arrived since the last periodic write - if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, true, storageFallbackURLs); err != nil { + if _, _, err := uploadFileWithBackup(outputURI, inputFileName, fields, writeTimeout, false, storageFallbackURLs); err != nil { // Don't ignore this error, since there won't be any further attempts to write return nil, fmt.Errorf("failed to write final save: %w", err) } @@ -123,14 +147,14 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout return nil, nil } -func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageFallbackURLs map[string]string) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { +func uploadFileWithBackup(outputURI *url.URL, fileName string, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageFallbackURLs map[string]string) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { retryPolicy := NoRetries() if withRetries { retryPolicy = UploadRetryBackoff() } err = backoff.Retry(func() error { var primaryErr error - out, bytesWritten, primaryErr = uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries) + out, bytesWritten, primaryErr = uploadFile(outputURI, fileName, fields, writeTimeout, withRetries) if primaryErr == nil { return nil } @@ -142,7 +166,7 @@ func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drive } glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr) - out, bytesWritten, err = uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries) + out, bytesWritten, err = uploadFile(backupURI, fileName, fields, writeTimeout, withRetries) if err == nil { return nil } @@ -162,7 +186,7 @@ func buildBackupURI(outputURI *url.URL, storageFallbackURLs map[string]string) ( return nil, fmt.Errorf("no backup URL found for %s", outputURI.Redacted()) } -func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { +func uploadFile(outputURI *url.URL, fileName string, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { outputStr := outputURI.String() // While we wait for storj to implement an easier method for global object deletion we are hacking something // here to allow us to have recording objects deleted after 7 days. @@ -186,9 +210,15 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro retryPolicy = SingleRequestRetryBackoff() } err = backoff.Retry(func() error { + file, err := os.Open(fileName) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + // To count how many bytes we are trying to read then write (upload) to s3 storage byteCounter := &ByteCounter{} - teeReader := io.TeeReader(bytes.NewReader(fileContents), byteCounter) + teeReader := io.TeeReader(file, byteCounter) out, err = session.SaveData(context.Background(), "", teeReader, fields, writeTimeout) bytesWritten = byteCounter.Count @@ -202,20 +232,16 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro return out, bytesWritten, err } -func extractThumb(outputURI *url.URL, segment []byte, storageFallbackURLs map[string]string) error { +func extractThumb(outputURI *url.URL, segmentFileName string, storageFallbackURLs map[string]string) error { tmpDir, err := os.MkdirTemp(os.TempDir(), "thumb-*") if err != nil { return fmt.Errorf("temp file creation failed: %w", err) } defer os.RemoveAll(tmpDir) outFile := filepath.Join(tmpDir, "out.jpg") - inFile := filepath.Join(tmpDir, filepath.Base(outputURI.Path)) - if err = os.WriteFile(inFile, segment, 0644); err != nil { - return fmt.Errorf("failed to write input file: %w", err) - } args := []string{ - "-i", inFile, + "-i", segmentFileName, "-ss", "00:00:00", "-vframes", "1", "-vf", "scale=854:480:force_original_aspect_ratio=decrease", @@ -237,16 +263,6 @@ func extractThumb(outputURI *url.URL, segment []byte, storageFallbackURLs map[st return fmt.Errorf("ffmpeg failed[%s] [%s]: %w", outputBuf.String(), stdErr.String(), err) } - f, err := os.Open(outFile) - if err != nil { - return fmt.Errorf("opening file failed: %w", err) - } - defer f.Close() - thumbData, err := io.ReadAll(f) - if err != nil { - return fmt.Errorf("failed to read file: %w", err) - } - // two thumbs, one at session level, the other at stream level thumbURLs := []*url.URL{outputURI.JoinPath("../latest.jpg"), outputURI.JoinPath("../../../latest.jpg")} fields := &drivers.FileProperties{CacheControl: "max-age=5"} @@ -255,7 +271,7 @@ func extractThumb(outputURI *url.URL, segment []byte, storageFallbackURLs map[st for _, thumbURL := range thumbURLs { thumbURL := thumbURL errGroup.Go(func() error { - _, _, err = uploadFileWithBackup(thumbURL, thumbData, fields, 10*time.Second, true, storageFallbackURLs) + _, _, err = uploadFileWithBackup(thumbURL, outFile, fields, 10*time.Second, true, storageFallbackURLs) if err != nil { return fmt.Errorf("saving thumbnail failed: %w", err) } diff --git a/core/uploader_test.go b/core/uploader_test.go index 649528b..86f3a76 100644 --- a/core/uploader_test.go +++ b/core/uploader_test.go @@ -67,6 +67,9 @@ func TestUploadFileWithBackup(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) + testFile := filepath.Join(dir, "input.txt") + require.NoError(t, os.WriteFile(testFile, []byte("test"), 0644)) + fakeStorage := "s3+https://fake.service.livepeer.com/bucket/" backupStorage := filepath.Join(dir, "backup") + "/" fakeOutput := fakeStorage + "hls/123/file.txt" @@ -75,7 +78,7 @@ func TestUploadFileWithBackup(t *testing.T) { storageFallbackURLs := map[string]string{ fakeStorage: "file://" + backupStorage, } - out, written, err := uploadFileWithBackup(mustParseURL(fakeOutput), []byte("test"), nil, 0, false, storageFallbackURLs) + out, written, err := uploadFileWithBackup(mustParseURL(fakeOutput), testFile, nil, 0, false, storageFallbackURLs) require.NoError(t, err) require.Equal(t, expectedOutFile, out.URL) require.Equal(t, int64(4), written) From 8e1577c718a701e3c3c35071d9b72c96814f7ebc Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Tue, 25 Jun 2024 18:35:41 +0100 Subject: [PATCH 3/6] uploader: Fix error returned when backup URL can't be built --- core/uploader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/uploader.go b/core/uploader.go index 16f53a6..77db5e1 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -162,7 +162,7 @@ func uploadFileWithBackup(outputURI *url.URL, fileName string, fields *drivers.F backupURI, err := buildBackupURI(outputURI, storageFallbackURLs) if err != nil { glog.Errorf("failed to build backup URL: %v", err) - return err + return primaryErr } glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr) From 38ab54229c53a4c173a6a8bf348199352fee4d70 Mon Sep 17 00:00:00 2001 From: Max Holland Date: Mon, 12 Aug 2024 10:42:10 +0100 Subject: [PATCH 4/6] tweak retries and add time taken log --- catalyst-uploader.go | 3 ++- core/uploader.go | 5 ++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/catalyst-uploader.go b/catalyst-uploader.go index ca95efd..cf843ac 100644 --- a/catalyst-uploader.go +++ b/catalyst-uploader.go @@ -99,6 +99,7 @@ func run() int { return 1 } + start := time.Now() out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs) if err != nil { glog.Errorf("Uploader failed for %s: %s", uri.Redacted(), err) @@ -109,7 +110,7 @@ func run() int { if out != nil { respHeaders = out.UploaderResponseHeaders } - glog.Infof("Uploader succeeded for %s. storageRequestID=%s Etag=%s", uri.Redacted(), respHeaders.Get("X-Amz-Request-Id"), respHeaders.Get("Etag")) + glog.Infof("Uploader succeeded for %s. storageRequestID=%s Etag=%s timeTaken=%vms", uri.Redacted(), respHeaders.Get("X-Amz-Request-Id"), respHeaders.Get("Etag"), time.Since(start).Milliseconds()) // success, write uploaded file details to stdout if glog.V(5) { err = json.NewEncoder(stdout).Encode(map[string]string{"uri": uri.Redacted()}) diff --git a/core/uploader.go b/core/uploader.go index c31e7d1..49ba0ab 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -13,11 +13,10 @@ import ( "strings" "time" - "golang.org/x/sync/errgroup" - "github.com/cenkalti/backoff/v4" "github.com/golang/glog" "github.com/livepeer/go-tools/drivers" + "golang.org/x/sync/errgroup" ) type ByteCounter struct { @@ -43,7 +42,7 @@ func NoRetries() backoff.BackOff { } func UploadRetryBackoff() backoff.BackOff { - return newExponentialBackOffExecutor(2*time.Minute, 5*time.Minute, 1*time.Hour) + return newExponentialBackOffExecutor(30*time.Second, 4*time.Minute, 15*time.Minute) } func SingleRequestRetryBackoff() backoff.BackOff { From 02957cad4182e5dd5c8d4288dc14aa8cd4bc9bce Mon Sep 17 00:00:00 2001 From: Max Holland Date: Thu, 22 Aug 2024 12:49:51 +0100 Subject: [PATCH 5/6] Add segment write timeout config param --- catalyst-uploader.go | 3 ++- core/uploader.go | 6 ++---- core/uploader_test.go | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/catalyst-uploader.go b/catalyst-uploader.go index cf843ac..9eb9842 100644 --- a/catalyst-uploader.go +++ b/catalyst-uploader.go @@ -39,6 +39,7 @@ func run() int { verbosity := fs.String("v", "", "Log verbosity. {4|5|6}") timeout := fs.Duration("t", 30*time.Second, "Upload timeout") storageFallbackURLs := CommaMapFlag(fs, "storage-fallback-urls", `Comma-separated map of primary to backup storage URLs. If a file fails uploading to one of the primary storages (detected by prefix), it will fallback to the corresponding backup URL after having the prefix replaced`) + segTimeout := fs.Duration("segment-timeout", 5*time.Minute, "Segment write timeout") defaultConfigFile := "/etc/livepeer/catalyst_uploader.conf" if _, err := os.Stat(defaultConfigFile); os.IsNotExist(err) { @@ -100,7 +101,7 @@ func run() int { } start := time.Now() - out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs) + out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs, *segTimeout) if err != nil { glog.Errorf("Uploader failed for %s: %s", uri.Redacted(), err) return 1 diff --git a/core/uploader.go b/core/uploader.go index 49ba0ab..d0e9746 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -49,13 +49,11 @@ func SingleRequestRetryBackoff() backoff.BackOff { return newExponentialBackOffExecutor(5*time.Second, 10*time.Second, 30*time.Second) } -const segmentWriteTimeout = 5 * time.Minute - var expiryField = map[string]string{ "Object-Expires": "+168h", // Objects will be deleted after 7 days } -func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string) (*drivers.SaveDataOutput, error) { +func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string, segTimeout time.Duration) (*drivers.SaveDataOutput, error) { ext := filepath.Ext(outputURI.Path) inputFile, err := os.CreateTemp("", "upload-*"+ext) if err != nil { @@ -75,7 +73,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout return nil, fmt.Errorf("failed to close input file: %w", err) } - out, bytesWritten, err := uploadFileWithBackup(outputURI, inputFileName, nil, segmentWriteTimeout, true, storageFallbackURLs) + out, bytesWritten, err := uploadFileWithBackup(outputURI, inputFileName, nil, segTimeout, true, storageFallbackURLs) if err != nil { return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err) } diff --git a/core/uploader_test.go b/core/uploader_test.go index 86f3a76..b775ba9 100644 --- a/core/uploader_test.go +++ b/core/uploader_test.go @@ -36,7 +36,7 @@ func TestItWritesSlowInputIncrementally(t *testing.T) { go func() { u, err := url.Parse(outputFile.Name()) require.NoError(t, err) - _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil) + _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil, nil) require.NoError(t, err, "") }() From 5d8ffd09672289b87c638cf50da8d5f029980e05 Mon Sep 17 00:00:00 2001 From: Max Holland Date: Thu, 22 Aug 2024 12:52:45 +0100 Subject: [PATCH 6/6] fix test --- core/uploader_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/uploader_test.go b/core/uploader_test.go index b775ba9..9717b57 100644 --- a/core/uploader_test.go +++ b/core/uploader_test.go @@ -36,7 +36,7 @@ func TestItWritesSlowInputIncrementally(t *testing.T) { go func() { u, err := url.Parse(outputFile.Name()) require.NoError(t, err) - _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil, nil) + _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil, time.Minute) require.NoError(t, err, "") }()