Skip to content

Commit

Permalink
Merge pull request #43 from jortel/journal3
Browse files Browse the repository at this point in the history
Model: Journal improvements
  • Loading branch information
jortel authored Jan 27, 2021
2 parents bec9ad9 + 3f7a13e commit 7ce031d
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 95 deletions.
24 changes: 3 additions & 21 deletions pkg/inventory/model/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ type DB interface {
Watch(Model, EventHandler) (*Watch, error)
// End a watch.
EndWatch(watch *Watch)
// The journal
Journal() *Journal
}

//
Expand Down Expand Up @@ -99,7 +97,6 @@ func (r *Client) Close(purge bool) error {
if r.db == nil {
return nil
}
r.journal.Disable()
err := r.db.Close()
if err != nil {
return liberr.Wrap(err)
Expand Down Expand Up @@ -180,7 +177,7 @@ func (r *Client) Update(model Model) error {
r.dbMutex.Lock()
defer r.dbMutex.Unlock()
table := Table{r.db}
current := r.journal.copy(model)
current := Clone(model)
err := table.Get(current)
if err != nil {
return liberr.Wrap(err)
Expand Down Expand Up @@ -237,16 +234,7 @@ func (r *Client) Watch(model Model, handler EventHandler) (*Watch, error) {
return nil, liberr.Wrap(err)
}
list := listPtr.Elem()
for i := 0; i < list.Len(); i++ {
m := list.Index(i).Addr().Interface()
watch.notify(
&Event{
Model: m.(Model),
Action: Created,
})
}

watch.Start()
watch.Start(&list)

return watch, nil
}
Expand All @@ -257,12 +245,6 @@ func (r *Client) EndWatch(watch *Watch) {
r.journal.End(watch)
}

//
// The associated journal.
func (r *Client) Journal() *Journal {
return &r.journal
}

//
// Database transaction.
type Tx struct {
Expand Down Expand Up @@ -317,7 +299,7 @@ func (r *Tx) Insert(model Model) error {
// Update the model.
func (r *Tx) Update(model Model) error {
table := Table{r.real}
current := r.journal.copy(model)
current := Clone(model)
err := table.Get(current)
if err != nil {
return liberr.Wrap(err)
Expand Down
103 changes: 31 additions & 72 deletions pkg/inventory/model/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,20 @@ func (w *Watch) notify(event *Event) {
//
// Run the watch.
// Forward events to the `handler`.
func (w *Watch) Start() {
func (w *Watch) Start(list *reflect.Value) {
if w.started {
return
}
run := func() {
for i := 0; i < list.Len(); i++ {
m := list.Index(i).Addr().Interface()
w.Handler.Created(
Event{
Model: m.(Model),
Action: Created,
})
}
list = nil
for event := range w.queue {
switch event.Action {
case Created:
Expand Down Expand Up @@ -115,42 +124,9 @@ func (w *Watch) End() {
type Journal struct {
mutex sync.RWMutex
// List of registered watches.
watches []*Watch
watchList []*Watch
// Queue of staged events.
staged []*Event
// Enabled.
enabled bool
}

//
// The journal is enabled.
// Must be enabled for watch models.
func (r *Journal) Enabled() bool {
r.mutex.RLock()
defer r.mutex.RUnlock()
return r.enabled
}

//
// Enable the journal.
func (r *Journal) Enable() {
r.mutex.Lock()
defer r.mutex.Unlock()
r.enabled = true
}

//
// Disable the journal.
// End all watches and discard staged events.
func (r *Journal) Disable() {
r.mutex.Lock()
defer r.mutex.Unlock()
for _, w := range r.watches {
w.End()
}
r.watches = []*Watch{}
r.staged = []*Event{}
r.enabled = false
}

//
Expand All @@ -160,14 +136,11 @@ func (r *Journal) Disable() {
func (r *Journal) Watch(model Model, handler EventHandler) (*Watch, error) {
r.mutex.Lock()
defer r.mutex.Unlock()
if !r.enabled {
return nil, liberr.New("disabled")
}
watch := &Watch{
Handler: handler,
Model: model,
}
r.watches = append(r.watches, watch)
r.watchList = append(r.watchList, watch)
watch.queue = make(chan *Event, 10000)
return watch, nil
}
Expand All @@ -177,19 +150,16 @@ func (r *Journal) Watch(model Model, handler EventHandler) (*Watch, error) {
func (r *Journal) End(watch *Watch) {
r.mutex.Lock()
defer r.mutex.Unlock()
if !r.enabled {
return
}
kept := []*Watch{}
for _, w := range r.watches {
for _, w := range r.watchList {
if w != watch {
kept = append(kept, w)
continue
}
w.End()
}

r.watches = kept
r.watchList = kept
}

//
Expand All @@ -198,13 +168,13 @@ func (r *Journal) End(watch *Watch) {
func (r *Journal) Created(model Model) {
r.mutex.Lock()
defer r.mutex.Unlock()
if !r.enabled {
if !r.hasWatch(model) {
return
}
r.staged = append(
r.staged,
&Event{
Model: r.copy(model),
Model: Clone(model),
Action: Created,
})
}
Expand All @@ -215,14 +185,14 @@ func (r *Journal) Created(model Model) {
func (r *Journal) Updated(model Model, updated Model) {
r.mutex.Lock()
defer r.mutex.Unlock()
if !r.enabled {
if !r.hasWatch(model) {
return
}
r.staged = append(
r.staged,
&Event{
Model: r.copy(model),
Updated: r.copy(updated),
Model: Clone(model),
Updated: Clone(updated),
Action: Updated,
})
}
Expand All @@ -233,13 +203,13 @@ func (r *Journal) Updated(model Model, updated Model) {
func (r *Journal) Deleted(model Model) {
r.mutex.Lock()
defer r.mutex.Unlock()
if !r.enabled {
if !r.hasWatch(model) {
return
}
r.staged = append(
r.staged,
&Event{
Model: r.copy(model),
Model: Clone(model),
Action: Deleted,
})
}
Expand All @@ -249,11 +219,8 @@ func (r *Journal) Deleted(model Model) {
func (r *Journal) Commit() {
r.mutex.Lock()
defer r.mutex.Unlock()
if !r.enabled {
return
}
for _, event := range r.staged {
for _, w := range r.watches {
for _, w := range r.watchList {
w.notify(event)
}
}
Expand All @@ -266,26 +233,18 @@ func (r *Journal) Commit() {
func (r *Journal) Unstage() {
r.mutex.Lock()
defer r.mutex.Unlock()
if !r.enabled {
return
}

r.staged = []*Event{}
}

//
// Copy the model.
// The model is a pointer must be protected against being
// changed at it origin or by the handlers.
func (r *Journal) copy(model Model) Model {
mt := reflect.TypeOf(model)
mv := reflect.ValueOf(model)
switch mt.Kind() {
case reflect.Ptr:
mt = mt.Elem()
mv = mv.Elem()
// Model is being watched.
// Determine if there a watch interested in the model.
func (r *Journal) hasWatch(model Model) bool {
for _, w := range r.watchList {
if w.Match(model) {
return true
}
}
new := reflect.New(mt).Elem()
new.Set(mv)
return new.Addr().Interface().(Model)

return false
}
15 changes: 15 additions & 0 deletions pkg/inventory/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,18 @@ type Base struct {
func (m *Base) Pk() string {
return m.PK
}

//
// Create new the model.
func Clone(model Model) Model {
mt := reflect.TypeOf(model)
mv := reflect.ValueOf(model)
switch mt.Kind() {
case reflect.Ptr:
mt = mt.Elem()
mv = mv.Elem()
}
new := reflect.New(mt).Elem()
new.Set(mv)
return new.Addr().Interface().(Model)
}
Loading

0 comments on commit 7ce031d

Please sign in to comment.