Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Sep 26, 2024
1 parent 4aed025 commit 7b71cc1
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 13 deletions.
19 changes: 9 additions & 10 deletions mongo/client_bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,14 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
getNsIndex := func(namespace string) int {
if v, ok := nsMap[namespace]; ok {
return v
} else {
nsIdx := len(nsList)
nsMap[namespace] = nsIdx
nsList = append(nsList, namespace)
return nsIdx
}
nsIdx := len(nsList)
nsMap[namespace] = nsIdx
nsList = append(nsList, namespace)
return nsIdx
}
resMap := make([]interface{}, len(bw.models))
insIdMap := make(map[int]interface{})
insIDMap := make(map[int]interface{})
for i, v := range bw.models {
var doc bsoncore.Document
var err error
Expand All @@ -71,7 +70,7 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
if err != nil {
break
}
insIdMap[i] = id
insIDMap[i] = id
case *ClientUpdateOneModel:
nsIdx = getNsIndex(model.Namespace)
if bw.result.UpdateResults == nil {
Expand Down Expand Up @@ -203,7 +202,7 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
return err
}
case map[int64]ClientInsertResult:
if err = appendInsertResult(cur, res, insIdMap); err != nil {
if err = appendInsertResult(cur, res, insIDMap); err != nil {
return err
}
case map[int64]ClientUpdateResult:
Expand Down Expand Up @@ -413,7 +412,7 @@ func appendUpdateResult(cur bson.Raw, m map[int64]ClientUpdateResult) error {
N int32
NModified int32
Upserted struct {
Id interface{} `bson:"_id"`
ID interface{} `bson:"_id"`
}
}
if err := bson.Unmarshal(cur, &res); err != nil {
Expand All @@ -422,7 +421,7 @@ func appendUpdateResult(cur bson.Raw, m map[int64]ClientUpdateResult) error {
m[int64(res.Idx)] = ClientUpdateResult{
MatchedCount: int64(res.N),
ModifiedCount: int64(res.NModified),
UpsertedID: res.Upserted.Id,
UpsertedID: res.Upserted.ID,
}
return nil
}
6 changes: 6 additions & 0 deletions mongo/client_bulk_write_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type ClientWriteModels struct {
models []interface{}
}

// AppendInsertOne appends ClientInsertOneModels.
func (m *ClientWriteModels) AppendInsertOne(models ...*ClientInsertOneModel) *ClientWriteModels {
if m == nil {
m = &ClientWriteModels{}
Expand All @@ -25,6 +26,7 @@ func (m *ClientWriteModels) AppendInsertOne(models ...*ClientInsertOneModel) *Cl
return m
}

// AppendUpdateOne appends ClientUpdateOneModels.
func (m *ClientWriteModels) AppendUpdateOne(models ...*ClientUpdateOneModel) *ClientWriteModels {
if m == nil {
m = &ClientWriteModels{}
Expand All @@ -35,6 +37,7 @@ func (m *ClientWriteModels) AppendUpdateOne(models ...*ClientUpdateOneModel) *Cl
return m
}

// AppendUpdateMany appends ClientUpdateManyModels.
func (m *ClientWriteModels) AppendUpdateMany(models ...*ClientUpdateManyModel) *ClientWriteModels {
if m == nil {
m = &ClientWriteModels{}
Expand All @@ -45,6 +48,7 @@ func (m *ClientWriteModels) AppendUpdateMany(models ...*ClientUpdateManyModel) *
return m
}

// AppendReplaceOne appends ClientReplaceOneModels.
func (m *ClientWriteModels) AppendReplaceOne(models ...*ClientReplaceOneModel) *ClientWriteModels {
if m == nil {
m = &ClientWriteModels{}
Expand All @@ -55,6 +59,7 @@ func (m *ClientWriteModels) AppendReplaceOne(models ...*ClientReplaceOneModel) *
return m
}

// AppendDeleteOne appends ClientDeleteOneModels.
func (m *ClientWriteModels) AppendDeleteOne(models ...*ClientDeleteOneModel) *ClientWriteModels {
if m == nil {
m = &ClientWriteModels{}
Expand All @@ -65,6 +70,7 @@ func (m *ClientWriteModels) AppendDeleteOne(models ...*ClientDeleteOneModel) *Cl
return m
}

// AppendDeleteMany appends ClientDeleteManyModels.
func (m *ClientWriteModels) AppendDeleteMany(models ...*ClientDeleteManyModel) *ClientWriteModels {
if m == nil {
m = &ClientWriteModels{}
Expand Down
2 changes: 1 addition & 1 deletion mongo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func (bwe ClientBulkWriteException) Error() string {
if len(bwe.WriteErrors) > 0 {
errs := make([]error, 0, len(bwe.WriteErrors))
for _, v := range bwe.WriteErrors {
errs = append(errs, &v)
errs = append(errs, v)
}
causes = append(causes, "write errors: "+joinBatchErrors(errs))
}
Expand Down
5 changes: 4 additions & 1 deletion mongo/integration/unified/client_operation_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ func executeClientBulkWrite(ctx context.Context, operation *operation) (*operati
opts.SetVerboseResults(val.Boolean())
case "writeConcern":
var wc writeConcern
bson.Unmarshal(val.Value, &wc)
err := bson.Unmarshal(val.Value, &wc)
if err != nil {
return nil, err
}
c, err := wc.toWriteConcernOption()
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion mongo/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
)

// BulkWriteResult is the result type returned by a client-level BulkWrite operation.
// ClientBulkWriteResult is the result type returned by a client-level BulkWrite operation.
type ClientBulkWriteResult struct {
// The number of documents inserted.
InsertedCount int64
Expand Down
1 change: 1 addition & 0 deletions x/mongo/driver/operation/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func NewCommand(command bsoncore.Document) *Command {
}
}

// NewCommandFn constructs and returns a new Command.
func NewCommandFn(commandFn func([]byte, description.SelectedServer) ([]byte, error)) *Command {
return &Command{
commandFn: commandFn,
Expand Down

0 comments on commit 7b71cc1

Please sign in to comment.