Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions mongo/bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,54 +168,64 @@ func (bw *bulkWrite) runBatch(ctx context.Context, batch bulkWriteBatch) (BulkWr
return batchRes, batchErr, nil
}

func (bw *bulkWrite) runInsert(ctx context.Context, batch bulkWriteBatch) (operation.InsertResult, error) {
func (bw *bulkWrite) runInsert(ctx context.Context, batch bulkWriteBatch) (insertResult, error) {
docs := make([]bsoncore.Document, len(batch.models))
for i, model := range batch.models {
converted := model.(*InsertOneModel)
doc, err := marshal(converted.Document, bw.collection.bsonOpts, bw.collection.registry)
if err != nil {
return operation.InsertResult{}, err
return insertResult{}, err
}
doc, _, err = ensureID(doc, bson.NilObjectID, bw.collection.bsonOpts, bw.collection.registry)
if err != nil {
return operation.InsertResult{}, err
return insertResult{}, err
}

docs[i] = doc
}

op := operation.NewInsert(docs...).
Session(bw.session).WriteConcern(bw.writeConcern).CommandMonitor(bw.collection.client.monitor).
ServerSelector(bw.selector).ClusterClock(bw.collection.client.clock).
Database(bw.collection.db.name).Collection(bw.collection.name).
Deployment(bw.collection.client.deployment).Crypt(bw.collection.client.cryptFLE).
ServerAPI(bw.collection.client.serverAPI).Timeout(bw.collection.client.timeout).
Logger(bw.collection.client.logger).Authenticator(bw.collection.client.authenticator)
op := insert{
documents: docs,
session: bw.session,
writeConcern: bw.writeConcern,
monitor: bw.collection.client.monitor,
selector: bw.selector,
clock: bw.collection.client.clock,
database: bw.collection.db.name,
collection: bw.collection.name,
deployment: bw.collection.client.deployment,
crypt: bw.collection.client.cryptFLE,
serverAPI: bw.collection.client.serverAPI,
timeout: bw.collection.client.timeout,
logger: bw.collection.client.logger,
authenticator: bw.collection.client.authenticator,
}

if bw.comment != nil {
comment, err := marshalValue(bw.comment, bw.collection.bsonOpts, bw.collection.registry)
if err != nil {
return op.Result(), err
}
op.Comment(comment)
op.comment = comment
}
if bw.bypassDocumentValidation != nil && *bw.bypassDocumentValidation {
op = op.BypassDocumentValidation(*bw.bypassDocumentValidation)
op.bypassDocumentValidation = bw.bypassDocumentValidation
}
if bw.ordered != nil {
op = op.Ordered(*bw.ordered)
op.ordered = bw.ordered
}

retry := driver.RetryNone
if bw.collection.client.retryWrites && batch.canRetry {
retry = driver.RetryOncePerCommand
}
op = op.Retry(retry)
op.retry = &retry

if bw.rawData != nil {
op.RawData(*bw.rawData)
op.rawData = bw.rawData
}
if len(bw.additionalCmd) > 0 {
op.AdditionalCmd(bw.additionalCmd)
op.additionalCmd = bw.additionalCmd
}

err := op.Execute(ctx)
Expand Down
36 changes: 24 additions & 12 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.mongodb.org/mongo-driver/v2/internal/csfle"
"go.mongodb.org/mongo-driver/v2/internal/mongoutil"
"go.mongodb.org/mongo-driver/v2/internal/optionsutil"
"go.mongodb.org/mongo-driver/v2/internal/ptrutil"
"go.mongodb.org/mongo-driver/v2/internal/serverselector"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.mongodb.org/mongo-driver/v2/mongo/readconcern"
Expand Down Expand Up @@ -306,42 +307,53 @@ func (coll *Collection) insert(

selector := makePinnedSelector(sess, coll.writeSelector)

op := operation.NewInsert(docs...).
Session(sess).WriteConcern(wc).CommandMonitor(coll.client.monitor).
ServerSelector(selector).ClusterClock(coll.client.clock).
Database(coll.db.name).Collection(coll.name).
Deployment(coll.client.deployment).Crypt(coll.client.cryptFLE).Ordered(true).
ServerAPI(coll.client.serverAPI).Timeout(coll.client.timeout).Logger(coll.client.logger).Authenticator(coll.client.authenticator)
op := insert{
documents: docs,
session: sess,
writeConcern: wc,
monitor: coll.client.monitor,
selector: selector,
clock: coll.client.clock,
database: coll.db.name,
collection: coll.name,
deployment: coll.client.deployment,
crypt: coll.client.cryptFLE,
ordered: ptrutil.Ptr(true),
serverAPI: coll.client.serverAPI,
timeout: coll.client.timeout,
logger: coll.client.logger,
authenticator: coll.client.authenticator,
}

args, err := mongoutil.NewOptions[options.InsertManyOptions](opts...)
if err != nil {
return nil, fmt.Errorf("failed to construct options from builder: %w", err)
}

if args.BypassDocumentValidation != nil && *args.BypassDocumentValidation {
op = op.BypassDocumentValidation(*args.BypassDocumentValidation)
op.bypassDocumentValidation = args.BypassDocumentValidation
}
if args.Comment != nil {
comment, err := marshalValue(args.Comment, coll.bsonOpts, coll.registry)
if err != nil {
return nil, err
}
op = op.Comment(comment)
op.comment = comment
}
if args.Ordered != nil {
op = op.Ordered(*args.Ordered)
op.ordered = args.Ordered
}
if rawData, ok := optionsutil.Value(args.Internal, "rawData").(bool); ok {
op = op.RawData(rawData)
op.rawData = &rawData
}
if additionalCmd, ok := optionsutil.Value(args.Internal, "addCommandFields").(bson.D); ok {
op = op.AdditionalCmd(additionalCmd)
op.additionalCmd = additionalCmd
}
retry := driver.RetryNone
if coll.client.retryWrites {
retry = driver.RetryOncePerCommand
}
op = op.Retry(retry)
op.retry = &retry

err = op.Execute(ctx)
var wce driver.WriteCommandError
Expand Down
143 changes: 143 additions & 0 deletions mongo/insert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright (C) MongoDB, Inc. 2019-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package mongo

import (
"context"
"errors"
"fmt"
"time"

"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/event"
"go.mongodb.org/mongo-driver/v2/internal/driverutil"
"go.mongodb.org/mongo-driver/v2/internal/logger"
"go.mongodb.org/mongo-driver/v2/mongo/writeconcern"
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/session"
)

// insert performs an insert operation.
type insert struct {
authenticator driver.Authenticator
bypassDocumentValidation *bool
comment bsoncore.Value
documents []bsoncore.Document
ordered *bool
session *session.Client
clock *session.ClusterClock
collection string
monitor *event.CommandMonitor
crypt driver.Crypt
database string
deployment driver.Deployment
selector description.ServerSelector
writeConcern *writeconcern.WriteConcern
retry *driver.RetryMode
result insertResult
serverAPI *driver.ServerAPIOptions
timeout *time.Duration
rawData *bool
additionalCmd bson.D
logger *logger.Logger
}

// insertResult represents an insert result returned by the server.
type insertResult struct {
// Number of documents successfully inserted.
N int64
}

func buildInsertResult(response bsoncore.Document) (insertResult, error) {
elements, err := response.Elements()
if err != nil {
return insertResult{}, err
}
ir := insertResult{}
for _, element := range elements {
if element.Key() == "n" {
var ok bool
ir.N, ok = element.Value().AsInt64OK()
if !ok {
return ir, fmt.Errorf("response field 'n' is type int32 or int64, but received BSON type %s", element.Value().Type)
}
}
}
return ir, nil
}

// Result returns the result of executing this operation.
func (i *insert) Result() insertResult { return i.result }

func (i *insert) processResponse(_ context.Context, resp bsoncore.Document, _ driver.ResponseInfo) error {
ir, err := buildInsertResult(resp)
i.result.N += ir.N
return err
}

// Execute runs this operations and returns an error if the operation did not execute successfully.
func (i *insert) Execute(ctx context.Context) error {
if i.deployment == nil {
return errors.New("the Insert operation must have a Deployment set before Execute can be called")
}
batches := &driver.Batches{
Identifier: "documents",
Documents: i.documents,
Ordered: i.ordered,
}

return driver.Operation{
CommandFn: i.command,
ProcessResponseFn: i.processResponse,
Batches: batches,
RetryMode: i.retry,
Type: driver.Write,
Client: i.session,
Clock: i.clock,
CommandMonitor: i.monitor,
Crypt: i.crypt,
Database: i.database,
Deployment: i.deployment,
Selector: i.selector,
WriteConcern: i.writeConcern,
ServerAPI: i.serverAPI,
Timeout: i.timeout,
Logger: i.logger,
Name: driverutil.InsertOp,
Authenticator: i.authenticator,
}.Execute(ctx)

}

func (i *insert) command(dst []byte, desc description.SelectedServer) ([]byte, error) {
dst = bsoncore.AppendStringElement(dst, "insert", i.collection)
if i.bypassDocumentValidation != nil && (desc.WireVersion != nil &&
driverutil.VersionRangeIncludes(*desc.WireVersion, 4)) {

dst = bsoncore.AppendBooleanElement(dst, "bypassDocumentValidation", *i.bypassDocumentValidation)
}
if i.comment.Type != bsoncore.Type(0) {
dst = bsoncore.AppendValueElement(dst, "comment", i.comment)
}
if i.ordered != nil {
dst = bsoncore.AppendBooleanElement(dst, "ordered", *i.ordered)
}
// Set rawData for 8.2+ servers.
if i.rawData != nil && desc.WireVersion != nil && driverutil.VersionRangeIncludes(*desc.WireVersion, 27) {
dst = bsoncore.AppendBooleanElement(dst, "rawData", *i.rawData)
}
if len(i.additionalCmd) > 0 {
doc, err := bson.Marshal(i.additionalCmd)
if err != nil {
return nil, err
}
dst = append(dst, doc[4:len(doc)-1]...)
}
return dst, nil
}
56 changes: 0 additions & 56 deletions x/mongo/driver/integration/insert_test.go

This file was deleted.

24 changes: 17 additions & 7 deletions x/mongo/driver/integration/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"go.mongodb.org/mongo-driver/v2/internal/integtest"
"go.mongodb.org/mongo-driver/v2/internal/require"
"go.mongodb.org/mongo-driver/v2/internal/serverselector"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.mongodb.org/mongo-driver/v2/mongo/writeconcern"
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
Expand Down Expand Up @@ -145,12 +147,20 @@ func autoInsertDocs(t *testing.T, writeConcern *writeconcern.WriteConcern, docs

// insertDocs inserts the docs into the test cluster.
func insertDocs(t *testing.T, dbname, colname string, writeConcern *writeconcern.WriteConcern, docs ...bsoncore.Document) {
err := operation.NewInsert(docs...).
Collection(colname).
Database(dbname).
Deployment(integtest.Topology(t)).
ServerSelector(&serverselector.Write{}).
WriteConcern(writeConcern).
Execute(context.Background())
t.Helper()

// The initial call to integtest.Topology drops the database used by the
// tests, so we have to call it here first to prevent the existing test code
// from dropping the database after we've inserted data.
integtest.Topology(t)

client, err := mongo.Connect(options.Client().ApplyURI(connectionString.Original).SetWriteConcern(writeConcern))
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mongo.Connect call does not specify a context parameter. While this may work with the current API, it's better practice to pass context.Background() or another appropriate context to ensure proper cancellation and timeout behavior.

Copilot uses AI. Check for mistakes.
require.NoError(t, err)
defer func() {
_ = client.Disconnect(context.Background())
}()

coll := client.Database(dbname).Collection(colname)
_, err = coll.InsertMany(context.Background(), docs)
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InsertMany expects a slice of interface{} but docs is []bsoncore.Document. This type mismatch will cause a compilation error. The documents should be converted to []interface{} or the function signature should be adjusted to accept the correct type.

Suggested change
_, err = coll.InsertMany(context.Background(), docs)
// Convert []bsoncore.Document to []interface{} for InsertMany
docsInterface := make([]interface{}, len(docs))
for i, doc := range docs {
docsInterface[i] = doc
}
_, err = coll.InsertMany(context.Background(), docsInterface)

Copilot uses AI. Check for mistakes.
require.NoError(t, err)
}
Loading
Loading