Skip to content

Commit

Permalink
feat: bulk insert
Browse files Browse the repository at this point in the history
  • Loading branch information
yzqzss committed Jun 16, 2024
1 parent 72ee938 commit 625250b
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 0 deletions.
123 changes: 123 additions & 0 deletions api_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
Expand Down Expand Up @@ -194,6 +196,127 @@ func v1_update_task(c *gin.Context) {
})
}

type Item struct {
Item_id string `json:"item_id"`
Item_id_type string `json:"item_id_type"`
Item_status string `json:"item_status"`
Item_status_type string `json:"item_status_type"`
Payload string `json:"payload"`
}

func v1_insert_many(c *gin.Context) {
identifier := c.Param("identifier")
client_version := c.Param("client_version")
archivist := c.Param("archivist")
top_items_rawString := c.PostForm("items")

if is_safe_sting(identifier) && is_safe_sting(archivist) {
// OK
} else {
c.JSON(400, gin.H{"error": "Invalid parameter or query string"})
return
}

project := GetProject(identifier)
if project == nil {
c.JSON(404, gin.H{
"error": fmt.Sprintf("Project %s not found", identifier),
})
return
}
if client_version != project.Client.Version {
c.JSON(400, gin.H{
"error": "Client version not supported",
"msg": fmt.Sprintf("Please update to version %s", project.Client.Version),
})
return
}

db := mongoClient.Database(project.Mongodb.DbName)
item_collection := db.Collection(project.Mongodb.ItemCollection)

topItems := []Item{}
err := json.Unmarshal([]byte(top_items_rawString), &topItems)
if err != nil {
c.JSON(400, gin.H{"error": "Invalid JSON items"})
panic(err)
}

var doc_id_name string
if project.Mongodb.CustomDocIDName != "" {
doc_id_name = project.Mongodb.CustomDocIDName
} else {
doc_id_name = DEFAULT_DOC_ID_NAME
}

documents := []interface{}{}

for _, item := range topItems {
document := bson.M{}
// id
if item.Item_id_type == "str" {
document[doc_id_name] = item.Item_id
} else if item.Item_id_type == "int" {
item_id_int, err := strconv.ParseInt(item.Item_id, 10, 64)
if err != nil {
c.JSON(400, gin.H{"error": "Invalid item_id"})
return
}
document[doc_id_name] = item_id_int
} else {
c.JSON(400, gin.H{"error": "Invalid task_id_type"})
return
}
// status
if item.Item_status_type == "str" {
document["status"] = item.Item_status
} else if item.Item_status_type == "int" {
status, err := strconv.ParseInt(item.Item_status, 10, 64)
if err != nil {
c.JSON(400, gin.H{"error": "Invalid item_status"})
return
}
document["status"] = status
} else if item.Item_status_type == "None" {
document["status"] = nil
} else {
c.JSON(400, gin.H{"error": "Invalid status_type"})
return
}
// payload
var payload_BSON primitive.M
err := bson.UnmarshalExtJSON([]byte(item.Payload), true, &payload_BSON)
if err != nil {
c.JSON(400, gin.H{"error": "Invalid JSON payload"})
panic(err)
}
document["payload"] = payload_BSON

documents = append(documents, document)
}

// do insert, sorted=false
opt := options.InsertMany().SetOrdered(false)
result, err := item_collection.InsertMany(context.TODO(), documents, opt)
// if err is BulkWriteException
if err != nil {
if !errors.As(err, &mongo.BulkWriteException{}) {
c.JSON(500, gin.H{"error": "Failed to insert items"})
panic(err)
}
// BulkWriteException is expected
}

bulkWriteException, _ := err.(mongo.BulkWriteException)
c.JSON(200, gin.H{
"InsertedIDs": result.InsertedIDs,
"msg": "Items bulk insert actions done successfully",
"WriteErrors": len(bulkWriteException.WriteErrors),
"Labels": len(bulkWriteException.Labels),
"WriteConcernError": bulkWriteException.WriteConcernError,
})
}

func v1_insert_item(c *gin.Context) {
identifier := c.Param("identifier")
client_version := c.Param("client_version")
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func main() {
v1_tracker.POST("/project/:identifier/:client_version/:archivist/claim_task", v1_claim_task)
v1_tracker.POST("/project/:identifier/:client_version/:archivist/update_task/:task_id", v1_update_task)
v1_tracker.POST("/project/:identifier/:client_version/:archivist/insert_item/:item_id", v1_insert_item)
v1_tracker.POST("/project/:identifier/:client_version/:archivist/insert_many/:size", v1_insert_many) // :size unused
}
r.Run()
}

0 comments on commit 625250b

Please sign in to comment.