123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509 |
- package api
- import (
- "bytes"
- "context"
- "encoding/base64"
- "fmt" // "html/template"
- "reflect"
- "regexp"
- "strconv"
- "strings"
- "text/template"
- "time"
- "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs"
- "github.com/davecgh/go-spew/spew"
- iriscontext "github.com/kataras/iris/v12/context"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "go.mongodb.org/mongo-driver/mongo"
- "go.mongodb.org/mongo-driver/mongo/options"
- "go.mongodb.org/mongo-driver/mongo/readconcern"
- "go.mongodb.org/mongo-driver/mongo/readpref"
- "go.mongodb.org/mongo-driver/mongo/writeconcern"
- )
- const (
- DefaultAddrConnect = "mongodb://localhost:27017"
- InsertOne = iota + 1
- InsertMany
- Patch
- )
- var (
- QueriesMap = map[string]*template.Template{}
- EMPTYQUERY_REGEX = regexp.MustCompile(`{\s*}`)
- )
- func GetSessionContext(ctx iriscontext.Context) mongo.SessionContext {
- context := ctx.Values().Get("$SessionContext")
- if context != nil {
- return context.(mongo.SessionContext)
- }
- return nil
- }
- func init() {
- errs.RegisterMapErrorFunction("MONGO_ERROS", func(source error) (err *errs.Error) {
- message := source.Error()
- switch {
- case strings.Contains(message, "E11000"):
- err = errs.AlreadyExists().Details(&errs.Detail{
- Message: "DUPLICATED_ITEM",
- })
- case strings.Contains(message, "cannot decode"):
- err = errs.DataCaps().Details(&errs.Detail{
- Message: message,
- })
- case strings.Contains(message, "no documents in result"):
- errs.NotFound().Details(&errs.Detail{
- Message: "NOT_FOUND",
- })
- }
- return
- })
- }
- type Mongo struct {
- // Sessions map[string]*mgo.Session
- // Session *mgo.Session
- // Uri string
- // Configs
- //DefaultDB string
- // User string `json:"user"`
- // Password string `json:"password"`
- // DataBase string `json:"database"`
- // Addrs string `json:"addrs"`
- subject *BehaviorSubjectStruct
- client *mongo.Client
- Credential *options.Credential
- Config string `json:"config"`
- Clients map[string]*mongo.Client
- }
- type execfn func(context.Context, *mongo.Collection)
- type EntityInterface interface {
- Patch() *bson.A
- }
- // Modelo de comunicação entre o controle da API e a camada de persistencia.
- type Filter struct {
- Id primitive.ObjectID
- UserId primitive.ObjectID
- // NextPageToken primitive.ObjectID
- // PageToken PageToken
- Fields *bson.M
- Query *bson.M
- Patchs *bson.A
- Sort *bson.M
- // Sort []string
- Collection string
- QueryType string
- DB string
- Format string
- Insertion int
- CheckQuery bool
- IgnoreNoEntityFound bool
- MaxResults int
- Entity interface{}
- Entities interface{}
- SessionContext mongo.SessionContext
- // Aggregate interface{}
- Pipeline primitive.A
- Options interface{}
- }
- func (f *Filter) Aggregate(ctx context.Context, collection *mongo.Collection, opts *options.AggregateOptions) (cursor *mongo.Cursor, err *errs.Error) {
- var errl error
- if opts == nil {
- opts = options.Aggregate().SetBatchSize(int32(f.MaxResults))
- }
- // cmd := command.Aggregate{
- // NS: ns,
- // Pipeline: pipeline,
- // ReadPref: rp,
- // }
- if cursor, errl = collection.Aggregate(ctx, f.Pipeline, opts); errl != nil {
- err = errs.FromError(errl)
- }
- return
- }
- func (t *Filter) Check() (err *errs.Error) {
- if t.CheckQuery {
- if t.Query == nil {
- if !t.Id.IsZero() {
- t.Query = &bson.M{"_id": t.Id}
- } else {
- err = errs.InvalidArgument().Details(&errs.Detail{
- LocationType: "query",
- Location: "q",
- Message: "emptyValue",
- })
- return
- }
- }
- }
- if t.Query == nil {
- t.Query = &bson.M{}
- }
- return
- }
- func (t *Mongo) Ready() *BehaviorSubjectStruct {
- if t.subject == nil {
- t.subject = BehaviorSubject()
- }
- return t.subject
- }
- func (t *Mongo) Init() (err error) {
- defer func(){
- spew.Dump(err)
- }()
- fmt.Printf("Connection string '%s'", t.Config)
- clientOpts := options.Client().ApplyURI(t.Config)
- if t.Credential != nil {
- clientOpts.SetAuth(*t.Credential)
- }
- connectContext, _ := context.WithTimeout(context.Background(), 10*time.Second)
- // if t.client, err = mongo.NewClient(setOptions); err != nil {
- if t.client, err = mongo.Connect(
- connectContext,
- clientOpts,
- ); err != nil {
- return
- }
- // ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
- // defer cancel()
- // if err = t.client.Connect(ctx); err != nil {
- // return
- // }
- t.Ready().Next(true)
- return nil
- }
- func (t *Mongo) CommitTransaction(opts *options.SessionOptions) {
- }
- func (t *Mongo) StartTransaction(
- sessionOptions *options.SessionOptions,
- transactionOptions *options.TransactionOptions,
- ) (
- sessionContext mongo.SessionContext,
- err *errs.Error,
- ) {
- var (
- session mongo.Session
- localError error
- )
- defer func() {
- if localError != nil {
- err = errs.Internal().Details(&errs.Detail{
- Message: localError.Error(),
- })
- }
- }()
- if session, localError = t.client.StartSession(sessionOptions); localError == nil {
- ctx, _ := context.WithTimeout(context.Background(), 60*time.Second)
- mongo.WithSession(ctx, session, func(sctx mongo.SessionContext) error {
- if localError = session.StartTransaction(transactionOptions); localError == nil {
- sessionContext = sctx
- }
- return nil
- })
- }
- return
- }
- func (t *Mongo) Dispatch(f *Filter) {
- }
- func (t *Mongo) InsertOne(f *Filter) (res *mongo.InsertOneResult, err *errs.Error) {
- f.Insertion = InsertOne
- t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
- var lerr error
- if res, lerr = collection.InsertOne(ctx, f.Entity); lerr != nil {
- err = errs.FromError(lerr)
- }
- }, func(e *errs.Error) { err = e })
- return
- }
- func (t *Mongo) InsertMany(f *Filter) (res *mongo.InsertManyResult, err *errs.Error) {
- f.Insertion = InsertMany
- t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
- var (
- lerr error
- entities []interface{}
- value = reflect.ValueOf(f.Entities)
- )
- for index := 0; index < value.Len(); index++ {
- entities = append(entities, value.Index(index).Interface())
- }
- if res, lerr = collection.InsertMany(ctx, entities); lerr != nil {
- err = errs.FromError(lerr)
- }
- }, func(e *errs.Error) { err = e })
- return
- }
- //Remove os elementos da colecao, selecionados pela query
- func (t *Mongo) RemoveOne(f *Filter) (res *mongo.DeleteResult, err *errs.Error) {
- f.CheckQuery = true
- t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
- var lerr error
- if res, lerr = collection.DeleteOne(ctx, f.Query); lerr != nil {
- err = errs.FromError(lerr)
- } else if res.DeletedCount == 0 {
- err = errs.NotFound().Details(&errs.Detail{
- Message: "No entity has been deleted",
- })
- }
- }, func(e *errs.Error) { err = e })
- return
- }
- //Remove os elementos da colecao, selecionados pela query
- func (t *Mongo) RemoveMany(f *Filter) (res *mongo.DeleteResult, err *errs.Error) {
- var lerr error
- f.CheckQuery = true
- t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
- if res, lerr = collection.DeleteMany(ctx, f.Entities); lerr != nil {
- err = errs.FromError(lerr)
- } else if res.DeletedCount == 0 {
- err = errs.NotFound().Details(&errs.Detail{
- Message: "No entity has been deleted",
- })
- }
- }, func(e *errs.Error) { err = e })
- return
- }
- func (t *Mongo) UpdateOne(f *Filter) (res *mongo.UpdateResult, err *errs.Error) {
- var lerr error
- f.Insertion = InsertOne
- f.CheckQuery = true
- t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
- if res, lerr = collection.ReplaceOne(ctx, f.Query, f.Entity); lerr != nil {
- err = errs.FromError(lerr)
- } else if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
- err = errs.NotFound().Details(&errs.Detail{
- Message: "No entity has been updated",
- })
- }
- }, func(e *errs.Error) { err = e })
- return
- }
- func (t *Mongo) UpdateMany(f *Filter) (res *mongo.UpdateResult, err *errs.Error) {
- var (
- lerr error
- entity interface{}
- entities []interface{}
- value = reflect.ValueOf(f.Entities)
- )
- f.Insertion = InsertMany
- f.CheckQuery = true
- t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
- for index := 0; index < value.Len(); index++ {
- entity = value.Index(index).Interface()
- entities = append(entities, entity)
- }
- if res, lerr = collection.UpdateMany(ctx, f.Query, entities); lerr != nil {
- err = errs.FromError(lerr)
- } else if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
- err = errs.NotFound().Details(&errs.Detail{
- Message: "No entity has been updated ",
- })
- }
- }, func(e *errs.Error) { err = e })
- return
- }
- func RegisterQuery(id, query string) {
- QueriesMap[id] = template.Must(template.New(id).Parse(query))
- }
- func ParseQuery(ctx iriscontext.Context, filter *Filter, basequery string, data map[string]interface{}) (err *errs.Error) {
- var (
- found bool
- templateErr error
- buf bytes.Buffer
- baseQueryTemplate *template.Template
- // baseQeuryDocument *bson.M
- // errbq *errs.Error
- queryDecodedString []byte
- userQueryString = Q(ctx, "q", "e30=") // default base64 encoded from "{}"
- includeInTrash, _ = strconv.ParseBool(Q(ctx, "includeInTrash", "false")) // default base64 encoded from "{}"
- )
- // defer func() {
- // spew.Dump(filter.Query)
- // }()
- // Faz o parse da userQueryString enviada pelo usuario que deve ser um base64
- if queryDecodedString, templateErr = base64.StdEncoding.DecodeString(userQueryString); templateErr != nil {
- err = errs.InvalidArgument().Details(&errs.Detail{
- Message: "Query param isn't a base64 json",
- })
- return
- }
- if baseQueryTemplate, found = QueriesMap[basequery]; !found {
- err = errs.Internal().Details(&errs.Detail{
- Message: "Invalid query input",
- })
- return
- }
- // transclude the query to base query
- data["_transclude_"] = string(queryDecodedString)
- data["_deleted_"] = includeInTrash
- fmt.Println(
- basequery,
- string(queryDecodedString),
- data,
- )
- if templateErr = baseQueryTemplate.Execute(&buf, data); templateErr != nil {
- err = errs.InvalidArgument().Details(&errs.Detail{
- Message: "Failed on interpolate data with template query",
- })
- return
- }
-
- x := buf.Bytes()
-
- fmt.Println("buf.Bytes()", string(x))
- if templateErr = bson.UnmarshalExtJSON(x, false, &filter.Query); templateErr != nil {
- err = errs.InvalidArgument().Details(&errs.Detail{
- Message: "Failed on interpolate data with template query",
- })
- return
- }
- return
- }
- func (t *Mongo) PatchOne(f *Filter) (res *mongo.UpdateResult, err *errs.Error) {
- f.Insertion = Patch
- f.CheckQuery = true
- // out, _ := json.Marshal()
- // fmt.Println(string(out))
- // spew.Dump(f.Patchs)
- t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
- var (
- lerr error
- // client *mongo.Client
- )
- // if client, err = t.GetClient(f.DB); err != nil {
- // return
- // }
- // spew.Dump(f.Query)
- // spew.Dump(f.Patchs)
- // id := primitive.NewObjectID()
- t.client.UseSessionWithOptions(
- ctx,
- options.Session().SetDefaultReadPreference(readpref.Primary()),
- func(sctx mongo.SessionContext) error {
- defer func() {
- if err != nil {
- // fmt.Println("abort patch ", id.Hex())
- sctx.AbortTransaction(sctx)
- } else {
- // fmt.Println("commit patch ", id.Hex())
- sctx.CommitTransaction(sctx)
- }
- sctx.EndSession(sctx)
- }()
- sctx.StartTransaction(options.Transaction().
- SetReadConcern(readconcern.Snapshot()).
- SetWriteConcern(writeconcern.New(writeconcern.WMajority())))
- for _, p := range *f.Patchs {
- if f.Options == nil {
- f.Options = options.Update()
- }
- if res, lerr = collection.UpdateOne(ctx, f.Query, p, f.Options.(*options.UpdateOptions)); lerr != nil {
- err = errs.FromError(lerr)
- return lerr
- }
- if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
- err = errs.NotFound().Details(&errs.Detail{
- Message: "No entity has been updated",
- })
- return fmt.Errorf("No entity has been updated")
- }
- }
- return nil
- },
- )
- }, func(e *errs.Error) { err = e })
- return
- }
- func (t *Mongo) PatchMany(f *Filter) (res *mongo.UpdateResult, err *errs.Error) {
- f.Insertion = Patch
- f.CheckQuery = true
- t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
- var (
- lerr error
- // client *mongo.Client
- )
- // if client, err = t.GetClient(f.DB); err != nil {
- // return
- // }
- // o, _ := json.MarshalIndent(f.Query, "", " ")
- // fmt.Println("query", o)
- // o, _ = json.MarshalIndent(f.Patchs, "", " ")
- // fmt.Println("patch", o)
- t.client.UseSessionWithOptions(
- ctx,
- options.Session().SetDefaultReadPreference(readpref.Primary()),
- func(sctx mongo.SessionContext) error {
- defer func() {
- if err != nil {
- sctx.AbortTransaction(sctx)
- } else {
- sctx.CommitTransaction(sctx)
- }
- sctx.EndSession(sctx)
- }()
- sctx.StartTransaction(
- options.Transaction().
- SetReadConcern(readconcern.Snapshot()).
- SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
- )
- for _, p := range *f.Patchs {
- if res, lerr = collection.UpdateMany(ctx, f.Query, p); lerr != nil {
- err = errs.FromError(lerr)
- return lerr
- }
- if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
- err = errs.NotFound().Details(&errs.Detail{
- Message: "No entity has been updated",
- })
- err.CodeText = "noEntityUpdated"
- return fmt.Errorf("No entity has been updated")
- }
- }
- return nil
- },
- )
- }, func(e *errs.Error) { err = e })
- return
- }
- func (mongo *Mongo) CreateIndex(database, collectionString string, index mongo.IndexModel) *errs.Error {
- collection := mongo.client.Database(database).Collection(collectionString)
- opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
- if _, err := collection.Indexes().CreateOne(context.Background(), index, opts); err != nil {
- return errs.FromError(err)
- }
- return nil
- }
- func (mongo *Mongo) FindOneRx(options *Filter) *ObservableStruct {
- return Observable(func(observer *Subscriber) {
- if res, err := mongo.FindOne(options); err != nil {
- observer.Err(err)
- } else {
- observer.Next(res)
- }
- })
- }
- func (t *Mongo) FindOne(f *Filter) (res *mongo.SingleResult, err *errs.Error) {
- f.CheckQuery = (f.QueryType != "aggregate")
- t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
- var (
- lerr error
- cursor *mongo.Cursor
- )
- if f.QueryType == "aggregate" {
- if cursor, err = f.Aggregate(ctx, collection, nil); err != nil {
- return
- }
- if cursor.Next(ctx) {
- if lerr = cursor.Decode(f.Entity); lerr == nil {
- return
- }
- } else {
- lerr = fmt.Errorf("No document found")
- }
- } else {
- res = collection.FindOne(ctx, f.Query)
- if lerr = res.Err(); lerr == nil {
- if lerr = res.Decode(f.Entity); lerr == nil {
- return
- }
- }
- }
- err = errs.FromError(lerr)
- }, func(e *errs.Error) {
- err = e
- })
- return
- }
- func findIds(filter *Filter) (ids []primitive.ObjectID) {
- ids = []primitive.ObjectID{}
- // findIdsFilter = &Filter{}
- // cursor, err := executeFind(findIdsFilter)
- return
- }
- func (t *Mongo) FindMany(f *Filter) (cursor *mongo.Cursor, err *errs.Error) {
- // f.CheckQuery = true
- t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
- var (
- lerr error
- elemp reflect.Value
- // ids = []primitive.ObjectID{}
- // limit = int64(f.MaxResults)
- // fields = f.Fields
- )
- // defer func() {
- // if errd := recover(); errd != nil {
- // // fmt.Println(errd)
- // // spew.Dump(Trace())
- // // fmt.Println(errors.WithStack(errd.(error)) )
- // LogError(0, fmt.Sprintf("FindMany - %v", errd))
- // }
- // }()
- // Carrega os ids e verifica o intervalo de paginacao da consulta.
- // f.Fields = &bson.M{"_id": 1}
- // ids = findIds(f)
- // Atualiza a consulta para o conjunto de ids da paginacao
- // f.Fields = fields
- // f.Query = &bson.M{"_id": bson.M{"$in": ids}}
- // fmt.Println("Query Type ", f.QueryType)
- if f.QueryType == "aggregate" {
- cursor, lerr = f.Aggregate(ctx, collection, nil)
- } else {
- findOptions := options.Find()
- findOptions.SetLimit(int64(f.MaxResults))
- // fmt.Printf("sort of query '%s'\n", f.Format)
- // spew.Dump(f.Sort)
- // spew.Dump(f.Fields)
- // spew.Dump(f.Query)
- if f.Sort != nil {
- findOptions.SetSort(f.Sort)
- }
- if f.Fields != nil {
- findOptions.SetProjection(f.Fields)
- }
- cursor, lerr = collection.Find(ctx, f.Query, findOptions)
- }
- if lerr == nil {
- defer cursor.Close(ctx)
- // spew.Dump(cursor)
- if f.Entities == nil {
- lerr = fmt.Errorf("Entities can't be nil")
- goto FindManyError
- }
- entitiesValue := reflect.ValueOf(f.Entities)
- if entitiesValue.Kind() != reflect.Ptr || entitiesValue.Elem().Kind() != reflect.Slice {
- lerr = fmt.Errorf("Entities argument must be a slice address")
- goto FindManyError
- }
- slicev := entitiesValue.Elem()
- slicev = slicev.Slice(0, slicev.Cap())
- typ := slicev.Type().Elem()
- for cursor.Next(ctx) {
- elemp = reflect.New(typ)
- // fmt.Println("aqui")
- if lerr = cursor.Decode(elemp.Interface()); lerr != nil {
- // spew.Dump(elemp)
- goto FindManyError
- }
- slicev = reflect.Append(slicev, elemp.Elem())
- }
- // spew.Dump(slicev)
- entitiesValue.Elem().Set(slicev)
- return
- }
- FindManyError:
- err = errs.FromError(lerr)
- }, func(e *errs.Error) { err = e })
- return
- }
- func (models *Mongo) Exists(options *Filter) (exists bool, err *errs.Error) {
- options.Fields = &bson.M{"_id": 1}
- if _, err = models.FindOne(options); err != nil {
- return
- }
- exists = true
- return
- }
- func (driver *Mongo) GetContext(options *Filter) (context.Context, *mongo.Collection) {
- var ctx context.Context
- if options.SessionContext != nil {
- ctx = options.SessionContext
- } else {
- ctx, _ = context.WithTimeout(context.Background(), 40*time.Second)
- }
- collection := driver.client.Database(options.DB).Collection(options.Collection)
- return ctx, collection
- }
- func (t *Mongo) exec(f *Filter, execAction execfn, errorAction func(*errs.Error)) {
- var (
- err *errs.Error
- errl error
- ctx context.Context
- // client *mongo.Client
- )
- defer func() {
- if err == nil {
- if errl == nil {
- return
- }
- err = errs.FromError(errl)
- }
- // spew.Dump(err)
- errorAction(err)
- }()
- if err = f.Check(); err != nil {
- return
- }
- switch f.Insertion {
- case InsertOne:
- if f.Entity == nil {
- errl = fmt.Errorf("Entity can't be nil")
- return
- }
- case InsertMany:
- if f.Entities == nil {
- errl = fmt.Errorf("Entities can't be nil")
- return
- }
- case Patch:
- if f.Patchs == nil {
- errl = fmt.Errorf("Patchs can't be nil")
- return
- }
- }
- // fmt.Println("passei do exec")
- if f.SessionContext != nil {
- ctx = f.SessionContext
- } else {
- ctx, _ = context.WithTimeout(context.Background(), time.Minute)
- }
- collection := t.client.Database(f.DB).Collection(f.Collection)
- execAction(ctx, collection)
- return
- }
- func DeletedPatch() *bson.A {
- return &bson.A{
- bson.M{
- "$set": bson.M{
- "deleted": true,
- "deletedIn": time.Now().Unix(),
- },
- },
- }
- }
- // func mergeQueries(filter *Filter) *bson.M {
- // if baseQeuryDocument != nil {
- // if filter.Query == nil {
- // filter.Query = baseQeuryDocument
- // } else {
- // filter.Query = &bson.M{
- // "$and": bson.A{baseQeuryDocument, filter.Query},
- // }
- // }
- // } else if filter.Query == nil {
- // filter.Query = &bson.M{}
- // }
- // }
- // func yieldIndexModel() mongo.IndexModel {
- // keys := bsonx.Doc{{Key: *key, Value: bsonx.Int32(int32(*value))}}
- // index := mongo.IndexModel{}
- // index.Keys = keys
- // if *unique {
- // index.Options = bsonx.Doc{{Key: "unique", Value: bsonx.Boolean(true)}}
- // }
- // return index
- // }
- // func (t *Mongo) GetClient() (*mgo.Session, *errs.Error) {
- // func (t *Mongo) GetClient(id string) (*mongo.Client, *errs.Error) {
- // var (
- // client *mongo.Client
- // found bool
- // )
- // if id == "" {
- // panic("Client id not defined!")
- // }
- // if client, found = t.Clients[id]; !found {
- // return nil, FullError(ERR_SERVICE_UNAVAILABLE, &errs.Detail{
- // Message: fmt.Sprintf("Client %s not exists!", id),
- // })
- // }
- // return client, nil
- // }
- // func errorcheck(err error) *errs.Error {
- // if err != nil {
- // return FullError(ERR_GENERAL, &errs.Detail{
- // Message: err.Error(),
- // })
- // }
- // return nil
- // }
- // func Paginate(collection *mongo.Collection, startValue primitive.ObjectID, nPerPage int64) ([]bson.D, *bson.Value, error) {
- // // Query range filter using the default indexed _id field.
- // filter := bson.VC.DocumentFromElements(
- // bson.EC.SubDocumentFromElements(
- // "_id",
- // bson.EC.ObjectID("$gt", startValue),
- // ),
- // )
- // var opts []findopt.Find
- // opts = append(opts, findopt.Sort(bson.NewDocument(bson.EC.Int32("_id", -1))))
- // opts = append(opts, findopt.Limit(nPerPage))
- // cursor, _ := collection.Find(context.Background(), filter, opts...)
- // var lastValue *bson.Value
- // var results []bson.Document
- // for cursor.Next(context.Background()) {
- // elem := bson.NewDocument()
- // err := cursor.Decode(elem)
- // if err != nil {
- // return results, lastValue, err
- // }
- // results = append(results, *elem)
- // lastValue = elem.Lookup("_id")
- // }
- // return results, lastValue, nil
- // }
- // func (t *Mongo) Patch(f *Filter) *errs.Error {
- // var (
- // entityinterface EntityInterface
- // col *mongo.Collection
- // entity interface{}
- // ok bool
- // session *mgo.Session
- // err error
- // x *bson.M
- // )
- // if session, err = t.GetClient(f.DB); err == nil {
- // if !f.Id.IsZero() {
- // f.Query = &bson.M{"_id": f.Id}
- // } else if f.Query == nil {
- // err = fmt.Errorf("Query not defined!")
- // goto ErrorPatch
- // }
- // defer session.Close()
- // if entityinterface, ok = f.Entity.(EntityInterface); ok {
- // entity = entityinterface.Update()
- // x = entity.(*bson.M)
- // // delete(*x, "$set")
- // } else {
- // entity = bson.M{"$set": f.Entity}
- // }
- // col = session.DB(f.DB).C(f.Collection)
- // // entity["$set"]
- // spew.Dump(entity)
- // _, err = col.Upsert(f.Query, entity)
- // }
- // ErrorPatch:
- // if err != nil {
- // return Error(ERR_PERSIST, err.Error())
- // }
- // return nil
- // }
- // func (t *Mongo) Upsert(f *Filter) *errs.Error {
- // var (
- // entityinterface EntityInterface
- // col *mongo.Collection
- // entity interface{}
- // ok bool
- // )
- // session, err := t.GetClient(f.DB)
- // if err == nil {
- // if !f.Id.IsZero() {
- // f.Query = &bson.M{"_id": f.Id}
- // } else if f.Query == nil {
- // err = fmt.Errorf("Query not defined!")
- // goto ErrorUp
- // }
- // defer session.Close()
- // col = session.DB(f.DB).C(f.Collection)
- // // update = bson.M{"$set": f.Entity}
- // // if data, ok = f.Entity.Push(); ok {
- // // update["$push"] = data
- // // }
- // // if data, ok = f.Entity.Pull(); ok {
- // // update["$pull"] = data
- // // }
- // // spew.Dump(f.Entity)
- // if entityinterface, ok = f.Entity.(EntityInterface); ok {
- // // fmt.Println("Implement interface")
- // entity = entityinterface.Update()
- // } else {
- // entity = f.Entity
- // }
- // // spew.Dump(entity)
- // // _, err = col.Upsert(f.Query, entity)
- // err = col.Update(f.Query, entity)
- // }
- // ErrorUp:
- // if err != nil {
- // return Error(ERR_PERSIST, err.Error())
- // }
- // return nil
- // }
- //-----------------------------------------------------------------------
- // func (t *Mongo) Find(f *Filter, one bool) (*errs.Error, int) {
- // var cursor string
- // pageToken := &f.PageToken
- // session, err := t.GetClient(f.DB)
- // if err == nil {
- // defer session.Close()
- // if f.Query == nil {
- // if one {
- // if !f.Id.IsZero() {
- // f.Query = &bson.M{"_id": f.Id}
- // } else {
- // return Error(ERR_INVALID_PARAM_VALUE, "Param id not valid."), 0
- // }
- // } else if f.Query == nil {
- // f.Query = &bson.M{}
- // }
- // }
- // query := minquery.New(session.DB(f.DB), f.Collection, f.Query)
- // // Se tem um token de paginacao
- // hasToken := pageToken.HasToken()
- // if hasToken {
- // fmt.Println("consultando com token", pageToken.Cursor)
- // query = query.Cursor(pageToken.Cursor)
- // }
- // // cursorFields := []string{"_id"}
- // cursorFields := []string{}
- // if len(f.Sort) > 0 {
- // query.Sort(f.Sort...)
- // for _, key := range f.Sort {
- // if !strings.Contains(key, ".") {
- // cursorFields = append(cursorFields, key)
- // }
- // }
- // } else {
- // cursorFields = append(cursorFields, "_id")
- // }
- // // Seleciona os campos se forem especificados
- // if f.Fields != nil {
- // query.Select(f.Fields)
- // }
- // // Determina o numero de itens a ser retornado
- // if f.MaxResults > 0 {
- // query.Limit(f.MaxResults)
- // }
- // if one {
- // err = query.One(f.Entity)
- // pageToken.Count = 1 // Filter one
- // } else {
- // // spew.Dump(f.Entity, cursorFields)
- // if cursor, err = query.All(f.Entity, cursorFields...); err != nil {
- // goto ErroFind
- // }
- // // Numero total de documentos
- // // Se tem um token de paginacao o valor total esta no token
- // // Caso contrario consulta no banco
- // if !hasToken {
- // c := session.DB(f.DB).C(f.Collection)
- // if pageToken.Count, err = c.Find(f.Query).Select(&bson.M{"_id": 1}).Count(); err != nil {
- // goto ErroFind
- // }
- // }
- // // if ()
- // fmt.Println("Cursor return", cursor, "-")
- // pageToken.NewCursor = cursor
- // }
- // }
- // ErroFind:
- // if err != nil {
- // return Error(ERR_PERSIST, err.Error()), 0
- // }
- // return nil, pageToken.Count
- // }
- // func (t *Mongo) FindOne(f *Filter) (*errs.Error, int) {
- // return t.Find(f, true)
- // }
- // func (t *Mongo) FindAll(f *Filter) (*errs.Error, int) {
- // return t.Find(f, false)
- // }
- // func (t *Mongo) Count(f *Filter) (*errs.Error, int) {
- // session, err := t.GetClient(f.DB)
- // if err == nil {
- // defer session.Close()
- // if f.Query == nil {
- // f.Query = &bson.M{}
- // }
- // query := session.DB(f.DB).C(f.Collection).Find(f.Query)
- // // Seleciona os campos se forem especificados
- // f.PageToken.Count, err = query.Select(&bson.M{"_id": 1}).Count()
- // }
- // if err != nil {
- // return Error(ERR_PERSIST, err.Error()), 0
- // }
- // return nil, f.PageToken.Count
- // }
- // func (t *Mongo) Update(f *Filter) *errs.Error {
- // var (
- // entityinterface EntityInterface
- // entity interface{}
- // )
- // one := false
- // ok := false
- // session, err := t.GetClient(f.DB)
- // if err == nil {
- // defer session.Close()
- // if !f.Id.IsZero() {
- // one = true
- // f.Query = &bson.M{"_id": f.Id}
- // } else if f.Query == nil {
- // err = fmt.Errorf("Query not defined!")
- // }
- // if entityinterface, ok = f.Entity.(EntityInterface); ok {
- // entity = entityinterface.Update()
- // } else {
- // entity = f.Entity
- // }
- // col := session.DB(f.DB).C(f.Collection)
- // if one {
- // err = col.Update(f.Query, entity)
- // } else {
- // _, err = col.UpdateAll(f.Query, entity)
- // }
- // }
- // if err != nil {
- // return Error(ERR_PERSIST, err.Error())
- // }
- // return nil
- // }
- //-----------------------------------------------------------------------
- // func (t *Mongo) Aggregation(f *Filter, q []bson.M, one bool) *errs.Error {
- // session, err := t.GetClient(f.DB)
- // if err == nil {
- // defer session.Close()
- // pipe := session.DB(f.DB).C(f.Collection).Pipe(q)
- // if one {
- // err = pipe.One(&f.Entity)
- // } else {
- // err = pipe.All(&f.Entity)
- // }
- // }
- // if err != nil {
- // return Error(ERR_PERSIST, err.Error())
- // }
- // return nil
- // }
- // all := true
- // session, err := t.GetClient(f.DB)
- // if err == nil {
- // defer session.Close()
- // if !f.Id.IsZero() {
- // all = false
- // f.Query = &bson.M{"_id": f.Id}
- // } else if f.Query == nil {
- // err = fmt.Errorf("Query not defined!")
- // }
- // C := session.DB(f.DB).C(f.Collection)
- // if all {
- // _, err = C.RemoveAll(f.Query)
- // } else {
- // err = C.Remove(f.Query)
- // }
- // }
- // if err != nil {
- // return Error(ERR_PERSIST, err.Error())
- // }
- // return nil
- // VERIFICAR A NECESSIDADE E CORRIGIR O ERRO
- // func (t *Mongo) GetArrayById(f *Filter, ids []primitive.ObjectID) error {
- // session, err := t.GetClient(f.DB)
- // if err != nil {
- // return err
- // }
- // defer session.Close()
- // q := bson.M{"_id": bson.M{"$in": ids}}
- // err = session.DB(f.DB).C(f.Collection).Find(q).One(f.Entity)
- // return err
- // }
- // func (t *Mongo) GetEmbedFromArray(f *Filter, embed string, id primitive.ObjectID, sid primitive.ObjectID) (errs error) {
- // session, err := t.GetClient(f.DB)
- // if err != nil {
- // return err
- // }
- // defer session.Close()
- // field := "$" + embed
- // match := bson.M{"_id": id}
- // match[embed] = bson.M{"$exists": true}
- // aggregations := []bson.M{
- // bson.M{"$unwind": field},
- // bson.M{"$match": match},
- // bson.M{"$replaceRoot": bson.M{"newRoot": field}},
- // bson.M{"$match": bson.M{"_id": sid}},
- // }
- // C := session.DB(f.DB).C(f.Collection)
- // return C.Pipe(aggregations).One(&f.Entity)
- // }
- // func (t *Mongo) UpdateEmbedFromArray(f *Filter, embed string, id primitive.ObjectID, sid primitive.ObjectID) error {
- // session, err := t.GetClient(f.DB)
- // if err != nil {
- // return err
- // }
- // defer session.Close()
- // col := session.DB(f.DB).C(f.Collection)
- // //Define o elemento que sera atualizado
- // element := bson.M{}
- // element[embed+".$"] = &f.Entity
- // //Define o seletor do elemento que sera atualizado
- // selector := bson.M{"_id": id}
- // selector[embed+"._id"] = sid
- // return col.Update(selector, bson.M{"$set": element})
- // }
- // func (t *Mongo) AddEmbedFromArray(f *Filter, embed string, id primitive.ObjectID) error {
- // session, errs := t.GetClient(f.DB)
- // if errs != nil {
- // return errs
- // }
- // defer session.Close()
- // col := session.DB(f.DB).C(f.Collection)
- // //Define o elemento que sera atualizado
- // element := bson.M{}
- // element[embed] = &f.Entity
- // //Define o seletor do elemento que sera atualizado
- // selector := bson.M{"_id": id}
- // return col.Update(selector, bson.M{"$push": element})
- // }
- // 0000000000000000000000000000000000000000000000000000000000000000
- // func (t *Mongo) NewTransaction(dbname string) (*Transaction, error) {
- // session, e := t.GetClient(dbname)
- // if e != nil {
- // return nil, e
- // }
- // collection := session.DB(dbname).C("txns")
- // return &Transaction{R: txn.NewRunner(collection)}, nil
- // }
- // Transactions
- // type Transaction struct {
- // R *txn.Runner
- // Ops []txn.Op
- // }
- // func (t *Transaction) Insert(collection string, id primitive.ObjectID, entity interface{}) *Transaction {
- // t.Ops = append(t.Ops, txn.Op{
- // C: collection,
- // Id: id,
- // Insert: entity,
- // })
- // return t
- // }
- // func (t *Transaction) Update(collection string, id primitive.ObjectID, assert *bson.M, entity interface{}) *Transaction {
- // op := txn.Op{
- // C: collection,
- // Id: id,
- // Update: entity,
- // }
- // if assert != nil {
- // op.Assert = assert
- // }
- // t.Ops = append(t.Ops, op)
- // return t
- // }
- // func (t *Transaction) Remove(collection string, id primitive.ObjectID) *Transaction {
- // t.Ops = append(t.Ops, txn.Op{
- // C: collection,
- // Id: id,
- // Remove: true,
- // })
- // return t
- // }
- // func (t *Transaction) Run() error {
- // return t.R.Run(t.Ops, primitive.NewObjectID(), nil)
- // }
- // func (t *Mongo) Find(f *Filter, one bool) (*errs.Error, int) {
- // session, err := t.GetClient()
- // if err == nil {
- // defer session.Close()
- // if !f.Id.IsZero() {
- // f.Query = &bson.M{"_id": f.Id}
- // } else if f.Query == nil {
- // f.Query = &bson.M{}
- // }
- // pageToken := &f.PageToken
- // aggregations := []*bson.M{}
- // // Se tem um token de paginacao
- // hasToken := pageToken.HasToken()
- // if hasToken {
- // fmt.Println("consultando com token", pageToken.CurrentID)
- // // {$or:
- // // [
- // // {$and :
- // // [
- // // {<sort key>:{$gte: <last sort key value of previous page>}},
- // // {"_id" : {$gt : <last result id of previous page>}}
- // // ]
- // // },
- // // ou
- // // {<sort key> :{$gt: <last sort key value>}}
- // // ]
- // // }.
- // // f.Query = &bson.M{"$and": []bson.M{
- // // *f.Query,
- // // bson.M{"_id": bson.M{"$gt": f.NextPageToken}},
- // // }}
- // f.Query = &bson.M{"_id": bson.M{
- // // "$gt": primitive.ObjectIDHex(pageToken.CurrentID),
- // "$gt": pageToken.CurrentID,
- // }}
- // } else {
- // }
- // // Adiciona o primeiro estagio da agregacao
- // aggregations = append(aggregations, &bson.M{"$match": f.Query})
- // if len(f.Sort) > 0 {
- // f.Sort = append(f.Sort, "_id")
- // aggregations = append(aggregations, MgoSortBson(f.Sort))
- // // query.Sort(f.Sort...)
- // }
- // // aggregations = append(aggregations, &bson.M{
- // // "$addFields": bson.M{"position":},
- // // })
- // // Seleciona os campos se forem especificados
- // if f.Fields != nil {
- // // aggregations = append(aggregations, bson.M{"$limit": f.MaxResults})
- // }
- // // countQuery := session.DB(f.DB).C(f.Collection).Pipe(aggregations)
- // // Determina o numero de itens a ser retornado
- // if f.MaxResults > 0 {
- // aggregations = append(aggregations, &bson.M{"$limit": f.MaxResults})
- // }
- // spew.Dump(aggregations)
- // // query := session.DB(f.DB).C(f.Collection).Find(f.Query)
- // collection := session.DB(f.DB).C(f.Collection)
- // query := collection.Pipe(aggregations)
- // if one {
- // err = query.One(f.Entity)
- // f.Count = 1 // Filter one
- // } else {
- // err = query.All(f.Entity)
- // // Numero total de documentos
- // // Se tem um token de paginacao o valor total esta no token
- // // Caso contrario consulta no banco
- // if hasToken {
- // f.Count = pageToken.Count
- // } else {
- // f.Count, _ = collection.Find(f.Query).Select(&bson.M{"_id": 1}).Count()
- // }
- // }
- // }
- // if err != nil {
- // return Error(ERR_PERSIST, err.Error()), 0
- // }
- // return nil, f.Count
- // }
- // v := reflect.ValueOf(f.Entity)
- // fmt.Println("UpdateCursorResponse")
- // // spew.Dump(f.PageToken)
- // // fmt.Println(v.Kind())
- // if v.Kind() == reflect.Ptr {
- // // Atualiza v para o elemento apontado
- // v = v.Elem()
- // if v.Kind() == reflect.Slice || v.Kind() == reflect.Array {
- // // Acessa o atributo ID da ultima entidade do array
- // field := v.Index(v.Len() - 1).Elem().FieldByName("ID")
- // // Converte o atributo ID para objectId
- // last := field.Interface().(primitive.ObjectID)
- // // Cria a consulta que verifica se existe outros registros alem do ultimo
- // filter := &Filter{
- // Collection: f.Collection,
- // Query: &bson.M{"_id": bson.M{
- // "$lt": last,
- // }},
- // }
- // // Se existirem elementos adiciona o nextTOKEN
- // if _, count := models.Count(filter); count > 0 {
- // // atualiza os valores do token
- // f.PageToken.StartID = last.Hex()
- // f.PageToken.Count = f.Count
- // resp.NextPageToken = f.PageToken.Encode()
- // }
- // }
- // }
- // func NewMongo() (*Mongo, error) {
- // // session, err := mgo.Dial(addrs)
- // // session, err := mgo.Dial("mongodb://localhost:27017")
- // session, err := mgo.Dial("mongodb://localhost:27017")
- // if err != nil {
- // return nil, err
- // }
- // session.SetMode(mgo.Monotonic, true)
- // // "addrs": "mongodb://localhost:27017",
- // // "user": "guest",
- // // "password": "welcome",
- // // "database": "financeiro"
- // m := &Mongo{
- // Session: session,
- // DataBase: "accounts",
- // Addrs: ,
- // // Config: cfg,
- // }
- // return m, err
- // }
|