package api import ( "bytes" "context" "encoding/base64" "fmt" // "html/template" "reflect" "regexp" "strconv" "strings" "text/template" "time" "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs" "github.com/davecgh/go-spew/spew" iriscontext "github.com/kataras/iris/v12/context" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readconcern" "go.mongodb.org/mongo-driver/mongo/readpref" "go.mongodb.org/mongo-driver/mongo/writeconcern" ) const ( DefaultAddrConnect = "mongodb://localhost:27017" InsertOne = iota + 1 InsertMany Patch ) var ( QueriesMap = map[string]*template.Template{} EMPTYQUERY_REGEX = regexp.MustCompile(`{\s*}`) ) func GetSessionContext(ctx iriscontext.Context) mongo.SessionContext { context := ctx.Values().Get("$SessionContext") if context != nil { return context.(mongo.SessionContext) } return nil } func init() { errs.RegisterMapErrorFunction("MONGO_ERROS", func(source error) (err *errs.Error) { message := source.Error() switch { case strings.Contains(message, "E11000"): err = errs.AlreadyExists().Details(&errs.Detail{ Message: "DUPLICATED_ITEM", }) case strings.Contains(message, "cannot decode"): err = errs.DataCaps().Details(&errs.Detail{ Message: message, }) case strings.Contains(message, "no documents in result"): errs.NotFound().Details(&errs.Detail{ Message: "NOT_FOUND", }) } return }) } type Mongo struct { // Sessions map[string]*mgo.Session // Session *mgo.Session // Uri string // Configs //DefaultDB string // User string `json:"user"` // Password string `json:"password"` // DataBase string `json:"database"` // Addrs string `json:"addrs"` subject *BehaviorSubjectStruct client *mongo.Client Credential *options.Credential Config string `json:"config"` Clients map[string]*mongo.Client } type execfn func(context.Context, *mongo.Collection) type EntityInterface interface { Patch() *bson.A } // Modelo de comunicação entre o controle da API e a camada de persistencia. type Filter struct { Id primitive.ObjectID UserId primitive.ObjectID // NextPageToken primitive.ObjectID // PageToken PageToken Fields *bson.M Query *bson.M Patchs *bson.A Sort *bson.M // Sort []string Collection string QueryType string DB string Format string Insertion int CheckQuery bool IgnoreNoEntityFound bool MaxResults int Entity interface{} Entities interface{} SessionContext mongo.SessionContext // Aggregate interface{} Pipeline primitive.A Options interface{} } func (f *Filter) Aggregate(ctx context.Context, collection *mongo.Collection, opts *options.AggregateOptions) (cursor *mongo.Cursor, err *errs.Error) { var errl error if opts == nil { opts = options.Aggregate().SetBatchSize(int32(f.MaxResults)) } // cmd := command.Aggregate{ // NS: ns, // Pipeline: pipeline, // ReadPref: rp, // } if cursor, errl = collection.Aggregate(ctx, f.Pipeline, opts); errl != nil { err = errs.FromError(errl) } return } func (t *Filter) Check() (err *errs.Error) { if t.CheckQuery { if t.Query == nil { if !t.Id.IsZero() { t.Query = &bson.M{"_id": t.Id} } else { err = errs.InvalidArgument().Details(&errs.Detail{ LocationType: "query", Location: "q", Message: "emptyValue", }) return } } } if t.Query == nil { t.Query = &bson.M{} } return } func (t *Mongo) Ready() *BehaviorSubjectStruct { if t.subject == nil { t.subject = BehaviorSubject() } return t.subject } func (t *Mongo) Init() (err error) { defer func(){ spew.Dump(err) }() fmt.Printf("Connection string '%s'", t.Config) clientOpts := options.Client().ApplyURI(t.Config) if t.Credential != nil { clientOpts.SetAuth(*t.Credential) } connectContext, _ := context.WithTimeout(context.Background(), 10*time.Second) // if t.client, err = mongo.NewClient(setOptions); err != nil { if t.client, err = mongo.Connect( connectContext, clientOpts, ); err != nil { return } // ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) // defer cancel() // if err = t.client.Connect(ctx); err != nil { // return // } t.Ready().Next(true) return nil } func (t *Mongo) CommitTransaction(opts *options.SessionOptions) { } func (t *Mongo) StartTransaction( sessionOptions *options.SessionOptions, transactionOptions *options.TransactionOptions, ) ( sessionContext mongo.SessionContext, err *errs.Error, ) { var ( session mongo.Session localError error ) defer func() { if localError != nil { err = errs.Internal().Details(&errs.Detail{ Message: localError.Error(), }) } }() if session, localError = t.client.StartSession(sessionOptions); localError == nil { ctx, _ := context.WithTimeout(context.Background(), 60*time.Second) mongo.WithSession(ctx, session, func(sctx mongo.SessionContext) error { if localError = session.StartTransaction(transactionOptions); localError == nil { sessionContext = sctx } return nil }) } return } func (t *Mongo) Dispatch(f *Filter) { } func (t *Mongo) InsertOne(f *Filter) (res *mongo.InsertOneResult, err *errs.Error) { f.Insertion = InsertOne t.exec(f, func(ctx context.Context, collection *mongo.Collection) { var lerr error if res, lerr = collection.InsertOne(ctx, f.Entity); lerr != nil { err = errs.FromError(lerr) } }, func(e *errs.Error) { err = e }) return } func (t *Mongo) InsertMany(f *Filter) (res *mongo.InsertManyResult, err *errs.Error) { f.Insertion = InsertMany t.exec(f, func(ctx context.Context, collection *mongo.Collection) { var ( lerr error entities []interface{} value = reflect.ValueOf(f.Entities) ) for index := 0; index < value.Len(); index++ { entities = append(entities, value.Index(index).Interface()) } if res, lerr = collection.InsertMany(ctx, entities); lerr != nil { err = errs.FromError(lerr) } }, func(e *errs.Error) { err = e }) return } //Remove os elementos da colecao, selecionados pela query func (t *Mongo) RemoveOne(f *Filter) (res *mongo.DeleteResult, err *errs.Error) { f.CheckQuery = true t.exec(f, func(ctx context.Context, collection *mongo.Collection) { var lerr error if res, lerr = collection.DeleteOne(ctx, f.Query); lerr != nil { err = errs.FromError(lerr) } else if res.DeletedCount == 0 { err = errs.NotFound().Details(&errs.Detail{ Message: "No entity has been deleted", }) } }, func(e *errs.Error) { err = e }) return } //Remove os elementos da colecao, selecionados pela query func (t *Mongo) RemoveMany(f *Filter) (res *mongo.DeleteResult, err *errs.Error) { var lerr error f.CheckQuery = true t.exec(f, func(ctx context.Context, collection *mongo.Collection) { if res, lerr = collection.DeleteMany(ctx, f.Entities); lerr != nil { err = errs.FromError(lerr) } else if res.DeletedCount == 0 { err = errs.NotFound().Details(&errs.Detail{ Message: "No entity has been deleted", }) } }, func(e *errs.Error) { err = e }) return } func (t *Mongo) UpdateOne(f *Filter) (res *mongo.UpdateResult, err *errs.Error) { var lerr error f.Insertion = InsertOne f.CheckQuery = true t.exec(f, func(ctx context.Context, collection *mongo.Collection) { if res, lerr = collection.ReplaceOne(ctx, f.Query, f.Entity); lerr != nil { err = errs.FromError(lerr) } else if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 { err = errs.NotFound().Details(&errs.Detail{ Message: "No entity has been updated", }) } }, func(e *errs.Error) { err = e }) return } func (t *Mongo) UpdateMany(f *Filter) (res *mongo.UpdateResult, err *errs.Error) { var ( lerr error entity interface{} entities []interface{} value = reflect.ValueOf(f.Entities) ) f.Insertion = InsertMany f.CheckQuery = true t.exec(f, func(ctx context.Context, collection *mongo.Collection) { for index := 0; index < value.Len(); index++ { entity = value.Index(index).Interface() entities = append(entities, entity) } if res, lerr = collection.UpdateMany(ctx, f.Query, entities); lerr != nil { err = errs.FromError(lerr) } else if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 { err = errs.NotFound().Details(&errs.Detail{ Message: "No entity has been updated ", }) } }, func(e *errs.Error) { err = e }) return } func RegisterQuery(id, query string) { QueriesMap[id] = template.Must(template.New(id).Parse(query)) } func ParseQuery(ctx iriscontext.Context, filter *Filter, basequery string, data map[string]interface{}) (err *errs.Error) { var ( found bool templateErr error buf bytes.Buffer baseQueryTemplate *template.Template // baseQeuryDocument *bson.M // errbq *errs.Error queryDecodedString []byte userQueryString = Q(ctx, "q", "e30=") // default base64 encoded from "{}" includeInTrash, _ = strconv.ParseBool(Q(ctx, "includeInTrash", "false")) // default base64 encoded from "{}" ) // defer func() { // spew.Dump(filter.Query) // }() // Faz o parse da userQueryString enviada pelo usuario que deve ser um base64 if queryDecodedString, templateErr = base64.StdEncoding.DecodeString(userQueryString); templateErr != nil { err = errs.InvalidArgument().Details(&errs.Detail{ Message: "Query param isn't a base64 json", }) return } if baseQueryTemplate, found = QueriesMap[basequery]; !found { err = errs.Internal().Details(&errs.Detail{ Message: "Invalid query input", }) return } // transclude the query to base query data["_transclude_"] = string(queryDecodedString) data["_deleted_"] = includeInTrash fmt.Println( basequery, string(queryDecodedString), data, ) if templateErr = baseQueryTemplate.Execute(&buf, data); templateErr != nil { err = errs.InvalidArgument().Details(&errs.Detail{ Message: "Failed on interpolate data with template query", }) return } x := buf.Bytes() fmt.Println("buf.Bytes()", string(x)) if templateErr = bson.UnmarshalExtJSON(x, false, &filter.Query); templateErr != nil { err = errs.InvalidArgument().Details(&errs.Detail{ Message: "Failed on interpolate data with template query", }) return } return } func (t *Mongo) PatchOne(f *Filter) (res *mongo.UpdateResult, err *errs.Error) { f.Insertion = Patch f.CheckQuery = true // out, _ := json.Marshal() // fmt.Println(string(out)) // spew.Dump(f.Patchs) t.exec(f, func(ctx context.Context, collection *mongo.Collection) { var ( lerr error // client *mongo.Client ) // if client, err = t.GetClient(f.DB); err != nil { // return // } // spew.Dump(f.Query) // spew.Dump(f.Patchs) // id := primitive.NewObjectID() t.client.UseSessionWithOptions( ctx, options.Session().SetDefaultReadPreference(readpref.Primary()), func(sctx mongo.SessionContext) error { defer func() { if err != nil { // fmt.Println("abort patch ", id.Hex()) sctx.AbortTransaction(sctx) } else { // fmt.Println("commit patch ", id.Hex()) sctx.CommitTransaction(sctx) } sctx.EndSession(sctx) }() sctx.StartTransaction(options.Transaction(). SetReadConcern(readconcern.Snapshot()). SetWriteConcern(writeconcern.New(writeconcern.WMajority()))) for _, p := range *f.Patchs { if f.Options == nil { f.Options = options.Update() } if res, lerr = collection.UpdateOne(ctx, f.Query, p, f.Options.(*options.UpdateOptions)); lerr != nil { err = errs.FromError(lerr) return lerr } if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 { err = errs.NotFound().Details(&errs.Detail{ Message: "No entity has been updated", }) return fmt.Errorf("No entity has been updated") } } return nil }, ) }, func(e *errs.Error) { err = e }) return } func (t *Mongo) PatchMany(f *Filter) (res *mongo.UpdateResult, err *errs.Error) { f.Insertion = Patch f.CheckQuery = true t.exec(f, func(ctx context.Context, collection *mongo.Collection) { var ( lerr error // client *mongo.Client ) // if client, err = t.GetClient(f.DB); err != nil { // return // } // o, _ := json.MarshalIndent(f.Query, "", " ") // fmt.Println("query", o) // o, _ = json.MarshalIndent(f.Patchs, "", " ") // fmt.Println("patch", o) t.client.UseSessionWithOptions( ctx, options.Session().SetDefaultReadPreference(readpref.Primary()), func(sctx mongo.SessionContext) error { defer func() { if err != nil { sctx.AbortTransaction(sctx) } else { sctx.CommitTransaction(sctx) } sctx.EndSession(sctx) }() sctx.StartTransaction( options.Transaction(). SetReadConcern(readconcern.Snapshot()). SetWriteConcern(writeconcern.New(writeconcern.WMajority())), ) for _, p := range *f.Patchs { if res, lerr = collection.UpdateMany(ctx, f.Query, p); lerr != nil { err = errs.FromError(lerr) return lerr } if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 { err = errs.NotFound().Details(&errs.Detail{ Message: "No entity has been updated", }) err.CodeText = "noEntityUpdated" return fmt.Errorf("No entity has been updated") } } return nil }, ) }, func(e *errs.Error) { err = e }) return } func (mongo *Mongo) CreateIndex(database, collectionString string, index mongo.IndexModel) *errs.Error { collection := mongo.client.Database(database).Collection(collectionString) opts := options.CreateIndexes().SetMaxTime(10 * time.Second) if _, err := collection.Indexes().CreateOne(context.Background(), index, opts); err != nil { return errs.FromError(err) } return nil } func (mongo *Mongo) FindOneRx(options *Filter) *ObservableStruct { return Observable(func(observer *Subscriber) { if res, err := mongo.FindOne(options); err != nil { observer.Err(err) } else { observer.Next(res) } }) } func (t *Mongo) FindOne(f *Filter) (res *mongo.SingleResult, err *errs.Error) { f.CheckQuery = (f.QueryType != "aggregate") t.exec(f, func(ctx context.Context, collection *mongo.Collection) { var ( lerr error cursor *mongo.Cursor ) if f.QueryType == "aggregate" { if cursor, err = f.Aggregate(ctx, collection, nil); err != nil { return } if cursor.Next(ctx) { if lerr = cursor.Decode(f.Entity); lerr == nil { return } } else { lerr = fmt.Errorf("No document found") } } else { res = collection.FindOne(ctx, f.Query) if lerr = res.Err(); lerr == nil { if lerr = res.Decode(f.Entity); lerr == nil { return } } } err = errs.FromError(lerr) }, func(e *errs.Error) { err = e }) return } func findIds(filter *Filter) (ids []primitive.ObjectID) { ids = []primitive.ObjectID{} // findIdsFilter = &Filter{} // cursor, err := executeFind(findIdsFilter) return } func (t *Mongo) FindMany(f *Filter) (cursor *mongo.Cursor, err *errs.Error) { // f.CheckQuery = true t.exec(f, func(ctx context.Context, collection *mongo.Collection) { var ( lerr error elemp reflect.Value // ids = []primitive.ObjectID{} // limit = int64(f.MaxResults) // fields = f.Fields ) // defer func() { // if errd := recover(); errd != nil { // // fmt.Println(errd) // // spew.Dump(Trace()) // // fmt.Println(errors.WithStack(errd.(error)) ) // LogError(0, fmt.Sprintf("FindMany - %v", errd)) // } // }() // Carrega os ids e verifica o intervalo de paginacao da consulta. // f.Fields = &bson.M{"_id": 1} // ids = findIds(f) // Atualiza a consulta para o conjunto de ids da paginacao // f.Fields = fields // f.Query = &bson.M{"_id": bson.M{"$in": ids}} // fmt.Println("Query Type ", f.QueryType) if f.QueryType == "aggregate" { cursor, lerr = f.Aggregate(ctx, collection, nil) } else { findOptions := options.Find() findOptions.SetLimit(int64(f.MaxResults)) // fmt.Printf("sort of query '%s'\n", f.Format) // spew.Dump(f.Sort) // spew.Dump(f.Fields) // spew.Dump(f.Query) if f.Sort != nil { findOptions.SetSort(f.Sort) } if f.Fields != nil { findOptions.SetProjection(f.Fields) } cursor, lerr = collection.Find(ctx, f.Query, findOptions) } if lerr == nil { defer cursor.Close(ctx) // spew.Dump(cursor) if f.Entities == nil { lerr = fmt.Errorf("Entities can't be nil") goto FindManyError } entitiesValue := reflect.ValueOf(f.Entities) if entitiesValue.Kind() != reflect.Ptr || entitiesValue.Elem().Kind() != reflect.Slice { lerr = fmt.Errorf("Entities argument must be a slice address") goto FindManyError } slicev := entitiesValue.Elem() slicev = slicev.Slice(0, slicev.Cap()) typ := slicev.Type().Elem() for cursor.Next(ctx) { elemp = reflect.New(typ) // fmt.Println("aqui") if lerr = cursor.Decode(elemp.Interface()); lerr != nil { // spew.Dump(elemp) goto FindManyError } slicev = reflect.Append(slicev, elemp.Elem()) } // spew.Dump(slicev) entitiesValue.Elem().Set(slicev) return } FindManyError: err = errs.FromError(lerr) }, func(e *errs.Error) { err = e }) return } func (models *Mongo) Exists(options *Filter) (exists bool, err *errs.Error) { options.Fields = &bson.M{"_id": 1} if _, err = models.FindOne(options); err != nil { return } exists = true return } func (driver *Mongo) GetContext(options *Filter) (context.Context, *mongo.Collection) { var ctx context.Context if options.SessionContext != nil { ctx = options.SessionContext } else { ctx, _ = context.WithTimeout(context.Background(), 40*time.Second) } collection := driver.client.Database(options.DB).Collection(options.Collection) return ctx, collection } func (t *Mongo) exec(f *Filter, execAction execfn, errorAction func(*errs.Error)) { var ( err *errs.Error errl error ctx context.Context // client *mongo.Client ) defer func() { if err == nil { if errl == nil { return } err = errs.FromError(errl) } // spew.Dump(err) errorAction(err) }() if err = f.Check(); err != nil { return } switch f.Insertion { case InsertOne: if f.Entity == nil { errl = fmt.Errorf("Entity can't be nil") return } case InsertMany: if f.Entities == nil { errl = fmt.Errorf("Entities can't be nil") return } case Patch: if f.Patchs == nil { errl = fmt.Errorf("Patchs can't be nil") return } } // fmt.Println("passei do exec") if f.SessionContext != nil { ctx = f.SessionContext } else { ctx, _ = context.WithTimeout(context.Background(), time.Minute) } collection := t.client.Database(f.DB).Collection(f.Collection) execAction(ctx, collection) return } func DeletedPatch() *bson.A { return &bson.A{ bson.M{ "$set": bson.M{ "deleted": true, "deletedIn": time.Now().Unix(), }, }, } } // func mergeQueries(filter *Filter) *bson.M { // if baseQeuryDocument != nil { // if filter.Query == nil { // filter.Query = baseQeuryDocument // } else { // filter.Query = &bson.M{ // "$and": bson.A{baseQeuryDocument, filter.Query}, // } // } // } else if filter.Query == nil { // filter.Query = &bson.M{} // } // } // func yieldIndexModel() mongo.IndexModel { // keys := bsonx.Doc{{Key: *key, Value: bsonx.Int32(int32(*value))}} // index := mongo.IndexModel{} // index.Keys = keys // if *unique { // index.Options = bsonx.Doc{{Key: "unique", Value: bsonx.Boolean(true)}} // } // return index // } // func (t *Mongo) GetClient() (*mgo.Session, *errs.Error) { // func (t *Mongo) GetClient(id string) (*mongo.Client, *errs.Error) { // var ( // client *mongo.Client // found bool // ) // if id == "" { // panic("Client id not defined!") // } // if client, found = t.Clients[id]; !found { // return nil, FullError(ERR_SERVICE_UNAVAILABLE, &errs.Detail{ // Message: fmt.Sprintf("Client %s not exists!", id), // }) // } // return client, nil // } // func errorcheck(err error) *errs.Error { // if err != nil { // return FullError(ERR_GENERAL, &errs.Detail{ // Message: err.Error(), // }) // } // return nil // } // func Paginate(collection *mongo.Collection, startValue primitive.ObjectID, nPerPage int64) ([]bson.D, *bson.Value, error) { // // Query range filter using the default indexed _id field. // filter := bson.VC.DocumentFromElements( // bson.EC.SubDocumentFromElements( // "_id", // bson.EC.ObjectID("$gt", startValue), // ), // ) // var opts []findopt.Find // opts = append(opts, findopt.Sort(bson.NewDocument(bson.EC.Int32("_id", -1)))) // opts = append(opts, findopt.Limit(nPerPage)) // cursor, _ := collection.Find(context.Background(), filter, opts...) // var lastValue *bson.Value // var results []bson.Document // for cursor.Next(context.Background()) { // elem := bson.NewDocument() // err := cursor.Decode(elem) // if err != nil { // return results, lastValue, err // } // results = append(results, *elem) // lastValue = elem.Lookup("_id") // } // return results, lastValue, nil // } // func (t *Mongo) Patch(f *Filter) *errs.Error { // var ( // entityinterface EntityInterface // col *mongo.Collection // entity interface{} // ok bool // session *mgo.Session // err error // x *bson.M // ) // if session, err = t.GetClient(f.DB); err == nil { // if !f.Id.IsZero() { // f.Query = &bson.M{"_id": f.Id} // } else if f.Query == nil { // err = fmt.Errorf("Query not defined!") // goto ErrorPatch // } // defer session.Close() // if entityinterface, ok = f.Entity.(EntityInterface); ok { // entity = entityinterface.Update() // x = entity.(*bson.M) // // delete(*x, "$set") // } else { // entity = bson.M{"$set": f.Entity} // } // col = session.DB(f.DB).C(f.Collection) // // entity["$set"] // spew.Dump(entity) // _, err = col.Upsert(f.Query, entity) // } // ErrorPatch: // if err != nil { // return Error(ERR_PERSIST, err.Error()) // } // return nil // } // func (t *Mongo) Upsert(f *Filter) *errs.Error { // var ( // entityinterface EntityInterface // col *mongo.Collection // entity interface{} // ok bool // ) // session, err := t.GetClient(f.DB) // if err == nil { // if !f.Id.IsZero() { // f.Query = &bson.M{"_id": f.Id} // } else if f.Query == nil { // err = fmt.Errorf("Query not defined!") // goto ErrorUp // } // defer session.Close() // col = session.DB(f.DB).C(f.Collection) // // update = bson.M{"$set": f.Entity} // // if data, ok = f.Entity.Push(); ok { // // update["$push"] = data // // } // // if data, ok = f.Entity.Pull(); ok { // // update["$pull"] = data // // } // // spew.Dump(f.Entity) // if entityinterface, ok = f.Entity.(EntityInterface); ok { // // fmt.Println("Implement interface") // entity = entityinterface.Update() // } else { // entity = f.Entity // } // // spew.Dump(entity) // // _, err = col.Upsert(f.Query, entity) // err = col.Update(f.Query, entity) // } // ErrorUp: // if err != nil { // return Error(ERR_PERSIST, err.Error()) // } // return nil // } //----------------------------------------------------------------------- // func (t *Mongo) Find(f *Filter, one bool) (*errs.Error, int) { // var cursor string // pageToken := &f.PageToken // session, err := t.GetClient(f.DB) // if err == nil { // defer session.Close() // if f.Query == nil { // if one { // if !f.Id.IsZero() { // f.Query = &bson.M{"_id": f.Id} // } else { // return Error(ERR_INVALID_PARAM_VALUE, "Param id not valid."), 0 // } // } else if f.Query == nil { // f.Query = &bson.M{} // } // } // query := minquery.New(session.DB(f.DB), f.Collection, f.Query) // // Se tem um token de paginacao // hasToken := pageToken.HasToken() // if hasToken { // fmt.Println("consultando com token", pageToken.Cursor) // query = query.Cursor(pageToken.Cursor) // } // // cursorFields := []string{"_id"} // cursorFields := []string{} // if len(f.Sort) > 0 { // query.Sort(f.Sort...) // for _, key := range f.Sort { // if !strings.Contains(key, ".") { // cursorFields = append(cursorFields, key) // } // } // } else { // cursorFields = append(cursorFields, "_id") // } // // Seleciona os campos se forem especificados // if f.Fields != nil { // query.Select(f.Fields) // } // // Determina o numero de itens a ser retornado // if f.MaxResults > 0 { // query.Limit(f.MaxResults) // } // if one { // err = query.One(f.Entity) // pageToken.Count = 1 // Filter one // } else { // // spew.Dump(f.Entity, cursorFields) // if cursor, err = query.All(f.Entity, cursorFields...); err != nil { // goto ErroFind // } // // Numero total de documentos // // Se tem um token de paginacao o valor total esta no token // // Caso contrario consulta no banco // if !hasToken { // c := session.DB(f.DB).C(f.Collection) // if pageToken.Count, err = c.Find(f.Query).Select(&bson.M{"_id": 1}).Count(); err != nil { // goto ErroFind // } // } // // if () // fmt.Println("Cursor return", cursor, "-") // pageToken.NewCursor = cursor // } // } // ErroFind: // if err != nil { // return Error(ERR_PERSIST, err.Error()), 0 // } // return nil, pageToken.Count // } // func (t *Mongo) FindOne(f *Filter) (*errs.Error, int) { // return t.Find(f, true) // } // func (t *Mongo) FindAll(f *Filter) (*errs.Error, int) { // return t.Find(f, false) // } // func (t *Mongo) Count(f *Filter) (*errs.Error, int) { // session, err := t.GetClient(f.DB) // if err == nil { // defer session.Close() // if f.Query == nil { // f.Query = &bson.M{} // } // query := session.DB(f.DB).C(f.Collection).Find(f.Query) // // Seleciona os campos se forem especificados // f.PageToken.Count, err = query.Select(&bson.M{"_id": 1}).Count() // } // if err != nil { // return Error(ERR_PERSIST, err.Error()), 0 // } // return nil, f.PageToken.Count // } // func (t *Mongo) Update(f *Filter) *errs.Error { // var ( // entityinterface EntityInterface // entity interface{} // ) // one := false // ok := false // session, err := t.GetClient(f.DB) // if err == nil { // defer session.Close() // if !f.Id.IsZero() { // one = true // f.Query = &bson.M{"_id": f.Id} // } else if f.Query == nil { // err = fmt.Errorf("Query not defined!") // } // if entityinterface, ok = f.Entity.(EntityInterface); ok { // entity = entityinterface.Update() // } else { // entity = f.Entity // } // col := session.DB(f.DB).C(f.Collection) // if one { // err = col.Update(f.Query, entity) // } else { // _, err = col.UpdateAll(f.Query, entity) // } // } // if err != nil { // return Error(ERR_PERSIST, err.Error()) // } // return nil // } //----------------------------------------------------------------------- // func (t *Mongo) Aggregation(f *Filter, q []bson.M, one bool) *errs.Error { // session, err := t.GetClient(f.DB) // if err == nil { // defer session.Close() // pipe := session.DB(f.DB).C(f.Collection).Pipe(q) // if one { // err = pipe.One(&f.Entity) // } else { // err = pipe.All(&f.Entity) // } // } // if err != nil { // return Error(ERR_PERSIST, err.Error()) // } // return nil // } // all := true // session, err := t.GetClient(f.DB) // if err == nil { // defer session.Close() // if !f.Id.IsZero() { // all = false // f.Query = &bson.M{"_id": f.Id} // } else if f.Query == nil { // err = fmt.Errorf("Query not defined!") // } // C := session.DB(f.DB).C(f.Collection) // if all { // _, err = C.RemoveAll(f.Query) // } else { // err = C.Remove(f.Query) // } // } // if err != nil { // return Error(ERR_PERSIST, err.Error()) // } // return nil // VERIFICAR A NECESSIDADE E CORRIGIR O ERRO // func (t *Mongo) GetArrayById(f *Filter, ids []primitive.ObjectID) error { // session, err := t.GetClient(f.DB) // if err != nil { // return err // } // defer session.Close() // q := bson.M{"_id": bson.M{"$in": ids}} // err = session.DB(f.DB).C(f.Collection).Find(q).One(f.Entity) // return err // } // func (t *Mongo) GetEmbedFromArray(f *Filter, embed string, id primitive.ObjectID, sid primitive.ObjectID) (errs error) { // session, err := t.GetClient(f.DB) // if err != nil { // return err // } // defer session.Close() // field := "$" + embed // match := bson.M{"_id": id} // match[embed] = bson.M{"$exists": true} // aggregations := []bson.M{ // bson.M{"$unwind": field}, // bson.M{"$match": match}, // bson.M{"$replaceRoot": bson.M{"newRoot": field}}, // bson.M{"$match": bson.M{"_id": sid}}, // } // C := session.DB(f.DB).C(f.Collection) // return C.Pipe(aggregations).One(&f.Entity) // } // func (t *Mongo) UpdateEmbedFromArray(f *Filter, embed string, id primitive.ObjectID, sid primitive.ObjectID) error { // session, err := t.GetClient(f.DB) // if err != nil { // return err // } // defer session.Close() // col := session.DB(f.DB).C(f.Collection) // //Define o elemento que sera atualizado // element := bson.M{} // element[embed+".$"] = &f.Entity // //Define o seletor do elemento que sera atualizado // selector := bson.M{"_id": id} // selector[embed+"._id"] = sid // return col.Update(selector, bson.M{"$set": element}) // } // func (t *Mongo) AddEmbedFromArray(f *Filter, embed string, id primitive.ObjectID) error { // session, errs := t.GetClient(f.DB) // if errs != nil { // return errs // } // defer session.Close() // col := session.DB(f.DB).C(f.Collection) // //Define o elemento que sera atualizado // element := bson.M{} // element[embed] = &f.Entity // //Define o seletor do elemento que sera atualizado // selector := bson.M{"_id": id} // return col.Update(selector, bson.M{"$push": element}) // } // 0000000000000000000000000000000000000000000000000000000000000000 // func (t *Mongo) NewTransaction(dbname string) (*Transaction, error) { // session, e := t.GetClient(dbname) // if e != nil { // return nil, e // } // collection := session.DB(dbname).C("txns") // return &Transaction{R: txn.NewRunner(collection)}, nil // } // Transactions // type Transaction struct { // R *txn.Runner // Ops []txn.Op // } // func (t *Transaction) Insert(collection string, id primitive.ObjectID, entity interface{}) *Transaction { // t.Ops = append(t.Ops, txn.Op{ // C: collection, // Id: id, // Insert: entity, // }) // return t // } // func (t *Transaction) Update(collection string, id primitive.ObjectID, assert *bson.M, entity interface{}) *Transaction { // op := txn.Op{ // C: collection, // Id: id, // Update: entity, // } // if assert != nil { // op.Assert = assert // } // t.Ops = append(t.Ops, op) // return t // } // func (t *Transaction) Remove(collection string, id primitive.ObjectID) *Transaction { // t.Ops = append(t.Ops, txn.Op{ // C: collection, // Id: id, // Remove: true, // }) // return t // } // func (t *Transaction) Run() error { // return t.R.Run(t.Ops, primitive.NewObjectID(), nil) // } // func (t *Mongo) Find(f *Filter, one bool) (*errs.Error, int) { // session, err := t.GetClient() // if err == nil { // defer session.Close() // if !f.Id.IsZero() { // f.Query = &bson.M{"_id": f.Id} // } else if f.Query == nil { // f.Query = &bson.M{} // } // pageToken := &f.PageToken // aggregations := []*bson.M{} // // Se tem um token de paginacao // hasToken := pageToken.HasToken() // if hasToken { // fmt.Println("consultando com token", pageToken.CurrentID) // // {$or: // // [ // // {$and : // // [ // // {:{$gte: }}, // // {"_id" : {$gt : }} // // ] // // }, // // ou // // { :{$gt: }} // // ] // // }. // // f.Query = &bson.M{"$and": []bson.M{ // // *f.Query, // // bson.M{"_id": bson.M{"$gt": f.NextPageToken}}, // // }} // f.Query = &bson.M{"_id": bson.M{ // // "$gt": primitive.ObjectIDHex(pageToken.CurrentID), // "$gt": pageToken.CurrentID, // }} // } else { // } // // Adiciona o primeiro estagio da agregacao // aggregations = append(aggregations, &bson.M{"$match": f.Query}) // if len(f.Sort) > 0 { // f.Sort = append(f.Sort, "_id") // aggregations = append(aggregations, MgoSortBson(f.Sort)) // // query.Sort(f.Sort...) // } // // aggregations = append(aggregations, &bson.M{ // // "$addFields": bson.M{"position":}, // // }) // // Seleciona os campos se forem especificados // if f.Fields != nil { // // aggregations = append(aggregations, bson.M{"$limit": f.MaxResults}) // } // // countQuery := session.DB(f.DB).C(f.Collection).Pipe(aggregations) // // Determina o numero de itens a ser retornado // if f.MaxResults > 0 { // aggregations = append(aggregations, &bson.M{"$limit": f.MaxResults}) // } // spew.Dump(aggregations) // // query := session.DB(f.DB).C(f.Collection).Find(f.Query) // collection := session.DB(f.DB).C(f.Collection) // query := collection.Pipe(aggregations) // if one { // err = query.One(f.Entity) // f.Count = 1 // Filter one // } else { // err = query.All(f.Entity) // // Numero total de documentos // // Se tem um token de paginacao o valor total esta no token // // Caso contrario consulta no banco // if hasToken { // f.Count = pageToken.Count // } else { // f.Count, _ = collection.Find(f.Query).Select(&bson.M{"_id": 1}).Count() // } // } // } // if err != nil { // return Error(ERR_PERSIST, err.Error()), 0 // } // return nil, f.Count // } // v := reflect.ValueOf(f.Entity) // fmt.Println("UpdateCursorResponse") // // spew.Dump(f.PageToken) // // fmt.Println(v.Kind()) // if v.Kind() == reflect.Ptr { // // Atualiza v para o elemento apontado // v = v.Elem() // if v.Kind() == reflect.Slice || v.Kind() == reflect.Array { // // Acessa o atributo ID da ultima entidade do array // field := v.Index(v.Len() - 1).Elem().FieldByName("ID") // // Converte o atributo ID para objectId // last := field.Interface().(primitive.ObjectID) // // Cria a consulta que verifica se existe outros registros alem do ultimo // filter := &Filter{ // Collection: f.Collection, // Query: &bson.M{"_id": bson.M{ // "$lt": last, // }}, // } // // Se existirem elementos adiciona o nextTOKEN // if _, count := models.Count(filter); count > 0 { // // atualiza os valores do token // f.PageToken.StartID = last.Hex() // f.PageToken.Count = f.Count // resp.NextPageToken = f.PageToken.Encode() // } // } // } // func NewMongo() (*Mongo, error) { // // session, err := mgo.Dial(addrs) // // session, err := mgo.Dial("mongodb://localhost:27017") // session, err := mgo.Dial("mongodb://localhost:27017") // if err != nil { // return nil, err // } // session.SetMode(mgo.Monotonic, true) // // "addrs": "mongodb://localhost:27017", // // "user": "guest", // // "password": "welcome", // // "database": "financeiro" // m := &Mongo{ // Session: session, // DataBase: "accounts", // Addrs: , // // Config: cfg, // } // return m, err // }