Skip to content

Commit

Permalink
aio(analog sensor): fix deadlock in cyclic reading
Browse files Browse the repository at this point in the history
  • Loading branch information
gen2thomas committed Nov 26, 2023
1 parent 3980845 commit 9745893
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 61 deletions.
89 changes: 49 additions & 40 deletions drivers/aio/analog_sensor_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ func NewAnalogSensorDriver(a AnalogReader, pin string, opts ...interface{}) *Ana
driver: newDriver(a, "AnalogSensor"),
sensorCfg: &sensorConfiguration{scale: func(input int) float64 { return float64(input) }},
pin: pin,
Eventer: gobot.NewEventer(),
halt: make(chan bool),
Eventer: gobot.NewEventer(), // needed early due to grove vibration sensor driver
}
d.afterStart = d.initialize
d.beforeHalt = d.shutdown
Expand All @@ -76,10 +75,6 @@ func NewAnalogSensorDriver(a AnalogReader, pin string, opts ...interface{}) *Ana
}
}

d.AddEvent(Data)
d.AddEvent(Value)
d.AddEvent(Error)

d.AddCommand("Read", func(params map[string]interface{}) interface{} {
val, err := d.Read()
return map[string]interface{}{"val": val, "err": err}
Expand Down Expand Up @@ -112,6 +107,37 @@ func (a *AnalogSensorDriver) SetScaler(scaler func(int) float64) {
WithSensorScaler(scaler).apply(a.sensorCfg)
}

// Pin returns the AnalogSensorDrivers pin
func (a *AnalogSensorDriver) Pin() string { return a.pin }

// Read returns the current reading from the sensor, scaled by the current scaler
func (a *AnalogSensorDriver) Read() (float64, error) {
_, value, err := a.analogRead()
return value, err
}

// ReadRaw returns the current reading from the sensor without scaling
func (a *AnalogSensorDriver) ReadRaw() (int, error) {
rawValue, _, err := a.analogRead()
return rawValue, err
}

// Value returns the last read value from the sensor
func (a *AnalogSensorDriver) Value() float64 {
a.mutex.Lock()
defer a.mutex.Unlock()

return a.lastValue
}

// RawValue returns the last read raw value from the sensor
func (a *AnalogSensorDriver) RawValue() int {
a.mutex.Lock()
defer a.mutex.Unlock()

return a.lastRawValue
}

// initialize the AnalogSensorDriver and if the cyclic reading is active, reads the sensor at the given interval.
// Emits the Events:
//
Expand All @@ -123,12 +149,27 @@ func (a *AnalogSensorDriver) initialize() error {
// cyclic reading deactivated
return nil
}

a.AddEvent(Data)
a.AddEvent(Value)
a.AddEvent(Error)

// A small buffer is needed to prevent mutex-channel-deadlock between Halt() and analogRead().
// This can happen, if the shutdown is in progress (mutex passed) and the go routine is calling
// the analogRead() in between, before the halt can be evaluated by the select statement.
// In this case the mutex of analogRead() blocks the reading of the halt channel and, without a small buffer,
// the writing to halt is blocked because there is no immediate read from channel.
// Please note, that this is special behavior caused by the first read is done immediately before the select
// statement.
a.halt = make(chan bool, 1)

oldRawValue := 0
oldValue := 0.0
go func() {
timer := time.NewTimer(a.sensorCfg.readInterval)
timer.Stop()
for {
// please note, that this ensures the first read is done immediately, but has drawbacks, see notes above
rawValue, value, err := a.analogRead()
if err != nil {
a.Publish(a.Event(Error), err)
Expand All @@ -142,8 +183,7 @@ func (a *AnalogSensorDriver) initialize() error {
oldValue = value
}
}

timer.Reset(a.sensorCfg.readInterval)
timer.Reset(a.sensorCfg.readInterval) // ensure that after each read is a wait, independent of duration of read
select {
case <-timer.C:
case <-a.halt:
Expand All @@ -157,45 +197,14 @@ func (a *AnalogSensorDriver) initialize() error {

// shutdown stops polling the analog sensor for new information
func (a *AnalogSensorDriver) shutdown() error {
if a.sensorCfg.readInterval == 0 {
if a.sensorCfg.readInterval == 0 || a.halt == nil {
// cyclic reading deactivated
return nil
}
a.halt <- true
return nil
}

// Pin returns the AnalogSensorDrivers pin
func (a *AnalogSensorDriver) Pin() string { return a.pin }

// Read returns the current reading from the sensor, scaled by the current scaler
func (a *AnalogSensorDriver) Read() (float64, error) {
_, value, err := a.analogRead()
return value, err
}

// ReadRaw returns the current reading from the sensor without scaling
func (a *AnalogSensorDriver) ReadRaw() (int, error) {
rawValue, _, err := a.analogRead()
return rawValue, err
}

// Value returns the last read value from the sensor
func (a *AnalogSensorDriver) Value() float64 {
a.mutex.Lock()
defer a.mutex.Unlock()

return a.lastValue
}

// RawValue returns the last read raw value from the sensor
func (a *AnalogSensorDriver) RawValue() int {
a.mutex.Lock()
defer a.mutex.Unlock()

return a.lastRawValue
}

// analogRead performs an reading from the sensor and sets the internal attributes and returns the raw and scaled value
func (a *AnalogSensorDriver) analogRead() (int, float64, error) {
a.mutex.Lock()
Expand Down
15 changes: 8 additions & 7 deletions drivers/aio/analog_sensor_driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestNewAnalogSensorDriver(t *testing.T) {
assert.Equal(t, pin, d.Pin())
assert.InDelta(t, 0.0, d.lastValue, 0, 0)
assert.Equal(t, 0, d.lastRawValue)
assert.NotNil(t, d.halt)
assert.Nil(t, d.halt) // will be created on initialize, if cyclic reading is on
assert.NotNil(t, d.Eventer)
require.NotNil(t, d.sensorCfg)
assert.Equal(t, time.Duration(0), d.sensorCfg.readInterval)
Expand Down Expand Up @@ -171,6 +171,9 @@ func TestAnalogSensor_WithSensorCyclicRead(t *testing.T) {
}
}

// act (start cyclic reading)
require.NoError(t, d.Start())

// arrange: expect raw value to be received
_ = d.Once(d.Event(Data), func(data interface{}) {
assert.Equal(t, 100, data.(int))
Expand All @@ -185,9 +188,6 @@ func TestAnalogSensor_WithSensorCyclicRead(t *testing.T) {
nextVal <- -1 // arrange: error in read function
})

// act (start cyclic reading)
require.NoError(t, d.Start())

// assert: both events within timeout
select {
case <-semDone:
Expand Down Expand Up @@ -233,13 +233,14 @@ func TestAnalogSensor_WithSensorCyclicRead(t *testing.T) {
func TestAnalogSensorHalt_WithSensorCyclicRead(t *testing.T) {
// arrange
d := NewAnalogSensorDriver(newAioTestAdaptor(), "1", WithSensorCyclicRead(10*time.Millisecond))
require.NoError(t, d.Start())
done := make(chan struct{})
// act & assert
go func() {
<-d.halt
require.NoError(t, d.Halt())
close(done)
}()
// act & assert
require.NoError(t, d.Halt())
// test that the halt is not blocked by any deadlock with mutex and/or channel
select {
case <-done:
case <-time.After(100 * time.Millisecond):
Expand Down
8 changes: 4 additions & 4 deletions drivers/aio/grove_drivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestNewGroveRotaryDriver(t *testing.T) {
assert.Equal(t, pin, d.Pin())
assert.InDelta(t, 0.0, d.lastValue, 0, 0)
assert.Equal(t, 0, d.lastRawValue)
assert.NotNil(t, d.halt)
assert.Nil(t, d.halt) // will be created on initialize, if cyclic reading is on
assert.NotNil(t, d.Eventer)
require.NotNil(t, d.sensorCfg)
assert.Equal(t, time.Duration(0), d.sensorCfg.readInterval)
Expand All @@ -65,7 +65,7 @@ func TestNewGroveLightSensorDriver(t *testing.T) {
assert.Equal(t, pin, d.Pin())
assert.InDelta(t, 0.0, d.lastValue, 0, 0)
assert.Equal(t, 0, d.lastRawValue)
assert.NotNil(t, d.halt)
assert.Nil(t, d.halt) // will be created on initialize, if cyclic reading is on
assert.NotNil(t, d.Eventer)
require.NotNil(t, d.sensorCfg)
assert.Equal(t, time.Duration(0), d.sensorCfg.readInterval)
Expand All @@ -91,7 +91,7 @@ func TestNewGrovePiezoVibrationSensorDriver(t *testing.T) {
assert.Equal(t, pin, d.Pin())
assert.InDelta(t, 0.0, d.lastValue, 0, 0)
assert.Equal(t, 0, d.lastRawValue)
assert.NotNil(t, d.halt)
assert.Nil(t, d.halt) // will be created on initialize, if cyclic reading is on
assert.NotNil(t, d.Eventer)
require.NotNil(t, d.sensorCfg)
assert.Equal(t, time.Duration(0), d.sensorCfg.readInterval)
Expand All @@ -117,7 +117,7 @@ func TestNewGroveSoundSensorDriver(t *testing.T) {
assert.Equal(t, pin, d.Pin())
assert.InDelta(t, 0.0, d.lastValue, 0, 0)
assert.Equal(t, 0, d.lastRawValue)
assert.NotNil(t, d.halt)
assert.Nil(t, d.halt) // will be created on initialize, if cyclic reading is on
assert.NotNil(t, d.Eventer)
require.NotNil(t, d.sensorCfg)
assert.Equal(t, time.Duration(0), d.sensorCfg.readInterval)
Expand Down
9 changes: 5 additions & 4 deletions drivers/aio/grove_temperature_sensor_driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestNewGroveTemperatureSensorDriver(t *testing.T) {
assert.Equal(t, pin, d.Pin())
assert.InDelta(t, 0.0, d.lastValue, 0, 0)
assert.Equal(t, 0, d.lastRawValue)
assert.NotNil(t, d.halt)
assert.Nil(t, d.halt) // will be created on initialize, if cyclic reading is on
assert.NotNil(t, d.Eventer)
require.NotNil(t, d.sensorCfg)
assert.Equal(t, time.Duration(0), d.sensorCfg.readInterval)
Expand Down Expand Up @@ -103,14 +103,15 @@ func TestGroveTemperatureSensor_publishesTemperatureInCelsius(t *testing.T) {
a.analogReadFunc = func() (int, error) {
return 585, nil
}

// act: start cyclic reading
require.NoError(t, d.Start())

_ = d.Once(d.Event(Value), func(data interface{}) {
assert.Equal(t, "31.62", fmt.Sprintf("%.2f", data.(float64)))
sem <- true
})

// act: start cyclic reading
require.NoError(t, d.Start())

// assert: value was published
select {
case <-sem:
Expand Down
14 changes: 8 additions & 6 deletions drivers/aio/temperature_sensor_driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestNewTemperatureSensorDriver(t *testing.T) {
assert.Equal(t, pin, d.Pin())
assert.InDelta(t, 0.0, d.lastValue, 0, 0)
assert.Equal(t, 0, d.lastRawValue)
assert.NotNil(t, d.halt)
assert.Nil(t, d.halt) // will be created on initialize, if cyclic reading is on
assert.NotNil(t, d.Eventer)
require.NotNil(t, d.sensorCfg)
assert.Equal(t, time.Duration(0), d.sensorCfg.readInterval)
Expand Down Expand Up @@ -139,11 +139,12 @@ func TestTemperatureSensorPublishesTemperatureInCelsius(t *testing.T) {
a.analogReadFunc = func() (int, error) {
return 585, nil
}

require.NoError(t, d.Start())
_ = d.Once(d.Event(Value), func(data interface{}) {
assert.Equal(t, "31.62", fmt.Sprintf("%.2f", data.(float64)))
sem <- true
})
require.NoError(t, d.Start())

select {
case <-sem:
Expand Down Expand Up @@ -184,16 +185,17 @@ func TestTemperatureSensorHalt_WithSensorCyclicRead(t *testing.T) {
// arrange
d := NewTemperatureSensorDriver(newAioTestAdaptor(), "1", WithSensorCyclicRead(10*time.Millisecond))
done := make(chan struct{})
require.NoError(t, d.Start())
// act & assert
go func() {
<-d.halt
require.NoError(t, d.Halt())
close(done)
}()
// act & assert
require.NoError(t, d.Halt())
// test that the halt is not blocked by any deadlock with mutex and/or channel
select {
case <-done:
case <-time.After(100 * time.Millisecond):
t.Errorf(" Temperature Sensor was not halted")
t.Errorf("Temperature Sensor was not halted")
}
}

Expand Down

0 comments on commit 9745893

Please sign in to comment.