mongo.go 43 KB


  1. package api
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/base64"
  6. "encoding/json"
  7. "fmt" // "html/template"
  8. "reflect"
  9. "regexp"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "text/template"
  14. "time"
  15. "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs"
  16. "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/types"
  17. "github.com/davecgh/go-spew/spew"
  18. "github.com/jeremywohl/flatten"
  19. "github.com/kataras/iris"
  20. iriscontext "github.com/kataras/iris/v12/context"
  21. "go.mongodb.org/mongo-driver/bson"
  22. "go.mongodb.org/mongo-driver/bson/primitive"
  23. "go.mongodb.org/mongo-driver/mongo"
  24. "go.mongodb.org/mongo-driver/mongo/options"
  25. )
  26. type ParseQueryHandler = func(ctx iriscontext.Context) (err *errs.Error)
  27. const (
  28. DefaultAddrConnect = "mongodb://localhost:27017"
  29. InsertOne = iota + 1
  30. InsertMany
  31. Patch
  32. )
  33. var (
  34. QueriesMap = map[string]*template.Template{}
  35. ParseQueryHandlers = map[string]ParseQueryHandler{}
  36. EMPTYQUERY_REGEX = regexp.MustCompile(`{\s*}`)
  37. NextPageTokenMap = map[string]string{}
  38. )
  39. func GetSessionContext(ctx iriscontext.Context) mongo.SessionContext {
  40. context := ctx.Values().Get("$SessionContext")
  41. if context != nil {
  42. return context.(mongo.SessionContext)
  43. }
  44. return nil
  45. }
  46. func init() {
  47. errs.RegisterMapErrorFunction("MONGO_ERROS", func(source error) (err *errs.Error) {
  48. message := source.Error()
  49. switch {
  50. case strings.Contains(message, "E11000"):
  51. err = errs.AlreadyExists().Details(&errs.Detail{
  52. Reason: "DUPLICATED_ITEM",
  53. Message: message,
  54. })
  55. case strings.Contains(message, "cannot decode"):
  56. err = errs.DataCaps().Details(&errs.Detail{
  57. Message: message,
  58. })
  59. case strings.Contains(message, "no documents in result"):
  60. errs.NotFound().Details(&errs.Detail{
  61. Reason: "NOT_FOUND",
  62. Message: message,
  63. })
  64. }
  65. return
  66. })
  67. }
  68. type Mongo struct {
  69. // Sessions map[string]*mgo.Session
  70. // Session *mgo.Session
  71. // Uri string
  72. // Configs
  73. //DefaultDB string
  74. // User string `json:"user"`
  75. // Password string `json:"password"`
  76. // DataBase string `json:"database"`
  77. // Addrs string `json:"addrs"`
  78. FindManyNextPaginationMux sync.Mutex
  79. // NextPageTokenMap map[string]string
  80. Uniq int64
  81. subject *BehaviorSubjectStruct
  82. client *mongo.Client
  83. Credential *options.Credential
  84. Config string `json:"config"`
  85. Clients map[string]*mongo.Client
  86. }
  87. type execfn func(context.Context, *mongo.Collection)
  88. type EntityInterface interface {
  89. Patch() *bson.A
  90. }
  91. // Modelo de comunicação entre o controle da API e a camada de persistencia.
  92. type Filter struct {
  93. Id primitive.ObjectID
  94. UserId primitive.ObjectID
  95. // NextPageToken primitive.ObjectID
  96. // PageToken PageToken
  97. Fields *bson.M
  98. Query *bson.M
  99. Patchs *bson.A
  100. Sort *bson.M
  101. Context iriscontext.Context
  102. // Sort []string
  103. NextPageToken string
  104. Collection string
  105. QueryType string
  106. DB string
  107. Format string
  108. Insertion int
  109. ResultSizeEstimate int64
  110. CheckQuery bool
  111. IgnoreNoEntityFound bool
  112. MaxResults int
  113. Entity interface{}
  114. Entities interface{}
  115. SessionContext mongo.SessionContext
  116. // Aggregate interface{}
  117. Pipeline primitive.A
  118. Options interface{}
  119. // PatchOptions = patch.Patch()
  120. }
  121. func (f *Filter) Aggregate(ctx context.Context, collection *mongo.Collection, opts *options.AggregateOptions) (cursor *mongo.Cursor, err *errs.Error) {
  122. var errl error
  123. if opts == nil {
  124. opts = options.Aggregate().SetBatchSize(int32(f.MaxResults)).SetCollation(&options.Collation{
  125. Strength: 1,
  126. Locale: "en",
  127. })
  128. }
  129. if cursor, errl = collection.Aggregate(ctx, f.Pipeline, opts); errl != nil {
  130. err = errs.FromError(errl)
  131. }
  132. return
  133. }
  134. func (t *Filter) Check() (err *errs.Error) {
  135. if t.CheckQuery {
  136. if t.Query == nil {
  137. if !t.Id.IsZero() {
  138. t.Query = &bson.M{"_id": t.Id}
  139. } else {
  140. err = errs.InvalidArgument().Details(&errs.Detail{
  141. LocationType: "query",
  142. Location: "q",
  143. Message: "emptyValue",
  144. })
  145. return
  146. }
  147. }
  148. }
  149. if t.Query == nil {
  150. t.Query = &bson.M{}
  151. }
  152. return
  153. }
  154. func (mongo *Mongo) uniqID() int64 {
  155. mongo.Uniq++
  156. return mongo.Uniq
  157. }
  158. func (t *Mongo) Ready() *BehaviorSubjectStruct {
  159. if t.subject == nil {
  160. t.subject = BehaviorSubject()
  161. }
  162. return t.subject
  163. }
  164. func (t *Mongo) Init() (err error) {
  165. defer func() {
  166. spew.Dump(err)
  167. }()
  168. t.FindManyNextPaginationMux = sync.Mutex{}
  169. NextPageTokenMap = map[string]string{}
  170. fmt.Printf("Connection string '%s'", t.Config)
  171. clientOpts := options.Client().ApplyURI(t.Config)
  172. if t.Credential != nil {
  173. clientOpts.SetAuth(*t.Credential)
  174. }
  175. connectContext, _ := context.WithTimeout(context.Background(), 10*time.Second)
  176. // if t.client, err = mongo.NewClient(setOptions); err != nil {
  177. if t.client, err = mongo.Connect(
  178. connectContext,
  179. clientOpts,
  180. ); err != nil {
  181. return
  182. }
  183. // ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
  184. // defer cancel()
  185. // if err = t.client.Connect(ctx); err != nil {
  186. // return
  187. // }
  188. t.Ready().Next(true)
  189. return nil
  190. }
  191. func (t *Mongo) CommitTransaction(opts *options.SessionOptions) {
  192. }
  193. func (t *Mongo) StartTransaction(
  194. sessionOptions *options.SessionOptions,
  195. transactionOptions *options.TransactionOptions,
  196. ) (
  197. sessionContext mongo.SessionContext,
  198. err *errs.Error,
  199. ) {
  200. var (
  201. session mongo.Session
  202. localError error
  203. )
  204. defer func() {
  205. if localError != nil {
  206. err = errs.Internal().Details(&errs.Detail{
  207. Message: localError.Error(),
  208. })
  209. }
  210. }()
  211. if session, localError = t.client.StartSession(sessionOptions); localError == nil {
  212. ctx, _ := context.WithTimeout(context.Background(), 60*time.Second)
  213. mongo.WithSession(ctx, session, func(sctx mongo.SessionContext) error {
  214. if localError = session.StartTransaction(transactionOptions); localError == nil {
  215. sessionContext = sctx
  216. }
  217. return nil
  218. })
  219. }
  220. return
  221. }
  222. func (t *Mongo) Dispatch(f *Filter) {
  223. }
  224. func (t *Mongo) InsertOne(f *Filter) (res *mongo.InsertOneResult, err *errs.Error) {
  225. defer func() {
  226. createDebugEvent(f, "models.insert.one", func(event *DebugEvent) {
  227. event.Error = err
  228. })
  229. }()
  230. f.Insertion = InsertOne
  231. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  232. var lerr error
  233. if res, lerr = collection.InsertOne(ctx, f.Entity); lerr != nil {
  234. err = errs.FromError(lerr)
  235. }
  236. }, func(e *errs.Error) { err = e })
  237. return
  238. }
  239. func (t *Mongo) InsertMany(f *Filter) (res *mongo.InsertManyResult, err *errs.Error) {
  240. defer func() {
  241. createDebugEvent(f, "models.insert.many", func(event *DebugEvent) {
  242. event.Error = err
  243. })
  244. }()
  245. f.Insertion = InsertMany
  246. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  247. var (
  248. lerr error
  249. entities []interface{}
  250. value = reflect.ValueOf(f.Entities)
  251. )
  252. for index := 0; index < value.Len(); index++ {
  253. entities = append(entities, value.Index(index).Interface())
  254. }
  255. if res, lerr = collection.InsertMany(ctx, entities); lerr != nil {
  256. err = errs.FromError(lerr)
  257. }
  258. }, func(e *errs.Error) { err = e })
  259. return
  260. }
  261. //Remove os elementos da colecao, selecionados pela query
  262. func (t *Mongo) RemoveOne(f *Filter) (res *mongo.DeleteResult, err *errs.Error) {
  263. defer func() {
  264. createDebugEvent(f, "models.remove.one", func(event *DebugEvent) {
  265. event.Error = err
  266. })
  267. }()
  268. f.CheckQuery = true
  269. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  270. var lerr error
  271. fmt.Println("remove.one")
  272. spew.Dump(f.Query)
  273. if res, lerr = collection.DeleteOne(ctx, f.Query); lerr != nil {
  274. err = errs.FromError(lerr)
  275. } else if res.DeletedCount == 0 {
  276. err = errs.NotFound().Details(&errs.Detail{
  277. Message: "No entity has been deleted",
  278. })
  279. }
  280. }, func(e *errs.Error) { err = e })
  281. return
  282. }
  283. //Remove os elementos da colecao, selecionados pela query
  284. func (t *Mongo) RemoveMany(f *Filter) (res *mongo.DeleteResult, err *errs.Error) {
  285. var lerr error
  286. var deleteOptions *options.DeleteOptions
  287. defer func() {
  288. createDebugEvent(f, "models.remove.many", func(event *DebugEvent) {
  289. event.Error = err
  290. })
  291. }()
  292. f.CheckQuery = true
  293. if f.Options != nil {
  294. deleteOptions = f.Options.(*options.DeleteOptions)
  295. }
  296. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  297. if res, lerr = collection.DeleteMany(
  298. ctx,
  299. f.Query,
  300. deleteOptions,
  301. ); lerr != nil {
  302. err = errs.FromError(lerr)
  303. }
  304. // else if res.DeletedCount == 0 {
  305. // err = errs.NotFound().Details(&errs.Detail{
  306. // Message: "No entity has been deleted",
  307. // })
  308. // }
  309. }, func(e *errs.Error) { err = e })
  310. return
  311. }
  312. func (t *Mongo) UpdateOne(f *Filter) (res *mongo.UpdateResult, err *errs.Error) {
  313. var lerr error
  314. defer func() {
  315. createDebugEvent(f, "models.update.one", func(event *DebugEvent) {
  316. event.Error = err
  317. })
  318. }()
  319. f.Insertion = InsertOne
  320. f.CheckQuery = true
  321. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  322. if res, lerr = collection.ReplaceOne(ctx, f.Query, f.Entity); lerr != nil {
  323. err = errs.FromError(lerr)
  324. } else if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
  325. err = errs.NotFound().Details(&errs.Detail{
  326. Message: "No entity has been updated",
  327. })
  328. }
  329. }, func(e *errs.Error) { err = e })
  330. return
  331. }
  332. func (t *Mongo) UpdateMany(f *Filter) (res *mongo.UpdateResult, err *errs.Error) {
  333. var (
  334. lerr error
  335. entity interface{}
  336. entities []interface{}
  337. value = reflect.ValueOf(f.Entities)
  338. )
  339. defer func() {
  340. createDebugEvent(f, "models.update.many", func(event *DebugEvent) {
  341. event.Error = err
  342. })
  343. }()
  344. f.Insertion = InsertMany
  345. f.CheckQuery = true
  346. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  347. for index := 0; index < value.Len(); index++ {
  348. entity = value.Index(index).Interface()
  349. entities = append(entities, entity)
  350. }
  351. if res, lerr = collection.UpdateMany(ctx, f.Query, entities); lerr != nil {
  352. err = errs.FromError(lerr)
  353. } else if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
  354. err = errs.NotFound().Details(&errs.Detail{
  355. Message: "No entity has been updated ",
  356. })
  357. }
  358. }, func(e *errs.Error) { err = e })
  359. return
  360. }
  361. func RegisterQuery(id, query string) {
  362. QueriesMap[id] = template.Must(template.New(id).Parse(query))
  363. }
  364. func BeforeParseQueryHandler(id string, fn ParseQueryHandler) {
  365. ParseQueryHandlers[id] = fn
  366. }
  367. func getQueryHandler(id string) ParseQueryHandler {
  368. return ParseQueryHandlers[id]
  369. }
  370. func ParseQuery(ctx iriscontext.Context, filter *Filter, basequery string, data map[string]interface{}) (err *errs.Error) {
  371. var (
  372. found bool
  373. templateErr error
  374. buf bytes.Buffer
  375. baseQueryTemplate *template.Template
  376. // baseQeuryDocument *bson.M
  377. // errbq *errs.Error
  378. nextPageToken string
  379. queryDecodedString []byte
  380. values = ctx.Values()
  381. userQueryString = Q(ctx, "q", "e30=") // default base64 encoded from "{}"
  382. includeInTrash, _ = strconv.ParseBool(Q(ctx, "includeInTrash", "false")) // default base64 encoded from "{}"
  383. )
  384. // Faz o parse da userQueryString enviada pelo usuario que deve ser um base64
  385. if queryDecodedString, templateErr = base64.StdEncoding.DecodeString(userQueryString); templateErr != nil {
  386. err = errs.InvalidArgument().Details(&errs.Detail{
  387. Message: "Query param isn't a base64 json",
  388. })
  389. return
  390. }
  391. if baseQueryTemplate, found = QueriesMap[basequery]; !found {
  392. err = errs.Internal().Details(&errs.Detail{
  393. Message: "Invalid query input",
  394. })
  395. return
  396. }
  397. // transclude the query to base query
  398. fmt.Println("##########################[", filter.NextPageToken, "]")
  399. // spew.Dump(t.NextPageTokenMap)
  400. if nextPageToken, found = NextPageTokenMap[filter.NextPageToken]; !found {
  401. nextPageToken = "{}"
  402. }
  403. data["_transclude_nextPageToken"] = nextPageToken
  404. data["_transclude_true"] = string(queryDecodedString)
  405. data["_deleted_"] = includeInTrash
  406. values.Set("eon.query.data", data)
  407. values.Set("eon.query.id", basequery)
  408. values.Set("eon.query.options", filter)
  409. if handler := getQueryHandler(basequery); handler != nil {
  410. if err = handler(ctx); err != nil {
  411. return
  412. }
  413. }
  414. if templateErr = baseQueryTemplate.Execute(&buf, data); templateErr != nil {
  415. err = errs.InvalidArgument().Details(&errs.Detail{
  416. Message: "Failed on interpolate data with template query",
  417. })
  418. return
  419. }
  420. x := buf.Bytes()
  421. fmt.Println("buf.Bytes()", string(x))
  422. if templateErr = bson.UnmarshalExtJSON(x, false, &filter.Query); templateErr != nil {
  423. err = errs.InvalidArgument().Details(&errs.Detail{
  424. Message: "Failed on interpolate data with template query",
  425. })
  426. return
  427. }
  428. return
  429. }
  430. func (this *Mongo) PatchOne(actionOptions *Filter) (res *mongo.UpdateResult, err *errs.Error) {
  431. uniq := this.uniqID()
  432. fmt.Println("mongo.patch.many", uniq)
  433. defer func() {
  434. createDebugEvent(actionOptions, "models.patch.many", func(event *DebugEvent) {
  435. event.Error = err
  436. })
  437. fmt.Println("mongo.patch.many.end", uniq)
  438. }()
  439. actionOptions.Insertion = Patch
  440. actionOptions.CheckQuery = true
  441. this.exec(actionOptions, func(ctx context.Context, collection *mongo.Collection) {
  442. var (
  443. lerr error
  444. // client *mongo.Client
  445. )
  446. for _, p := range *actionOptions.Patchs {
  447. if actionOptions.Options == nil {
  448. actionOptions.Options = options.Update()
  449. }
  450. if res, lerr = collection.UpdateOne(ctx, actionOptions.Query, p, actionOptions.Options.(*options.UpdateOptions)); lerr != nil {
  451. // ce := lerr.(mongo.CommandError)
  452. // fmt.Println("----------------------------",
  453. // ce.Message,
  454. // ce.Labels,
  455. // ce.Code,
  456. // ce.Name,
  457. // )
  458. // f.Context.Application().Logger().Errorf(lerr.Error())
  459. err = errs.FromError(lerr)
  460. return
  461. }
  462. if !actionOptions.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
  463. err = errs.NotFound().Details(&errs.Detail{
  464. Message: "No entity has been updated",
  465. })
  466. return
  467. }
  468. }
  469. // if client, err = t.GetClient(f.DB); err != nil {
  470. // return
  471. // }
  472. // spew.Dump(f.Query)
  473. // spew.Dump(f.Patchs)
  474. // id := primitive.NewObjectID()
  475. // t.client.UseSessionWithOptions(
  476. // ctx,
  477. // options.Session().SetDefaultReadPreference(readpref.Primary()),
  478. // func(sctx mongo.SessionContext) error {
  479. // defer func() {
  480. // if err != nil {
  481. // sctx.AbortTransaction(sctx)
  482. // } else {
  483. // sctx.CommitTransaction(sctx)
  484. // }
  485. // sctx.EndSession(sctx)
  486. // }()
  487. // sctx.StartTransaction(options.Transaction().
  488. // SetReadConcern(readconcern.Snapshot()).
  489. // SetWriteConcern(writeconcern.New(writeconcern.WMajority())))
  490. // for _, p := range *f.Patchs {
  491. // if f.Options == nil {
  492. // f.Options = options.Update()
  493. // }
  494. // if res, lerr = collection.UpdateOne(ctx, f.Query, p, f.Options.(*options.UpdateOptions)); lerr != nil {
  495. // err = errs.FromError(lerr)
  496. // return lerr
  497. // }
  498. // if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
  499. // err = errs.NotFound().Details(&errs.Detail{
  500. // Message: "No entity has been updated",
  501. // })
  502. // return fmt.Errorf("No entity has been updated")
  503. // }
  504. // }
  505. // return nil
  506. // },
  507. // )
  508. }, func(e *errs.Error) { err = e })
  509. return
  510. }
  511. func (this *Mongo) PatchMany(actionOptions *Filter) (res *mongo.UpdateResult, err *errs.Error) {
  512. uniq := this.uniqID()
  513. fmt.Println("mongo.patch.many", uniq)
  514. defer func() {
  515. createDebugEvent(actionOptions, "models.patch.many", func(event *DebugEvent) {
  516. event.Error = err
  517. })
  518. fmt.Println("mongo.patch.many.end", uniq)
  519. }()
  520. actionOptions.Insertion = Patch
  521. actionOptions.CheckQuery = true
  522. this.exec(actionOptions, func(ctx context.Context, collection *mongo.Collection) {
  523. var (
  524. lerr error
  525. // client *mongo.Client
  526. )
  527. for _, p := range *actionOptions.Patchs {
  528. if res, lerr = collection.UpdateMany(ctx, actionOptions.Query, p); lerr != nil {
  529. err = errs.FromError(lerr)
  530. return
  531. }
  532. if !actionOptions.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
  533. err = errs.NotFound().Details(&errs.Detail{
  534. Message: "No entity has been updated",
  535. })
  536. err.CodeText = "noEntityUpdated"
  537. // fmt.Errorf("No entity has been updated")
  538. return
  539. }
  540. }
  541. // if client, err = t.GetClient(f.DB); err != nil {
  542. // return
  543. // }
  544. // o, _ := json.MarshalIndent(f.Query, "", " ")
  545. // fmt.Println("query", o)
  546. // o, _ = json.MarshalIndent(f.Patchs, "", " ")
  547. // fmt.Println("patch", o)
  548. // t.client.UseSessionWithOptions(
  549. // ctx,
  550. // options.Session().SetDefaultReadPreference(readpref.Primary()),
  551. // func(sctx mongo.SessionContext) error {
  552. // defer func() {
  553. // if err != nil {
  554. // sctx.AbortTransaction(sctx)
  555. // } else {
  556. // sctx.CommitTransaction(sctx)
  557. // }
  558. // sctx.EndSession(sctx)
  559. // }()
  560. // sctx.StartTransaction(
  561. // options.Transaction().
  562. // SetReadConcern(readconcern.Snapshot()).
  563. // SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
  564. // )
  565. // for _, p := range *f.Patchs {
  566. // if res, lerr = collection.UpdateMany(ctx, f.Query, p); lerr != nil {
  567. // err = errs.FromError(lerr)
  568. // return lerr
  569. // }
  570. // if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
  571. // err = errs.NotFound().Details(&errs.Detail{
  572. // Message: "No entity has been updated",
  573. // })
  574. // err.CodeText = "noEntityUpdated"
  575. // return fmt.Errorf("No entity has been updated")
  576. // }
  577. // }
  578. // return nil
  579. // },
  580. // )
  581. }, func(e *errs.Error) { err = e })
  582. return
  583. }
  584. func (mongo *Mongo) CreateIndex(database, collectionString string, index mongo.IndexModel) *errs.Error {
  585. collection := mongo.client.Database(database).Collection(collectionString)
  586. opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
  587. if _, err := collection.Indexes().CreateOne(context.Background(), index, opts); err != nil {
  588. return errs.FromError(err)
  589. }
  590. return nil
  591. }
  592. func (mongo *Mongo) FindOneRx(options *Filter) *ObservableStruct {
  593. return Observable(func(observer *Subscriber) {
  594. if res, err := mongo.FindOne(options); err != nil {
  595. observer.Err(err)
  596. } else {
  597. observer.Next(res)
  598. }
  599. })
  600. }
  601. func (t *Mongo) FindOne(f *Filter) (res *mongo.SingleResult, err *errs.Error) {
  602. uniq := t.uniqID()
  603. fmt.Println("mongo.find.one", uniq)
  604. defer func() {
  605. createDebugEvent(f, "models.find.one", func(event *DebugEvent) {
  606. event.Error = err
  607. })
  608. fmt.Println("mongo.find.one.end", uniq, f.Collection)
  609. spew.Dump(f.Query)
  610. }()
  611. f.CheckQuery = (f.QueryType != "aggregate")
  612. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  613. var (
  614. lerr error
  615. cursor *mongo.Cursor
  616. )
  617. if f.QueryType == "aggregate" {
  618. if cursor, err = f.Aggregate(ctx, collection, nil); err != nil {
  619. return
  620. }
  621. if cursor.Next(ctx) {
  622. if lerr = cursor.Decode(f.Entity); lerr == nil {
  623. return
  624. }
  625. } else {
  626. lerr = fmt.Errorf("No document found")
  627. }
  628. } else {
  629. res = collection.FindOne(ctx, f.Query)
  630. if lerr = res.Err(); lerr == nil {
  631. if lerr = res.Decode(f.Entity); lerr == nil {
  632. return
  633. }
  634. }
  635. }
  636. err = errs.FromError(lerr)
  637. }, func(e *errs.Error) {
  638. err = e
  639. })
  640. return
  641. }
  642. func findIds(filter *Filter) (ids []primitive.ObjectID) {
  643. ids = []primitive.ObjectID{}
  644. // findIdsFilter = &Filter{}
  645. // cursor, err := executeFind(findIdsFilter)
  646. return
  647. }
  648. // if f.QueryType == "aggregate" {
  649. // cursor, lerr = f.Aggregate(ctx, collection, nil)
  650. // fmt.Println("-------------------------------------------------------------------------------after aggregate")
  651. // } else {
  652. // findOptions := options.Find()
  653. // findOptions.SetLimit(int64(f.MaxResults)).SetCollation(&options.Collation{
  654. // Locale: "en",
  655. // Strength: 1,
  656. // })
  657. // // fmt.Printf("sort of query '%s'\n", f.Format)
  658. // // spew.Dump(f.Sort)
  659. // // spew.Dump(f.Fields)
  660. // // spew.Dump(f.Query)
  661. // if f.Sort != nil {
  662. // findOptions.SetSort(f.Sort)
  663. // }
  664. // if f.Fields != nil {
  665. // findOptions.SetProjection(f.Fields)
  666. // }
  667. // cursor, lerr = collection.Find(ctx, f.Query, findOptions)
  668. // }
  669. func (t *Mongo) FindMany(f *Filter) (cursor *mongo.Cursor, err *errs.Error) {
  670. uniq := t.uniqID()
  671. fmt.Println("mongo.find.many", uniq)
  672. defer func() {
  673. createDebugEvent(f, "models.find.many", func(event *DebugEvent) {
  674. event.Error = err
  675. })
  676. fmt.Println("mongo.find.many.end", uniq, f.Collection)
  677. }()
  678. if f.Entities == nil {
  679. err = errs.Internal().Details(&errs.Detail{
  680. Message: "Entities can't be nil",
  681. })
  682. return
  683. }
  684. entitiesValue := reflect.ValueOf(f.Entities)
  685. if entitiesValue.Kind() != reflect.Ptr || entitiesValue.Elem().Kind() != reflect.Slice {
  686. err = errs.Internal().Details(&errs.Detail{
  687. Message: "Entities argument must be a slice address",
  688. })
  689. }
  690. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  691. var (
  692. lerr error
  693. err *errs.Error
  694. elemp reflect.Value
  695. )
  696. if f.QueryType != "aggregate" {
  697. f.Pipeline = bson.A{}
  698. if f.Sort != nil && len(*f.Sort) > 0 {
  699. f.Pipeline = append(f.Pipeline, bson.M{"$sort": f.Sort})
  700. }
  701. f.Pipeline = append(f.Pipeline, bson.M{"$match": f.Query})
  702. if f.Fields != nil && len(*f.Fields) > 0 {
  703. f.Pipeline = append(f.Pipeline, bson.M{"$project": f.Fields})
  704. }
  705. if f.MaxResults > 0 {
  706. f.Pipeline = append(f.Pipeline, bson.M{"$limit": f.MaxResults})
  707. }
  708. f.QueryType = "aggregate"
  709. }
  710. wg := sync.WaitGroup{}
  711. wg.Add(2)
  712. out, _ := json.Marshal(f.Query)
  713. fmt.Println("))))))))))))))))))))))", string(out))
  714. go func() {
  715. var countError error
  716. defer wg.Done()
  717. if f.ResultSizeEstimate, countError = collection.CountDocuments(nil, f.Query); countError != nil {
  718. err = errs.FromError(countError)
  719. }
  720. }()
  721. go func() {
  722. defer wg.Done()
  723. if cursor, err = f.Aggregate(ctx, collection, nil); err != nil {
  724. return
  725. }
  726. defer cursor.Close(ctx)
  727. slicev := entitiesValue.Elem()
  728. slicev = slicev.Slice(0, slicev.Cap())
  729. typ := slicev.Type().Elem()
  730. for cursor.Next(ctx) {
  731. elemp = reflect.New(typ)
  732. if lerr = cursor.Decode(elemp.Interface()); lerr != nil {
  733. err = errs.FromError(lerr)
  734. return
  735. }
  736. slicev = reflect.Append(slicev, elemp.Elem())
  737. }
  738. if elemp.IsValid() {
  739. var (
  740. data = map[string]interface{}{}
  741. nextPageQuery = map[string]interface{}{}
  742. out []byte
  743. variablesJson string
  744. )
  745. if out, lerr = json.Marshal(elemp.Elem().Interface()); lerr != nil {
  746. err = errs.FromError(lerr)
  747. return
  748. }
  749. if variablesJson, lerr = flatten.FlattenString(string(out), ".", flatten.DotStyle); lerr != nil {
  750. err = errs.FromError(lerr)
  751. return
  752. }
  753. if lerr = json.Unmarshal([]byte(variablesJson), &data); err != nil {
  754. err = errs.FromError(lerr)
  755. return
  756. }
  757. if f.Sort != nil {
  758. orderOperator := map[int]string{
  759. 1: "$gte",
  760. -1: "$lte",
  761. }
  762. var firstPropOrder *int
  763. for prop, order := range *f.Sort {
  764. value := data[fmt.Sprintf(".%s", prop)]
  765. if prop == "_id" {
  766. value = bson.M{"$oid": value}
  767. }
  768. nextPageQuery[prop] = bson.M{
  769. orderOperator[order.(int)]: value,
  770. }
  771. if firstPropOrder == nil {
  772. firstPropOrder = types.Int(order.(int))
  773. }
  774. }
  775. if (*f.Sort)["_id"] == nil {
  776. if firstPropOrder == nil {
  777. firstPropOrder = types.Int(1)
  778. }
  779. orderOperator = map[int]string{
  780. 1: "$gt",
  781. -1: "$lt",
  782. }
  783. nextPageQuery["_id"] = bson.M{
  784. orderOperator[*firstPropOrder]: bson.M{"$oid": data["._id"]},
  785. }
  786. }
  787. }
  788. if out, lerr = json.Marshal(nextPageQuery); lerr != nil {
  789. err = errs.FromError(lerr)
  790. return
  791. }
  792. f.NextPageToken = primitive.NewObjectID().Hex()
  793. t.FindManyNextPaginationMux.Lock()
  794. NextPageTokenMap[f.NextPageToken] = string(out)
  795. // fmt.Println("=========================================\n")
  796. // fmt.Println("string(out):", string(out))
  797. // fmt.Println("=========================================\n\n")
  798. t.FindManyNextPaginationMux.Unlock()
  799. }
  800. entitiesValue.Elem().Set(slicev)
  801. }()
  802. wg.Wait()
  803. return
  804. },
  805. func(e *errs.Error) { err = e },
  806. )
  807. return
  808. }
  809. func (this *Mongo) Exists(actionOptions *Filter) (exists bool, err *errs.Error) {
  810. uniq := this.uniqID()
  811. fmt.Println("mongo.exists", uniq)
  812. defer func() {
  813. createDebugEvent(actionOptions, "models.exists", func(event *DebugEvent) {
  814. event.Error = err
  815. })
  816. fmt.Println("mongo.exists.end", uniq)
  817. }()
  818. actionOptions.Entity = map[string]interface{}{}
  819. actionOptions.Fields = &bson.M{"_id": 1}
  820. if _, err = this.FindOne(actionOptions); err != nil {
  821. return
  822. }
  823. exists = true
  824. return
  825. }
  826. func (driver *Mongo) GetContext(options *Filter) (context.Context, *mongo.Collection) {
  827. var ctx context.Context
  828. if options.SessionContext != nil {
  829. ctx = options.SessionContext
  830. } else {
  831. ctx, _ = context.WithTimeout(context.Background(), 40*time.Second)
  832. }
  833. collection := driver.client.Database(options.DB).Collection(options.Collection)
  834. return ctx, collection
  835. }
  836. func (t *Mongo) exec(f *Filter, execAction execfn, errorAction func(*errs.Error)) {
  837. var (
  838. err *errs.Error
  839. errl error
  840. ctx context.Context
  841. // client *mongo.Client
  842. )
  843. defer func() {
  844. if err == nil {
  845. if errl == nil {
  846. return
  847. }
  848. err = errs.FromError(errl)
  849. }
  850. // spew.Dump(err)
  851. errorAction(err)
  852. }()
  853. if err = f.Check(); err != nil {
  854. return
  855. }
  856. switch f.Insertion {
  857. case InsertOne:
  858. if f.Entity == nil {
  859. errl = fmt.Errorf("Entity can't be nil")
  860. return
  861. }
  862. case InsertMany:
  863. if f.Entities == nil {
  864. errl = fmt.Errorf("Entities can't be nil")
  865. return
  866. }
  867. case Patch:
  868. if f.Patchs == nil {
  869. errl = fmt.Errorf("Patchs can't be nil")
  870. return
  871. }
  872. }
  873. // fmt.Println("passei do exec")
  874. if f.SessionContext != nil {
  875. ctx = f.SessionContext
  876. } else {
  877. ctx, _ = context.WithTimeout(context.Background(), time.Minute)
  878. }
  879. fmt.Println("###############", t.client == nil)
  880. collection := t.client.Database(f.DB).Collection(f.Collection)
  881. execAction(ctx, collection)
  882. return
  883. }
  884. func (this *Mongo) CreateCollection(createOptions *Filter) (exists bool, err *errs.Error) {
  885. // createCmd := bson.D{
  886. // {"create", createOptions.Collection},
  887. // }
  888. // sess := this.client.StartSession()
  889. // sess.
  890. return
  891. // if sess != nil {
  892. // err := mongo.WithSession(mtest.Background, sess, func(sc mongo.SessionContext) error {
  893. // return mt.DB.RunCommand(sc, createCmd).Err()
  894. // })
  895. // return err
  896. // }
  897. // return mt.DB.RunCommand(mtest.Background, createCmd).Err()
  898. // }
  899. }
  900. func createDebugEvent(options *Filter, eventType string, fn func(event *DebugEvent)) {
  901. // debug := options.Context.Values().Get("#debug")
  902. if options.Context != nil {
  903. debug, defined := options.Context.Values().Get("#debug").(*DebugTaks)
  904. if defined {
  905. event := debug.Event(eventType, "")
  906. event.Data = iris.Map{
  907. "database": options.DB,
  908. "collection": options.Collection,
  909. "query": options.Query,
  910. "aggregate": options.Pipeline,
  911. "sort": options.Sort,
  912. "fields": options.Fields,
  913. "maxResults": options.MaxResults,
  914. "patchs": options.Patchs,
  915. }
  916. fn(event)
  917. }
  918. }
  919. }
  920. func DeletedPatch() *bson.A {
  921. return &bson.A{
  922. bson.M{
  923. "$set": bson.M{
  924. "deleted": true,
  925. "deletedIn": time.Now().Unix(),
  926. },
  927. },
  928. }
  929. }
  930. // func mergeQueries(filter *Filter) *bson.M {
  931. // if baseQeuryDocument != nil {
  932. // if filter.Query == nil {
  933. // filter.Query = baseQeuryDocument
  934. // } else {
  935. // filter.Query = &bson.M{
  936. // "$and": bson.A{baseQeuryDocument, filter.Query},
  937. // }
  938. // }
  939. // } else if filter.Query == nil {
  940. // filter.Query = &bson.M{}
  941. // }
  942. // }
  943. // func yieldIndexModel() mongo.IndexModel {
  944. // keys := bsonx.Doc{{Key: *key, Value: bsonx.Int32(int32(*value))}}
  945. // index := mongo.IndexModel{}
  946. // index.Keys = keys
  947. // if *unique {
  948. // index.Options = bsonx.Doc{{Key: "unique", Value: bsonx.Boolean(true)}}
  949. // }
  950. // return index
  951. // }
  952. // func (t *Mongo) GetClient() (*mgo.Session, *errs.Error) {
  953. // func (t *Mongo) GetClient(id string) (*mongo.Client, *errs.Error) {
  954. // var (
  955. // client *mongo.Client
  956. // found bool
  957. // )
  958. // if id == "" {
  959. // panic("Client id not defined!")
  960. // }
  961. // if client, found = t.Clients[id]; !found {
  962. // return nil, FullError(ERR_SERVICE_UNAVAILABLE, &errs.Detail{
  963. // Message: fmt.Sprintf("Client %s not exists!", id),
  964. // })
  965. // }
  966. // return client, nil
  967. // }
  968. // func errorcheck(err error) *errs.Error {
  969. // if err != nil {
  970. // return FullError(ERR_GENERAL, &errs.Detail{
  971. // Message: err.Error(),
  972. // })
  973. // }
  974. // return nil
  975. // }
  976. // func Paginate(collection *mongo.Collection, startValue primitive.ObjectID, nPerPage int64) ([]bson.D, *bson.Value, error) {
  977. // // Query range filter using the default indexed _id field.
  978. // filter := bson.VC.DocumentFromElements(
  979. // bson.EC.SubDocumentFromElements(
  980. // "_id",
  981. // bson.EC.ObjectID("$gt", startValue),
  982. // ),
  983. // )
  984. // var opts []findopt.Find
  985. // opts = append(opts, findopt.Sort(bson.NewDocument(bson.EC.Int32("_id", -1))))
  986. // opts = append(opts, findopt.Limit(nPerPage))
  987. // cursor, _ := collection.Find(context.Background(), filter, opts...)
  988. // var lastValue *bson.Value
  989. // var results []bson.Document
  990. // for cursor.Next(context.Background()) {
  991. // elem := bson.NewDocument()
  992. // err := cursor.Decode(elem)
  993. // if err != nil {
  994. // return results, lastValue, err
  995. // }
  996. // results = append(results, *elem)
  997. // lastValue = elem.Lookup("_id")
  998. // }
  999. // return results, lastValue, nil
  1000. // }
  1001. // func (t *Mongo) Patch(f *Filter) *errs.Error {
  1002. // var (
  1003. // entityinterface EntityInterface
  1004. // col *mongo.Collection
  1005. // entity interface{}
  1006. // ok bool
  1007. // session *mgo.Session
  1008. // err error
  1009. // x *bson.M
  1010. // )
  1011. // if session, err = t.GetClient(f.DB); err == nil {
  1012. // if !f.Id.IsZero() {
  1013. // f.Query = &bson.M{"_id": f.Id}
  1014. // } else if f.Query == nil {
  1015. // err = fmt.Errorf("Query not defined!")
  1016. // goto ErrorPatch
  1017. // }
  1018. // defer session.Close()
  1019. // if entityinterface, ok = f.Entity.(EntityInterface); ok {
  1020. // entity = entityinterface.Update()
  1021. // x = entity.(*bson.M)
  1022. // // delete(*x, "$set")
  1023. // } else {
  1024. // entity = bson.M{"$set": f.Entity}
  1025. // }
  1026. // col = session.DB(f.DB).C(f.Collection)
  1027. // // entity["$set"]
  1028. // spew.Dump(entity)
  1029. // _, err = col.Upsert(f.Query, entity)
  1030. // }
  1031. // ErrorPatch:
  1032. // if err != nil {
  1033. // return Error(ERR_PERSIST, err.Error())
  1034. // }
  1035. // return nil
  1036. // }
  1037. // func (t *Mongo) Upsert(f *Filter) *errs.Error {
  1038. // var (
  1039. // entityinterface EntityInterface
  1040. // col *mongo.Collection
  1041. // entity interface{}
  1042. // ok bool
  1043. // )
  1044. // session, err := t.GetClient(f.DB)
  1045. // if err == nil {
  1046. // if !f.Id.IsZero() {
  1047. // f.Query = &bson.M{"_id": f.Id}
  1048. // } else if f.Query == nil {
  1049. // err = fmt.Errorf("Query not defined!")
  1050. // goto ErrorUp
  1051. // }
  1052. // defer session.Close()
  1053. // col = session.DB(f.DB).C(f.Collection)
  1054. // // update = bson.M{"$set": f.Entity}
  1055. // // if data, ok = f.Entity.Push(); ok {
  1056. // // update["$push"] = data
  1057. // // }
  1058. // // if data, ok = f.Entity.Pull(); ok {
  1059. // // update["$pull"] = data
  1060. // // }
  1061. // // spew.Dump(f.Entity)
  1062. // if entityinterface, ok = f.Entity.(EntityInterface); ok {
  1063. // // fmt.Println("Implement interface")
  1064. // entity = entityinterface.Update()
  1065. // } else {
  1066. // entity = f.Entity
  1067. // }
  1068. // // spew.Dump(entity)
  1069. // // _, err = col.Upsert(f.Query, entity)
  1070. // err = col.Update(f.Query, entity)
  1071. // }
  1072. // ErrorUp:
  1073. // if err != nil {
  1074. // return Error(ERR_PERSIST, err.Error())
  1075. // }
  1076. // return nil
  1077. // }
  1078. //-----------------------------------------------------------------------
  1079. // func (t *Mongo) Find(f *Filter, one bool) (*errs.Error, int) {
  1080. // var cursor string
  1081. // pageToken := &f.PageToken
  1082. // session, err := t.GetClient(f.DB)
  1083. // if err == nil {
  1084. // defer session.Close()
  1085. // if f.Query == nil {
  1086. // if one {
  1087. // if !f.Id.IsZero() {
  1088. // f.Query = &bson.M{"_id": f.Id}
  1089. // } else {
  1090. // return Error(ERR_INVALID_PARAM_VALUE, "Param id not valid."), 0
  1091. // }
  1092. // } else if f.Query == nil {
  1093. // f.Query = &bson.M{}
  1094. // }
  1095. // }
  1096. // query := minquery.New(session.DB(f.DB), f.Collection, f.Query)
  1097. // // Se tem um token de paginacao
  1098. // hasToken := pageToken.HasToken()
  1099. // if hasToken {
  1100. // fmt.Println("consultando com token", pageToken.Cursor)
  1101. // query = query.Cursor(pageToken.Cursor)
  1102. // }
  1103. // // cursorFields := []string{"_id"}
  1104. // cursorFields := []string{}
  1105. // if len(f.Sort) > 0 {
  1106. // query.Sort(f.Sort...)
  1107. // for _, key := range f.Sort {
  1108. // if !strings.Contains(key, ".") {
  1109. // cursorFields = append(cursorFields, key)
  1110. // }
  1111. // }
  1112. // } else {
  1113. // cursorFields = append(cursorFields, "_id")
  1114. // }
  1115. // // Seleciona os campos se forem especificados
  1116. // if f.Fields != nil {
  1117. // query.Select(f.Fields)
  1118. // }
  1119. // // Determina o numero de itens a ser retornado
  1120. // if f.MaxResults > 0 {
  1121. // query.Limit(f.MaxResults)
  1122. // }
  1123. // if one {
  1124. // err = query.One(f.Entity)
  1125. // pageToken.Count = 1 // Filter one
  1126. // } else {
  1127. // // spew.Dump(f.Entity, cursorFields)
  1128. // if cursor, err = query.All(f.Entity, cursorFields...); err != nil {
  1129. // goto ErroFind
  1130. // }
  1131. // // Numero total de documentos
  1132. // // Se tem um token de paginacao o valor total esta no token
  1133. // // Caso contrario consulta no banco
  1134. // if !hasToken {
  1135. // c := session.DB(f.DB).C(f.Collection)
  1136. // if pageToken.Count, err = c.Find(f.Query).Select(&bson.M{"_id": 1}).Count(); err != nil {
  1137. // goto ErroFind
  1138. // }
  1139. // }
  1140. // // if ()
  1141. // fmt.Println("Cursor return", cursor, "-")
  1142. // pageToken.NewCursor = cursor
  1143. // }
  1144. // }
  1145. // ErroFind:
  1146. // if err != nil {
  1147. // return Error(ERR_PERSIST, err.Error()), 0
  1148. // }
  1149. // return nil, pageToken.Count
  1150. // }
  1151. // func (t *Mongo) FindOne(f *Filter) (*errs.Error, int) {
  1152. // return t.Find(f, true)
  1153. // }
  1154. // func (t *Mongo) FindAll(f *Filter) (*errs.Error, int) {
  1155. // return t.Find(f, false)
  1156. // }
  1157. // func (t *Mongo) Count(f *Filter) (*errs.Error, int) {
  1158. // session, err := t.GetClient(f.DB)
  1159. // if err == nil {
  1160. // defer session.Close()
  1161. // if f.Query == nil {
  1162. // f.Query = &bson.M{}
  1163. // }
  1164. // query := session.DB(f.DB).C(f.Collection).Find(f.Query)
  1165. // // Seleciona os campos se forem especificados
  1166. // f.PageToken.Count, err = query.Select(&bson.M{"_id": 1}).Count()
  1167. // }
  1168. // if err != nil {
  1169. // return Error(ERR_PERSIST, err.Error()), 0
  1170. // }
  1171. // return nil, f.PageToken.Count
  1172. // }
  1173. // func (t *Mongo) Update(f *Filter) *errs.Error {
  1174. // var (
  1175. // entityinterface EntityInterface
  1176. // entity interface{}
  1177. // )
  1178. // one := false
  1179. // ok := false
  1180. // session, err := t.GetClient(f.DB)
  1181. // if err == nil {
  1182. // defer session.Close()
  1183. // if !f.Id.IsZero() {
  1184. // one = true
  1185. // f.Query = &bson.M{"_id": f.Id}
  1186. // } else if f.Query == nil {
  1187. // err = fmt.Errorf("Query not defined!")
  1188. // }
  1189. // if entityinterface, ok = f.Entity.(EntityInterface); ok {
  1190. // entity = entityinterface.Update()
  1191. // } else {
  1192. // entity = f.Entity
  1193. // }
  1194. // col := session.DB(f.DB).C(f.Collection)
  1195. // if one {
  1196. // err = col.Update(f.Query, entity)
  1197. // } else {
  1198. // _, err = col.UpdateAll(f.Query, entity)
  1199. // }
  1200. // }
  1201. // if err != nil {
  1202. // return Error(ERR_PERSIST, err.Error())
  1203. // }
  1204. // return nil
  1205. // }
  1206. //-----------------------------------------------------------------------
  1207. // func (t *Mongo) Aggregation(f *Filter, q []bson.M, one bool) *errs.Error {
  1208. // session, err := t.GetClient(f.DB)
  1209. // if err == nil {
  1210. // defer session.Close()
  1211. // pipe := session.DB(f.DB).C(f.Collection).Pipe(q)
  1212. // if one {
  1213. // err = pipe.One(&f.Entity)
  1214. // } else {
  1215. // err = pipe.All(&f.Entity)
  1216. // }
  1217. // }
  1218. // if err != nil {
  1219. // return Error(ERR_PERSIST, err.Error())
  1220. // }
  1221. // return nil
  1222. // }
  1223. // all := true
  1224. // session, err := t.GetClient(f.DB)
  1225. // if err == nil {
  1226. // defer session.Close()
  1227. // if !f.Id.IsZero() {
  1228. // all = false
  1229. // f.Query = &bson.M{"_id": f.Id}
  1230. // } else if f.Query == nil {
  1231. // err = fmt.Errorf("Query not defined!")
  1232. // }
  1233. // C := session.DB(f.DB).C(f.Collection)
  1234. // if all {
  1235. // _, err = C.RemoveAll(f.Query)
  1236. // } else {
  1237. // err = C.Remove(f.Query)
  1238. // }
  1239. // }
  1240. // if err != nil {
  1241. // return Error(ERR_PERSIST, err.Error())
  1242. // }
  1243. // return nil
  1244. // VERIFICAR A NECESSIDADE E CORRIGIR O ERRO
  1245. // func (t *Mongo) GetArrayById(f *Filter, ids []primitive.ObjectID) error {
  1246. // session, err := t.GetClient(f.DB)
  1247. // if err != nil {
  1248. // return err
  1249. // }
  1250. // defer session.Close()
  1251. // q := bson.M{"_id": bson.M{"$in": ids}}
  1252. // err = session.DB(f.DB).C(f.Collection).Find(q).One(f.Entity)
  1253. // return err
  1254. // }
  1255. // func (t *Mongo) GetEmbedFromArray(f *Filter, embed string, id primitive.ObjectID, sid primitive.ObjectID) (errs error) {
  1256. // session, err := t.GetClient(f.DB)
  1257. // if err != nil {
  1258. // return err
  1259. // }
  1260. // defer session.Close()
  1261. // field := "$" + embed
  1262. // match := bson.M{"_id": id}
  1263. // match[embed] = bson.M{"$exists": true}
  1264. // aggregations := []bson.M{
  1265. // bson.M{"$unwind": field},
  1266. // bson.M{"$match": match},
  1267. // bson.M{"$replaceRoot": bson.M{"newRoot": field}},
  1268. // bson.M{"$match": bson.M{"_id": sid}},
  1269. // }
  1270. // C := session.DB(f.DB).C(f.Collection)
  1271. // return C.Pipe(aggregations).One(&f.Entity)
  1272. // }
  1273. // func (t *Mongo) UpdateEmbedFromArray(f *Filter, embed string, id primitive.ObjectID, sid primitive.ObjectID) error {
  1274. // session, err := t.GetClient(f.DB)
  1275. // if err != nil {
  1276. // return err
  1277. // }
  1278. // defer session.Close()
  1279. // col := session.DB(f.DB).C(f.Collection)
  1280. // //Define o elemento que sera atualizado
  1281. // element := bson.M{}
  1282. // element[embed+".$"] = &f.Entity
  1283. // //Define o seletor do elemento que sera atualizado
  1284. // selector := bson.M{"_id": id}
  1285. // selector[embed+"._id"] = sid
  1286. // return col.Update(selector, bson.M{"$set": element})
  1287. // }
  1288. // func (t *Mongo) AddEmbedFromArray(f *Filter, embed string, id primitive.ObjectID) error {
  1289. // session, errs := t.GetClient(f.DB)
  1290. // if errs != nil {
  1291. // return errs
  1292. // }
  1293. // defer session.Close()
  1294. // col := session.DB(f.DB).C(f.Collection)
  1295. // //Define o elemento que sera atualizado
  1296. // element := bson.M{}
  1297. // element[embed] = &f.Entity
  1298. // //Define o seletor do elemento que sera atualizado
  1299. // selector := bson.M{"_id": id}
  1300. // return col.Update(selector, bson.M{"$push": element})
  1301. // }
  1302. // 0000000000000000000000000000000000000000000000000000000000000000
  1303. // func (t *Mongo) NewTransaction(dbname string) (*Transaction, error) {
  1304. // session, e := t.GetClient(dbname)
  1305. // if e != nil {
  1306. // return nil, e
  1307. // }
  1308. // collection := session.DB(dbname).C("txns")
  1309. // return &Transaction{R: txn.NewRunner(collection)}, nil
  1310. // }
  1311. // Transactions
  1312. // type Transaction struct {
  1313. // R *txn.Runner
  1314. // Ops []txn.Op
  1315. // }
  1316. // func (t *Transaction) Insert(collection string, id primitive.ObjectID, entity interface{}) *Transaction {
  1317. // t.Ops = append(t.Ops, txn.Op{
  1318. // C: collection,
  1319. // Id: id,
  1320. // Insert: entity,
  1321. // })
  1322. // return t
  1323. // }
  1324. // func (t *Transaction) Update(collection string, id primitive.ObjectID, assert *bson.M, entity interface{}) *Transaction {
  1325. // op := txn.Op{
  1326. // C: collection,
  1327. // Id: id,
  1328. // Update: entity,
  1329. // }
  1330. // if assert != nil {
  1331. // op.Assert = assert
  1332. // }
  1333. // t.Ops = append(t.Ops, op)
  1334. // return t
  1335. // }
  1336. // func (t *Transaction) Remove(collection string, id primitive.ObjectID) *Transaction {
  1337. // t.Ops = append(t.Ops, txn.Op{
  1338. // C: collection,
  1339. // Id: id,
  1340. // Remove: true,
  1341. // })
  1342. // return t
  1343. // }
  1344. // func (t *Transaction) Run() error {
  1345. // return t.R.Run(t.Ops, primitive.NewObjectID(), nil)
  1346. // }
  1347. // func (t *Mongo) Find(f *Filter, one bool) (*errs.Error, int) {
  1348. // session, err := t.GetClient()
  1349. // if err == nil {
  1350. // defer session.Close()
  1351. // if !f.Id.IsZero() {
  1352. // f.Query = &bson.M{"_id": f.Id}
  1353. // } else if f.Query == nil {
  1354. // f.Query = &bson.M{}
  1355. // }
  1356. // pageToken := &f.PageToken
  1357. // aggregations := []*bson.M{}
  1358. // // Se tem um token de paginacao
  1359. // hasToken := pageToken.HasToken()
  1360. // if hasToken {
  1361. // fmt.Println("consultando com token", pageToken.CurrentID)
  1362. // // {$or:
  1363. // // [
  1364. // // {$and :
  1365. // // [
  1366. // // {<sort key>:{$gte: <last sort key value of previous page>}},
  1367. // // {"_id" : {$gt : <last result id of previous page>}}
  1368. // // ]
  1369. // // },
  1370. // // ou
  1371. // // {<sort key> :{$gt: <last sort key value>}}
  1372. // // ]
  1373. // // }.
  1374. // // f.Query = &bson.M{"$and": []bson.M{
  1375. // // *f.Query,
  1376. // // bson.M{"_id": bson.M{"$gt": f.NextPageToken}},
  1377. // // }}
  1378. // f.Query = &bson.M{"_id": bson.M{
  1379. // // "$gt": primitive.ObjectIDHex(pageToken.CurrentID),
  1380. // "$gt": pageToken.CurrentID,
  1381. // }}
  1382. // } else {
  1383. // }
  1384. // // Adiciona o primeiro estagio da agregacao
  1385. // aggregations = append(aggregations, &bson.M{"$match": f.Query})
  1386. // if len(f.Sort) > 0 {
  1387. // f.Sort = append(f.Sort, "_id")
  1388. // aggregations = append(aggregations, MgoSortBson(f.Sort))
  1389. // // query.Sort(f.Sort...)
  1390. // }
  1391. // // aggregations = append(aggregations, &bson.M{
  1392. // // "$addFields": bson.M{"position":},
  1393. // // })
  1394. // // Seleciona os campos se forem especificados
  1395. // if f.Fields != nil {
  1396. // // aggregations = append(aggregations, bson.M{"$limit": f.MaxResults})
  1397. // }
  1398. // // countQuery := session.DB(f.DB).C(f.Collection).Pipe(aggregations)
  1399. // // Determina o numero de itens a ser retornado
  1400. // if f.MaxResults > 0 {
  1401. // aggregations = append(aggregations, &bson.M{"$limit": f.MaxResults})
  1402. // }
  1403. // spew.Dump(aggregations)
  1404. // // query := session.DB(f.DB).C(f.Collection).Find(f.Query)
  1405. // collection := session.DB(f.DB).C(f.Collection)
  1406. // query := collection.Pipe(aggregations)
  1407. // if one {
  1408. // err = query.One(f.Entity)
  1409. // f.Count = 1 // Filter one
  1410. // } else {
  1411. // err = query.All(f.Entity)
  1412. // // Numero total de documentos
  1413. // // Se tem um token de paginacao o valor total esta no token
  1414. // // Caso contrario consulta no banco
  1415. // if hasToken {
  1416. // f.Count = pageToken.Count
  1417. // } else {
  1418. // f.Count, _ = collection.Find(f.Query).Select(&bson.M{"_id": 1}).Count()
  1419. // }
  1420. // }
  1421. // }
  1422. // if err != nil {
  1423. // return Error(ERR_PERSIST, err.Error()), 0
  1424. // }
  1425. // return nil, f.Count
  1426. // }
  1427. // v := reflect.ValueOf(f.Entity)
  1428. // fmt.Println("UpdateCursorResponse")
  1429. // // spew.Dump(f.PageToken)
  1430. // // fmt.Println(v.Kind())
  1431. // if v.Kind() == reflect.Ptr {
  1432. // // Atualiza v para o elemento apontado
  1433. // v = v.Elem()
  1434. // if v.Kind() == reflect.Slice || v.Kind() == reflect.Array {
  1435. // // Acessa o atributo ID da ultima entidade do array
  1436. // field := v.Index(v.Len() - 1).Elem().FieldByName("ID")
  1437. // // Converte o atributo ID para objectId
  1438. // last := field.Interface().(primitive.ObjectID)
  1439. // // Cria a consulta que verifica se existe outros registros alem do ultimo
  1440. // filter := &Filter{
  1441. // Collection: f.Collection,
  1442. // Query: &bson.M{"_id": bson.M{
  1443. // "$lt": last,
  1444. // }},
  1445. // }
  1446. // // Se existirem elementos adiciona o nextTOKEN
  1447. // if _, count := models.Count(filter); count > 0 {
  1448. // // atualiza os valores do token
  1449. // f.PageToken.StartID = last.Hex()
  1450. // f.PageToken.Count = f.Count
  1451. // resp.NextPageToken = f.PageToken.Encode()
  1452. // }
  1453. // }
  1454. // }
  1455. // func NewMongo() (*Mongo, error) {
  1456. // // session, err := mgo.Dial(addrs)
  1457. // // session, err := mgo.Dial("mongodb://localhost:27017")
  1458. // session, err := mgo.Dial("mongodb://localhost:27017")
  1459. // if err != nil {
  1460. // return nil, err
  1461. // }
  1462. // session.SetMode(mgo.Monotonic, true)
  1463. // // "addrs": "mongodb://localhost:27017",
  1464. // // "user": "guest",
  1465. // // "password": "welcome",
  1466. // // "database": "financeiro"
  1467. // m := &Mongo{
  1468. // Session: session,
  1469. // DataBase: "accounts",
  1470. // Addrs: ,
  1471. // // Config: cfg,
  1472. // }
  1473. // return m, err
  1474. // }