Skip to content

Commit

Permalink
file input plugin: define watching_dir from include patterns (#567)
Browse files Browse the repository at this point in the history
* file input plugin info url

* file input plugin: new format for patterns

* file input plugin: new format for patterns

* define watching_dir for file watcher

* check dir files in notify by base paths from include patterns

* use filepath.Walk

* watching_dir is not required

* fix doc for new format in file plugin

* add debug messages for watcher

* fix after rebase

* check file path patterns on start
  • Loading branch information
DmitryRomanov authored Oct 9, 2024
1 parent e02c4a6 commit b897426
Show file tree
Hide file tree
Showing 12 changed files with 451 additions and 125 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/alicebob/miniredis/v2 v2.30.5
github.com/bitly/go-simplejson v0.5.1
github.com/bmatcuk/doublestar/v4 v4.0.2
github.com/bufbuild/protocompile v0.13.0
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cespare/xxhash/v2 v2.2.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pg
github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q=
github.com/bufbuild/protocompile v0.13.0 h1:6cwUB0Y2tSvmNxsbunwzmIto3xOlJOV7ALALuVOs92M=
github.com/bufbuild/protocompile v0.13.0/go.mod h1:dr++fGGeMPWHv7jPeT06ZKukm45NJscd7rUxQVzEKRk=
github.com/bmatcuk/doublestar/v4 v4.0.2 h1:X0krlUVAVmtr2cRoTqR8aDMrDqnB36ht8wpWTiQ3jsA=
github.com/bmatcuk/doublestar/v4 v4.0.2/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc=
github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c=
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
Expand Down
12 changes: 0 additions & 12 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,6 @@ But update events don't work with symlinks, so watcher also periodically manuall
> ⚠ Use add_file_name plugin if you want to add filename to events.
**Reading docker container log files:**
```yaml
pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
offsets_file: /data/offsets.yaml
filename_pattern: "*-json.log"
persistence_mode: async
```
[More details...](plugin/input/file/README.md)
## http
Reads events from HTTP requests with the body delimited by a new line.
Expand Down
12 changes: 0 additions & 12 deletions plugin/input/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,6 @@ But update events don't work with symlinks, so watcher also periodically manuall
> ⚠ Use add_file_name plugin if you want to add filename to events.
**Reading docker container log files:**
```yaml
pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
offsets_file: /data/offsets.yaml
filename_pattern: "*-json.log"
persistence_mode: async
```
[More details...](plugin/input/file/README.md)
## http
Reads events from HTTP requests with the body delimited by a new line.
Expand Down
15 changes: 15 additions & 0 deletions plugin/input/file/README.idoc.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
# File plugin
@introduction

**Reading docker container log files:**
```yaml
pipelines:
example_docker_pipeline:
input:
type: file
paths:
include:
- '/var/lib/docker/containers/**/*-json.log'
exclude:
- '/var/lib/docker/containers/19aa5027343f4*/*-json.log'
offsets_file: /data/offsets.yaml
persistence_mode: async
```
### Config params
@config-params|description
Expand Down
54 changes: 33 additions & 21 deletions plugin/input/file/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,22 @@ pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
paths:
include:
- '/var/lib/docker/containers/**/*-json.log'
exclude:
- '/var/lib/docker/containers/19aa5027343f4*/*-json.log'
offsets_file: /data/offsets.yaml
filename_pattern: "*-json.log"
persistence_mode: async
```
### Config params
**`watching_dir`** *`string`* *`required`*
**`paths`** *`Paths`*

The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
`/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
different directories, it's recommended to setup separate pipelines for each.
Set paths in glob format

* `include` *`[]string`*
* `exclude` *`[]string`*

<br>

Expand All @@ -48,20 +51,6 @@ The filename to store offsets of processed files. Offsets are loaded only on ini

<br>

**`filename_pattern`** *`string`* *`default=*`*

Files that don't meet this pattern will be ignored.
> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.

<br>

**`dir_pattern`** *`string`* *`default=*`*

Dirs that don't meet this pattern will be ignored.
> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.

<br>

**`persistence_mode`** *`string`* *`default=async`* *`options=async|sync`*

It defines how to save the offsets file:
Expand Down Expand Up @@ -147,6 +136,29 @@ Example: ```filename: '{{ .filename }}'```

<br>

**`watching_dir`** **Deprecated format**

The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
`/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
different directories, it's recommended to setup separate pipelines for each.

<br>

**`filename_pattern`** *`string`* *`default=*`*

Files that don't meet this pattern will be ignored.
> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.

<br>

**`dir_pattern`** *`string`* *`default=*`*

Dirs that don't meet this pattern will be ignored.
> Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.

<br>


### Meta params
**`filename`**
Expand Down
74 changes: 45 additions & 29 deletions plugin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"path/filepath"
"time"

"github.com/bmatcuk/doublestar/v4"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
Expand Down Expand Up @@ -34,18 +35,6 @@ But update events don't work with symlinks, so watcher also periodically manuall
> By default the plugin is notified only on file creations. Note that following for changes is more CPU intensive.
> ⚠ Use add_file_name plugin if you want to add filename to events.
**Reading docker container log files:**
```yaml
pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
offsets_file: /data/offsets.yaml
filename_pattern: "*-json.log"
persistence_mode: async
```
}*/

type Plugin struct {
Expand Down Expand Up @@ -81,17 +70,22 @@ const (
offsetsOpReset // * `reset` – resets an offset to the beginning of the file
)

type Paths struct {
Include []string `json:"include"`
Exclude []string `json:"exclude"`
}

type Config struct {
// ! config-params
// ^ config-params

// > @3@4@5@6
// >
// > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
// > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
// > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
// > different directories, it's recommended to setup separate pipelines for each.
WatchingDir string `json:"watching_dir" required:"true"` // *
// > Set paths in glob format
// >
// > * `include` *`[]string`*
// > * `exclude` *`[]string`*
Paths Paths `json:"paths"` // *

// > @3@4@5@6
// >
Expand All @@ -100,18 +94,6 @@ type Config struct {
OffsetsFile string `json:"offsets_file" required:"true"` // *
OffsetsFileTmp string

// > @3@4@5@6
// >
// > Files that don't meet this pattern will be ignored.
// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
FilenamePattern string `json:"filename_pattern" default:"*"` // *

// > @3@4@5@6
// >
// > Dirs that don't meet this pattern will be ignored.
// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
DirPattern string `json:"dir_pattern" default:"*"` // *

// > @3@4@5@6
// >
// > It defines how to save the offsets file:
Expand Down Expand Up @@ -184,6 +166,26 @@ type Config struct {
// >
// > Example: ```filename: '{{ .filename }}'```
Meta cfg.MetaTemplates `json:"meta"` // *

// > **Deprecated format**
// >
// > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
// > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
// > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
// > different directories, it's recommended to setup separate pipelines for each.
WatchingDir string `json:"watching_dir"` // *

// > @3@4@5@6
// >
// > Files that don't meet this pattern will be ignored.
// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
FilenamePattern string `json:"filename_pattern" default:"*"` // *

// > @3@4@5@6
// >
// > Dirs that don't meet this pattern will be ignored.
// > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
DirPattern string `json:"dir_pattern" default:"*"` // *
}

var offsetFiles = make(map[string]string)
Expand Down Expand Up @@ -222,6 +224,20 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
offsetFiles[offsetFilePath] = params.PipelineName
}

for _, pattern := range p.config.Paths.Include {
_, err := doublestar.PathMatch(pattern, ".")
if err != nil {
p.logger.Fatalf("wrong paths include pattern %q: %s", pattern, err.Error())
}
}

for _, pattern := range p.config.Paths.Exclude {
_, err := doublestar.PathMatch(pattern, ".")
if err != nil {
p.logger.Fatalf("wrong paths exclude pattern %q: %s", pattern, err.Error())
}
}

p.jobProvider = NewJobProvider(
p.config,
newMetricCollection(
Expand Down
12 changes: 9 additions & 3 deletions plugin/input/file/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,16 @@ func (ir *InfoRegistry) Info(w http.ResponseWriter, r *http.Request) {

_, _ = w.Write([]byte(jobsInfo))

watcherInfo := logger.Header("watch_dirs")
watcherInfo := logger.Header("watch_paths")
for _, source := range plugin.jobProvider.watcher.basePaths {
watcherInfo += fmt.Sprintf(
"%s\n",
source,
)
}
watcherInfo += fmt.Sprintf(
"%s\n",
plugin.jobProvider.watcher.path,
"commonPath: %s\n",
plugin.jobProvider.watcher.commonPath,
)
_, _ = w.Write([]byte(watcherInfo))

Expand Down
17 changes: 15 additions & 2 deletions plugin/input/file/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,23 @@ func NewJobProvider(config *Config, metrics *metricCollection, sugLogger *zap.Su
numberOfCurrentJobsMetric: metrics.numberOfCurrentJobsMetric,
}

if len(config.Paths.Include) == 0 {
if config.DirPattern == "*" {
config.Paths.Include = append(
config.Paths.Include,
filepath.Join(config.WatchingDir, filepath.Join("**", config.FilenamePattern)),
)
} else {
config.Paths.Include = append(
config.Paths.Include,
filepath.Join(config.WatchingDir, filepath.Join(config.DirPattern, config.FilenamePattern)),
)
}
}

jp.watcher = NewWatcher(
config.WatchingDir,
config.FilenamePattern,
config.DirPattern,
config.Paths,
jp.processNotification,
config.ShouldWatchChanges,
metrics.notifyChannelLengthMetric,
Expand Down
Loading

0 comments on commit b897426

Please sign in to comment.