|
@@ -4,27 +4,29 @@ import (
|
|
|
"bytes"
|
|
|
"context"
|
|
|
"encoding/base64"
|
|
|
+ "encoding/json"
|
|
|
"fmt" // "html/template"
|
|
|
"reflect"
|
|
|
"regexp"
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
+ "sync"
|
|
|
"text/template"
|
|
|
"time"
|
|
|
|
|
|
"git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs"
|
|
|
"github.com/davecgh/go-spew/spew"
|
|
|
+ "github.com/jeremywohl/flatten"
|
|
|
"github.com/kataras/iris"
|
|
|
iriscontext "github.com/kataras/iris/v12/context"
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
|
- "go.mongodb.org/mongo-driver/mongo/readconcern"
|
|
|
- "go.mongodb.org/mongo-driver/mongo/readpref"
|
|
|
- "go.mongodb.org/mongo-driver/mongo/writeconcern"
|
|
|
)
|
|
|
|
|
|
+type ParseQueryHandler = func(ctx iriscontext.Context) (err *errs.Error)
|
|
|
+
|
|
|
const (
|
|
|
DefaultAddrConnect = "mongodb://localhost:27017"
|
|
|
|
|
@@ -34,8 +36,10 @@ const (
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
- QueriesMap = map[string]*template.Template{}
|
|
|
- EMPTYQUERY_REGEX = regexp.MustCompile(`{\s*}`)
|
|
|
+ QueriesMap = map[string]*template.Template{}
|
|
|
+ ParseQueryHandlers = map[string]ParseQueryHandler{}
|
|
|
+ EMPTYQUERY_REGEX = regexp.MustCompile(`{\s*}`)
|
|
|
+ NextPageTokenMap = map[string]string{}
|
|
|
)
|
|
|
|
|
|
func GetSessionContext(ctx iriscontext.Context) mongo.SessionContext {
|
|
@@ -54,7 +58,8 @@ func init() {
|
|
|
switch {
|
|
|
case strings.Contains(message, "E11000"):
|
|
|
err = errs.AlreadyExists().Details(&errs.Detail{
|
|
|
- Message: "DUPLICATED_ITEM",
|
|
|
+ Reason: "DUPLICATED_ITEM",
|
|
|
+ Message: message,
|
|
|
})
|
|
|
case strings.Contains(message, "cannot decode"):
|
|
|
err = errs.DataCaps().Details(&errs.Detail{
|
|
@@ -63,7 +68,8 @@ func init() {
|
|
|
|
|
|
case strings.Contains(message, "no documents in result"):
|
|
|
errs.NotFound().Details(&errs.Detail{
|
|
|
- Message: "NOT_FOUND",
|
|
|
+ Reason: "NOT_FOUND",
|
|
|
+ Message: message,
|
|
|
})
|
|
|
}
|
|
|
return
|
|
@@ -80,6 +86,9 @@ type Mongo struct {
|
|
|
// Password string `json:"password"`
|
|
|
// DataBase string `json:"database"`
|
|
|
// Addrs string `json:"addrs"`
|
|
|
+ FindManyNextPaginationMux sync.Mutex
|
|
|
+ // NextPageTokenMap map[string]string
|
|
|
+ Uniq int64
|
|
|
subject *BehaviorSubjectStruct
|
|
|
client *mongo.Client
|
|
|
Credential *options.Credential
|
|
@@ -104,11 +113,13 @@ type Filter struct {
|
|
|
Sort *bson.M
|
|
|
Context iriscontext.Context
|
|
|
// Sort []string
|
|
|
+ NextPageToken string
|
|
|
Collection string
|
|
|
QueryType string
|
|
|
DB string
|
|
|
Format string
|
|
|
Insertion int
|
|
|
+ ResultSizeEstimate int64
|
|
|
CheckQuery bool
|
|
|
IgnoreNoEntityFound bool
|
|
|
MaxResults int
|
|
@@ -118,21 +129,19 @@ type Filter struct {
|
|
|
// Aggregate interface{}
|
|
|
Pipeline primitive.A
|
|
|
Options interface{}
|
|
|
+ // PatchOptions = patch.Patch()
|
|
|
}
|
|
|
|
|
|
func (f *Filter) Aggregate(ctx context.Context, collection *mongo.Collection, opts *options.AggregateOptions) (cursor *mongo.Cursor, err *errs.Error) {
|
|
|
var errl error
|
|
|
|
|
|
if opts == nil {
|
|
|
- opts = options.Aggregate().SetBatchSize(int32(f.MaxResults))
|
|
|
+ opts = options.Aggregate().SetBatchSize(int32(f.MaxResults)).SetCollation(&options.Collation{
|
|
|
+ Strength: 1,
|
|
|
+ Locale: "en",
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
- // cmd := command.Aggregate{
|
|
|
- // NS: ns,
|
|
|
- // Pipeline: pipeline,
|
|
|
- // ReadPref: rp,
|
|
|
- // }
|
|
|
-
|
|
|
if cursor, errl = collection.Aggregate(ctx, f.Pipeline, opts); errl != nil {
|
|
|
err = errs.FromError(errl)
|
|
|
}
|
|
@@ -163,6 +172,11 @@ func (t *Filter) Check() (err *errs.Error) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
+func (mongo *Mongo) uniqID() int64 {
|
|
|
+ mongo.Uniq++
|
|
|
+ return mongo.Uniq
|
|
|
+}
|
|
|
+
|
|
|
func (t *Mongo) Ready() *BehaviorSubjectStruct {
|
|
|
|
|
|
if t.subject == nil {
|
|
@@ -177,6 +191,9 @@ func (t *Mongo) Init() (err error) {
|
|
|
spew.Dump(err)
|
|
|
}()
|
|
|
|
|
|
+ t.FindManyNextPaginationMux = sync.Mutex{}
|
|
|
+ NextPageTokenMap = map[string]string{}
|
|
|
+
|
|
|
fmt.Printf("Connection string '%s'", t.Config)
|
|
|
clientOpts := options.Client().ApplyURI(t.Config)
|
|
|
|
|
@@ -253,14 +270,10 @@ func (t *Mongo) InsertOne(f *Filter) (res *mongo.InsertOneResult, err *errs.Erro
|
|
|
})
|
|
|
}()
|
|
|
f.Insertion = InsertOne
|
|
|
-
|
|
|
t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
|
|
|
var lerr error
|
|
|
-
|
|
|
if res, lerr = collection.InsertOne(ctx, f.Entity); lerr != nil {
|
|
|
-
|
|
|
err = errs.FromError(lerr)
|
|
|
-
|
|
|
}
|
|
|
}, func(e *errs.Error) { err = e })
|
|
|
|
|
@@ -307,6 +320,9 @@ func (t *Mongo) RemoveOne(f *Filter) (res *mongo.DeleteResult, err *errs.Error)
|
|
|
t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
|
|
|
var lerr error
|
|
|
|
|
|
+ fmt.Println("remove.one")
|
|
|
+ spew.Dump(f.Query)
|
|
|
+
|
|
|
if res, lerr = collection.DeleteOne(ctx, f.Query); lerr != nil {
|
|
|
err = errs.FromError(lerr)
|
|
|
|
|
@@ -314,7 +330,6 @@ func (t *Mongo) RemoveOne(f *Filter) (res *mongo.DeleteResult, err *errs.Error)
|
|
|
err = errs.NotFound().Details(&errs.Detail{
|
|
|
Message: "No entity has been deleted",
|
|
|
})
|
|
|
-
|
|
|
}
|
|
|
|
|
|
}, func(e *errs.Error) { err = e })
|
|
@@ -324,6 +339,7 @@ func (t *Mongo) RemoveOne(f *Filter) (res *mongo.DeleteResult, err *errs.Error)
|
|
|
//Remove os elementos da colecao, selecionados pela query
|
|
|
func (t *Mongo) RemoveMany(f *Filter) (res *mongo.DeleteResult, err *errs.Error) {
|
|
|
var lerr error
|
|
|
+ var deleteOptions *options.DeleteOptions
|
|
|
|
|
|
defer func() {
|
|
|
createDebugEvent(f, "models.remove.many", func(event *DebugEvent) {
|
|
@@ -332,17 +348,25 @@ func (t *Mongo) RemoveMany(f *Filter) (res *mongo.DeleteResult, err *errs.Error)
|
|
|
}()
|
|
|
|
|
|
f.CheckQuery = true
|
|
|
+ if (f.Options != nil ) {
|
|
|
+ deleteOptions = f.Options.(*options.DeleteOptions)
|
|
|
+ }
|
|
|
|
|
|
t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
|
|
|
|
|
|
- if res, lerr = collection.DeleteMany(ctx, f.Entities); lerr != nil {
|
|
|
+ if res, lerr = collection.DeleteMany(
|
|
|
+ ctx,
|
|
|
+ f.Query,
|
|
|
+ deleteOptions,
|
|
|
+ ); lerr != nil {
|
|
|
err = errs.FromError(lerr)
|
|
|
|
|
|
- } else if res.DeletedCount == 0 {
|
|
|
- err = errs.NotFound().Details(&errs.Detail{
|
|
|
- Message: "No entity has been deleted",
|
|
|
- })
|
|
|
- }
|
|
|
+ }
|
|
|
+ // else if res.DeletedCount == 0 {
|
|
|
+ // err = errs.NotFound().Details(&errs.Detail{
|
|
|
+ // Message: "No entity has been deleted",
|
|
|
+ // })
|
|
|
+ // }
|
|
|
|
|
|
}, func(e *errs.Error) { err = e })
|
|
|
|
|
@@ -417,6 +441,14 @@ func RegisterQuery(id, query string) {
|
|
|
QueriesMap[id] = template.Must(template.New(id).Parse(query))
|
|
|
}
|
|
|
|
|
|
+func BeforeParseQueryHandler(id string, fn ParseQueryHandler) {
|
|
|
+ ParseQueryHandlers[id] = fn
|
|
|
+}
|
|
|
+
|
|
|
+func getQueryHandler(id string) ParseQueryHandler {
|
|
|
+ return ParseQueryHandlers[id]
|
|
|
+}
|
|
|
+
|
|
|
func ParseQuery(ctx iriscontext.Context, filter *Filter, basequery string, data map[string]interface{}) (err *errs.Error) {
|
|
|
var (
|
|
|
found bool
|
|
@@ -425,15 +457,13 @@ func ParseQuery(ctx iriscontext.Context, filter *Filter, basequery string, data
|
|
|
baseQueryTemplate *template.Template
|
|
|
// baseQeuryDocument *bson.M
|
|
|
// errbq *errs.Error
|
|
|
+ nextPageToken string
|
|
|
queryDecodedString []byte
|
|
|
+ values = ctx.Values()
|
|
|
userQueryString = Q(ctx, "q", "e30=") // default base64 encoded from "{}"
|
|
|
includeInTrash, _ = strconv.ParseBool(Q(ctx, "includeInTrash", "false")) // default base64 encoded from "{}"
|
|
|
)
|
|
|
|
|
|
- // defer func() {
|
|
|
- // spew.Dump(filter.Query)
|
|
|
- // }()
|
|
|
-
|
|
|
// Faz o parse da userQueryString enviada pelo usuario que deve ser um base64
|
|
|
if queryDecodedString, templateErr = base64.StdEncoding.DecodeString(userQueryString); templateErr != nil {
|
|
|
err = errs.InvalidArgument().Details(&errs.Detail{
|
|
@@ -450,14 +480,25 @@ func ParseQuery(ctx iriscontext.Context, filter *Filter, basequery string, data
|
|
|
}
|
|
|
|
|
|
// transclude the query to base query
|
|
|
- data["_transclude_"] = string(queryDecodedString)
|
|
|
+ fmt.Println("##########################[", filter.NextPageToken, "]")
|
|
|
+ // spew.Dump(t.NextPageTokenMap)
|
|
|
+ if nextPageToken, found = NextPageTokenMap[filter.NextPageToken]; !found {
|
|
|
+ nextPageToken = "{}"
|
|
|
+ }
|
|
|
+
|
|
|
+ data["_transclude_nextPageToken"] = nextPageToken
|
|
|
+ data["_transclude_true"] = string(queryDecodedString)
|
|
|
data["_deleted_"] = includeInTrash
|
|
|
|
|
|
- fmt.Println(
|
|
|
- basequery,
|
|
|
- string(queryDecodedString),
|
|
|
- data,
|
|
|
- )
|
|
|
+ values.Set("eon.query.data", data)
|
|
|
+ values.Set("eon.query.id", basequery)
|
|
|
+ values.Set("eon.query.options", filter)
|
|
|
+
|
|
|
+ if handler := getQueryHandler(basequery); handler != nil {
|
|
|
+ if err = handler(ctx); err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
if templateErr = baseQueryTemplate.Execute(&buf, data); templateErr != nil {
|
|
|
err = errs.InvalidArgument().Details(&errs.Detail{
|
|
@@ -479,97 +520,141 @@ func ParseQuery(ctx iriscontext.Context, filter *Filter, basequery string, data
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-func (t *Mongo) PatchOne(f *Filter) (res *mongo.UpdateResult, err *errs.Error) {
|
|
|
+func (this *Mongo) PatchOne(actionOptions *Filter) (res *mongo.UpdateResult, err *errs.Error) {
|
|
|
+ uniq := this.uniqID()
|
|
|
+ fmt.Println("mongo.patch.many", uniq)
|
|
|
defer func() {
|
|
|
- createDebugEvent(f, "models.patch.one", func(event *DebugEvent) {
|
|
|
+ createDebugEvent(actionOptions, "models.patch.many", func(event *DebugEvent) {
|
|
|
event.Error = err
|
|
|
})
|
|
|
+ fmt.Println("mongo.patch.many.end", uniq)
|
|
|
}()
|
|
|
- f.Insertion = Patch
|
|
|
- f.CheckQuery = true
|
|
|
+ actionOptions.Insertion = Patch
|
|
|
+ actionOptions.CheckQuery = true
|
|
|
|
|
|
- // out, _ := json.Marshal()
|
|
|
- // fmt.Println(string(out))
|
|
|
- // spew.Dump(f.Patchs)
|
|
|
-
|
|
|
- t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
|
|
|
+ this.exec(actionOptions, func(ctx context.Context, collection *mongo.Collection) {
|
|
|
|
|
|
var (
|
|
|
lerr error
|
|
|
// client *mongo.Client
|
|
|
)
|
|
|
|
|
|
- // if client, err = t.GetClient(f.DB); err != nil {
|
|
|
- // return
|
|
|
- // }
|
|
|
- // spew.Dump(f.Query)
|
|
|
+ for _, p := range *actionOptions.Patchs {
|
|
|
|
|
|
- // spew.Dump(f.Patchs)
|
|
|
- // id := primitive.NewObjectID()
|
|
|
+ if actionOptions.Options == nil {
|
|
|
+ actionOptions.Options = options.Update()
|
|
|
+ }
|
|
|
|
|
|
- t.client.UseSessionWithOptions(
|
|
|
- ctx,
|
|
|
- options.Session().SetDefaultReadPreference(readpref.Primary()),
|
|
|
- func(sctx mongo.SessionContext) error {
|
|
|
-
|
|
|
- defer func() {
|
|
|
- if err != nil {
|
|
|
- // fmt.Println("abort patch ", id.Hex())
|
|
|
- sctx.AbortTransaction(sctx)
|
|
|
- } else {
|
|
|
- // fmt.Println("commit patch ", id.Hex())
|
|
|
- sctx.CommitTransaction(sctx)
|
|
|
- }
|
|
|
- sctx.EndSession(sctx)
|
|
|
- }()
|
|
|
+ if res, lerr = collection.UpdateOne(ctx, actionOptions.Query, p, actionOptions.Options.(*options.UpdateOptions)); lerr != nil {
|
|
|
+ // ce := lerr.(mongo.CommandError)
|
|
|
+ // fmt.Println("----------------------------",
|
|
|
+ // ce.Message,
|
|
|
+ // ce.Labels,
|
|
|
+ // ce.Code,
|
|
|
+ // ce.Name,
|
|
|
+ // )
|
|
|
+ // f.Context.Application().Logger().Errorf(lerr.Error())
|
|
|
+ err = errs.FromError(lerr)
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- sctx.StartTransaction(options.Transaction().
|
|
|
- SetReadConcern(readconcern.Snapshot()).
|
|
|
- SetWriteConcern(writeconcern.New(writeconcern.WMajority())))
|
|
|
+ if !actionOptions.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
|
|
|
|
|
|
- for _, p := range *f.Patchs {
|
|
|
+ err = errs.NotFound().Details(&errs.Detail{
|
|
|
|
|
|
- if f.Options == nil {
|
|
|
- f.Options = options.Update()
|
|
|
- }
|
|
|
+ Message: "No entity has been updated",
|
|
|
+ })
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- if res, lerr = collection.UpdateOne(ctx, f.Query, p, f.Options.(*options.UpdateOptions)); lerr != nil {
|
|
|
- err = errs.FromError(lerr)
|
|
|
- return lerr
|
|
|
- }
|
|
|
+ // if client, err = t.GetClient(f.DB); err != nil {
|
|
|
+ // return
|
|
|
+ // }
|
|
|
+ // spew.Dump(f.Query)
|
|
|
|
|
|
- if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
|
|
|
+ // spew.Dump(f.Patchs)
|
|
|
+ // id := primitive.NewObjectID()
|
|
|
|
|
|
- err = errs.NotFound().Details(&errs.Detail{
|
|
|
- Message: "No entity has been updated",
|
|
|
- })
|
|
|
- return fmt.Errorf("No entity has been updated")
|
|
|
- }
|
|
|
- }
|
|
|
- return nil
|
|
|
- },
|
|
|
- )
|
|
|
+ // t.client.UseSessionWithOptions(
|
|
|
+ // ctx,
|
|
|
+ // options.Session().SetDefaultReadPreference(readpref.Primary()),
|
|
|
+ // func(sctx mongo.SessionContext) error {
|
|
|
+
|
|
|
+ // defer func() {
|
|
|
+ // if err != nil {
|
|
|
+ // sctx.AbortTransaction(sctx)
|
|
|
+ // } else {
|
|
|
+ // sctx.CommitTransaction(sctx)
|
|
|
+ // }
|
|
|
+ // sctx.EndSession(sctx)
|
|
|
+ // }()
|
|
|
+
|
|
|
+ // sctx.StartTransaction(options.Transaction().
|
|
|
+ // SetReadConcern(readconcern.Snapshot()).
|
|
|
+ // SetWriteConcern(writeconcern.New(writeconcern.WMajority())))
|
|
|
+
|
|
|
+ // for _, p := range *f.Patchs {
|
|
|
+
|
|
|
+ // if f.Options == nil {
|
|
|
+ // f.Options = options.Update()
|
|
|
+ // }
|
|
|
+
|
|
|
+ // if res, lerr = collection.UpdateOne(ctx, f.Query, p, f.Options.(*options.UpdateOptions)); lerr != nil {
|
|
|
+ // err = errs.FromError(lerr)
|
|
|
+ // return lerr
|
|
|
+ // }
|
|
|
+
|
|
|
+ // if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
|
|
|
+
|
|
|
+ // err = errs.NotFound().Details(&errs.Detail{
|
|
|
+ // Message: "No entity has been updated",
|
|
|
+ // })
|
|
|
+ // return fmt.Errorf("No entity has been updated")
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // return nil
|
|
|
+ // },
|
|
|
+ // )
|
|
|
}, func(e *errs.Error) { err = e })
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-func (t *Mongo) PatchMany(f *Filter) (res *mongo.UpdateResult, err *errs.Error) {
|
|
|
+func (this *Mongo) PatchMany(actionOptions *Filter) (res *mongo.UpdateResult, err *errs.Error) {
|
|
|
+ uniq := this.uniqID()
|
|
|
+ fmt.Println("mongo.patch.many", uniq)
|
|
|
defer func() {
|
|
|
- createDebugEvent(f, "models.patch.many", func(event *DebugEvent) {
|
|
|
+ createDebugEvent(actionOptions, "models.patch.many", func(event *DebugEvent) {
|
|
|
event.Error = err
|
|
|
})
|
|
|
+ fmt.Println("mongo.patch.many.end", uniq)
|
|
|
}()
|
|
|
|
|
|
- f.Insertion = Patch
|
|
|
- f.CheckQuery = true
|
|
|
+ actionOptions.Insertion = Patch
|
|
|
+ actionOptions.CheckQuery = true
|
|
|
|
|
|
- t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
|
|
|
+ this.exec(actionOptions, func(ctx context.Context, collection *mongo.Collection) {
|
|
|
|
|
|
var (
|
|
|
lerr error
|
|
|
// client *mongo.Client
|
|
|
)
|
|
|
+ for _, p := range *actionOptions.Patchs {
|
|
|
+
|
|
|
+ if res, lerr = collection.UpdateMany(ctx, actionOptions.Query, p); lerr != nil {
|
|
|
+ err = errs.FromError(lerr)
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
+ if !actionOptions.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
|
|
|
+ err = errs.NotFound().Details(&errs.Detail{
|
|
|
+ Message: "No entity has been updated",
|
|
|
+ })
|
|
|
+ err.CodeText = "noEntityUpdated"
|
|
|
+ // fmt.Errorf("No entity has been updated")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
// if client, err = t.GetClient(f.DB); err != nil {
|
|
|
// return
|
|
|
// }
|
|
@@ -579,44 +664,44 @@ func (t *Mongo) PatchMany(f *Filter) (res *mongo.UpdateResult, err *errs.Error)
|
|
|
// o, _ = json.MarshalIndent(f.Patchs, "", " ")
|
|
|
// fmt.Println("patch", o)
|
|
|
|
|
|
- t.client.UseSessionWithOptions(
|
|
|
- ctx,
|
|
|
- options.Session().SetDefaultReadPreference(readpref.Primary()),
|
|
|
- func(sctx mongo.SessionContext) error {
|
|
|
-
|
|
|
- defer func() {
|
|
|
- if err != nil {
|
|
|
- sctx.AbortTransaction(sctx)
|
|
|
- } else {
|
|
|
- sctx.CommitTransaction(sctx)
|
|
|
- }
|
|
|
- sctx.EndSession(sctx)
|
|
|
- }()
|
|
|
-
|
|
|
- sctx.StartTransaction(
|
|
|
- options.Transaction().
|
|
|
- SetReadConcern(readconcern.Snapshot()).
|
|
|
- SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
|
|
|
- )
|
|
|
-
|
|
|
- for _, p := range *f.Patchs {
|
|
|
-
|
|
|
- if res, lerr = collection.UpdateMany(ctx, f.Query, p); lerr != nil {
|
|
|
- err = errs.FromError(lerr)
|
|
|
- return lerr
|
|
|
- }
|
|
|
-
|
|
|
- if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
|
|
|
- err = errs.NotFound().Details(&errs.Detail{
|
|
|
- Message: "No entity has been updated",
|
|
|
- })
|
|
|
- err.CodeText = "noEntityUpdated"
|
|
|
- return fmt.Errorf("No entity has been updated")
|
|
|
- }
|
|
|
- }
|
|
|
- return nil
|
|
|
- },
|
|
|
- )
|
|
|
+ // t.client.UseSessionWithOptions(
|
|
|
+ // ctx,
|
|
|
+ // options.Session().SetDefaultReadPreference(readpref.Primary()),
|
|
|
+ // func(sctx mongo.SessionContext) error {
|
|
|
+
|
|
|
+ // defer func() {
|
|
|
+ // if err != nil {
|
|
|
+ // sctx.AbortTransaction(sctx)
|
|
|
+ // } else {
|
|
|
+ // sctx.CommitTransaction(sctx)
|
|
|
+ // }
|
|
|
+ // sctx.EndSession(sctx)
|
|
|
+ // }()
|
|
|
+
|
|
|
+ // sctx.StartTransaction(
|
|
|
+ // options.Transaction().
|
|
|
+ // SetReadConcern(readconcern.Snapshot()).
|
|
|
+ // SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
|
|
|
+ // )
|
|
|
+
|
|
|
+ // for _, p := range *f.Patchs {
|
|
|
+
|
|
|
+ // if res, lerr = collection.UpdateMany(ctx, f.Query, p); lerr != nil {
|
|
|
+ // err = errs.FromError(lerr)
|
|
|
+ // return lerr
|
|
|
+ // }
|
|
|
+
|
|
|
+ // if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
|
|
|
+ // err = errs.NotFound().Details(&errs.Detail{
|
|
|
+ // Message: "No entity has been updated",
|
|
|
+ // })
|
|
|
+ // err.CodeText = "noEntityUpdated"
|
|
|
+ // return fmt.Errorf("No entity has been updated")
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // return nil
|
|
|
+ // },
|
|
|
+ // )
|
|
|
}, func(e *errs.Error) { err = e })
|
|
|
return
|
|
|
}
|
|
@@ -646,14 +731,17 @@ func (mongo *Mongo) FindOneRx(options *Filter) *ObservableStruct {
|
|
|
}
|
|
|
|
|
|
func (t *Mongo) FindOne(f *Filter) (res *mongo.SingleResult, err *errs.Error) {
|
|
|
+ uniq := t.uniqID()
|
|
|
+ fmt.Println("mongo.find.one", uniq)
|
|
|
defer func() {
|
|
|
createDebugEvent(f, "models.find.one", func(event *DebugEvent) {
|
|
|
event.Error = err
|
|
|
})
|
|
|
+ fmt.Println("mongo.find.one.end", uniq, f.Collection)
|
|
|
+ spew.Dump(f.Query)
|
|
|
}()
|
|
|
|
|
|
f.CheckQuery = (f.QueryType != "aggregate")
|
|
|
-
|
|
|
t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
|
|
|
var (
|
|
|
lerr error
|
|
@@ -697,133 +785,192 @@ func findIds(filter *Filter) (ids []primitive.ObjectID) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
+// if f.QueryType == "aggregate" {
|
|
|
+// cursor, lerr = f.Aggregate(ctx, collection, nil)
|
|
|
+// fmt.Println("-------------------------------------------------------------------------------after aggregate")
|
|
|
+// } else {
|
|
|
+// findOptions := options.Find()
|
|
|
+// findOptions.SetLimit(int64(f.MaxResults)).SetCollation(&options.Collation{
|
|
|
+// Locale: "en",
|
|
|
+// Strength: 1,
|
|
|
+// })
|
|
|
+
|
|
|
+// // fmt.Printf("sort of query '%s'\n", f.Format)
|
|
|
+// // spew.Dump(f.Sort)
|
|
|
+// // spew.Dump(f.Fields)
|
|
|
+// // spew.Dump(f.Query)
|
|
|
+
|
|
|
+// if f.Sort != nil {
|
|
|
+// findOptions.SetSort(f.Sort)
|
|
|
+// }
|
|
|
+
|
|
|
+// if f.Fields != nil {
|
|
|
+// findOptions.SetProjection(f.Fields)
|
|
|
+// }
|
|
|
+
|
|
|
+// cursor, lerr = collection.Find(ctx, f.Query, findOptions)
|
|
|
+// }
|
|
|
+
|
|
|
func (t *Mongo) FindMany(f *Filter) (cursor *mongo.Cursor, err *errs.Error) {
|
|
|
|
|
|
- // f.CheckQuery = true
|
|
|
+ uniq := t.uniqID()
|
|
|
+ fmt.Println("mongo.find.many", uniq)
|
|
|
defer func() {
|
|
|
createDebugEvent(f, "models.find.many", func(event *DebugEvent) {
|
|
|
event.Error = err
|
|
|
})
|
|
|
+ fmt.Println("mongo.find.many.end", uniq, f.Collection)
|
|
|
}()
|
|
|
|
|
|
+ if f.Entities == nil {
|
|
|
+ err = errs.Internal().Details(&errs.Detail{
|
|
|
+ Message: "Entities can't be nil",
|
|
|
+ })
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ entitiesValue := reflect.ValueOf(f.Entities)
|
|
|
+ if entitiesValue.Kind() != reflect.Ptr || entitiesValue.Elem().Kind() != reflect.Slice {
|
|
|
+ err = errs.Internal().Details(&errs.Detail{
|
|
|
+ Message: "Entities argument must be a slice address",
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
|
|
|
var (
|
|
|
lerr error
|
|
|
+ err *errs.Error
|
|
|
elemp reflect.Value
|
|
|
- // ids = []primitive.ObjectID{}
|
|
|
- // limit = int64(f.MaxResults)
|
|
|
- // fields = f.Fields
|
|
|
)
|
|
|
|
|
|
- // defer func() {
|
|
|
- // if errd := recover(); errd != nil {
|
|
|
- // // fmt.Println(errd)
|
|
|
- // // spew.Dump(Trace())
|
|
|
- // // fmt.Println(errors.WithStack(errd.(error)) )
|
|
|
- // LogError(0, fmt.Sprintf("FindMany - %v", errd))
|
|
|
- // }
|
|
|
- // }()
|
|
|
- // Carrega os ids e verifica o intervalo de paginacao da consulta.
|
|
|
- // f.Fields = &bson.M{"_id": 1}
|
|
|
-
|
|
|
- // ids = findIds(f)
|
|
|
-
|
|
|
- // Atualiza a consulta para o conjunto de ids da paginacao
|
|
|
- // f.Fields = fields
|
|
|
- // f.Query = &bson.M{"_id": bson.M{"$in": ids}}
|
|
|
+ if f.QueryType != "aggregate" {
|
|
|
+ f.Pipeline = bson.A{}
|
|
|
|
|
|
- // fmt.Println("Query Type ", f.QueryType)
|
|
|
-
|
|
|
- if f.QueryType == "aggregate" {
|
|
|
- cursor, lerr = f.Aggregate(ctx, collection, nil)
|
|
|
- } else {
|
|
|
- findOptions := options.Find()
|
|
|
- findOptions.SetLimit(int64(f.MaxResults))
|
|
|
+ if f.Sort != nil && len(*f.Sort) > 0 {
|
|
|
+ f.Pipeline = append(f.Pipeline, bson.M{"$sort": f.Sort})
|
|
|
+ }
|
|
|
|
|
|
- // fmt.Printf("sort of query '%s'\n", f.Format)
|
|
|
- // spew.Dump(f.Sort)
|
|
|
- // spew.Dump(f.Fields)
|
|
|
- // spew.Dump(f.Query)
|
|
|
+ f.Pipeline = append(f.Pipeline, bson.M{"$match": f.Query})
|
|
|
|
|
|
- if f.Sort != nil {
|
|
|
- findOptions.SetSort(f.Sort)
|
|
|
+ if f.Fields != nil && len(*f.Fields) > 0 {
|
|
|
+ f.Pipeline = append(f.Pipeline, bson.M{"$project": f.Fields})
|
|
|
}
|
|
|
|
|
|
- if f.Fields != nil {
|
|
|
- findOptions.SetProjection(f.Fields)
|
|
|
+ if f.MaxResults > 0 {
|
|
|
+ f.Pipeline = append(f.Pipeline, bson.M{"$limit": f.MaxResults})
|
|
|
}
|
|
|
-
|
|
|
- cursor, lerr = collection.Find(ctx, f.Query, findOptions)
|
|
|
+ f.QueryType = "aggregate"
|
|
|
}
|
|
|
|
|
|
- if lerr == nil {
|
|
|
- defer cursor.Close(ctx)
|
|
|
-
|
|
|
- // spew.Dump(cursor)
|
|
|
+ wg := sync.WaitGroup{}
|
|
|
+ wg.Add(2)
|
|
|
|
|
|
- if f.Entities == nil {
|
|
|
- lerr = fmt.Errorf("Entities can't be nil")
|
|
|
- goto FindManyError
|
|
|
+ go func() {
|
|
|
+ var countError error
|
|
|
+ defer wg.Done()
|
|
|
+ if f.ResultSizeEstimate, countError = collection.CountDocuments(nil, f.Query); countError != nil {
|
|
|
+ err = errs.FromError(countError)
|
|
|
}
|
|
|
+ }()
|
|
|
|
|
|
- entitiesValue := reflect.ValueOf(f.Entities)
|
|
|
- if entitiesValue.Kind() != reflect.Ptr || entitiesValue.Elem().Kind() != reflect.Slice {
|
|
|
- lerr = fmt.Errorf("Entities argument must be a slice address")
|
|
|
- goto FindManyError
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+ if cursor, err = f.Aggregate(ctx, collection, nil); err != nil {
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
+ defer cursor.Close(ctx)
|
|
|
+
|
|
|
slicev := entitiesValue.Elem()
|
|
|
slicev = slicev.Slice(0, slicev.Cap())
|
|
|
typ := slicev.Type().Elem()
|
|
|
|
|
|
for cursor.Next(ctx) {
|
|
|
-
|
|
|
elemp = reflect.New(typ)
|
|
|
-
|
|
|
- // fmt.Println("aqui")
|
|
|
if lerr = cursor.Decode(elemp.Interface()); lerr != nil {
|
|
|
-
|
|
|
- // spew.Dump(elemp)
|
|
|
- goto FindManyError
|
|
|
+ err = errs.FromError(lerr)
|
|
|
+ return
|
|
|
}
|
|
|
slicev = reflect.Append(slicev, elemp.Elem())
|
|
|
}
|
|
|
|
|
|
- // spew.Dump(slicev)
|
|
|
- entitiesValue.Elem().Set(slicev)
|
|
|
- return
|
|
|
- }
|
|
|
+ if elemp.IsValid() {
|
|
|
+ var (
|
|
|
+ data = map[string]interface{}{}
|
|
|
+ nextPageQuery = map[string]interface{}{}
|
|
|
+ out []byte
|
|
|
+ variablesJson string
|
|
|
+ )
|
|
|
+ if out, lerr = json.Marshal(elemp.Elem().Interface()); lerr != nil {
|
|
|
+ err = errs.FromError(lerr)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if variablesJson, lerr = flatten.FlattenString(string(out), ".", flatten.DotStyle); lerr != nil {
|
|
|
+ err = errs.FromError(lerr)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if lerr = json.Unmarshal([]byte(variablesJson), &data); err != nil {
|
|
|
+ err = errs.FromError(lerr)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ orderOperator := map[int]string{
|
|
|
+ 1: "$gt",
|
|
|
+ -1: "$lt",
|
|
|
+ }
|
|
|
|
|
|
- FindManyError:
|
|
|
- err = errs.FromError(lerr)
|
|
|
- }, func(e *errs.Error) { err = e })
|
|
|
- return
|
|
|
-}
|
|
|
+ if f.Sort == nil {
|
|
|
+ f.Sort = &bson.M{}
|
|
|
+ }
|
|
|
|
|
|
-func createDebugEvent(options *Filter, eventType string, fn func(event *DebugEvent)) {
|
|
|
- // debug := options.Context.Values().Get("#debug")
|
|
|
- if options.Context != nil {
|
|
|
+ if (*f.Sort)["_id"] == nil {
|
|
|
+ (*f.Sort)["_id"] = 1
|
|
|
+ }
|
|
|
|
|
|
- debug, defined := options.Context.Values().Get("#debug").(*DebugTaks)
|
|
|
- if defined {
|
|
|
- event := debug.Event(eventType, "")
|
|
|
- event.Data = iris.Map{}
|
|
|
- fn(event)
|
|
|
- }
|
|
|
- }
|
|
|
+ for prop, order := range *f.Sort {
|
|
|
+ value := data[fmt.Sprintf(".%s", prop)]
|
|
|
+ if prop == "_id" {
|
|
|
+ value = bson.M{"$oid": value}
|
|
|
+ }
|
|
|
+ nextPageQuery[prop] = bson.M{
|
|
|
+ orderOperator[order.(int)]: value,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if out, lerr = json.Marshal(nextPageQuery); lerr != nil {
|
|
|
+ err = errs.FromError(lerr)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ f.NextPageToken = primitive.NewObjectID().Hex()
|
|
|
+ t.FindManyNextPaginationMux.Lock()
|
|
|
+ NextPageTokenMap[f.NextPageToken] = string(out)
|
|
|
+ t.FindManyNextPaginationMux.Unlock()
|
|
|
+ }
|
|
|
+ entitiesValue.Elem().Set(slicev)
|
|
|
+ }()
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+
|
|
|
+ return
|
|
|
+ },
|
|
|
+ func(e *errs.Error) { err = e },
|
|
|
+ )
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
-func (models *Mongo) Exists(options *Filter) (exists bool, err *errs.Error) {
|
|
|
+func (this *Mongo) Exists(actionOptions *Filter) (exists bool, err *errs.Error) {
|
|
|
+ uniq := this.uniqID()
|
|
|
+ fmt.Println("mongo.exists", uniq)
|
|
|
defer func() {
|
|
|
- createDebugEvent(options, "models.exists", func(event *DebugEvent) {
|
|
|
- event.Data = iris.Map{"response": exists}
|
|
|
+ createDebugEvent(actionOptions, "models.exists", func(event *DebugEvent) {
|
|
|
event.Error = err
|
|
|
})
|
|
|
+ fmt.Println("mongo.exists.end", uniq)
|
|
|
}()
|
|
|
|
|
|
- options.Fields = &bson.M{"_id": 1}
|
|
|
+ actionOptions.Entity = map[string]interface{}{}
|
|
|
+ actionOptions.Fields = &bson.M{"_id": 1}
|
|
|
|
|
|
- if _, err = models.FindOne(options); err != nil {
|
|
|
+ if _, err = this.FindOne(actionOptions); err != nil {
|
|
|
|
|
|
return
|
|
|
}
|
|
@@ -897,12 +1044,36 @@ func (t *Mongo) exec(f *Filter, execAction execfn, errorAction func(*errs.Error)
|
|
|
ctx, _ = context.WithTimeout(context.Background(), time.Minute)
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ fmt.Println("###############", t.client == nil)
|
|
|
+
|
|
|
collection := t.client.Database(f.DB).Collection(f.Collection)
|
|
|
|
|
|
execAction(ctx, collection)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
+func createDebugEvent(options *Filter, eventType string, fn func(event *DebugEvent)) {
|
|
|
+ // debug := options.Context.Values().Get("#debug")
|
|
|
+ if options.Context != nil {
|
|
|
+ debug, defined := options.Context.Values().Get("#debug").(*DebugTaks)
|
|
|
+ if defined {
|
|
|
+ event := debug.Event(eventType, "")
|
|
|
+ event.Data = iris.Map{
|
|
|
+ "database": options.DB,
|
|
|
+ "collection": options.Collection,
|
|
|
+ "query": options.Query,
|
|
|
+ "aggregate": options.Pipeline,
|
|
|
+ "sort": options.Sort,
|
|
|
+ "fields": options.Fields,
|
|
|
+ "maxResults": options.MaxResults,
|
|
|
+ "patchs": options.Patchs,
|
|
|
+ }
|
|
|
+ fn(event)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func DeletedPatch() *bson.A {
|
|
|
return &bson.A{
|
|
|
bson.M{
|