From b4b05e764191a3bf6f5f2a5eb46eb0af2e58b65a Mon Sep 17 00:00:00 2001 From: Max Holland Date: Mon, 21 Oct 2024 12:13:20 +0100 Subject: [PATCH 1/2] Add support for saving thumbnails to a private location --- catalyst-uploader.go | 5 ++++- core/uploader.go | 23 ++++++++++++++++++++--- core/uploader_test.go | 2 +- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/catalyst-uploader.go b/catalyst-uploader.go index 6eff256..b404452 100644 --- a/catalyst-uploader.go +++ b/catalyst-uploader.go @@ -42,6 +42,8 @@ func run() int { segTimeout := fs.Duration("segment-timeout", 5*time.Minute, "Segment write timeout") disableRecording := CommaSliceFlag(fs, "disable-recording", `Comma-separated list of playbackIDs to disable recording for`) disableThumbs := CommaSliceFlag(fs, "disable-thumbs", `Comma-separated list of playbackIDs to disable thumbs for`) + privateThumbs := CommaSliceFlag(fs, "private-thumbs", `Comma-separated list of playbackIDs to save to private location`) + privateThumbsURLReplacement := CommaMapFlag(fs, "private-thumbs-replace", `Map for replacement URL to use when saving thumbnails to the private location`) defaultConfigFile := "/etc/livepeer/catalyst_uploader.conf" if _, err := os.Stat(defaultConfigFile); os.IsNotExist(err) { @@ -50,6 +52,7 @@ func run() int { _ = fs.String("config", defaultConfigFile, "config file (optional)") err = ff.Parse(fs, os.Args[1:], + ff.WithIgnoreUndefined(true), ff.WithConfigFileFlag("config"), ff.WithConfigFileParser(ff.PlainParser), ff.WithEnvVarPrefix("CATALYST_UPLOADER"), @@ -110,7 +113,7 @@ func run() int { } start := time.Now() - out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs, *segTimeout, *disableThumbs) + out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs, *segTimeout, *disableThumbs, *privateThumbs, *privateThumbsURLReplacement) if err != nil { glog.Errorf("Uploader failed for %s: %s", uri.Redacted(), err) return 1 diff --git a/core/uploader.go b/core/uploader.go index 1f4b918..af3f2a0 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -53,7 +53,7 @@ var expiryField = map[string]string{ "Object-Expires": "+168h", // Objects will be deleted after 7 days } -func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string, segTimeout time.Duration, disableThumbs []string) (*drivers.SaveDataOutput, error) { +func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string, segTimeout time.Duration, disableThumbs, privateThumbs []string, privateThumbsURLReplacement map[string]string) (*drivers.SaveDataOutput, error) { ext := filepath.Ext(outputURI.Path) inputFile, err := os.CreateTemp("", "upload-*"+ext) if err != nil { @@ -78,7 +78,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err) } - if err = extractThumb(outputURI, inputFileName, storageFallbackURLs, disableThumbs); err != nil { + if err = extractThumb(outputURI, inputFileName, storageFallbackURLs, disableThumbs, privateThumbs, privateThumbsURLReplacement); err != nil { glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err) } return out, nil @@ -229,13 +229,30 @@ func uploadFile(outputURI *url.URL, fileName string, fields *drivers.FilePropert return out, bytesWritten, err } -func extractThumb(outputURI *url.URL, segmentFileName string, storageFallbackURLs map[string]string, disableThumbs []string) error { +func extractThumb(outputURI *url.URL, segmentFileName string, storageFallbackURLs map[string]string, disableThumbs []string, privateThumbs []string, privateThumbsURLReplacement map[string]string) error { for _, playbackID := range disableThumbs { if strings.Contains(outputURI.Path, playbackID) { glog.Infof("Thumbnails disabled for %s", outputURI.Redacted()) return nil } } + for _, playbackID := range privateThumbs { + if strings.Contains(outputURI.Path, playbackID) { + glog.Infof("Saving thumbnail to private location for %s", outputURI.Redacted()) + outputURIStr := outputURI.String() + for original, private := range privateThumbsURLReplacement { + if strings.HasPrefix(outputURIStr, original) { + newURI, err := url.Parse(strings.Replace(outputURIStr, original, private, 1)) + if err != nil { + return fmt.Errorf("failed to parse thumbnail URL: %w", err) + } + outputURI = newURI + break + } + } + break + } + } tmpDir, err := os.MkdirTemp(os.TempDir(), "thumb-*") if err != nil { diff --git a/core/uploader_test.go b/core/uploader_test.go index a917f00..905a8e2 100644 --- a/core/uploader_test.go +++ b/core/uploader_test.go @@ -36,7 +36,7 @@ func TestItWritesSlowInputIncrementally(t *testing.T) { go func() { u, err := url.Parse(outputFile.Name()) require.NoError(t, err) - _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil, time.Minute, nil) + _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil, time.Minute, nil, nil, nil) require.NoError(t, err, "") }() From d436ade69e4d2a9eb3e392e15b91786d23a82bd0 Mon Sep 17 00:00:00 2001 From: Max Holland Date: Mon, 21 Oct 2024 12:48:50 +0100 Subject: [PATCH 2/2] Adapt to a generic url replacement --- catalyst-uploader.go | 5 ++--- core/uploader.go | 33 ++++++++++++++++++--------------- core/uploader_test.go | 2 +- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/catalyst-uploader.go b/catalyst-uploader.go index b404452..8cc8915 100644 --- a/catalyst-uploader.go +++ b/catalyst-uploader.go @@ -42,8 +42,7 @@ func run() int { segTimeout := fs.Duration("segment-timeout", 5*time.Minute, "Segment write timeout") disableRecording := CommaSliceFlag(fs, "disable-recording", `Comma-separated list of playbackIDs to disable recording for`) disableThumbs := CommaSliceFlag(fs, "disable-thumbs", `Comma-separated list of playbackIDs to disable thumbs for`) - privateThumbs := CommaSliceFlag(fs, "private-thumbs", `Comma-separated list of playbackIDs to save to private location`) - privateThumbsURLReplacement := CommaMapFlag(fs, "private-thumbs-replace", `Map for replacement URL to use when saving thumbnails to the private location`) + thumbsURLReplacement := CommaMapFlag(fs, "thumbs-replace-urls", `Map of space separated playbackIDs to space separated URL replacement to use when saving thumbnails. E.g. playbackID1 playbackID2=oldURL newURL`) defaultConfigFile := "/etc/livepeer/catalyst_uploader.conf" if _, err := os.Stat(defaultConfigFile); os.IsNotExist(err) { @@ -113,7 +112,7 @@ func run() int { } start := time.Now() - out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs, *segTimeout, *disableThumbs, *privateThumbs, *privateThumbsURLReplacement) + out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs, *segTimeout, *disableThumbs, *thumbsURLReplacement) if err != nil { glog.Errorf("Uploader failed for %s: %s", uri.Redacted(), err) return 1 diff --git a/core/uploader.go b/core/uploader.go index af3f2a0..d2af9f6 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -53,7 +53,7 @@ var expiryField = map[string]string{ "Object-Expires": "+168h", // Objects will be deleted after 7 days } -func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string, segTimeout time.Duration, disableThumbs, privateThumbs []string, privateThumbsURLReplacement map[string]string) (*drivers.SaveDataOutput, error) { +func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string, segTimeout time.Duration, disableThumbs []string, thumbsURLReplacement map[string]string) (*drivers.SaveDataOutput, error) { ext := filepath.Ext(outputURI.Path) inputFile, err := os.CreateTemp("", "upload-*"+ext) if err != nil { @@ -78,7 +78,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err) } - if err = extractThumb(outputURI, inputFileName, storageFallbackURLs, disableThumbs, privateThumbs, privateThumbsURLReplacement); err != nil { + if err = extractThumb(outputURI, inputFileName, storageFallbackURLs, disableThumbs, thumbsURLReplacement); err != nil { glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err) } return out, nil @@ -229,28 +229,31 @@ func uploadFile(outputURI *url.URL, fileName string, fields *drivers.FilePropert return out, bytesWritten, err } -func extractThumb(outputURI *url.URL, segmentFileName string, storageFallbackURLs map[string]string, disableThumbs []string, privateThumbs []string, privateThumbsURLReplacement map[string]string) error { +func extractThumb(outputURI *url.URL, segmentFileName string, storageFallbackURLs map[string]string, disableThumbs []string, thumbsURLReplacement map[string]string) error { for _, playbackID := range disableThumbs { if strings.Contains(outputURI.Path, playbackID) { glog.Infof("Thumbnails disabled for %s", outputURI.Redacted()) return nil } } - for _, playbackID := range privateThumbs { - if strings.Contains(outputURI.Path, playbackID) { - glog.Infof("Saving thumbnail to private location for %s", outputURI.Redacted()) - outputURIStr := outputURI.String() - for original, private := range privateThumbsURLReplacement { - if strings.HasPrefix(outputURIStr, original) { - newURI, err := url.Parse(strings.Replace(outputURIStr, original, private, 1)) - if err != nil { - return fmt.Errorf("failed to parse thumbnail URL: %w", err) - } - outputURI = newURI + for playbackIDs, replacement := range thumbsURLReplacement { + for _, playbackID := range strings.Split(playbackIDs, " ") { + if strings.Contains(outputURI.Path, playbackID) { + outputURIStr := outputURI.String() + split := strings.Split(replacement, " ") + if len(split) != 2 { break } + original, replaceWith := split[0], split[1] + + newURI, err := url.Parse(strings.Replace(outputURIStr, original, replaceWith, 1)) + if err != nil { + return fmt.Errorf("failed to parse thumbnail URL: %w", err) + } + outputURI = newURI + glog.Infof("Replaced thumbnail location for %s", outputURI.Redacted()) + break } - break } } diff --git a/core/uploader_test.go b/core/uploader_test.go index 905a8e2..e7a18e2 100644 --- a/core/uploader_test.go +++ b/core/uploader_test.go @@ -36,7 +36,7 @@ func TestItWritesSlowInputIncrementally(t *testing.T) { go func() { u, err := url.Parse(outputFile.Name()) require.NoError(t, err) - _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil, time.Minute, nil, nil, nil) + _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil, time.Minute, nil, nil) require.NoError(t, err, "") }()