Skip to content

Commit

Permalink
Temporarily serialize requests in the LSP, and fix the bugs that doin…
Browse files Browse the repository at this point in the history
…g so revealed (#3370)
  • Loading branch information
mcy authored Oct 11, 2024
1 parent f19fcb3 commit 71e170e
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 380 deletions.
4 changes: 2 additions & 2 deletions private/buf/bufcli/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ func CreateWasmRuntimeCacheDir(container appext.Container) (string, error) {
return fullCacheDirPath, nil
}

// newWKTStore returns a new bufwktstore.Store while creating the required cache directories.
func newWKTStore(container appext.Container) (bufwktstore.Store, error) {
// NewWKTStore returns a new bufwktstore.Store while creating the required cache directories.
func NewWKTStore(container appext.Container) (bufwktstore.Store, error) {
if err := createCacheDir(container.CacheDirPath(), v3CacheWKTRelDirPath); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion private/buf/bufcli/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewController(
if err != nil {
return nil, err
}
wktStore, err := newWKTStore(container)
wktStore, err := NewWKTStore(container)
if err != nil {
return nil, err
}
Expand Down
39 changes: 10 additions & 29 deletions private/buf/buflsp/buflsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
"context"
"fmt"
"log/slog"
"sync"
"sync/atomic"

"github.com/bufbuild/buf/private/buf/bufctl"
"github.com/bufbuild/buf/private/bufpkg/bufcheck"
"github.com/bufbuild/buf/private/bufpkg/bufimage"
"github.com/bufbuild/buf/private/pkg/app/appext"
"github.com/bufbuild/buf/private/pkg/slogext"
"github.com/bufbuild/buf/private/pkg/storage"
Expand All @@ -40,6 +40,7 @@ import (
// Returns a context for managing the server.
func Serve(
ctx context.Context,
wktBucket storage.ReadBucket,
container appext.Container,
controller bufctl.Controller,
checkClient bufcheck.Client,
Expand Down Expand Up @@ -68,6 +69,7 @@ func Serve(
controller: controller,
checkClient: checkClient,
rootBucket: bucket,
wktBucket: wktBucket,
}
lsp.fileManager = newFileManager(lsp)
off := protocol.TraceOff
Expand Down Expand Up @@ -96,6 +98,10 @@ type lsp struct {
rootBucket storage.ReadBucket
fileManager *fileManager

wktBucket storage.ReadBucket

lock sync.Mutex

// These are atomics, because they are read often and written to
// almost never, but potentially concurrently. Having them side-by-side
// is fine; they are almost never written to so false sharing is not a
Expand All @@ -107,7 +113,7 @@ type lsp struct {
// init performs *actual* initialization of the server. This is called by Initialize().
//
// It may only be called once for a given server.
func (l *lsp) init(params *protocol.InitializeParams) error {
func (l *lsp) init(_ context.Context, params *protocol.InitializeParams) error {
if l.initParams.Load() != nil {
return fmt.Errorf("called the %q method more than once", protocol.MethodInitialize)
}
Expand All @@ -120,40 +126,13 @@ func (l *lsp) init(params *protocol.InitializeParams) error {
return nil
}

// findImportable finds all files that can potentially be imported by the proto file at
// uri. This returns a map from potential Protobuf import path to the URI of the file it would import.
//
// Note that this performs no validation on these files, because those files might be open in the
// editor and might contain invalid syntax at the moment. We only want to get their paths and nothing
// more.
func (l *lsp) findImportable(
ctx context.Context,
uri protocol.URI,
) (map[string]bufimage.ImageFileInfo, error) {
fileInfos, err := l.controller.GetImportableImageFileInfos(ctx, uri.Filename())
if err != nil {
return nil, err
}

imports := make(map[string]bufimage.ImageFileInfo)
for _, fileInfo := range fileInfos {
imports[fileInfo.Path()] = fileInfo
}

l.logger.DebugContext(ctx, fmt.Sprintf("found imports for %q: %#v", uri, imports))

return imports, nil
}

// newHandler constructs an RPC handler that wraps the default one from jsonrpc2. This allows us
// to inject debug logging, tracing, and timeouts to requests.
func (l *lsp) newHandler() jsonrpc2.Handler {
actual := protocol.ServerHandler(newServer(l), nil)
return func(ctx context.Context, reply jsonrpc2.Replier, req jsonrpc2.Request) (retErr error) {
defer slogext.DebugProfile(l.logger, slog.String("method", req.Method()), slog.Any("params", req.Params()))()

ctx = withRequestID(ctx)

replier := l.wrapReplier(reply, req)

// Verify that the server has been initialized if this isn't the initialization
Expand All @@ -162,6 +141,8 @@ func (l *lsp) newHandler() jsonrpc2.Handler {
return replier(ctx, nil, fmt.Errorf("the first call to the server must be the %q method", protocol.MethodInitialize))
}

l.lock.Lock()
defer l.lock.Unlock()
return actual(ctx, replier, req)
}
}
Loading

0 comments on commit 71e170e

Please sign in to comment.