package api import ( "bytes" "context" "encoding/base64" "encoding/json" "fmt" // "html/template" "reflect" "regexp" "strconv" "strings" "sync" "text/template" "time" "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs" "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/types" "github.com/davecgh/go-spew/spew" "github.com/jeremywohl/flatten" "github.com/kataras/iris" iriscontext "github.com/kataras/iris/v12/context" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type ParseQueryHandler = func(ctx iriscontext.Context) (err *errs.Error) const ( DefaultAddrConnect = "mongodb://localhost:27017" InsertOne = iota + 1 InsertMany Patch ) var ( QueriesMap = map[string]*template.Template{} ParseQueryHandlers = map[string]ParseQueryHandler{} EMPTYQUERY_REGEX = regexp.MustCompile(`{\s*}`) NextPageTokenMap = map[string]string{} ) func GetSessionContext(ctx iriscontext.Context) mongo.SessionContext { context := ctx.Values().Get("$SessionContext") if context != nil { return context.(mongo.SessionContext) } return nil } func init() { errs.RegisterMapErrorFunction("MONGO_ERROS", func(source error) (err *errs.Error) { message := source.Error() switch { case strings.Contains(message, "E11000"): err = errs.AlreadyExists().Details(&errs.Detail{ Reason: "DUPLICATED_ITEM", Message: message, }) case strings.Contains(message, "cannot decode"): err = errs.DataCaps().Details(&errs.Detail{ Message: message, }) case strings.Contains(message, "no documents in result"): errs.NotFound().Details(&errs.Detail{ Reason: "NOT_FOUND", Message: message, }) } return }) } type Mongo struct { // Sessions map[string]*mgo.Session // Session *mgo.Session // Uri string // Configs //DefaultDB string // User string `json:"user"` // Password string `json:"password"` // DataBase string `json:"database"` // Addrs string `json:"addrs"` FindManyNextPaginationMux sync.Mutex // NextPageTokenMap map[string]string Uniq int64 subject *BehaviorSubjectStruct client *mongo.Client Credential *options.Credential Config string `json:"config"` Clients map[string]*mongo.Client } type execfn func(context.Context, *mongo.Collection) type EntityInterface interface { Patch() *bson.A } // Modelo de comunicação entre o controle da API e a camada de persistencia. type Filter struct { Id primitive.ObjectID UserId primitive.ObjectID // NextPageToken primitive.ObjectID // PageToken PageToken Fields *bson.M Query *bson.M Patchs *bson.A Sort *bson.M Context iriscontext.Context // Sort []string NextPageToken string Collection string QueryType string DB string Format string Insertion int ResultSizeEstimate int64 CheckQuery bool IgnoreNoEntityFound bool MaxResults int Entity interface{} Entities interface{} SessionContext mongo.SessionContext // Aggregate interface{} Pipeline primitive.A Options interface{} // PatchOptions = patch.Patch() } func (f *Filter) Aggregate(ctx context.Context, collection *mongo.Collection, opts *options.AggregateOptions) (cursor *mongo.Cursor, err *errs.Error) { var errl error if opts == nil { opts = options.Aggregate().SetBatchSize(int32(f.MaxResults)).SetCollation(&options.Collation{ Strength: 1, Locale: "en", }) } if cursor, errl = collection.Aggregate(ctx, f.Pipeline, opts); errl != nil { err = errs.FromError(errl) } return } func (t *Filter) Check() (err *errs.Error) { if t.CheckQuery { if t.Query == nil { if !t.Id.IsZero() { t.Query = &bson.M{"_id": t.Id} } else { err = errs.InvalidArgument().Details(&errs.Detail{ LocationType: "query", Location: "q", Message: "emptyValue", }) return } } } if t.Query == nil { t.Query = &bson.M{} } return } func (mongo *Mongo) uniqID() int64 { mongo.Uniq++ return mongo.Uniq } func (t *Mongo) Ready() *BehaviorSubjectStruct { if t.subject == nil { t.subject = BehaviorSubject() } return t.subject } func (t *Mongo) Init() (err error) { defer func() { spew.Dump(err) }() t.FindManyNextPaginationMux = sync.Mutex{} NextPageTokenMap = map[string]string{} fmt.Printf("Connection string '%s'", t.Config) clientOpts := options.Client().ApplyURI(t.Config) if t.Credential != nil { clientOpts.SetAuth(*t.Credential) } connectContext, _ := context.WithTimeout(context.Background(), 10*time.Second) // if t.client, err = mongo.NewClient(setOptions); err != nil { if t.client, err = mongo.Connect( connectContext, clientOpts, ); err != nil { return } // ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) // defer cancel() // if err = t.client.Connect(ctx); err != nil { // return // } t.Ready().Next(true) return nil } func (t *Mongo) CommitTransaction(opts *options.SessionOptions) { } func (t *Mongo) StartTransaction( sessionOptions *options.SessionOptions, transactionOptions *options.TransactionOptions, ) ( sessionContext mongo.SessionContext, err *errs.Error, ) { var ( session mongo.Session localError error ) defer func() { if localError != nil { err = errs.Internal().Details(&errs.Detail{ Message: localError.Error(), }) } }() if session, localError = t.client.StartSession(sessionOptions); localError == nil { ctx, _ := context.WithTimeout(context.Background(), 60*time.Second) mongo.WithSession(ctx, session, func(sctx mongo.SessionContext) error { if localError = session.StartTransaction(transactionOptions); localError == nil { sessionContext = sctx } return nil }) } return } func (t *Mongo) Dispatch(f *Filter) { } func (t *Mongo) InsertOne(f *Filter) (res *mongo.InsertOneResult, err *errs.Error) { defer func() { createDebugEvent(f, "models.insert.one", func(event *DebugEvent) { event.Error = err }) }() f.Insertion = InsertOne t.exec(f, func(ctx context.Context, collection *mongo.Collection) { var lerr error if res, lerr = collection.InsertOne(ctx, f.Entity); lerr != nil { err = errs.FromError(lerr) } }, func(e *errs.Error) { err = e }) return } func (t *Mongo) InsertMany(f *Filter) (res *mongo.InsertManyResult, err *errs.Error) { defer func() { createDebugEvent(f, "models.insert.many", func(event *DebugEvent) { event.Error = err }) }() f.Insertion = InsertMany t.exec(f, func(ctx context.Context, collection *mongo.Collection) { var ( lerr error entities []interface{} value = reflect.ValueOf(f.Entities) ) for index := 0; index < value.Len(); index++ { entities = append(entities, value.Index(index).Interface()) } if res, lerr = collection.InsertMany(ctx, entities); lerr != nil { err = errs.FromError(lerr) } }, func(e *errs.Error) { err = e }) return } //Remove os elementos da colecao, selecionados pela query func (t *Mongo) RemoveOne(f *Filter) (res *mongo.DeleteResult, err *errs.Error) { defer func() { createDebugEvent(f, "models.remove.one", func(event *DebugEvent) { event.Error = err }) }() f.CheckQuery = true t.exec(f, func(ctx context.Context, collection *mongo.Collection) { var lerr error fmt.Println("remove.one") spew.Dump(f.Query) if res, lerr = collection.DeleteOne(ctx, f.Query); lerr != nil { err = errs.FromError(lerr) } else if res.DeletedCount == 0 { err = errs.NotFound().Details(&errs.Detail{ Message: "No entity has been deleted", }) } }, func(e *errs.Error) { err = e }) return } //Remove os elementos da colecao, selecionados pela query func (t *Mongo) RemoveMany(f *Filter) (res *mongo.DeleteResult, err *errs.Error) { var lerr error var deleteOptions *options.DeleteOptions defer func() { createDebugEvent(f, "models.remove.many", func(event *DebugEvent) { event.Error = err }) }() f.CheckQuery = true if f.Options != nil { deleteOptions = f.Options.(*options.DeleteOptions) } t.exec(f, func(ctx context.Context, collection *mongo.Collection) { if res, lerr = collection.DeleteMany( ctx, f.Query, deleteOptions, ); lerr != nil { err = errs.FromError(lerr) } // else if res.DeletedCount == 0 { // err = errs.NotFound().Details(&errs.Detail{ // Message: "No entity has been deleted", // }) // } }, func(e *errs.Error) { err = e }) return } func (t *Mongo) UpdateOne(f *Filter) (res *mongo.UpdateResult, err *errs.Error) { var lerr error defer func() { createDebugEvent(f, "models.update.one", func(event *DebugEvent) { event.Error = err }) }() f.Insertion = InsertOne f.CheckQuery = true t.exec(f, func(ctx context.Context, collection *mongo.Collection) { if res, lerr = collection.ReplaceOne(ctx, f.Query, f.Entity); lerr != nil { err = errs.FromError(lerr) } else if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 { err = errs.NotFound().Details(&errs.Detail{ Message: "No entity has been updated", }) } }, func(e *errs.Error) { err = e }) return } func (t *Mongo) UpdateMany(f *Filter) (res *mongo.UpdateResult, err *errs.Error) { var ( lerr error entity interface{} entities []interface{} value = reflect.ValueOf(f.Entities) ) defer func() { createDebugEvent(f, "models.update.many", func(event *DebugEvent) { event.Error = err }) }() f.Insertion = InsertMany f.CheckQuery = true t.exec(f, func(ctx context.Context, collection *mongo.Collection) { for index := 0; index < value.Len(); index++ { entity = value.Index(index).Interface() entities = append(entities, entity) } if res, lerr = collection.UpdateMany(ctx, f.Query, entities); lerr != nil { err = errs.FromError(lerr) } else if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 { err = errs.NotFound().Details(&errs.Detail{ Message: "No entity has been updated ", }) } }, func(e *errs.Error) { err = e }) return } func RegisterQuery(id, query string) { QueriesMap[id] = template.Must(template.New(id).Parse(query)) } func BeforeParseQueryHandler(id string, fn ParseQueryHandler) { ParseQueryHandlers[id] = fn } func getQueryHandler(id string) ParseQueryHandler { return ParseQueryHandlers[id] } func ParseQuery(ctx iriscontext.Context, filter *Filter, basequery string, data map[string]interface{}) (err *errs.Error) { var ( found bool templateErr error buf bytes.Buffer baseQueryTemplate *template.Template // baseQeuryDocument *bson.M // errbq *errs.Error nextPageToken string queryDecodedString []byte values = ctx.Values() userQueryString = Q(ctx, "q", "e30=") // default base64 encoded from "{}" includeInTrash, _ = strconv.ParseBool(Q(ctx, "includeInTrash", "false")) // default base64 encoded from "{}" ) // Faz o parse da userQueryString enviada pelo usuario que deve ser um base64 if queryDecodedString, templateErr = base64.StdEncoding.DecodeString(userQueryString); templateErr != nil { err = errs.InvalidArgument().Details(&errs.Detail{ Message: "Query param isn't a base64 json", }) return } if baseQueryTemplate, found = QueriesMap[basequery]; !found { err = errs.Internal().Details(&errs.Detail{ Message: "Invalid query input", }) return } // transclude the query to base query fmt.Println("##########################[", filter.NextPageToken, "]") // spew.Dump(t.NextPageTokenMap) if nextPageToken, found = NextPageTokenMap[filter.NextPageToken]; !found { nextPageToken = "{}" } data["_transclude_nextPageToken"] = nextPageToken data["_transclude_true"] = string(queryDecodedString) data["_deleted_"] = includeInTrash values.Set("eon.query.data", data) values.Set("eon.query.id", basequery) values.Set("eon.query.options", filter) if handler := getQueryHandler(basequery); handler != nil { if err = handler(ctx); err != nil { return } } if templateErr = baseQueryTemplate.Execute(&buf, data); templateErr != nil { err = errs.InvalidArgument().Details(&errs.Detail{ Message: "Failed on interpolate data with template query", }) return } x := buf.Bytes() fmt.Println("buf.Bytes()", string(x)) if templateErr = bson.UnmarshalExtJSON(x, false, &filter.Query); templateErr != nil { err = errs.InvalidArgument().Details(&errs.Detail{ Message: "Failed on interpolate data with template query", }) return } return } func (this *Mongo) PatchOne(actionOptions *Filter) (res *mongo.UpdateResult, err *errs.Error) { uniq := this.uniqID() fmt.Println("mongo.patch.many", uniq) defer func() { createDebugEvent(actionOptions, "models.patch.many", func(event *DebugEvent) { event.Error = err }) fmt.Println("mongo.patch.many.end", uniq) }() actionOptions.Insertion = Patch actionOptions.CheckQuery = true this.exec(actionOptions, func(ctx context.Context, collection *mongo.Collection) { var ( lerr error // client *mongo.Client ) for _, p := range *actionOptions.Patchs { if actionOptions.Options == nil { actionOptions.Options = options.Update() } if res, lerr = collection.UpdateOne(ctx, actionOptions.Query, p, actionOptions.Options.(*options.UpdateOptions)); lerr != nil { // ce := lerr.(mongo.CommandError) // fmt.Println("----------------------------", // ce.Message, // ce.Labels, // ce.Code, // ce.Name, // ) // f.Context.Application().Logger().Errorf(lerr.Error()) err = errs.FromError(lerr) return } if !actionOptions.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 { err = errs.NotFound().Details(&errs.Detail{ Message: "No entity has been updated", }) return } } // if client, err = t.GetClient(f.DB); err != nil { // return // } // spew.Dump(f.Query) // spew.Dump(f.Patchs) // id := primitive.NewObjectID() // t.client.UseSessionWithOptions( // ctx, // options.Session().SetDefaultReadPreference(readpref.Primary()), // func(sctx mongo.SessionContext) error { // defer func() { // if err != nil { // sctx.AbortTransaction(sctx) // } else { // sctx.CommitTransaction(sctx) // } // sctx.EndSession(sctx) // }() // sctx.StartTransaction(options.Transaction(). // SetReadConcern(readconcern.Snapshot()). // SetWriteConcern(writeconcern.New(writeconcern.WMajority()))) // for _, p := range *f.Patchs { // if f.Options == nil { // f.Options = options.Update() // } // if res, lerr = collection.UpdateOne(ctx, f.Query, p, f.Options.(*options.UpdateOptions)); lerr != nil { // err = errs.FromError(lerr) // return lerr // } // if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 { // err = errs.NotFound().Details(&errs.Detail{ // Message: "No entity has been updated", // }) // return fmt.Errorf("No entity has been updated") // } // } // return nil // }, // ) }, func(e *errs.Error) { err = e }) return } func (this *Mongo) PatchMany(actionOptions *Filter) (res *mongo.UpdateResult, err *errs.Error) { uniq := this.uniqID() fmt.Println("mongo.patch.many", uniq) defer func() { createDebugEvent(actionOptions, "models.patch.many", func(event *DebugEvent) { event.Error = err }) fmt.Println("mongo.patch.many.end", uniq) }() actionOptions.Insertion = Patch actionOptions.CheckQuery = true this.exec(actionOptions, func(ctx context.Context, collection *mongo.Collection) { var ( lerr error // client *mongo.Client ) for _, p := range *actionOptions.Patchs { if res, lerr = collection.UpdateMany(ctx, actionOptions.Query, p); lerr != nil { err = errs.FromError(lerr) return } if !actionOptions.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 { err = errs.NotFound().Details(&errs.Detail{ Message: "No entity has been updated", }) err.CodeText = "noEntityUpdated" // fmt.Errorf("No entity has been updated") return } } // if client, err = t.GetClient(f.DB); err != nil { // return // } // o, _ := json.MarshalIndent(f.Query, "", " ") // fmt.Println("query", o) // o, _ = json.MarshalIndent(f.Patchs, "", " ") // fmt.Println("patch", o) // t.client.UseSessionWithOptions( // ctx, // options.Session().SetDefaultReadPreference(readpref.Primary()), // func(sctx mongo.SessionContext) error { // defer func() { // if err != nil { // sctx.AbortTransaction(sctx) // } else { // sctx.CommitTransaction(sctx) // } // sctx.EndSession(sctx) // }() // sctx.StartTransaction( // options.Transaction(). // SetReadConcern(readconcern.Snapshot()). // SetWriteConcern(writeconcern.New(writeconcern.WMajority())), // ) // for _, p := range *f.Patchs { // if res, lerr = collection.UpdateMany(ctx, f.Query, p); lerr != nil { // err = errs.FromError(lerr) // return lerr // } // if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 { // err = errs.NotFound().Details(&errs.Detail{ // Message: "No entity has been updated", // }) // err.CodeText = "noEntityUpdated" // return fmt.Errorf("No entity has been updated") // } // } // return nil // }, // ) }, func(e *errs.Error) { err = e }) return } func (mongo *Mongo) CreateIndex(database, collectionString string, index mongo.IndexModel) *errs.Error { collection := mongo.client.Database(database).Collection(collectionString) opts := options.CreateIndexes().SetMaxTime(10 * time.Second) if _, err := collection.Indexes().CreateOne(context.Background(), index, opts); err != nil { return errs.FromError(err) } return nil } func (mongo *Mongo) FindOneRx(options *Filter) *ObservableStruct { return Observable(func(observer *Subscriber) { if res, err := mongo.FindOne(options); err != nil { observer.Err(err) } else { observer.Next(res) } }) } func (t *Mongo) FindOne(f *Filter) (res *mongo.SingleResult, err *errs.Error) { uniq := t.uniqID() fmt.Println("mongo.find.one", uniq) defer func() { createDebugEvent(f, "models.find.one", func(event *DebugEvent) { event.Error = err }) fmt.Println("mongo.find.one.end", uniq, f.Collection) spew.Dump(f.Query) }() f.CheckQuery = (f.QueryType != "aggregate") t.exec(f, func(ctx context.Context, collection *mongo.Collection) { var ( lerr error cursor *mongo.Cursor ) if f.QueryType == "aggregate" { if cursor, err = f.Aggregate(ctx, collection, nil); err != nil { return } if cursor.Next(ctx) { if lerr = cursor.Decode(f.Entity); lerr == nil { return } } else { lerr = fmt.Errorf("No document found") } } else { res = collection.FindOne(ctx, f.Query) if lerr = res.Err(); lerr == nil { if lerr = res.Decode(f.Entity); lerr == nil { return } } } err = errs.FromError(lerr) }, func(e *errs.Error) { err = e }) return } func findIds(filter *Filter) (ids []primitive.ObjectID) { ids = []primitive.ObjectID{} // findIdsFilter = &Filter{} // cursor, err := executeFind(findIdsFilter) return } // if f.QueryType == "aggregate" { // cursor, lerr = f.Aggregate(ctx, collection, nil) // fmt.Println("-------------------------------------------------------------------------------after aggregate") // } else { // findOptions := options.Find() // findOptions.SetLimit(int64(f.MaxResults)).SetCollation(&options.Collation{ // Locale: "en", // Strength: 1, // }) // // fmt.Printf("sort of query '%s'\n", f.Format) // // spew.Dump(f.Sort) // // spew.Dump(f.Fields) // // spew.Dump(f.Query) // if f.Sort != nil { // findOptions.SetSort(f.Sort) // } // if f.Fields != nil { // findOptions.SetProjection(f.Fields) // } // cursor, lerr = collection.Find(ctx, f.Query, findOptions) // } func (t *Mongo) FindMany(f *Filter) (cursor *mongo.Cursor, err *errs.Error) { uniq := t.uniqID() fmt.Println("mongo.find.many", uniq) defer func() { createDebugEvent(f, "models.find.many", func(event *DebugEvent) { event.Error = err }) fmt.Println("mongo.find.many.end", uniq, f.Collection) }() if f.Entities == nil { err = errs.Internal().Details(&errs.Detail{ Message: "Entities can't be nil", }) return } entitiesValue := reflect.ValueOf(f.Entities) if entitiesValue.Kind() != reflect.Ptr || entitiesValue.Elem().Kind() != reflect.Slice { err = errs.Internal().Details(&errs.Detail{ Message: "Entities argument must be a slice address", }) } t.exec(f, func(ctx context.Context, collection *mongo.Collection) { var ( lerr error err *errs.Error elemp reflect.Value ) if f.QueryType != "aggregate" { f.Pipeline = bson.A{} if f.Sort != nil && len(*f.Sort) > 0 { f.Pipeline = append(f.Pipeline, bson.M{"$sort": f.Sort}) } f.Pipeline = append(f.Pipeline, bson.M{"$match": f.Query}) if f.Fields != nil && len(*f.Fields) > 0 { f.Pipeline = append(f.Pipeline, bson.M{"$project": f.Fields}) } if f.MaxResults > 0 { f.Pipeline = append(f.Pipeline, bson.M{"$limit": f.MaxResults}) } f.QueryType = "aggregate" } wg := sync.WaitGroup{} wg.Add(2) out, _ := json.Marshal(f.Query) fmt.Println("))))))))))))))))))))))", string(out)) go func() { var countError error defer wg.Done() if f.ResultSizeEstimate, countError = collection.CountDocuments(nil, f.Query); countError != nil { err = errs.FromError(countError) } }() go func() { defer wg.Done() if cursor, err = f.Aggregate(ctx, collection, nil); err != nil { return } defer cursor.Close(ctx) slicev := entitiesValue.Elem() slicev = slicev.Slice(0, slicev.Cap()) typ := slicev.Type().Elem() for cursor.Next(ctx) { elemp = reflect.New(typ) if lerr = cursor.Decode(elemp.Interface()); lerr != nil { err = errs.FromError(lerr) return } slicev = reflect.Append(slicev, elemp.Elem()) } if elemp.IsValid() { var ( data = map[string]interface{}{} nextPageQuery = map[string]interface{}{} out []byte variablesJson string ) if out, lerr = json.Marshal(elemp.Elem().Interface()); lerr != nil { err = errs.FromError(lerr) return } if variablesJson, lerr = flatten.FlattenString(string(out), ".", flatten.DotStyle); lerr != nil { err = errs.FromError(lerr) return } if lerr = json.Unmarshal([]byte(variablesJson), &data); err != nil { err = errs.FromError(lerr) return } if f.Sort != nil { orderOperator := map[int]string{ 1: "$gte", -1: "$lte", } var firstPropOrder *int for prop, order := range *f.Sort { value := data[fmt.Sprintf(".%s", prop)] if prop == "_id" { value = bson.M{"$oid": value} } nextPageQuery[prop] = bson.M{ orderOperator[order.(int)]: value, } if firstPropOrder == nil { firstPropOrder = types.Int(order.(int)) } } if (*f.Sort)["_id"] == nil { if firstPropOrder == nil { firstPropOrder = types.Int(1) } orderOperator = map[int]string{ 1: "$gt", -1: "$lt", } nextPageQuery["_id"] = bson.M{ orderOperator[*firstPropOrder]: bson.M{"$oid": data["._id"]}, } } } if out, lerr = json.Marshal(nextPageQuery); lerr != nil { err = errs.FromError(lerr) return } f.NextPageToken = primitive.NewObjectID().Hex() t.FindManyNextPaginationMux.Lock() NextPageTokenMap[f.NextPageToken] = string(out) // fmt.Println("=========================================\n") // fmt.Println("string(out):", string(out)) // fmt.Println("=========================================\n\n") t.FindManyNextPaginationMux.Unlock() } entitiesValue.Elem().Set(slicev) }() wg.Wait() return }, func(e *errs.Error) { err = e }, ) return } func (this *Mongo) Exists(actionOptions *Filter) (exists bool, err *errs.Error) { uniq := this.uniqID() fmt.Println("mongo.exists", uniq) defer func() { createDebugEvent(actionOptions, "models.exists", func(event *DebugEvent) { event.Error = err }) fmt.Println("mongo.exists.end", uniq) }() actionOptions.Entity = map[string]interface{}{} actionOptions.Fields = &bson.M{"_id": 1} if _, err = this.FindOne(actionOptions); err != nil { return } exists = true return } func (driver *Mongo) GetContext(options *Filter) (context.Context, *mongo.Collection) { var ctx context.Context if options.SessionContext != nil { ctx = options.SessionContext } else { ctx, _ = context.WithTimeout(context.Background(), 40*time.Second) } collection := driver.client.Database(options.DB).Collection(options.Collection) return ctx, collection } func (t *Mongo) exec(f *Filter, execAction execfn, errorAction func(*errs.Error)) { var ( err *errs.Error errl error ctx context.Context // client *mongo.Client ) defer func() { if err == nil { if errl == nil { return } err = errs.FromError(errl) } // spew.Dump(err) errorAction(err) }() if err = f.Check(); err != nil { return } switch f.Insertion { case InsertOne: if f.Entity == nil { errl = fmt.Errorf("Entity can't be nil") return } case InsertMany: if f.Entities == nil { errl = fmt.Errorf("Entities can't be nil") return } case Patch: if f.Patchs == nil { errl = fmt.Errorf("Patchs can't be nil") return } } // fmt.Println("passei do exec") if f.SessionContext != nil { ctx = f.SessionContext } else { ctx, _ = context.WithTimeout(context.Background(), time.Minute) } fmt.Println("###############", t.client == nil) collection := t.client.Database(f.DB).Collection(f.Collection) execAction(ctx, collection) return } func (this *Mongo) CreateCollection(createOptions *Filter) (exists bool, err *errs.Error) { // createCmd := bson.D{ // {"create", createOptions.Collection}, // } // sess := this.client.StartSession() // sess. return // if sess != nil { // err := mongo.WithSession(mtest.Background, sess, func(sc mongo.SessionContext) error { // return mt.DB.RunCommand(sc, createCmd).Err() // }) // return err // } // return mt.DB.RunCommand(mtest.Background, createCmd).Err() // } } func createDebugEvent(options *Filter, eventType string, fn func(event *DebugEvent)) { // debug := options.Context.Values().Get("#debug") if options.Context != nil { debug, defined := options.Context.Values().Get("#debug").(*DebugTaks) if defined { event := debug.Event(eventType, "") event.Data = iris.Map{ "database": options.DB, "collection": options.Collection, "query": options.Query, "aggregate": options.Pipeline, "sort": options.Sort, "fields": options.Fields, "maxResults": options.MaxResults, "patchs": options.Patchs, } fn(event) } } } func DeletedPatch() *bson.A { return &bson.A{ bson.M{ "$set": bson.M{ "deleted": true, "deletedIn": time.Now().Unix(), }, }, } } // func mergeQueries(filter *Filter) *bson.M { // if baseQeuryDocument != nil { // if filter.Query == nil { // filter.Query = baseQeuryDocument // } else { // filter.Query = &bson.M{ // "$and": bson.A{baseQeuryDocument, filter.Query}, // } // } // } else if filter.Query == nil { // filter.Query = &bson.M{} // } // } // func yieldIndexModel() mongo.IndexModel { // keys := bsonx.Doc{{Key: *key, Value: bsonx.Int32(int32(*value))}} // index := mongo.IndexModel{} // index.Keys = keys // if *unique { // index.Options = bsonx.Doc{{Key: "unique", Value: bsonx.Boolean(true)}} // } // return index // } // func (t *Mongo) GetClient() (*mgo.Session, *errs.Error) { // func (t *Mongo) GetClient(id string) (*mongo.Client, *errs.Error) { // var ( // client *mongo.Client // found bool // ) // if id == "" { // panic("Client id not defined!") // } // if client, found = t.Clients[id]; !found { // return nil, FullError(ERR_SERVICE_UNAVAILABLE, &errs.Detail{ // Message: fmt.Sprintf("Client %s not exists!", id), // }) // } // return client, nil // } // func errorcheck(err error) *errs.Error { // if err != nil { // return FullError(ERR_GENERAL, &errs.Detail{ // Message: err.Error(), // }) // } // return nil // } // func Paginate(collection *mongo.Collection, startValue primitive.ObjectID, nPerPage int64) ([]bson.D, *bson.Value, error) { // // Query range filter using the default indexed _id field. // filter := bson.VC.DocumentFromElements( // bson.EC.SubDocumentFromElements( // "_id", // bson.EC.ObjectID("$gt", startValue), // ), // ) // var opts []findopt.Find // opts = append(opts, findopt.Sort(bson.NewDocument(bson.EC.Int32("_id", -1)))) // opts = append(opts, findopt.Limit(nPerPage)) // cursor, _ := collection.Find(context.Background(), filter, opts...) // var lastValue *bson.Value // var results []bson.Document // for cursor.Next(context.Background()) { // elem := bson.NewDocument() // err := cursor.Decode(elem) // if err != nil { // return results, lastValue, err // } // results = append(results, *elem) // lastValue = elem.Lookup("_id") // } // return results, lastValue, nil // } // func (t *Mongo) Patch(f *Filter) *errs.Error { // var ( // entityinterface EntityInterface // col *mongo.Collection // entity interface{} // ok bool // session *mgo.Session // err error // x *bson.M // ) // if session, err = t.GetClient(f.DB); err == nil { // if !f.Id.IsZero() { // f.Query = &bson.M{"_id": f.Id} // } else if f.Query == nil { // err = fmt.Errorf("Query not defined!") // goto ErrorPatch // } // defer session.Close() // if entityinterface, ok = f.Entity.(EntityInterface); ok { // entity = entityinterface.Update() // x = entity.(*bson.M) // // delete(*x, "$set") // } else { // entity = bson.M{"$set": f.Entity} // } // col = session.DB(f.DB).C(f.Collection) // // entity["$set"] // spew.Dump(entity) // _, err = col.Upsert(f.Query, entity) // } // ErrorPatch: // if err != nil { // return Error(ERR_PERSIST, err.Error()) // } // return nil // } // func (t *Mongo) Upsert(f *Filter) *errs.Error { // var ( // entityinterface EntityInterface // col *mongo.Collection // entity interface{} // ok bool // ) // session, err := t.GetClient(f.DB) // if err == nil { // if !f.Id.IsZero() { // f.Query = &bson.M{"_id": f.Id} // } else if f.Query == nil { // err = fmt.Errorf("Query not defined!") // goto ErrorUp // } // defer session.Close() // col = session.DB(f.DB).C(f.Collection) // // update = bson.M{"$set": f.Entity} // // if data, ok = f.Entity.Push(); ok { // // update["$push"] = data // // } // // if data, ok = f.Entity.Pull(); ok { // // update["$pull"] = data // // } // // spew.Dump(f.Entity) // if entityinterface, ok = f.Entity.(EntityInterface); ok { // // fmt.Println("Implement interface") // entity = entityinterface.Update() // } else { // entity = f.Entity // } // // spew.Dump(entity) // // _, err = col.Upsert(f.Query, entity) // err = col.Update(f.Query, entity) // } // ErrorUp: // if err != nil { // return Error(ERR_PERSIST, err.Error()) // } // return nil // } //----------------------------------------------------------------------- // func (t *Mongo) Find(f *Filter, one bool) (*errs.Error, int) { // var cursor string // pageToken := &f.PageToken // session, err := t.GetClient(f.DB) // if err == nil { // defer session.Close() // if f.Query == nil { // if one { // if !f.Id.IsZero() { // f.Query = &bson.M{"_id": f.Id} // } else { // return Error(ERR_INVALID_PARAM_VALUE, "Param id not valid."), 0 // } // } else if f.Query == nil { // f.Query = &bson.M{} // } // } // query := minquery.New(session.DB(f.DB), f.Collection, f.Query) // // Se tem um token de paginacao // hasToken := pageToken.HasToken() // if hasToken { // fmt.Println("consultando com token", pageToken.Cursor) // query = query.Cursor(pageToken.Cursor) // } // // cursorFields := []string{"_id"} // cursorFields := []string{} // if len(f.Sort) > 0 { // query.Sort(f.Sort...) // for _, key := range f.Sort { // if !strings.Contains(key, ".") { // cursorFields = append(cursorFields, key) // } // } // } else { // cursorFields = append(cursorFields, "_id") // } // // Seleciona os campos se forem especificados // if f.Fields != nil { // query.Select(f.Fields) // } // // Determina o numero de itens a ser retornado // if f.MaxResults > 0 { // query.Limit(f.MaxResults) // } // if one { // err = query.One(f.Entity) // pageToken.Count = 1 // Filter one // } else { // // spew.Dump(f.Entity, cursorFields) // if cursor, err = query.All(f.Entity, cursorFields...); err != nil { // goto ErroFind // } // // Numero total de documentos // // Se tem um token de paginacao o valor total esta no token // // Caso contrario consulta no banco // if !hasToken { // c := session.DB(f.DB).C(f.Collection) // if pageToken.Count, err = c.Find(f.Query).Select(&bson.M{"_id": 1}).Count(); err != nil { // goto ErroFind // } // } // // if () // fmt.Println("Cursor return", cursor, "-") // pageToken.NewCursor = cursor // } // } // ErroFind: // if err != nil { // return Error(ERR_PERSIST, err.Error()), 0 // } // return nil, pageToken.Count // } // func (t *Mongo) FindOne(f *Filter) (*errs.Error, int) { // return t.Find(f, true) // } // func (t *Mongo) FindAll(f *Filter) (*errs.Error, int) { // return t.Find(f, false) // } // func (t *Mongo) Count(f *Filter) (*errs.Error, int) { // session, err := t.GetClient(f.DB) // if err == nil { // defer session.Close() // if f.Query == nil { // f.Query = &bson.M{} // } // query := session.DB(f.DB).C(f.Collection).Find(f.Query) // // Seleciona os campos se forem especificados // f.PageToken.Count, err = query.Select(&bson.M{"_id": 1}).Count() // } // if err != nil { // return Error(ERR_PERSIST, err.Error()), 0 // } // return nil, f.PageToken.Count // } // func (t *Mongo) Update(f *Filter) *errs.Error { // var ( // entityinterface EntityInterface // entity interface{} // ) // one := false // ok := false // session, err := t.GetClient(f.DB) // if err == nil { // defer session.Close() // if !f.Id.IsZero() { // one = true // f.Query = &bson.M{"_id": f.Id} // } else if f.Query == nil { // err = fmt.Errorf("Query not defined!") // } // if entityinterface, ok = f.Entity.(EntityInterface); ok { // entity = entityinterface.Update() // } else { // entity = f.Entity // } // col := session.DB(f.DB).C(f.Collection) // if one { // err = col.Update(f.Query, entity) // } else { // _, err = col.UpdateAll(f.Query, entity) // } // } // if err != nil { // return Error(ERR_PERSIST, err.Error()) // } // return nil // } //----------------------------------------------------------------------- // func (t *Mongo) Aggregation(f *Filter, q []bson.M, one bool) *errs.Error { // session, err := t.GetClient(f.DB) // if err == nil { // defer session.Close() // pipe := session.DB(f.DB).C(f.Collection).Pipe(q) // if one { // err = pipe.One(&f.Entity) // } else { // err = pipe.All(&f.Entity) // } // } // if err != nil { // return Error(ERR_PERSIST, err.Error()) // } // return nil // } // all := true // session, err := t.GetClient(f.DB) // if err == nil { // defer session.Close() // if !f.Id.IsZero() { // all = false // f.Query = &bson.M{"_id": f.Id} // } else if f.Query == nil { // err = fmt.Errorf("Query not defined!") // } // C := session.DB(f.DB).C(f.Collection) // if all { // _, err = C.RemoveAll(f.Query) // } else { // err = C.Remove(f.Query) // } // } // if err != nil { // return Error(ERR_PERSIST, err.Error()) // } // return nil // VERIFICAR A NECESSIDADE E CORRIGIR O ERRO // func (t *Mongo) GetArrayById(f *Filter, ids []primitive.ObjectID) error { // session, err := t.GetClient(f.DB) // if err != nil { // return err // } // defer session.Close() // q := bson.M{"_id": bson.M{"$in": ids}} // err = session.DB(f.DB).C(f.Collection).Find(q).One(f.Entity) // return err // } // func (t *Mongo) GetEmbedFromArray(f *Filter, embed string, id primitive.ObjectID, sid primitive.ObjectID) (errs error) { // session, err := t.GetClient(f.DB) // if err != nil { // return err // } // defer session.Close() // field := "$" + embed // match := bson.M{"_id": id} // match[embed] = bson.M{"$exists": true} // aggregations := []bson.M{ // bson.M{"$unwind": field}, // bson.M{"$match": match}, // bson.M{"$replaceRoot": bson.M{"newRoot": field}}, // bson.M{"$match": bson.M{"_id": sid}}, // } // C := session.DB(f.DB).C(f.Collection) // return C.Pipe(aggregations).One(&f.Entity) // } // func (t *Mongo) UpdateEmbedFromArray(f *Filter, embed string, id primitive.ObjectID, sid primitive.ObjectID) error { // session, err := t.GetClient(f.DB) // if err != nil { // return err // } // defer session.Close() // col := session.DB(f.DB).C(f.Collection) // //Define o elemento que sera atualizado // element := bson.M{} // element[embed+".$"] = &f.Entity // //Define o seletor do elemento que sera atualizado // selector := bson.M{"_id": id} // selector[embed+"._id"] = sid // return col.Update(selector, bson.M{"$set": element}) // } // func (t *Mongo) AddEmbedFromArray(f *Filter, embed string, id primitive.ObjectID) error { // session, errs := t.GetClient(f.DB) // if errs != nil { // return errs // } // defer session.Close() // col := session.DB(f.DB).C(f.Collection) // //Define o elemento que sera atualizado // element := bson.M{} // element[embed] = &f.Entity // //Define o seletor do elemento que sera atualizado // selector := bson.M{"_id": id} // return col.Update(selector, bson.M{"$push": element}) // } // 0000000000000000000000000000000000000000000000000000000000000000 // func (t *Mongo) NewTransaction(dbname string) (*Transaction, error) { // session, e := t.GetClient(dbname) // if e != nil { // return nil, e // } // collection := session.DB(dbname).C("txns") // return &Transaction{R: txn.NewRunner(collection)}, nil // } // Transactions // type Transaction struct { // R *txn.Runner // Ops []txn.Op // } // func (t *Transaction) Insert(collection string, id primitive.ObjectID, entity interface{}) *Transaction { // t.Ops = append(t.Ops, txn.Op{ // C: collection, // Id: id, // Insert: entity, // }) // return t // } // func (t *Transaction) Update(collection string, id primitive.ObjectID, assert *bson.M, entity interface{}) *Transaction { // op := txn.Op{ // C: collection, // Id: id, // Update: entity, // } // if assert != nil { // op.Assert = assert // } // t.Ops = append(t.Ops, op) // return t // } // func (t *Transaction) Remove(collection string, id primitive.ObjectID) *Transaction { // t.Ops = append(t.Ops, txn.Op{ // C: collection, // Id: id, // Remove: true, // }) // return t // } // func (t *Transaction) Run() error { // return t.R.Run(t.Ops, primitive.NewObjectID(), nil) // } // func (t *Mongo) Find(f *Filter, one bool) (*errs.Error, int) { // session, err := t.GetClient() // if err == nil { // defer session.Close() // if !f.Id.IsZero() { // f.Query = &bson.M{"_id": f.Id} // } else if f.Query == nil { // f.Query = &bson.M{} // } // pageToken := &f.PageToken // aggregations := []*bson.M{} // // Se tem um token de paginacao // hasToken := pageToken.HasToken() // if hasToken { // fmt.Println("consultando com token", pageToken.CurrentID) // // {$or: // // [ // // {$and : // // [ // // {:{$gte: }}, // // {"_id" : {$gt : }} // // ] // // }, // // ou // // { :{$gt: }} // // ] // // }. // // f.Query = &bson.M{"$and": []bson.M{ // // *f.Query, // // bson.M{"_id": bson.M{"$gt": f.NextPageToken}}, // // }} // f.Query = &bson.M{"_id": bson.M{ // // "$gt": primitive.ObjectIDHex(pageToken.CurrentID), // "$gt": pageToken.CurrentID, // }} // } else { // } // // Adiciona o primeiro estagio da agregacao // aggregations = append(aggregations, &bson.M{"$match": f.Query}) // if len(f.Sort) > 0 { // f.Sort = append(f.Sort, "_id") // aggregations = append(aggregations, MgoSortBson(f.Sort)) // // query.Sort(f.Sort...) // } // // aggregations = append(aggregations, &bson.M{ // // "$addFields": bson.M{"position":}, // // }) // // Seleciona os campos se forem especificados // if f.Fields != nil { // // aggregations = append(aggregations, bson.M{"$limit": f.MaxResults}) // } // // countQuery := session.DB(f.DB).C(f.Collection).Pipe(aggregations) // // Determina o numero de itens a ser retornado // if f.MaxResults > 0 { // aggregations = append(aggregations, &bson.M{"$limit": f.MaxResults}) // } // spew.Dump(aggregations) // // query := session.DB(f.DB).C(f.Collection).Find(f.Query) // collection := session.DB(f.DB).C(f.Collection) // query := collection.Pipe(aggregations) // if one { // err = query.One(f.Entity) // f.Count = 1 // Filter one // } else { // err = query.All(f.Entity) // // Numero total de documentos // // Se tem um token de paginacao o valor total esta no token // // Caso contrario consulta no banco // if hasToken { // f.Count = pageToken.Count // } else { // f.Count, _ = collection.Find(f.Query).Select(&bson.M{"_id": 1}).Count() // } // } // } // if err != nil { // return Error(ERR_PERSIST, err.Error()), 0 // } // return nil, f.Count // } // v := reflect.ValueOf(f.Entity) // fmt.Println("UpdateCursorResponse") // // spew.Dump(f.PageToken) // // fmt.Println(v.Kind()) // if v.Kind() == reflect.Ptr { // // Atualiza v para o elemento apontado // v = v.Elem() // if v.Kind() == reflect.Slice || v.Kind() == reflect.Array { // // Acessa o atributo ID da ultima entidade do array // field := v.Index(v.Len() - 1).Elem().FieldByName("ID") // // Converte o atributo ID para objectId // last := field.Interface().(primitive.ObjectID) // // Cria a consulta que verifica se existe outros registros alem do ultimo // filter := &Filter{ // Collection: f.Collection, // Query: &bson.M{"_id": bson.M{ // "$lt": last, // }}, // } // // Se existirem elementos adiciona o nextTOKEN // if _, count := models.Count(filter); count > 0 { // // atualiza os valores do token // f.PageToken.StartID = last.Hex() // f.PageToken.Count = f.Count // resp.NextPageToken = f.PageToken.Encode() // } // } // } // func NewMongo() (*Mongo, error) { // // session, err := mgo.Dial(addrs) // // session, err := mgo.Dial("mongodb://localhost:27017") // session, err := mgo.Dial("mongodb://localhost:27017") // if err != nil { // return nil, err // } // session.SetMode(mgo.Monotonic, true) // // "addrs": "mongodb://localhost:27017", // // "user": "guest", // // "password": "welcome", // // "database": "financeiro" // m := &Mongo{ // Session: session, // DataBase: "accounts", // Addrs: , // // Config: cfg, // } // return m, err // }