diff --git a/mongo/bulk_write.go b/mongo/bulk_write.go index ca683c1dc7..d87e2cb156 100644 --- a/mongo/bulk_write.go +++ b/mongo/bulk_write.go @@ -168,54 +168,64 @@ func (bw *bulkWrite) runBatch(ctx context.Context, batch bulkWriteBatch) (BulkWr return batchRes, batchErr, nil } -func (bw *bulkWrite) runInsert(ctx context.Context, batch bulkWriteBatch) (operation.InsertResult, error) { +func (bw *bulkWrite) runInsert(ctx context.Context, batch bulkWriteBatch) (insertResult, error) { docs := make([]bsoncore.Document, len(batch.models)) for i, model := range batch.models { converted := model.(*InsertOneModel) doc, err := marshal(converted.Document, bw.collection.bsonOpts, bw.collection.registry) if err != nil { - return operation.InsertResult{}, err + return insertResult{}, err } doc, _, err = ensureID(doc, bson.NilObjectID, bw.collection.bsonOpts, bw.collection.registry) if err != nil { - return operation.InsertResult{}, err + return insertResult{}, err } docs[i] = doc } - op := operation.NewInsert(docs...). - Session(bw.session).WriteConcern(bw.writeConcern).CommandMonitor(bw.collection.client.monitor). - ServerSelector(bw.selector).ClusterClock(bw.collection.client.clock). - Database(bw.collection.db.name).Collection(bw.collection.name). - Deployment(bw.collection.client.deployment).Crypt(bw.collection.client.cryptFLE). - ServerAPI(bw.collection.client.serverAPI).Timeout(bw.collection.client.timeout). - Logger(bw.collection.client.logger).Authenticator(bw.collection.client.authenticator) + op := insert{ + documents: docs, + session: bw.session, + writeConcern: bw.writeConcern, + monitor: bw.collection.client.monitor, + selector: bw.selector, + clock: bw.collection.client.clock, + database: bw.collection.db.name, + collection: bw.collection.name, + deployment: bw.collection.client.deployment, + crypt: bw.collection.client.cryptFLE, + serverAPI: bw.collection.client.serverAPI, + timeout: bw.collection.client.timeout, + logger: bw.collection.client.logger, + authenticator: bw.collection.client.authenticator, + } + if bw.comment != nil { comment, err := marshalValue(bw.comment, bw.collection.bsonOpts, bw.collection.registry) if err != nil { return op.Result(), err } - op.Comment(comment) + op.comment = comment } if bw.bypassDocumentValidation != nil && *bw.bypassDocumentValidation { - op = op.BypassDocumentValidation(*bw.bypassDocumentValidation) + op.bypassDocumentValidation = bw.bypassDocumentValidation } if bw.ordered != nil { - op = op.Ordered(*bw.ordered) + op.ordered = bw.ordered } retry := driver.RetryNone if bw.collection.client.retryWrites && batch.canRetry { retry = driver.RetryOncePerCommand } - op = op.Retry(retry) + op.retry = &retry if bw.rawData != nil { - op.RawData(*bw.rawData) + op.rawData = bw.rawData } if len(bw.additionalCmd) > 0 { - op.AdditionalCmd(bw.additionalCmd) + op.additionalCmd = bw.additionalCmd } err := op.Execute(ctx) diff --git a/mongo/collection.go b/mongo/collection.go index 18cb0f475d..b8ed5bf568 100644 --- a/mongo/collection.go +++ b/mongo/collection.go @@ -18,6 +18,7 @@ import ( "go.mongodb.org/mongo-driver/v2/internal/csfle" "go.mongodb.org/mongo-driver/v2/internal/mongoutil" "go.mongodb.org/mongo-driver/v2/internal/optionsutil" + "go.mongodb.org/mongo-driver/v2/internal/ptrutil" "go.mongodb.org/mongo-driver/v2/internal/serverselector" "go.mongodb.org/mongo-driver/v2/mongo/options" "go.mongodb.org/mongo-driver/v2/mongo/readconcern" @@ -306,12 +307,23 @@ func (coll *Collection) insert( selector := makePinnedSelector(sess, coll.writeSelector) - op := operation.NewInsert(docs...). - Session(sess).WriteConcern(wc).CommandMonitor(coll.client.monitor). - ServerSelector(selector).ClusterClock(coll.client.clock). - Database(coll.db.name).Collection(coll.name). - Deployment(coll.client.deployment).Crypt(coll.client.cryptFLE).Ordered(true). - ServerAPI(coll.client.serverAPI).Timeout(coll.client.timeout).Logger(coll.client.logger).Authenticator(coll.client.authenticator) + op := insert{ + documents: docs, + session: sess, + writeConcern: wc, + monitor: coll.client.monitor, + selector: selector, + clock: coll.client.clock, + database: coll.db.name, + collection: coll.name, + deployment: coll.client.deployment, + crypt: coll.client.cryptFLE, + ordered: ptrutil.Ptr(true), + serverAPI: coll.client.serverAPI, + timeout: coll.client.timeout, + logger: coll.client.logger, + authenticator: coll.client.authenticator, + } args, err := mongoutil.NewOptions[options.InsertManyOptions](opts...) if err != nil { @@ -319,29 +331,29 @@ func (coll *Collection) insert( } if args.BypassDocumentValidation != nil && *args.BypassDocumentValidation { - op = op.BypassDocumentValidation(*args.BypassDocumentValidation) + op.bypassDocumentValidation = args.BypassDocumentValidation } if args.Comment != nil { comment, err := marshalValue(args.Comment, coll.bsonOpts, coll.registry) if err != nil { return nil, err } - op = op.Comment(comment) + op.comment = comment } if args.Ordered != nil { - op = op.Ordered(*args.Ordered) + op.ordered = args.Ordered } if rawData, ok := optionsutil.Value(args.Internal, "rawData").(bool); ok { - op = op.RawData(rawData) + op.rawData = &rawData } if additionalCmd, ok := optionsutil.Value(args.Internal, "addCommandFields").(bson.D); ok { - op = op.AdditionalCmd(additionalCmd) + op.additionalCmd = additionalCmd } retry := driver.RetryNone if coll.client.retryWrites { retry = driver.RetryOncePerCommand } - op = op.Retry(retry) + op.retry = &retry err = op.Execute(ctx) var wce driver.WriteCommandError diff --git a/mongo/insert.go b/mongo/insert.go new file mode 100644 index 0000000000..f72440a7a4 --- /dev/null +++ b/mongo/insert.go @@ -0,0 +1,143 @@ +// Copyright (C) MongoDB, Inc. 2019-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package mongo + +import ( + "context" + "errors" + "fmt" + "time" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/event" + "go.mongodb.org/mongo-driver/v2/internal/driverutil" + "go.mongodb.org/mongo-driver/v2/internal/logger" + "go.mongodb.org/mongo-driver/v2/mongo/writeconcern" + "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/session" +) + +// insert performs an insert operation. +type insert struct { + authenticator driver.Authenticator + bypassDocumentValidation *bool + comment bsoncore.Value + documents []bsoncore.Document + ordered *bool + session *session.Client + clock *session.ClusterClock + collection string + monitor *event.CommandMonitor + crypt driver.Crypt + database string + deployment driver.Deployment + selector description.ServerSelector + writeConcern *writeconcern.WriteConcern + retry *driver.RetryMode + result insertResult + serverAPI *driver.ServerAPIOptions + timeout *time.Duration + rawData *bool + additionalCmd bson.D + logger *logger.Logger +} + +// insertResult represents an insert result returned by the server. +type insertResult struct { + // Number of documents successfully inserted. + N int64 +} + +func buildInsertResult(response bsoncore.Document) (insertResult, error) { + elements, err := response.Elements() + if err != nil { + return insertResult{}, err + } + ir := insertResult{} + for _, element := range elements { + if element.Key() == "n" { + var ok bool + ir.N, ok = element.Value().AsInt64OK() + if !ok { + return ir, fmt.Errorf("response field 'n' is type int32 or int64, but received BSON type %s", element.Value().Type) + } + } + } + return ir, nil +} + +// Result returns the result of executing this operation. +func (i *insert) Result() insertResult { return i.result } + +func (i *insert) processResponse(_ context.Context, resp bsoncore.Document, _ driver.ResponseInfo) error { + ir, err := buildInsertResult(resp) + i.result.N += ir.N + return err +} + +// Execute runs this operations and returns an error if the operation did not execute successfully. +func (i *insert) Execute(ctx context.Context) error { + if i.deployment == nil { + return errors.New("the Insert operation must have a Deployment set before Execute can be called") + } + batches := &driver.Batches{ + Identifier: "documents", + Documents: i.documents, + Ordered: i.ordered, + } + + return driver.Operation{ + CommandFn: i.command, + ProcessResponseFn: i.processResponse, + Batches: batches, + RetryMode: i.retry, + Type: driver.Write, + Client: i.session, + Clock: i.clock, + CommandMonitor: i.monitor, + Crypt: i.crypt, + Database: i.database, + Deployment: i.deployment, + Selector: i.selector, + WriteConcern: i.writeConcern, + ServerAPI: i.serverAPI, + Timeout: i.timeout, + Logger: i.logger, + Name: driverutil.InsertOp, + Authenticator: i.authenticator, + }.Execute(ctx) + +} + +func (i *insert) command(dst []byte, desc description.SelectedServer) ([]byte, error) { + dst = bsoncore.AppendStringElement(dst, "insert", i.collection) + if i.bypassDocumentValidation != nil && (desc.WireVersion != nil && + driverutil.VersionRangeIncludes(*desc.WireVersion, 4)) { + + dst = bsoncore.AppendBooleanElement(dst, "bypassDocumentValidation", *i.bypassDocumentValidation) + } + if i.comment.Type != bsoncore.Type(0) { + dst = bsoncore.AppendValueElement(dst, "comment", i.comment) + } + if i.ordered != nil { + dst = bsoncore.AppendBooleanElement(dst, "ordered", *i.ordered) + } + // Set rawData for 8.2+ servers. + if i.rawData != nil && desc.WireVersion != nil && driverutil.VersionRangeIncludes(*desc.WireVersion, 27) { + dst = bsoncore.AppendBooleanElement(dst, "rawData", *i.rawData) + } + if len(i.additionalCmd) > 0 { + doc, err := bson.Marshal(i.additionalCmd) + if err != nil { + return nil, err + } + dst = append(dst, doc[4:len(doc)-1]...) + } + return dst, nil +} diff --git a/x/mongo/driver/integration/insert_test.go b/x/mongo/driver/integration/insert_test.go deleted file mode 100644 index a2687e8dfe..0000000000 --- a/x/mongo/driver/integration/insert_test.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright (C) MongoDB, Inc. 2022-present. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - -package integration - -import ( - "context" - "testing" - - "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" - "go.mongodb.org/mongo-driver/v2/x/mongo/driver" - "go.mongodb.org/mongo-driver/v2/x/mongo/driver/operation" - "go.mongodb.org/mongo-driver/v2/x/mongo/driver/topology" -) - -func TestInsert(t *testing.T) { - t.Skip() - topo, err := topology.New(nil) - if err != nil { - t.Fatalf("Couldn't connect topology: %v", err) - } - _ = topo.Connect() - - doc := bsoncore.BuildDocument(nil, bsoncore.AppendDoubleElement(nil, "pi", 3.14159)) - - iop := operation.NewInsert(doc).Database("foo").Collection("bar").Deployment(topo) - err = iop.Execute(context.Background()) - if err != nil { - t.Fatalf("Couldn't execute insert operation: %v", err) - } - t.Log(iop.Result()) - - fop := operation.NewFind(bsoncore.BuildDocument(nil, bsoncore.AppendDoubleElement(nil, "pi", 3.14159))). - Database("foo").Collection("bar").Deployment(topo).BatchSize(1) - err = fop.Execute(context.Background()) - if err != nil { - t.Fatalf("Couldn't execute find operation: %v", err) - } - cur, err := fop.Result(driver.CursorOptions{BatchSize: 2}) - if err != nil { - t.Fatalf("Couldn't get cursor result from find operation: %v", err) - } - for cur.Next(context.Background()) { - batch := cur.Batch() - docs, err := batch.Documents() - if err != nil { - t.Fatalf("Couldn't iterate batch: %v", err) - } - for i, doc := range docs { - t.Log(i, doc) - } - } -} diff --git a/x/mongo/driver/integration/main_test.go b/x/mongo/driver/integration/main_test.go index 97f85de244..7b020ae936 100644 --- a/x/mongo/driver/integration/main_test.go +++ b/x/mongo/driver/integration/main_test.go @@ -17,6 +17,8 @@ import ( "go.mongodb.org/mongo-driver/v2/internal/integtest" "go.mongodb.org/mongo-driver/v2/internal/require" "go.mongodb.org/mongo-driver/v2/internal/serverselector" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" "go.mongodb.org/mongo-driver/v2/mongo/writeconcern" "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" "go.mongodb.org/mongo-driver/v2/x/mongo/driver" @@ -145,12 +147,20 @@ func autoInsertDocs(t *testing.T, writeConcern *writeconcern.WriteConcern, docs // insertDocs inserts the docs into the test cluster. func insertDocs(t *testing.T, dbname, colname string, writeConcern *writeconcern.WriteConcern, docs ...bsoncore.Document) { - err := operation.NewInsert(docs...). - Collection(colname). - Database(dbname). - Deployment(integtest.Topology(t)). - ServerSelector(&serverselector.Write{}). - WriteConcern(writeConcern). - Execute(context.Background()) + t.Helper() + + // The initial call to integtest.Topology drops the database used by the + // tests, so we have to call it here first to prevent the existing test code + // from dropping the database after we've inserted data. + integtest.Topology(t) + + client, err := mongo.Connect(options.Client().ApplyURI(connectionString.Original).SetWriteConcern(writeConcern)) + require.NoError(t, err) + defer func() { + _ = client.Disconnect(context.Background()) + }() + + coll := client.Database(dbname).Collection(colname) + _, err = coll.InsertMany(context.Background(), docs) require.NoError(t, err) } diff --git a/x/mongo/driver/operation/insert.go b/x/mongo/driver/operation/insert.go deleted file mode 100644 index d4f01e6b92..0000000000 --- a/x/mongo/driver/operation/insert.go +++ /dev/null @@ -1,354 +0,0 @@ -// Copyright (C) MongoDB, Inc. 2019-present. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - -package operation - -import ( - "context" - "errors" - "fmt" - "time" - - "go.mongodb.org/mongo-driver/v2/bson" - "go.mongodb.org/mongo-driver/v2/event" - "go.mongodb.org/mongo-driver/v2/internal/driverutil" - "go.mongodb.org/mongo-driver/v2/internal/logger" - "go.mongodb.org/mongo-driver/v2/mongo/writeconcern" - "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" - "go.mongodb.org/mongo-driver/v2/x/mongo/driver" - "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" - "go.mongodb.org/mongo-driver/v2/x/mongo/driver/session" -) - -// Insert performs an insert operation. -type Insert struct { - authenticator driver.Authenticator - bypassDocumentValidation *bool - comment bsoncore.Value - documents []bsoncore.Document - ordered *bool - session *session.Client - clock *session.ClusterClock - collection string - monitor *event.CommandMonitor - crypt driver.Crypt - database string - deployment driver.Deployment - selector description.ServerSelector - writeConcern *writeconcern.WriteConcern - retry *driver.RetryMode - result InsertResult - serverAPI *driver.ServerAPIOptions - timeout *time.Duration - rawData *bool - additionalCmd bson.D - logger *logger.Logger -} - -// InsertResult represents an insert result returned by the server. -type InsertResult struct { - // Number of documents successfully inserted. - N int64 -} - -func buildInsertResult(response bsoncore.Document) (InsertResult, error) { - elements, err := response.Elements() - if err != nil { - return InsertResult{}, err - } - ir := InsertResult{} - for _, element := range elements { - if element.Key() == "n" { - var ok bool - ir.N, ok = element.Value().AsInt64OK() - if !ok { - return ir, fmt.Errorf("response field 'n' is type int32 or int64, but received BSON type %s", element.Value().Type) - } - } - } - return ir, nil -} - -// NewInsert constructs and returns a new Insert. -func NewInsert(documents ...bsoncore.Document) *Insert { - return &Insert{ - documents: documents, - } -} - -// Result returns the result of executing this operation. -func (i *Insert) Result() InsertResult { return i.result } - -func (i *Insert) processResponse(_ context.Context, resp bsoncore.Document, _ driver.ResponseInfo) error { - ir, err := buildInsertResult(resp) - i.result.N += ir.N - return err -} - -// Execute runs this operations and returns an error if the operation did not execute successfully. -func (i *Insert) Execute(ctx context.Context) error { - if i.deployment == nil { - return errors.New("the Insert operation must have a Deployment set before Execute can be called") - } - batches := &driver.Batches{ - Identifier: "documents", - Documents: i.documents, - Ordered: i.ordered, - } - - return driver.Operation{ - CommandFn: i.command, - ProcessResponseFn: i.processResponse, - Batches: batches, - RetryMode: i.retry, - Type: driver.Write, - Client: i.session, - Clock: i.clock, - CommandMonitor: i.monitor, - Crypt: i.crypt, - Database: i.database, - Deployment: i.deployment, - Selector: i.selector, - WriteConcern: i.writeConcern, - ServerAPI: i.serverAPI, - Timeout: i.timeout, - Logger: i.logger, - Name: driverutil.InsertOp, - Authenticator: i.authenticator, - }.Execute(ctx) - -} - -func (i *Insert) command(dst []byte, desc description.SelectedServer) ([]byte, error) { - dst = bsoncore.AppendStringElement(dst, "insert", i.collection) - if i.bypassDocumentValidation != nil && (desc.WireVersion != nil && - driverutil.VersionRangeIncludes(*desc.WireVersion, 4)) { - - dst = bsoncore.AppendBooleanElement(dst, "bypassDocumentValidation", *i.bypassDocumentValidation) - } - if i.comment.Type != bsoncore.Type(0) { - dst = bsoncore.AppendValueElement(dst, "comment", i.comment) - } - if i.ordered != nil { - dst = bsoncore.AppendBooleanElement(dst, "ordered", *i.ordered) - } - // Set rawData for 8.2+ servers. - if i.rawData != nil && desc.WireVersion != nil && driverutil.VersionRangeIncludes(*desc.WireVersion, 27) { - dst = bsoncore.AppendBooleanElement(dst, "rawData", *i.rawData) - } - if len(i.additionalCmd) > 0 { - doc, err := bson.Marshal(i.additionalCmd) - if err != nil { - return nil, err - } - dst = append(dst, doc[4:len(doc)-1]...) - } - return dst, nil -} - -// BypassDocumentValidation allows the operation to opt-out of document level validation. Valid -// for server versions >= 3.2. For servers < 3.2, this setting is ignored. -func (i *Insert) BypassDocumentValidation(bypassDocumentValidation bool) *Insert { - if i == nil { - i = new(Insert) - } - - i.bypassDocumentValidation = &bypassDocumentValidation - return i -} - -// Comment sets a value to help trace an operation. -func (i *Insert) Comment(comment bsoncore.Value) *Insert { - if i == nil { - i = new(Insert) - } - - i.comment = comment - return i -} - -// Documents adds documents to this operation that will be inserted when this operation is -// executed. -func (i *Insert) Documents(documents ...bsoncore.Document) *Insert { - if i == nil { - i = new(Insert) - } - - i.documents = documents - return i -} - -// Ordered sets ordered. If true, when a write fails, the operation will return the error, when -// false write failures do not stop execution of the operation. -func (i *Insert) Ordered(ordered bool) *Insert { - if i == nil { - i = new(Insert) - } - - i.ordered = &ordered - return i -} - -// Session sets the session for this operation. -func (i *Insert) Session(session *session.Client) *Insert { - if i == nil { - i = new(Insert) - } - - i.session = session - return i -} - -// ClusterClock sets the cluster clock for this operation. -func (i *Insert) ClusterClock(clock *session.ClusterClock) *Insert { - if i == nil { - i = new(Insert) - } - - i.clock = clock - return i -} - -// Collection sets the collection that this command will run against. -func (i *Insert) Collection(collection string) *Insert { - if i == nil { - i = new(Insert) - } - - i.collection = collection - return i -} - -// CommandMonitor sets the monitor to use for APM events. -func (i *Insert) CommandMonitor(monitor *event.CommandMonitor) *Insert { - if i == nil { - i = new(Insert) - } - - i.monitor = monitor - return i -} - -// Crypt sets the Crypt object to use for automatic encryption and decryption. -func (i *Insert) Crypt(crypt driver.Crypt) *Insert { - if i == nil { - i = new(Insert) - } - - i.crypt = crypt - return i -} - -// Database sets the database to run this operation against. -func (i *Insert) Database(database string) *Insert { - if i == nil { - i = new(Insert) - } - - i.database = database - return i -} - -// Deployment sets the deployment to use for this operation. -func (i *Insert) Deployment(deployment driver.Deployment) *Insert { - if i == nil { - i = new(Insert) - } - - i.deployment = deployment - return i -} - -// ServerSelector sets the selector used to retrieve a server. -func (i *Insert) ServerSelector(selector description.ServerSelector) *Insert { - if i == nil { - i = new(Insert) - } - - i.selector = selector - return i -} - -// WriteConcern sets the write concern for this operation. -func (i *Insert) WriteConcern(writeConcern *writeconcern.WriteConcern) *Insert { - if i == nil { - i = new(Insert) - } - - i.writeConcern = writeConcern - return i -} - -// Retry enables retryable mode for this operation. Retries are handled automatically in driver.Operation.Execute based -// on how the operation is set. -func (i *Insert) Retry(retry driver.RetryMode) *Insert { - if i == nil { - i = new(Insert) - } - - i.retry = &retry - return i -} - -// ServerAPI sets the server API version for this operation. -func (i *Insert) ServerAPI(serverAPI *driver.ServerAPIOptions) *Insert { - if i == nil { - i = new(Insert) - } - - i.serverAPI = serverAPI - return i -} - -// Timeout sets the timeout for this operation. -func (i *Insert) Timeout(timeout *time.Duration) *Insert { - if i == nil { - i = new(Insert) - } - - i.timeout = timeout - return i -} - -// Logger sets the logger for this operation. -func (i *Insert) Logger(logger *logger.Logger) *Insert { - if i == nil { - i = new(Insert) - } - - i.logger = logger - return i -} - -// Authenticator sets the authenticator to use for this operation. -func (i *Insert) Authenticator(authenticator driver.Authenticator) *Insert { - if i == nil { - i = new(Insert) - } - - i.authenticator = authenticator - return i -} - -// RawData sets the rawData to access timeseries data in the compressed format. -func (i *Insert) RawData(rawData bool) *Insert { - if i == nil { - i = new(Insert) - } - - i.rawData = &rawData - return i -} - -// AdditionalCmd sets additional command fields to be attached. -func (i *Insert) AdditionalCmd(d bson.D) *Insert { - if i == nil { - i = new(Insert) - } - - i.additionalCmd = d - return i -}