Skip to content

Commit

Permalink
Fixing issues #45 #46 #47
Browse files Browse the repository at this point in the history
  • Loading branch information
gianmarcomennecozzi committed Jun 14, 2020
1 parent 3227332 commit bdfe698
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 72 deletions.
9 changes: 0 additions & 9 deletions api/ct.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,6 @@ func (s *Server) StoreLogEntries(str api.CtApi_StoreLogEntriesServer) error {
}
}()
}
//todo change the response to the client
//errs := s.Store.MapBatchWithCacheAndDB()
//if len(errs) != 0 {
// fmt.Println(errs)
//}
//err = s.Store.StoreBatchPostHook()
//if err != nil {
// fmt.Println(err)
//}

log.Info().Msgf("%v", s.Store.Counter)
s.Store.ResetCounter()
Expand Down
7 changes: 0 additions & 7 deletions api/measurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ func (s *Server) StopMeasurement(ctx context.Context, muid *api.MeasurementId) (
return nil, status.Error(codes.Internal, err.Error())
}

if err := s.Store.RunPostHooks(); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

log.Debug().Str("muid", muid.Id).Msgf("stopped measurement")

return &api.Empty{}, nil
Expand Down Expand Up @@ -78,9 +74,6 @@ func (s *Server) StopStage(ctx context.Context, muid *api.MeasurementId) (*api.E
return nil, status.Error(codes.Internal, err.Error())
}

if err := s.Store.RunPostHooks(); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
log.Debug().Str("muid", muid.Id).Msgf("stopped stage")

return &api.Empty{}, nil
Expand Down
103 changes: 57 additions & 46 deletions store/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package store

import (
"crypto/sha256"
"errors"
"fmt"

"github.com/rs/zerolog/log"

"github.com/aau-network-security/gollector/store/models"
"github.com/go-pg/pg"
)
Expand All @@ -20,9 +21,7 @@ type certstruct struct {
sid uint
}

var cacheNotFound = errors.New("not found in the cache")

type HashMapDB struct {
type hashMapDB struct {
tldByName map[string]*domainstruct
tldAnonByName map[string]*domainstruct
publicSuffixByName map[string]*domainstruct
Expand All @@ -34,8 +33,8 @@ type HashMapDB struct {
certByFingerprint map[string]*certstruct
}

func NewBatchQueryDB() HashMapDB {
return HashMapDB{
func NewBatchQueryDB() hashMapDB {
return hashMapDB{
tldByName: make(map[string]*domainstruct),
tldAnonByName: make(map[string]*domainstruct),
publicSuffixByName: make(map[string]*domainstruct),
Expand Down Expand Up @@ -69,7 +68,7 @@ func (s *Store) MapEntry(muid string, entry LogEntry) error {
for _, d := range entry.Cert.DNSNames {
domain, err := NewDomain(d)
if err != nil {
// TODO: handle error (return all erors in a list perhaps)
log.Info().Msgf("error creating domain [%s]: %s", d, err)
continue
}
s.hashMapDB.fqdnByName[domain.fqdn.normal] = &domainstruct{
Expand All @@ -88,28 +87,16 @@ func (s *Store) MapEntry(muid string, entry LogEntry) error {
return s.conditionalPostHooks()
}

func (s *Store) MapBatchWithCacheAndDB() []error {
func (s *Store) MapBatchWithCacheAndDB() {

var errs []error
if err := s.mapCert(); err != nil {
errs = append(errs, err)
}
if err := s.mapFQDN(); err != nil {
errs = append(errs, err)
}
if err := s.mapApex(); err != nil {
errs = append(errs, err)
}
if err := s.mapPublicSuffix(); err != nil {
errs = append(errs, err)
}
if err := s.mapTLD(); err != nil {
errs = append(errs, err)
}
return errs
s.mapCert()
s.mapFQDN()
s.mapApex()
s.mapPublicSuffix()
s.mapTLD()
}

func (s *Store) mapCert() error {
func (s *Store) mapCert() {

var certsNotFoundInCache []string
//map from cache
Expand All @@ -125,23 +112,27 @@ func (s *Store) mapCert() error {
s.hashMapDB.certByFingerprint[k] = existing
}

// Cache not full so the certificate can not be in the DB
if s.cache.certByFingerprint.Len() < s.cacheOpts.CertSize {
return
}

//map with DB
var certsFoundInDB []*models.Certificate

if err := s.db.Model(&certsFoundInDB).Where("sha256_fingerprint in (?)", pg.In(certsNotFoundInCache)).Select(); err != nil {
return err
log.Error().Msgf("error retrieve Certificates from DB [mapCert]: %s", err)
}

for _, c := range certsFoundInDB {
existing := s.hashMapDB.certByFingerprint[c.Sha256Fingerprint]
existing.cert = c
s.hashMapDB.certByFingerprint[c.Sha256Fingerprint] = existing
s.cache.certByFingerprint.Add(c.Sha256Fingerprint, c)
}

return nil
}

func (s *Store) mapFQDN() error {
func (s *Store) mapFQDN() {

var fqndNotFoundInCache []string

Expand All @@ -157,22 +148,27 @@ func (s *Store) mapFQDN() error {
s.hashMapDB.fqdnByName[k] = existing
}

// Cache not full so the certificate can not be in the DB
if s.cache.fqdnByName.Len() < s.cacheOpts.FQDNSize {
return
}

//map with DB
var fqdnFoundInDB []*models.Fqdn

if err := s.db.Model(&fqdnFoundInDB).Where("fqdn in (?)", pg.In(fqndNotFoundInCache)).Select(); err != nil {
return err
log.Error().Msgf("error retrieve FQDN from DB [mapFQDN]: %s", err)
}

for _, f := range fqdnFoundInDB {
existing := s.hashMapDB.fqdnByName[f.Fqdn]
existing.obj = f
s.hashMapDB.fqdnByName[f.Fqdn] = existing
s.cache.fqdnByName.Add(f.Fqdn, f)
}
return nil
}

func (s *Store) mapApex() error {
func (s *Store) mapApex() {

var apexNotFoundInCache []string

Expand All @@ -188,22 +184,27 @@ func (s *Store) mapApex() error {
s.hashMapDB.apexByName[k] = existing
}

// Cache not full so the certificate can not be in the DB
if s.cache.apexByName.Len() < s.cacheOpts.ApexSize {
return
}

//map with DB
var apexFoundInDB []*models.Apex

if err := s.db.Model(&apexFoundInDB).Where("apex in (?)", pg.In(apexNotFoundInCache)).Select(); err != nil {
return err
log.Error().Msgf("error retrieve Apex from DB [mapApex]: %s", err)
}

for _, a := range apexFoundInDB {
existing := s.hashMapDB.apexByName[a.Apex]
existing.obj = a
s.hashMapDB.apexByName[a.Apex] = existing
s.cache.apexByName.Add(a.Apex, a)
}
return nil
}

func (s *Store) mapPublicSuffix() error {
func (s *Store) mapPublicSuffix() {

var psNotFoundInCache []string

Expand All @@ -219,22 +220,26 @@ func (s *Store) mapPublicSuffix() error {
s.hashMapDB.publicSuffixByName[k] = existing
}

// Cache not full so the certificate can not be in the DB
if s.cache.publicSuffixByName.Len() < s.cacheOpts.PSuffSize {
return
}

//map with DB
var psFoundInDB []*models.PublicSuffix

if err := s.db.Model(&psFoundInDB).Where("public_suffix in (?)", pg.In(psNotFoundInCache)).Select(); err != nil {
return err
log.Error().Msgf("error retrieve PS from DB [mapPublicSuffix]: %s", err)
}

for _, ps := range psFoundInDB {
existing := s.hashMapDB.publicSuffixByName[ps.PublicSuffix]
existing.obj = ps
s.hashMapDB.publicSuffixByName[ps.PublicSuffix] = existing
}
return nil
}

func (s *Store) mapTLD() error {
func (s *Store) mapTLD() {

var tldNotFoundInCache []string

Expand All @@ -250,19 +255,25 @@ func (s *Store) mapTLD() error {
s.hashMapDB.tldByName[k] = existing
}

// Cache not full so the certificate can not be in the DB
if s.cache.tldByName.Len() < s.cacheOpts.TLDSize {
return
}

//map with DB
var tldFoundInDB []*models.Tld

if err := s.db.Model(&tldFoundInDB).Where("tld in (?)", pg.In(tldNotFoundInCache)).Select(); err != nil {
return err
log.Error().Msgf("error retrieve TLD from DB [mapTLD]: %s", err)
}

for _, tld := range tldFoundInDB {
existing := s.hashMapDB.tldByName[tld.Tld]
existing.obj = tld
s.hashMapDB.tldByName[tld.Tld] = existing
s.cache.tldByName.Add(tld.Tld, tld)
}
return nil
return
}

func (s *Store) StoreBatchPostHook() error {
Expand All @@ -277,6 +288,7 @@ func (s *Store) StoreBatchPostHook() error {
str.obj = res
s.hashMapDB.tldByName[k] = str
s.ids.tlds++
s.cache.tldByName.Add(k, res)
}
}

Expand All @@ -294,6 +306,7 @@ func (s *Store) StoreBatchPostHook() error {
s.inserts.publicSuffix = append(s.inserts.publicSuffix, res)
s.hashMapDB.publicSuffixByName[k] = str
s.ids.suffixes++
s.cache.publicSuffixByName.Add(k, res)
}
}

Expand All @@ -317,6 +330,7 @@ func (s *Store) StoreBatchPostHook() error {
s.inserts.apexes[res.ID] = res
s.hashMapDB.apexByName[k] = str
s.ids.apexes++
s.cache.apexByName.Add(k, res)
}
}

Expand Down Expand Up @@ -344,6 +358,7 @@ func (s *Store) StoreBatchPostHook() error {
s.inserts.fqdns = append(s.inserts.fqdns, res)
s.hashMapDB.fqdnByName[k] = str
s.ids.fqdns++
s.cache.fqdnByName.Add(k, res)
}
}

Expand All @@ -360,7 +375,6 @@ func (s *Store) StoreBatchPostHook() error {
for _, d := range certstr.entry.Cert.DNSNames {
domain, err := NewDomain(d)
if err != nil {
// todo: handle error
continue
}
fqdnstr := s.hashMapDB.fqdnByName[domain.fqdn.normal]
Expand All @@ -372,11 +386,11 @@ func (s *Store) StoreBatchPostHook() error {
}
s.inserts.certToFqdns = append(s.inserts.certToFqdns, &ctof)
}
// TODO: do not forget to insert in cache

certstr.cert = cert
s.inserts.certs = append(s.inserts.certs, cert)
s.ids.certs++
s.cache.certByFingerprint.Add(k, cert)

}

Expand All @@ -397,8 +411,5 @@ func (s *Store) StoreBatchPostHook() error {
s.inserts.logEntries = append(s.inserts.logEntries, &le)
}

// log entries

//todo return s.conditionalPostHooks()
return nil
}
17 changes: 7 additions & 10 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func newCache(opts CacheOpts) cache {
logByUrl: newLRUCache(opts.LogSize), //make(map[string]*models.Log),
certByFingerprint: newLRUCache(opts.CertSize), //make(map[string]*models.Certificate),
passiveEntryByFqdn: newSplunkEntryMap(),
recordTypeByName: newLRUCache(opts.ApexSize), //make(map[string]*models.RecordType), //todo ask kaspar
recordTypeByName: newLRUCache(opts.TLDSize), //make(map[string]*models.RecordType),
}
}

Expand Down Expand Up @@ -266,7 +266,7 @@ type Store struct {
anonymizer *Anonymizer
Ready *Ready
Counter counter
hashMapDB HashMapDB
hashMapDB hashMapDB
}

type counter struct {
Expand Down Expand Up @@ -665,12 +665,9 @@ func NewStore(conf Config, opts Opts) (*Store, error) {

func storeCachedValuePosthook() postHook {
return func(s *Store) error {
//todo change the response to the client
erorrs := s.MapBatchWithCacheAndDB()
if len(erorrs) != 0 {
//todo return err too
fmt.Println(erorrs)
}

s.MapBatchWithCacheAndDB()

err := s.StoreBatchPostHook()
if err != nil {
return err
Expand Down Expand Up @@ -779,13 +776,13 @@ func storeCachedValuePosthook() postHook {

s.updates = NewModelSet()
s.inserts = NewModelSet()
s.hashMapDB = NewBatchQueryDB()

if err := tx.Commit(); err != nil {
return errs.Wrap(err, "committing transaction")
}

s.hashMapDB = NewBatchQueryDB()

log.Info().Msgf("successfully wrote to the database")
return nil
}
}

0 comments on commit bdfe698

Please sign in to comment.