Skip to content

Commit

Permalink
Merge pull request #48 from moorereason/iss10
Browse files Browse the repository at this point in the history
Add concurrent DNS lookup support
  • Loading branch information
tierpod authored Apr 14, 2024
2 parents e47ca16 + 170b246 commit 8caa163
Show file tree
Hide file tree
Showing 66 changed files with 3,698 additions and 126 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ Copy config/config.dist.yaml to config.yaml and change parameters:

* **lookup_addr** (bool): perform reverse lookup? If enabled, may take some time.

* **lookup_limit** (int): limit lookup pool size; must be positive; default = 50

* **merge_reports** (bool): merge multiple similar reports to one?

* **log_debug** (bool): print debug log messages?
Expand Down
5 changes: 5 additions & 0 deletions cmd/dmarc-report-converter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type config struct {
Input Input `yaml:"input"`
Output Output `yaml:"output"`
LookupAddr bool `yaml:"lookup_addr"`
LookupLimit int `yaml:"lookup_limit"`
MergeReports bool `yaml:"merge_reports"`
LogDebug bool `yaml:"log_debug"`
LogDatetime bool `yaml:"log_datetime"`
Expand Down Expand Up @@ -73,6 +74,10 @@ func loadConfig(path string) (*config, error) {
return nil, err
}

if c.LookupLimit < 1 {
c.LookupLimit = 50
}

if c.Input.Dir == "" {
return nil, fmt.Errorf("input.dir is not configured")
}
Expand Down
16 changes: 6 additions & 10 deletions cmd/dmarc-report-converter/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,11 @@ import (
"github.com/tierpod/dmarc-report-converter/pkg/dmarc"
)

func readParse(r io.Reader, fname string, lookupAddr bool) (dmarc.Report, error) {
var report dmarc.Report
var err error

// readParse is a helper function that passes r, lookupAddr, and lookupLimit to
// dmarc.ReadParse.
//
// fname is the file name associated with r and is only used for debug logging.
func readParse(r io.Reader, fname string, lookupAddr bool, lookupLimit int) (dmarc.Report, error) {
log.Printf("[DEBUG] parse: %v", fname)

report, err = dmarc.ReadParse(r, lookupAddr)
if err != nil {
return dmarc.Report{}, err
}
return report, nil
return dmarc.ReadParse(r, lookupAddr, lookupLimit)
}
2 changes: 1 addition & 1 deletion cmd/dmarc-report-converter/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (c *filesConverter) convert() {
continue
}

report, err := readParse(file, f, c.cfg.LookupAddr)
report, err := readParse(file, f, c.cfg.LookupAddr, c.cfg.LookupLimit)
if err != nil {
file.Close()
log.Printf("[ERROR] files: %v in file %v, skip", err, f)
Expand Down
3 changes: 3 additions & 0 deletions config/config.dist.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ output:
# perform reverse lookups?
lookup_addr: no

# limit lookup pool size; must be positive; default = 50
#lookup_limit: 50

# merge multiple similar reports to one?
merge_reports: yes

Expand Down
13 changes: 11 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
module github.com/tierpod/dmarc-report-converter

go 1.13
go 1.18

require (
github.com/emersion/go-imap v1.2.1
github.com/emersion/go-message v0.18.0
github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43 // indirect
github.com/hashicorp/logutils v1.0.0
github.com/sourcegraph/conc v0.3.0
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43 // indirect
github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594 // indirect
github.com/kr/text v0.2.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
22 changes: 21 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emersion/go-imap v1.2.1 h1:+s9ZjMEjOB8NzZMVTM3cCenz2JrQIGGo5j1df19WjTA=
github.com/emersion/go-imap v1.2.1/go.mod h1:Qlx1FSx2FTxjnjWpIlVNEuX+ylerZQNFE5NsmKFSejY=
github.com/emersion/go-message v0.15.0/go.mod h1:wQUEfE+38+7EW8p8aZ96ptg6bAb1iwdgej19uXASlE4=
Expand All @@ -10,7 +14,22 @@ github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594 h1:IbFBtwo
github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594/go.mod h1:aqO8z8wPrjkscevZJFVE1wXJrLpC5LtJG7fqLOsPb2U=
github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
Expand Down Expand Up @@ -43,7 +62,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
55 changes: 39 additions & 16 deletions pkg/dmarc/dmarc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"encoding/json"
"encoding/xml"
"fmt"
"log"
"math"
"net"
"sort"
"time"

"github.com/sourcegraph/conc/pool"
)

// ReportIDDateTime is the DateTime format for Report.ID
Expand Down Expand Up @@ -215,37 +218,57 @@ type SPFAuthResult struct {
Scope string `xml:"scope" json:"scope"`
}

// Parse parses input xml data b to Report struct. If lookupAddr is true, performs a reverse
// lookups for feedback>record>row>source_ip
func Parse(b []byte, lookupAddr bool) (Report, error) {
// Parse parses input xml data b to Report struct.
//
// If lookupAddr is true, performs reverse DNS lookups for all
// feedback>record>row>source_ip entries.
//
// lookupLimit is the maximum pool size for doing concurrent DNS lookups. Any
// lookupLimit value less than 1 will disable concurrency by setting the pool
// size to 1.
func Parse(b []byte, lookupAddr bool, lookupLimit int) (Report, error) {
var r Report
err := xml.Unmarshal(b, &r)
if err != nil {
return Report{}, err
}

r.SortRecords()

if lookupAddr {
doPTRlookup(&r)
doPTRLookups(&r, lookupLimit)
}

r.SortRecords()
r.CalculateStats()

return r, nil
}

func doPTRlookup(r *Report) error {
// doPTRLookups uses a limited goroutine pool to do concurrent DNS lookups for
// all record>row>source_ip entries in r.
//
// lookupLimit is the goroutine pool size. Any lookupLimit value less than 1
// will essentially disable concurrency by setting the pool size to 1.
func doPTRLookups(r *Report, lookupLimit int) {
if lookupLimit < 1 {
lookupLimit = 1
}

p := pool.New().WithMaxGoroutines(lookupLimit)

start := time.Now()

for i, record := range r.Records {
var hostname string
hostnames, err := net.LookupAddr(record.Row.SourceIP)
if err != nil {
hostname = ""
} else {
hostname = hostnames[0]
}
r.Records[i].Row.SourceHostname = hostname
i := i
record := record

p.Go(func() {
hostnames, err := net.LookupAddr(record.Row.SourceIP)
if err == nil {
r.Records[i].Row.SourceHostname = hostnames[0]
}
})
}
log.Printf("[INFO] Parse: completed %d DNS lookups in %v for report %s", len(r.Records), time.Since(start), r.ReportMetadata.ReportID)

return nil
p.Wait()
}
10 changes: 5 additions & 5 deletions pkg/dmarc/dmarc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestReadParseXML(t *testing.T) {
}
defer f.Close()

out, err := ReadParseXML(f, false)
out, err := ReadParseXML(f, false, 1)
if err != nil {
t.Fatalf("ReadParseXML: %v", err)
}
Expand All @@ -154,7 +154,7 @@ func TestReadParseGZIP(t *testing.T) {
}
defer f.Close()

out, err := ReadParseGZIP(f, false)
out, err := ReadParseGZIP(f, false, 1)
if err != nil {
t.Fatalf("ReadParseGZIP: %v", err)
}
Expand All @@ -171,7 +171,7 @@ func TestReadParseZIP(t *testing.T) {
}
defer f.Close()

out, err := ReadParseZIP(f, false)
out, err := ReadParseZIP(f, false, 1)
if err != nil {
t.Fatalf("ReadParseZIP: %v", err)
}
Expand All @@ -190,7 +190,7 @@ func TestReadParse(t *testing.T) {
}
defer f.Close()

out, err := ReadParse(f, false)
out, err := ReadParse(f, false, 1)
if err != nil {
t.Fatalf("ReadParse(%v): %v", testFile, err)
}
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestReadParse_Empty(t *testing.T) {
t.Fatalf("ReadParse(%v): %v", testFile, err)
}
defer f.Close()
out, err := ReadParse(f, false)
out, err := ReadParse(f, false, 1)
if err != nil {
t.Fatalf("ReadParse(%v): %v", testFile, err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/dmarc/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func TestReport_MergeRecord(t *testing.T) {
// we have tested this errors already
f, _ := os.Open("testdata/test.xml")
defer f.Close()
report, _ := ReadParseXML(f, false)
report, _ := ReadParseXML(f, false, 1)

// this record must be merged with the xmlRecord2
r1 := xmlRecord2
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestReport_MergeReport(t *testing.T) {
}
defer f.Close()

rep1, err := ReadParseXML(f, false)
rep1, err := ReadParseXML(f, false, 1)
if err != nil {
t.Fatalf("Report_MergeReport: %v", err)
}
Expand All @@ -91,7 +91,7 @@ func TestReport_MergeReport(t *testing.T) {
}
defer f.Close()

rep2, err := ReadParseXML(f, false)
rep2, err := ReadParseXML(f, false, 1)
if err != nil {
t.Fatalf("Report_MergeReport: %v", err)
}
Expand Down
Loading

0 comments on commit 8caa163

Please sign in to comment.