From 22fa4782999d5878a26c73fd9248bee85e453e15 Mon Sep 17 00:00:00 2001 From: justin hartman Date: Mon, 8 Aug 2022 11:17:37 -0400 Subject: [PATCH 01/10] Bugfix: fix 'panic: close of closed channel' by atomically swapping the 'closed' member of RPC client handler structs --- client/rpc_client.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index d643ea7c9..307fd8ff4 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -404,7 +404,7 @@ func (c *RPCClient) GetCoordinate(node string) (*coordinate.Coordinate, error) { type monitorHandler struct { client *RPCClient - closed bool + closed uint32 // atomic init bool initCh chan<- error logCh chan<- string @@ -434,7 +434,8 @@ func (mh *monitorHandler) Handle(resp *responseHeader) { } func (mh *monitorHandler) Cleanup() { - if !mh.closed { + closed := atomic.SwapUint32(&mh.closed, 1) + if closed == 0 { if !mh.init { mh.init = true mh.initCh <- fmt.Errorf("Stream closed") @@ -442,7 +443,6 @@ func (mh *monitorHandler) Cleanup() { if mh.logCh != nil { close(mh.logCh) } - mh.closed = true } } @@ -486,7 +486,7 @@ func (c *RPCClient) Monitor(level logutils.LogLevel, ch chan<- string) (StreamHa type streamHandler struct { client *RPCClient - closed bool + closed uint32 // atomic init bool initCh chan<- error eventCh chan<- map[string]interface{} @@ -516,7 +516,8 @@ func (sh *streamHandler) Handle(resp *responseHeader) { } func (sh *streamHandler) Cleanup() { - if !sh.closed { + closed := atomic.SwapUint32(&sh.closed, 1) + if closed == 0 { if !sh.init { sh.init = true sh.initCh <- fmt.Errorf("Stream closed") @@ -524,7 +525,6 @@ func (sh *streamHandler) Cleanup() { if sh.eventCh != nil { close(sh.eventCh) } - sh.closed = true } } @@ -568,7 +568,7 @@ func (c *RPCClient) Stream(filter string, ch chan<- map[string]interface{}) (Str type queryHandler struct { client *RPCClient - closed bool + closed uint32 // atomic init bool initCh chan<- error ackCh chan<- string @@ -617,7 +617,8 @@ func (qh *queryHandler) Handle(resp *responseHeader) { } func (qh *queryHandler) Cleanup() { - if !qh.closed { + closed := atomic.SwapUint32(&qh.closed, 1) + if closed == 0 { if !qh.init { qh.init = true qh.initCh <- fmt.Errorf("Stream closed") @@ -628,7 +629,6 @@ func (qh *queryHandler) Cleanup() { if qh.respCh != nil { close(qh.respCh) } - qh.closed = true } } From 7d73b27a09a6d80d515a5a83b8c2cac3eb519fa3 Mon Sep 17 00:00:00 2001 From: justin hartman Date: Mon, 8 Aug 2022 11:31:50 -0400 Subject: [PATCH 02/10] Also give the init members the same treatment, and use CompareAndSwap instead of just Swap --- client/rpc_client.go | 33 ++++++++++++--------------------- 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index 307fd8ff4..212ae65be 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -405,7 +405,7 @@ func (c *RPCClient) GetCoordinate(node string) (*coordinate.Coordinate, error) { type monitorHandler struct { client *RPCClient closed uint32 // atomic - init bool + init uint32 // atomic initCh chan<- error logCh chan<- string seq uint64 @@ -413,8 +413,7 @@ type monitorHandler struct { func (mh *monitorHandler) Handle(resp *responseHeader) { // Initialize on the first response - if !mh.init { - mh.init = true + if atomic.CompareAndSwapUint32(&mh.init, 0, 1) { mh.initCh <- strToError(resp.Error) return } @@ -434,10 +433,8 @@ func (mh *monitorHandler) Handle(resp *responseHeader) { } func (mh *monitorHandler) Cleanup() { - closed := atomic.SwapUint32(&mh.closed, 1) - if closed == 0 { - if !mh.init { - mh.init = true + if atomic.CompareAndSwapUint32(&mh.closed, 0, 1) { + if atomic.CompareAndSwapUint32(&mh.init, 0, 1) { mh.initCh <- fmt.Errorf("Stream closed") } if mh.logCh != nil { @@ -487,7 +484,7 @@ func (c *RPCClient) Monitor(level logutils.LogLevel, ch chan<- string) (StreamHa type streamHandler struct { client *RPCClient closed uint32 // atomic - init bool + init uint32 // atomic initCh chan<- error eventCh chan<- map[string]interface{} seq uint64 @@ -495,8 +492,7 @@ type streamHandler struct { func (sh *streamHandler) Handle(resp *responseHeader) { // Initialize on the first response - if !sh.init { - sh.init = true + if atomic.CompareAndSwapUint32(&sh.init, 0, 1) { sh.initCh <- strToError(resp.Error) return } @@ -516,10 +512,8 @@ func (sh *streamHandler) Handle(resp *responseHeader) { } func (sh *streamHandler) Cleanup() { - closed := atomic.SwapUint32(&sh.closed, 1) - if closed == 0 { - if !sh.init { - sh.init = true + if atomic.CompareAndSwapUint32(&sh.closed, 0, 1) { + if atomic.CompareAndSwapUint32(&sh.init, 0, 1) { sh.initCh <- fmt.Errorf("Stream closed") } if sh.eventCh != nil { @@ -569,7 +563,7 @@ func (c *RPCClient) Stream(filter string, ch chan<- map[string]interface{}) (Str type queryHandler struct { client *RPCClient closed uint32 // atomic - init bool + init uint32 // atomic initCh chan<- error ackCh chan<- string respCh chan<- NodeResponse @@ -578,8 +572,7 @@ type queryHandler struct { func (qh *queryHandler) Handle(resp *responseHeader) { // Initialize on the first response - if !qh.init { - qh.init = true + if atomic.CompareAndSwapUint32(&qh.init, 0, 1) { qh.initCh <- strToError(resp.Error) return } @@ -617,10 +610,8 @@ func (qh *queryHandler) Handle(resp *responseHeader) { } func (qh *queryHandler) Cleanup() { - closed := atomic.SwapUint32(&qh.closed, 1) - if closed == 0 { - if !qh.init { - qh.init = true + if atomic.CompareAndSwapUint32(&qh.closed, 0, 1) { + if atomic.CompareAndSwapUint32(&qh.init, 0, 1) { qh.initCh <- fmt.Errorf("Stream closed") } if qh.ackCh != nil { From a91ff5dad1359a67d14b3a6e393d134698c398bb Mon Sep 17 00:00:00 2001 From: justin hartman Date: Wed, 17 Aug 2022 07:02:27 -0400 Subject: [PATCH 03/10] Allow callers to use a custom logger when creating an RPC client --- client/rpc_client.go | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index d643ea7c9..efcf241c6 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -50,6 +50,11 @@ type Config struct { // If provided, overrides the DefaultTimeout used for // IO deadlines Timeout time.Duration + + // Logger is a custom logger which you provide. If Logger is set, it will use + // this for the internal logger. If Logger is not set, it will fall back to the + // default logger from the log package. + Logger *log.Logger } // RPCClient is used to make requests to the Agent using an RPC mechanism. @@ -65,6 +70,7 @@ type RPCClient struct { dec *codec.Decoder enc *codec.Encoder writeLock sync.Mutex + logger *log.Logger dispatch map[uint64]seqHandler dispatchLock sync.Mutex @@ -140,6 +146,10 @@ func ClientFromConfig(c *Config) (*RPCClient, error) { writer: bufio.NewWriter(conn), dispatch: make(map[uint64]seqHandler), shutdownCh: make(chan struct{}), + logger: c.Logger, + } + if client.logger == nil { + client.logger = log.Default() } client.dec = codec.NewDecoder(client.reader, &codec.MsgpackHandle{RawToString: true, WriteExt: true}) @@ -422,14 +432,14 @@ func (mh *monitorHandler) Handle(resp *responseHeader) { // Decode logs for all other responses var rec logRecord if err := mh.client.dec.Decode(&rec); err != nil { - log.Printf("[ERR] Failed to decode log: %v", err) + mh.client.logger.Printf("[ERR] Failed to decode log: %v", err) mh.client.deregisterHandler(mh.seq) return } select { case mh.logCh <- rec.Log: default: - log.Printf("[ERR] Dropping log! Monitor channel full") + mh.client.logger.Printf("[ERR] Dropping log! Monitor channel full") } } @@ -504,14 +514,14 @@ func (sh *streamHandler) Handle(resp *responseHeader) { // Decode logs for all other responses var rec map[string]interface{} if err := sh.client.dec.Decode(&rec); err != nil { - log.Printf("[ERR] Failed to decode stream record: %v", err) + sh.client.logger.Printf("[ERR] Failed to decode stream record: %v", err) sh.client.deregisterHandler(sh.seq) return } select { case sh.eventCh <- rec: default: - log.Printf("[ERR] Dropping event! Stream channel full") + sh.client.logger.Printf("[ERR] Dropping event! Stream channel full") } } @@ -587,7 +597,7 @@ func (qh *queryHandler) Handle(resp *responseHeader) { // Decode the query response var rec queryRecord if err := qh.client.dec.Decode(&rec); err != nil { - log.Printf("[ERR] Failed to decode query response: %v", err) + qh.client.logger.Printf("[ERR] Failed to decode query response: %v", err) qh.client.deregisterHandler(qh.seq) return } @@ -597,14 +607,14 @@ func (qh *queryHandler) Handle(resp *responseHeader) { select { case qh.ackCh <- rec.From: default: - log.Printf("[ERR] Dropping query ack, channel full") + qh.client.logger.Printf("[ERR] Dropping query ack, channel full") } case queryRecordResponse: select { case qh.respCh <- NodeResponse{rec.From, rec.Payload}: default: - log.Printf("[ERR] Dropping query response, channel full") + qh.client.logger.Printf("[ERR] Dropping query response, channel full") } case queryRecordDone: @@ -612,7 +622,7 @@ func (qh *queryHandler) Handle(resp *responseHeader) { qh.client.deregisterHandler(qh.seq) default: - log.Printf("[ERR] Unrecognized query record type: %s", rec.Type) + qh.client.logger.Printf("[ERR] Unrecognized query record type: %s", rec.Type) } } @@ -833,7 +843,7 @@ func (c *RPCClient) listen() { for { if err := c.dec.Decode(&respHeader); err != nil { if !c.shutdown { - log.Printf("[ERR] agent.client: Failed to decode response header: %v", err) + c.logger.Printf("[ERR] agent.client: Failed to decode response header: %v", err) } break } From 2a367e17305185be6b4dbcd7f0ff4cf9a3c31e62 Mon Sep 17 00:00:00 2001 From: justin hartman Date: Wed, 17 Aug 2022 14:48:58 -0400 Subject: [PATCH 04/10] Add mutex protection around 'closed' and response channel closing --- client/rpc_client.go | 156 +++++++++++++++++++++++++++++++++---------- 1 file changed, 121 insertions(+), 35 deletions(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index 212ae65be..f725d76e8 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -403,12 +403,20 @@ func (c *RPCClient) GetCoordinate(node string) (*coordinate.Coordinate, error) { } type monitorHandler struct { + // These fields are constant client *RPCClient - closed uint32 // atomic + seq uint64 + + // These fields relate to the initial response. Once the initial response has been received, init + // is atomically set and the initial response is put into initCh. init uint32 // atomic initCh chan<- error + + // These fields relate to whether or not the stream handler is still open and the log channel. + // The two following fields are protected by the mutex. + mtx sync.Mutex + closed bool logCh chan<- string - seq uint64 } func (mh *monitorHandler) Handle(resp *responseHeader) { @@ -418,13 +426,25 @@ func (mh *monitorHandler) Handle(resp *responseHeader) { return } - // Decode logs for all other responses + // Decode the log var rec logRecord if err := mh.client.dec.Decode(&rec); err != nil { log.Printf("[ERR] Failed to decode log: %v", err) mh.client.deregisterHandler(mh.seq) return } + + // Take the mutex for the remainder of this function to ensure safe access to member variables + mh.mtx.Lock() + defer mh.mtx.Unlock() + + // If we're closed, dump the response + if mh.closed { + log.Printf("[WARN] Dropping monitor response, handler closed") + return + } + + // Not closed, so feed the response to the log channel select { case mh.logCh <- rec.Log: default: @@ -433,14 +453,22 @@ func (mh *monitorHandler) Handle(resp *responseHeader) { } func (mh *monitorHandler) Cleanup() { - if atomic.CompareAndSwapUint32(&mh.closed, 0, 1) { - if atomic.CompareAndSwapUint32(&mh.init, 0, 1) { - mh.initCh <- fmt.Errorf("Stream closed") - } - if mh.logCh != nil { - close(mh.logCh) - } + if atomic.CompareAndSwapUint32(&mh.init, 0, 1) { + mh.initCh <- fmt.Errorf("Stream closed") + } + + mh.mtx.Lock() + defer mh.mtx.Unlock() + + if mh.closed { + return + } + + if mh.logCh != nil { + close(mh.logCh) } + + mh.closed = true } // Monitor is used to subscribe to the logs of the agent @@ -457,6 +485,7 @@ func (c *RPCClient) Monitor(level logutils.LogLevel, ch chan<- string) (StreamHa // Create a monitor handler initCh := make(chan error, 1) + defer close(initCh) handler := &monitorHandler{ client: c, initCh: initCh, @@ -482,12 +511,20 @@ func (c *RPCClient) Monitor(level logutils.LogLevel, ch chan<- string) (StreamHa } type streamHandler struct { - client *RPCClient - closed uint32 // atomic - init uint32 // atomic - initCh chan<- error + // These fields are constant + client *RPCClient + seq uint64 + + // These fields relate to the initial response. Once the initial response has been received, init + // is atomically set and the initial response is put into initCh. + init uint32 // atomic + initCh chan<- error + + // These fields relate to whether or not the stream handler is still open and the event channel. + // The two following fields are protected by the mutex. + mtx sync.Mutex + closed bool eventCh chan<- map[string]interface{} - seq uint64 } func (sh *streamHandler) Handle(resp *responseHeader) { @@ -497,13 +534,25 @@ func (sh *streamHandler) Handle(resp *responseHeader) { return } - // Decode logs for all other responses + // Decode the event var rec map[string]interface{} if err := sh.client.dec.Decode(&rec); err != nil { log.Printf("[ERR] Failed to decode stream record: %v", err) sh.client.deregisterHandler(sh.seq) return } + + // Take the mutex for the remainder of this function to ensure safe access to member variables + sh.mtx.Lock() + defer sh.mtx.Unlock() + + // If we're closed, dump the response + if sh.closed { + log.Printf("[WARN] Dropping stream response, handler closed") + return + } + + // Not closed, so feed the response to the event channel select { case sh.eventCh <- rec: default: @@ -512,14 +561,22 @@ func (sh *streamHandler) Handle(resp *responseHeader) { } func (sh *streamHandler) Cleanup() { - if atomic.CompareAndSwapUint32(&sh.closed, 0, 1) { - if atomic.CompareAndSwapUint32(&sh.init, 0, 1) { - sh.initCh <- fmt.Errorf("Stream closed") - } - if sh.eventCh != nil { - close(sh.eventCh) - } + if atomic.CompareAndSwapUint32(&sh.init, 0, 1) { + sh.initCh <- fmt.Errorf("Stream closed") + } + + sh.mtx.Lock() + defer sh.mtx.Unlock() + + if sh.closed { + return } + + if sh.eventCh != nil { + close(sh.eventCh) + } + + sh.closed = true } // Stream is used to subscribe to events @@ -536,6 +593,7 @@ func (c *RPCClient) Stream(filter string, ch chan<- map[string]interface{}) (Str // Create a monitor handler initCh := make(chan error, 1) + defer close(initCh) handler := &streamHandler{ client: c, initCh: initCh, @@ -561,13 +619,21 @@ func (c *RPCClient) Stream(filter string, ch chan<- map[string]interface{}) (Str } type queryHandler struct { + // These fields are constant client *RPCClient - closed uint32 // atomic + seq uint64 + + // These fields relate to the initial response. Once the initial response has been received, init + // is atomically set and the initial response is put into initCh. init uint32 // atomic initCh chan<- error + + // These fields relate to whether or not the query handler is still open and the ACK and response + // channels. The three following fields are protected by the mutex. + mtx sync.Mutex + closed bool ackCh chan<- string respCh chan<- NodeResponse - seq uint64 } func (qh *queryHandler) Handle(resp *responseHeader) { @@ -585,6 +651,17 @@ func (qh *queryHandler) Handle(resp *responseHeader) { return } + // Take the mutex for the remainder of this function to ensure safe access to member variables + qh.mtx.Lock() + defer qh.mtx.Unlock() + + // If we're closed, dump the response + if qh.closed { + log.Printf("[WARN] Dropping query response, handler closed") + return + } + + // Not closed, so feed the response to the appropriate channel switch rec.Type { case queryRecordAck: select { @@ -610,17 +687,25 @@ func (qh *queryHandler) Handle(resp *responseHeader) { } func (qh *queryHandler) Cleanup() { - if atomic.CompareAndSwapUint32(&qh.closed, 0, 1) { - if atomic.CompareAndSwapUint32(&qh.init, 0, 1) { - qh.initCh <- fmt.Errorf("Stream closed") - } - if qh.ackCh != nil { - close(qh.ackCh) - } - if qh.respCh != nil { - close(qh.respCh) - } + if atomic.CompareAndSwapUint32(&qh.init, 0, 1) { + qh.initCh <- fmt.Errorf("Stream closed") } + + qh.mtx.Lock() + defer qh.mtx.Unlock() + + if qh.closed { + return + } + + if qh.ackCh != nil { + close(qh.ackCh) + } + if qh.respCh != nil { + close(qh.respCh) + } + + qh.closed = true } // QueryParam is provided to query set various settings. @@ -659,6 +744,7 @@ func (c *RPCClient) Query(params *QueryParam) error { // Create a query handler initCh := make(chan error, 1) + defer close(initCh) handler := &queryHandler{ client: c, initCh: initCh, From 45ae5c05dc3b1c95ce62ccac89bce12fe6e5cf1d Mon Sep 17 00:00:00 2001 From: justin hartman Date: Wed, 17 Aug 2022 15:58:28 -0400 Subject: [PATCH 05/10] Fix mutex locking for queryHandler --- client/rpc_client.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index dc7f4818c..08847760a 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -663,10 +663,12 @@ func (qh *queryHandler) Handle(resp *responseHeader) { // Take the mutex for the remainder of this function to ensure safe access to member variables qh.mtx.Lock() - defer qh.mtx.Unlock() + // Note: not calling "defer qh.mtx.Unlock()" because we need to unlock it before calling + // deregisterHandler below. // If we're closed, dump the response if qh.closed { + qh.mtx.Unlock() qh.client.logger.Printf("[WARN] Dropping query response, handler closed") return } @@ -676,22 +678,28 @@ func (qh *queryHandler) Handle(resp *responseHeader) { case queryRecordAck: select { case qh.ackCh <- rec.From: + qh.mtx.Unlock() default: + qh.mtx.Unlock() qh.client.logger.Printf("[ERR] Dropping query ack, channel full") } case queryRecordResponse: select { case qh.respCh <- NodeResponse{rec.From, rec.Payload}: + qh.mtx.Unlock() default: + qh.mtx.Unlock() qh.client.logger.Printf("[ERR] Dropping query response, channel full") } case queryRecordDone: // No further records coming + qh.mtx.Unlock() qh.client.deregisterHandler(qh.seq) default: + qh.mtx.Unlock() qh.client.logger.Printf("[ERR] Unrecognized query record type: %s", rec.Type) } } From d468640ff5201a92cecc654848bc01217ba22a3e Mon Sep 17 00:00:00 2001 From: justin hartman Date: Tue, 23 Aug 2022 07:22:18 -0400 Subject: [PATCH 06/10] Add timeouts for Monitor, Stream, and Query requests --- client/rpc_client.go | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index 08847760a..435d5e7e6 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -2,7 +2,7 @@ package client import ( "bufio" - "fmt" + "errors" "log" "net" "sync" @@ -20,7 +20,9 @@ const ( ) var ( - clientClosed = fmt.Errorf("client closed") + errClientClosed = errors.New("client closed") + errStreamClosed = errors.New("stream closed") + errRequestTimeout = errors.New("request timeout") ) type seqCallback struct { @@ -87,7 +89,7 @@ func (c *RPCClient) send(header *requestHeader, obj interface{}) error { defer c.writeLock.Unlock() if c.shutdown { - return clientClosed + return errClientClosed } // Setup an IO deadline, this way we won't wait indefinitely @@ -464,7 +466,7 @@ func (mh *monitorHandler) Handle(resp *responseHeader) { func (mh *monitorHandler) Cleanup() { if atomic.CompareAndSwapUint32(&mh.init, 0, 1) { - mh.initCh <- fmt.Errorf("Stream closed") + mh.initCh <- errStreamClosed } mh.mtx.Lock() @@ -516,7 +518,9 @@ func (c *RPCClient) Monitor(level logutils.LogLevel, ch chan<- string) (StreamHa return StreamHandle(seq), err case <-c.shutdownCh: c.deregisterHandler(seq) - return 0, clientClosed + return 0, errClientClosed + case <-time.After(c.timeout): + return 0, errRequestTimeout } } @@ -572,7 +576,7 @@ func (sh *streamHandler) Handle(resp *responseHeader) { func (sh *streamHandler) Cleanup() { if atomic.CompareAndSwapUint32(&sh.init, 0, 1) { - sh.initCh <- fmt.Errorf("Stream closed") + sh.initCh <- errStreamClosed } sh.mtx.Lock() @@ -624,7 +628,9 @@ func (c *RPCClient) Stream(filter string, ch chan<- map[string]interface{}) (Str return StreamHandle(seq), err case <-c.shutdownCh: c.deregisterHandler(seq) - return 0, clientClosed + return 0, errClientClosed + case <-time.After(c.timeout): + return 0, errRequestTimeout } } @@ -706,7 +712,7 @@ func (qh *queryHandler) Handle(resp *responseHeader) { func (qh *queryHandler) Cleanup() { if atomic.CompareAndSwapUint32(&qh.init, 0, 1) { - qh.initCh <- fmt.Errorf("Stream closed") + qh.initCh <- errStreamClosed } qh.mtx.Lock() @@ -784,7 +790,9 @@ func (c *RPCClient) Query(params *QueryParam) error { return err case <-c.shutdownCh: c.deregisterHandler(seq) - return clientClosed + return errClientClosed + case <-time.After(c.timeout): + return errRequestTimeout } } @@ -860,14 +868,14 @@ func (c *RPCClient) genericRPC(header *requestHeader, req interface{}, resp inte case err := <-errCh: return err case <-c.shutdownCh: - return clientClosed + return errClientClosed } } // strToError converts a string to an error if not blank func strToError(s string) error { if s != "" { - return fmt.Errorf(s) + return errors.New(s) } return nil } From 98a78be59d3cf96a245025fc99ed43189e37bec7 Mon Sep 17 00:00:00 2001 From: justin hartman Date: Tue, 23 Aug 2022 08:58:08 -0400 Subject: [PATCH 07/10] Also want to deregister the handler --- client/rpc_client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/client/rpc_client.go b/client/rpc_client.go index 435d5e7e6..5f9411702 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -520,6 +520,7 @@ func (c *RPCClient) Monitor(level logutils.LogLevel, ch chan<- string) (StreamHa c.deregisterHandler(seq) return 0, errClientClosed case <-time.After(c.timeout): + c.deregisterHandler(seq) return 0, errRequestTimeout } } @@ -630,6 +631,7 @@ func (c *RPCClient) Stream(filter string, ch chan<- map[string]interface{}) (Str c.deregisterHandler(seq) return 0, errClientClosed case <-time.After(c.timeout): + c.deregisterHandler(seq) return 0, errRequestTimeout } } @@ -792,6 +794,7 @@ func (c *RPCClient) Query(params *QueryParam) error { c.deregisterHandler(seq) return errClientClosed case <-time.After(c.timeout): + c.deregisterHandler(seq) return errRequestTimeout } } From 63a526e6458633f03b17600e8f6abcee83be13da Mon Sep 17 00:00:00 2001 From: justin hartman Date: Tue, 23 Aug 2022 09:04:16 -0400 Subject: [PATCH 08/10] Use the lower of the client and query parameters timeout for Query --- client/rpc_client.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index 5f9411702..55260f3db 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -786,6 +786,12 @@ func (c *RPCClient) Query(params *QueryParam) error { return err } + // Use the lower of either the channel timeout of the query params timeout (if provided) + timeout := c.timeout + if params.Timeout != 0 && params.Timeout < timeout { + timeout = params.Timeout + } + // Wait for a response select { case err := <-initCh: @@ -793,7 +799,7 @@ func (c *RPCClient) Query(params *QueryParam) error { case <-c.shutdownCh: c.deregisterHandler(seq) return errClientClosed - case <-time.After(c.timeout): + case <-time.After(timeout): c.deregisterHandler(seq) return errRequestTimeout } From 7f9fbeb36d8ef47af43a63bb118e4907ab11e34c Mon Sep 17 00:00:00 2001 From: justin hartman Date: Tue, 23 Aug 2022 11:05:03 -0400 Subject: [PATCH 09/10] Ok, try something else --- client/rpc_client.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index 55260f3db..ee0213737 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -671,12 +671,10 @@ func (qh *queryHandler) Handle(resp *responseHeader) { // Take the mutex for the remainder of this function to ensure safe access to member variables qh.mtx.Lock() - // Note: not calling "defer qh.mtx.Unlock()" because we need to unlock it before calling - // deregisterHandler below. + defer qh.mtx.Unlock() // If we're closed, dump the response if qh.closed { - qh.mtx.Unlock() qh.client.logger.Printf("[WARN] Dropping query response, handler closed") return } @@ -686,28 +684,26 @@ func (qh *queryHandler) Handle(resp *responseHeader) { case queryRecordAck: select { case qh.ackCh <- rec.From: - qh.mtx.Unlock() default: - qh.mtx.Unlock() qh.client.logger.Printf("[ERR] Dropping query ack, channel full") } case queryRecordResponse: select { case qh.respCh <- NodeResponse{rec.From, rec.Payload}: - qh.mtx.Unlock() default: - qh.mtx.Unlock() qh.client.logger.Printf("[ERR] Dropping query response, channel full") } case queryRecordDone: // No further records coming + // XXX: Need to unlock around this call! qh.mtx.Unlock() qh.client.deregisterHandler(qh.seq) + // XXX: Re-locking so the defer qh.mtx.Unlock() above works correctly + qh.mtx.Lock() default: - qh.mtx.Unlock() qh.client.logger.Printf("[ERR] Unrecognized query record type: %s", rec.Type) } } @@ -897,12 +893,13 @@ func (c *RPCClient) getSeq() uint64 { // deregisterAll is used to deregister all handlers func (c *RPCClient) deregisterAll() { c.dispatchLock.Lock() - defer c.dispatchLock.Unlock() + dispatch := c.dispatch + c.dispatch = make(map[uint64]seqHandler) + c.dispatchLock.Unlock() - for _, seqH := range c.dispatch { + for _, seqH := range dispatch { seqH.Cleanup() } - c.dispatch = make(map[uint64]seqHandler) } // deregisterHandler is used to deregister a handler From c338aef2fdefba901d7edadeca2ae99ca97ae28c Mon Sep 17 00:00:00 2001 From: justin hartman Date: Tue, 23 Aug 2022 20:38:41 -0400 Subject: [PATCH 10/10] Minor refactoring for comments and readability --- client/rpc_client.go | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index ee0213737..1882a310e 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -669,9 +669,26 @@ func (qh *queryHandler) Handle(resp *responseHeader) { return } - // Take the mutex for the remainder of this function to ensure safe access to member variables - qh.mtx.Lock() - defer qh.mtx.Unlock() + // We want to "defer qh.mtx.Unlock()" after locking, but we need to unlock before calling + // deregisterHandler below; so this variable and these helper functions allow us to "unlock" + // multiple times -- one in a defer, and one manually before deregistering the handler. + locked := false + lockSafely := func() { + if !locked { + qh.mtx.Lock() + locked = true + } + } + unlockSafely := func() { + if locked { + qh.mtx.Unlock() + locked = false + } + } + + // Lock the mutex for the remainder of this function to ensure safe access to member variables + lockSafely() + defer unlockSafely() // If we're closed, dump the response if qh.closed { @@ -697,11 +714,10 @@ func (qh *queryHandler) Handle(resp *responseHeader) { case queryRecordDone: // No further records coming - // XXX: Need to unlock around this call! - qh.mtx.Unlock() + // XXX: We need to unlock the mutex before calling deregisterHandler, as it will call Cleanup, + // which wants to lock the mutex! + unlockSafely() qh.client.deregisterHandler(qh.seq) - // XXX: Re-locking so the defer qh.mtx.Unlock() above works correctly - qh.mtx.Lock() default: qh.client.logger.Printf("[ERR] Unrecognized query record type: %s", rec.Type)