mongo.go 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752
  1. package api
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/base64"
  6. "encoding/json"
  7. "fmt" // "html/template"
  8. "reflect"
  9. "regexp"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "text/template"
  14. "time"
  15. "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs"
  16. "github.com/davecgh/go-spew/spew"
  17. "github.com/jeremywohl/flatten"
  18. "github.com/kataras/iris"
  19. iriscontext "github.com/kataras/iris/v12/context"
  20. "go.mongodb.org/mongo-driver/bson"
  21. "go.mongodb.org/mongo-driver/bson/primitive"
  22. "go.mongodb.org/mongo-driver/mongo"
  23. "go.mongodb.org/mongo-driver/mongo/options"
  24. )
  25. type ParseQueryHandler = func(ctx iriscontext.Context) (err *errs.Error)
  26. const (
  27. DefaultAddrConnect = "mongodb://localhost:27017"
  28. InsertOne = iota + 1
  29. InsertMany
  30. Patch
  31. )
  32. var (
  33. QueriesMap = map[string]*template.Template{}
  34. ParseQueryHandlers = map[string]ParseQueryHandler{}
  35. EMPTYQUERY_REGEX = regexp.MustCompile(`{\s*}`)
  36. NextPageTokenMap = map[string]string{}
  37. )
  38. func GetSessionContext(ctx iriscontext.Context) mongo.SessionContext {
  39. context := ctx.Values().Get("$SessionContext")
  40. if context != nil {
  41. return context.(mongo.SessionContext)
  42. }
  43. return nil
  44. }
  45. func init() {
  46. errs.RegisterMapErrorFunction("MONGO_ERROS", func(source error) (err *errs.Error) {
  47. message := source.Error()
  48. switch {
  49. case strings.Contains(message, "E11000"):
  50. err = errs.AlreadyExists().Details(&errs.Detail{
  51. Reason: "DUPLICATED_ITEM",
  52. Message: message,
  53. })
  54. case strings.Contains(message, "cannot decode"):
  55. err = errs.DataCaps().Details(&errs.Detail{
  56. Message: message,
  57. })
  58. case strings.Contains(message, "no documents in result"):
  59. errs.NotFound().Details(&errs.Detail{
  60. Reason: "NOT_FOUND",
  61. Message: message,
  62. })
  63. }
  64. return
  65. })
  66. }
  67. type Mongo struct {
  68. // Sessions map[string]*mgo.Session
  69. // Session *mgo.Session
  70. // Uri string
  71. // Configs
  72. //DefaultDB string
  73. // User string `json:"user"`
  74. // Password string `json:"password"`
  75. // DataBase string `json:"database"`
  76. // Addrs string `json:"addrs"`
  77. FindManyNextPaginationMux sync.Mutex
  78. // NextPageTokenMap map[string]string
  79. Uniq int64
  80. subject *BehaviorSubjectStruct
  81. client *mongo.Client
  82. Credential *options.Credential
  83. Config string `json:"config"`
  84. Clients map[string]*mongo.Client
  85. }
  86. type execfn func(context.Context, *mongo.Collection)
  87. type EntityInterface interface {
  88. Patch() *bson.A
  89. }
  90. // Modelo de comunicação entre o controle da API e a camada de persistencia.
  91. type Filter struct {
  92. Id primitive.ObjectID
  93. UserId primitive.ObjectID
  94. // NextPageToken primitive.ObjectID
  95. // PageToken PageToken
  96. Fields *bson.M
  97. Query *bson.M
  98. Patchs *bson.A
  99. Sort *bson.M
  100. Context iriscontext.Context
  101. // Sort []string
  102. NextPageToken string
  103. Collection string
  104. QueryType string
  105. DB string
  106. Format string
  107. Insertion int
  108. ResultSizeEstimate int64
  109. CheckQuery bool
  110. IgnoreNoEntityFound bool
  111. MaxResults int
  112. Entity interface{}
  113. Entities interface{}
  114. SessionContext mongo.SessionContext
  115. // Aggregate interface{}
  116. Pipeline primitive.A
  117. Options interface{}
  118. // PatchOptions = patch.Patch()
  119. }
  120. func (f *Filter) Aggregate(ctx context.Context, collection *mongo.Collection, opts *options.AggregateOptions) (cursor *mongo.Cursor, err *errs.Error) {
  121. var errl error
  122. if opts == nil {
  123. opts = options.Aggregate().SetBatchSize(int32(f.MaxResults)).SetCollation(&options.Collation{
  124. Strength: 1,
  125. Locale: "en",
  126. })
  127. }
  128. if cursor, errl = collection.Aggregate(ctx, f.Pipeline, opts); errl != nil {
  129. err = errs.FromError(errl)
  130. }
  131. return
  132. }
  133. func (t *Filter) Check() (err *errs.Error) {
  134. if t.CheckQuery {
  135. if t.Query == nil {
  136. if !t.Id.IsZero() {
  137. t.Query = &bson.M{"_id": t.Id}
  138. } else {
  139. err = errs.InvalidArgument().Details(&errs.Detail{
  140. LocationType: "query",
  141. Location: "q",
  142. Message: "emptyValue",
  143. })
  144. return
  145. }
  146. }
  147. }
  148. if t.Query == nil {
  149. t.Query = &bson.M{}
  150. }
  151. return
  152. }
  153. func (mongo *Mongo) uniqID() int64 {
  154. mongo.Uniq++
  155. return mongo.Uniq
  156. }
  157. func (t *Mongo) Ready() *BehaviorSubjectStruct {
  158. if t.subject == nil {
  159. t.subject = BehaviorSubject()
  160. }
  161. return t.subject
  162. }
  163. func (t *Mongo) Init() (err error) {
  164. defer func() {
  165. spew.Dump(err)
  166. }()
  167. t.FindManyNextPaginationMux = sync.Mutex{}
  168. NextPageTokenMap = map[string]string{}
  169. fmt.Printf("Connection string '%s'", t.Config)
  170. clientOpts := options.Client().ApplyURI(t.Config)
  171. if t.Credential != nil {
  172. clientOpts.SetAuth(*t.Credential)
  173. }
  174. connectContext, _ := context.WithTimeout(context.Background(), 10*time.Second)
  175. // if t.client, err = mongo.NewClient(setOptions); err != nil {
  176. if t.client, err = mongo.Connect(
  177. connectContext,
  178. clientOpts,
  179. ); err != nil {
  180. return
  181. }
  182. // ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
  183. // defer cancel()
  184. // if err = t.client.Connect(ctx); err != nil {
  185. // return
  186. // }
  187. t.Ready().Next(true)
  188. return nil
  189. }
  190. func (t *Mongo) CommitTransaction(opts *options.SessionOptions) {
  191. }
  192. func (t *Mongo) StartTransaction(
  193. sessionOptions *options.SessionOptions,
  194. transactionOptions *options.TransactionOptions,
  195. ) (
  196. sessionContext mongo.SessionContext,
  197. err *errs.Error,
  198. ) {
  199. var (
  200. session mongo.Session
  201. localError error
  202. )
  203. defer func() {
  204. if localError != nil {
  205. err = errs.Internal().Details(&errs.Detail{
  206. Message: localError.Error(),
  207. })
  208. }
  209. }()
  210. if session, localError = t.client.StartSession(sessionOptions); localError == nil {
  211. ctx, _ := context.WithTimeout(context.Background(), 60*time.Second)
  212. mongo.WithSession(ctx, session, func(sctx mongo.SessionContext) error {
  213. if localError = session.StartTransaction(transactionOptions); localError == nil {
  214. sessionContext = sctx
  215. }
  216. return nil
  217. })
  218. }
  219. return
  220. }
  221. func (t *Mongo) Dispatch(f *Filter) {
  222. }
  223. func (t *Mongo) InsertOne(f *Filter) (res *mongo.InsertOneResult, err *errs.Error) {
  224. defer func() {
  225. createDebugEvent(f, "models.insert.one", func(event *DebugEvent) {
  226. event.Error = err
  227. })
  228. }()
  229. f.Insertion = InsertOne
  230. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  231. var lerr error
  232. if res, lerr = collection.InsertOne(ctx, f.Entity); lerr != nil {
  233. err = errs.FromError(lerr)
  234. }
  235. }, func(e *errs.Error) { err = e })
  236. return
  237. }
  238. func (t *Mongo) InsertMany(f *Filter) (res *mongo.InsertManyResult, err *errs.Error) {
  239. defer func() {
  240. createDebugEvent(f, "models.insert.many", func(event *DebugEvent) {
  241. event.Error = err
  242. })
  243. }()
  244. f.Insertion = InsertMany
  245. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  246. var (
  247. lerr error
  248. entities []interface{}
  249. value = reflect.ValueOf(f.Entities)
  250. )
  251. for index := 0; index < value.Len(); index++ {
  252. entities = append(entities, value.Index(index).Interface())
  253. }
  254. if res, lerr = collection.InsertMany(ctx, entities); lerr != nil {
  255. err = errs.FromError(lerr)
  256. }
  257. }, func(e *errs.Error) { err = e })
  258. return
  259. }
  260. //Remove os elementos da colecao, selecionados pela query
  261. func (t *Mongo) RemoveOne(f *Filter) (res *mongo.DeleteResult, err *errs.Error) {
  262. defer func() {
  263. createDebugEvent(f, "models.remove.one", func(event *DebugEvent) {
  264. event.Error = err
  265. })
  266. }()
  267. f.CheckQuery = true
  268. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  269. var lerr error
  270. fmt.Println("remove.one")
  271. spew.Dump(f.Query)
  272. if res, lerr = collection.DeleteOne(ctx, f.Query); lerr != nil {
  273. err = errs.FromError(lerr)
  274. } else if res.DeletedCount == 0 {
  275. err = errs.NotFound().Details(&errs.Detail{
  276. Message: "No entity has been deleted",
  277. })
  278. }
  279. }, func(e *errs.Error) { err = e })
  280. return
  281. }
  282. //Remove os elementos da colecao, selecionados pela query
  283. func (t *Mongo) RemoveMany(f *Filter) (res *mongo.DeleteResult, err *errs.Error) {
  284. var lerr error
  285. var deleteOptions *options.DeleteOptions
  286. defer func() {
  287. createDebugEvent(f, "models.remove.many", func(event *DebugEvent) {
  288. event.Error = err
  289. })
  290. }()
  291. f.CheckQuery = true
  292. if (f.Options != nil ) {
  293. deleteOptions = f.Options.(*options.DeleteOptions)
  294. }
  295. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  296. if res, lerr = collection.DeleteMany(
  297. ctx,
  298. f.Query,
  299. deleteOptions,
  300. ); lerr != nil {
  301. err = errs.FromError(lerr)
  302. }
  303. // else if res.DeletedCount == 0 {
  304. // err = errs.NotFound().Details(&errs.Detail{
  305. // Message: "No entity has been deleted",
  306. // })
  307. // }
  308. }, func(e *errs.Error) { err = e })
  309. return
  310. }
  311. func (t *Mongo) UpdateOne(f *Filter) (res *mongo.UpdateResult, err *errs.Error) {
  312. var lerr error
  313. defer func() {
  314. createDebugEvent(f, "models.update.one", func(event *DebugEvent) {
  315. event.Error = err
  316. })
  317. }()
  318. f.Insertion = InsertOne
  319. f.CheckQuery = true
  320. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  321. if res, lerr = collection.ReplaceOne(ctx, f.Query, f.Entity); lerr != nil {
  322. err = errs.FromError(lerr)
  323. } else if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
  324. err = errs.NotFound().Details(&errs.Detail{
  325. Message: "No entity has been updated",
  326. })
  327. }
  328. }, func(e *errs.Error) { err = e })
  329. return
  330. }
  331. func (t *Mongo) UpdateMany(f *Filter) (res *mongo.UpdateResult, err *errs.Error) {
  332. var (
  333. lerr error
  334. entity interface{}
  335. entities []interface{}
  336. value = reflect.ValueOf(f.Entities)
  337. )
  338. defer func() {
  339. createDebugEvent(f, "models.update.many", func(event *DebugEvent) {
  340. event.Error = err
  341. })
  342. }()
  343. f.Insertion = InsertMany
  344. f.CheckQuery = true
  345. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  346. for index := 0; index < value.Len(); index++ {
  347. entity = value.Index(index).Interface()
  348. entities = append(entities, entity)
  349. }
  350. if res, lerr = collection.UpdateMany(ctx, f.Query, entities); lerr != nil {
  351. err = errs.FromError(lerr)
  352. } else if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
  353. err = errs.NotFound().Details(&errs.Detail{
  354. Message: "No entity has been updated ",
  355. })
  356. }
  357. }, func(e *errs.Error) { err = e })
  358. return
  359. }
  360. func RegisterQuery(id, query string) {
  361. QueriesMap[id] = template.Must(template.New(id).Parse(query))
  362. }
  363. func BeforeParseQueryHandler(id string, fn ParseQueryHandler) {
  364. ParseQueryHandlers[id] = fn
  365. }
  366. func getQueryHandler(id string) ParseQueryHandler {
  367. return ParseQueryHandlers[id]
  368. }
  369. func ParseQuery(ctx iriscontext.Context, filter *Filter, basequery string, data map[string]interface{}) (err *errs.Error) {
  370. var (
  371. found bool
  372. templateErr error
  373. buf bytes.Buffer
  374. baseQueryTemplate *template.Template
  375. // baseQeuryDocument *bson.M
  376. // errbq *errs.Error
  377. nextPageToken string
  378. queryDecodedString []byte
  379. values = ctx.Values()
  380. userQueryString = Q(ctx, "q", "e30=") // default base64 encoded from "{}"
  381. includeInTrash, _ = strconv.ParseBool(Q(ctx, "includeInTrash", "false")) // default base64 encoded from "{}"
  382. )
  383. // Faz o parse da userQueryString enviada pelo usuario que deve ser um base64
  384. if queryDecodedString, templateErr = base64.StdEncoding.DecodeString(userQueryString); templateErr != nil {
  385. err = errs.InvalidArgument().Details(&errs.Detail{
  386. Message: "Query param isn't a base64 json",
  387. })
  388. return
  389. }
  390. if baseQueryTemplate, found = QueriesMap[basequery]; !found {
  391. err = errs.Internal().Details(&errs.Detail{
  392. Message: "Invalid query input",
  393. })
  394. return
  395. }
  396. // transclude the query to base query
  397. fmt.Println("##########################[", filter.NextPageToken, "]")
  398. // spew.Dump(t.NextPageTokenMap)
  399. if nextPageToken, found = NextPageTokenMap[filter.NextPageToken]; !found {
  400. nextPageToken = "{}"
  401. }
  402. data["_transclude_nextPageToken"] = nextPageToken
  403. data["_transclude_true"] = string(queryDecodedString)
  404. data["_deleted_"] = includeInTrash
  405. values.Set("eon.query.data", data)
  406. values.Set("eon.query.id", basequery)
  407. values.Set("eon.query.options", filter)
  408. if handler := getQueryHandler(basequery); handler != nil {
  409. if err = handler(ctx); err != nil {
  410. return
  411. }
  412. }
  413. if templateErr = baseQueryTemplate.Execute(&buf, data); templateErr != nil {
  414. err = errs.InvalidArgument().Details(&errs.Detail{
  415. Message: "Failed on interpolate data with template query",
  416. })
  417. return
  418. }
  419. x := buf.Bytes()
  420. fmt.Println("buf.Bytes()", string(x))
  421. if templateErr = bson.UnmarshalExtJSON(x, false, &filter.Query); templateErr != nil {
  422. err = errs.InvalidArgument().Details(&errs.Detail{
  423. Message: "Failed on interpolate data with template query",
  424. })
  425. return
  426. }
  427. return
  428. }
  429. func (this *Mongo) PatchOne(actionOptions *Filter) (res *mongo.UpdateResult, err *errs.Error) {
  430. uniq := this.uniqID()
  431. fmt.Println("mongo.patch.many", uniq)
  432. defer func() {
  433. createDebugEvent(actionOptions, "models.patch.many", func(event *DebugEvent) {
  434. event.Error = err
  435. })
  436. fmt.Println("mongo.patch.many.end", uniq)
  437. }()
  438. actionOptions.Insertion = Patch
  439. actionOptions.CheckQuery = true
  440. this.exec(actionOptions, func(ctx context.Context, collection *mongo.Collection) {
  441. var (
  442. lerr error
  443. // client *mongo.Client
  444. )
  445. for _, p := range *actionOptions.Patchs {
  446. if actionOptions.Options == nil {
  447. actionOptions.Options = options.Update()
  448. }
  449. if res, lerr = collection.UpdateOne(ctx, actionOptions.Query, p, actionOptions.Options.(*options.UpdateOptions)); lerr != nil {
  450. // ce := lerr.(mongo.CommandError)
  451. // fmt.Println("----------------------------",
  452. // ce.Message,
  453. // ce.Labels,
  454. // ce.Code,
  455. // ce.Name,
  456. // )
  457. // f.Context.Application().Logger().Errorf(lerr.Error())
  458. err = errs.FromError(lerr)
  459. return
  460. }
  461. if !actionOptions.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
  462. err = errs.NotFound().Details(&errs.Detail{
  463. Message: "No entity has been updated",
  464. })
  465. return
  466. }
  467. }
  468. // if client, err = t.GetClient(f.DB); err != nil {
  469. // return
  470. // }
  471. // spew.Dump(f.Query)
  472. // spew.Dump(f.Patchs)
  473. // id := primitive.NewObjectID()
  474. // t.client.UseSessionWithOptions(
  475. // ctx,
  476. // options.Session().SetDefaultReadPreference(readpref.Primary()),
  477. // func(sctx mongo.SessionContext) error {
  478. // defer func() {
  479. // if err != nil {
  480. // sctx.AbortTransaction(sctx)
  481. // } else {
  482. // sctx.CommitTransaction(sctx)
  483. // }
  484. // sctx.EndSession(sctx)
  485. // }()
  486. // sctx.StartTransaction(options.Transaction().
  487. // SetReadConcern(readconcern.Snapshot()).
  488. // SetWriteConcern(writeconcern.New(writeconcern.WMajority())))
  489. // for _, p := range *f.Patchs {
  490. // if f.Options == nil {
  491. // f.Options = options.Update()
  492. // }
  493. // if res, lerr = collection.UpdateOne(ctx, f.Query, p, f.Options.(*options.UpdateOptions)); lerr != nil {
  494. // err = errs.FromError(lerr)
  495. // return lerr
  496. // }
  497. // if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
  498. // err = errs.NotFound().Details(&errs.Detail{
  499. // Message: "No entity has been updated",
  500. // })
  501. // return fmt.Errorf("No entity has been updated")
  502. // }
  503. // }
  504. // return nil
  505. // },
  506. // )
  507. }, func(e *errs.Error) { err = e })
  508. return
  509. }
  510. func (this *Mongo) PatchMany(actionOptions *Filter) (res *mongo.UpdateResult, err *errs.Error) {
  511. uniq := this.uniqID()
  512. fmt.Println("mongo.patch.many", uniq)
  513. defer func() {
  514. createDebugEvent(actionOptions, "models.patch.many", func(event *DebugEvent) {
  515. event.Error = err
  516. })
  517. fmt.Println("mongo.patch.many.end", uniq)
  518. }()
  519. actionOptions.Insertion = Patch
  520. actionOptions.CheckQuery = true
  521. this.exec(actionOptions, func(ctx context.Context, collection *mongo.Collection) {
  522. var (
  523. lerr error
  524. // client *mongo.Client
  525. )
  526. for _, p := range *actionOptions.Patchs {
  527. if res, lerr = collection.UpdateMany(ctx, actionOptions.Query, p); lerr != nil {
  528. err = errs.FromError(lerr)
  529. return
  530. }
  531. if !actionOptions.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
  532. err = errs.NotFound().Details(&errs.Detail{
  533. Message: "No entity has been updated",
  534. })
  535. err.CodeText = "noEntityUpdated"
  536. // fmt.Errorf("No entity has been updated")
  537. return
  538. }
  539. }
  540. // if client, err = t.GetClient(f.DB); err != nil {
  541. // return
  542. // }
  543. // o, _ := json.MarshalIndent(f.Query, "", " ")
  544. // fmt.Println("query", o)
  545. // o, _ = json.MarshalIndent(f.Patchs, "", " ")
  546. // fmt.Println("patch", o)
  547. // t.client.UseSessionWithOptions(
  548. // ctx,
  549. // options.Session().SetDefaultReadPreference(readpref.Primary()),
  550. // func(sctx mongo.SessionContext) error {
  551. // defer func() {
  552. // if err != nil {
  553. // sctx.AbortTransaction(sctx)
  554. // } else {
  555. // sctx.CommitTransaction(sctx)
  556. // }
  557. // sctx.EndSession(sctx)
  558. // }()
  559. // sctx.StartTransaction(
  560. // options.Transaction().
  561. // SetReadConcern(readconcern.Snapshot()).
  562. // SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
  563. // )
  564. // for _, p := range *f.Patchs {
  565. // if res, lerr = collection.UpdateMany(ctx, f.Query, p); lerr != nil {
  566. // err = errs.FromError(lerr)
  567. // return lerr
  568. // }
  569. // if !f.IgnoreNoEntityFound && (res.ModifiedCount+res.UpsertedCount) == 0 {
  570. // err = errs.NotFound().Details(&errs.Detail{
  571. // Message: "No entity has been updated",
  572. // })
  573. // err.CodeText = "noEntityUpdated"
  574. // return fmt.Errorf("No entity has been updated")
  575. // }
  576. // }
  577. // return nil
  578. // },
  579. // )
  580. }, func(e *errs.Error) { err = e })
  581. return
  582. }
  583. func (mongo *Mongo) CreateIndex(database, collectionString string, index mongo.IndexModel) *errs.Error {
  584. collection := mongo.client.Database(database).Collection(collectionString)
  585. opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
  586. if _, err := collection.Indexes().CreateOne(context.Background(), index, opts); err != nil {
  587. return errs.FromError(err)
  588. }
  589. return nil
  590. }
  591. func (mongo *Mongo) FindOneRx(options *Filter) *ObservableStruct {
  592. return Observable(func(observer *Subscriber) {
  593. if res, err := mongo.FindOne(options); err != nil {
  594. observer.Err(err)
  595. } else {
  596. observer.Next(res)
  597. }
  598. })
  599. }
  600. func (t *Mongo) FindOne(f *Filter) (res *mongo.SingleResult, err *errs.Error) {
  601. uniq := t.uniqID()
  602. fmt.Println("mongo.find.one", uniq)
  603. defer func() {
  604. createDebugEvent(f, "models.find.one", func(event *DebugEvent) {
  605. event.Error = err
  606. })
  607. fmt.Println("mongo.find.one.end", uniq, f.Collection)
  608. spew.Dump(f.Query)
  609. }()
  610. f.CheckQuery = (f.QueryType != "aggregate")
  611. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  612. var (
  613. lerr error
  614. cursor *mongo.Cursor
  615. )
  616. if f.QueryType == "aggregate" {
  617. if cursor, err = f.Aggregate(ctx, collection, nil); err != nil {
  618. return
  619. }
  620. if cursor.Next(ctx) {
  621. if lerr = cursor.Decode(f.Entity); lerr == nil {
  622. return
  623. }
  624. } else {
  625. lerr = fmt.Errorf("No document found")
  626. }
  627. } else {
  628. res = collection.FindOne(ctx, f.Query)
  629. if lerr = res.Err(); lerr == nil {
  630. if lerr = res.Decode(f.Entity); lerr == nil {
  631. return
  632. }
  633. }
  634. }
  635. err = errs.FromError(lerr)
  636. }, func(e *errs.Error) {
  637. err = e
  638. })
  639. return
  640. }
  641. func findIds(filter *Filter) (ids []primitive.ObjectID) {
  642. ids = []primitive.ObjectID{}
  643. // findIdsFilter = &Filter{}
  644. // cursor, err := executeFind(findIdsFilter)
  645. return
  646. }
  647. // if f.QueryType == "aggregate" {
  648. // cursor, lerr = f.Aggregate(ctx, collection, nil)
  649. // fmt.Println("-------------------------------------------------------------------------------after aggregate")
  650. // } else {
  651. // findOptions := options.Find()
  652. // findOptions.SetLimit(int64(f.MaxResults)).SetCollation(&options.Collation{
  653. // Locale: "en",
  654. // Strength: 1,
  655. // })
  656. // // fmt.Printf("sort of query '%s'\n", f.Format)
  657. // // spew.Dump(f.Sort)
  658. // // spew.Dump(f.Fields)
  659. // // spew.Dump(f.Query)
  660. // if f.Sort != nil {
  661. // findOptions.SetSort(f.Sort)
  662. // }
  663. // if f.Fields != nil {
  664. // findOptions.SetProjection(f.Fields)
  665. // }
  666. // cursor, lerr = collection.Find(ctx, f.Query, findOptions)
  667. // }
  668. func (t *Mongo) FindMany(f *Filter) (cursor *mongo.Cursor, err *errs.Error) {
  669. uniq := t.uniqID()
  670. fmt.Println("mongo.find.many", uniq)
  671. defer func() {
  672. createDebugEvent(f, "models.find.many", func(event *DebugEvent) {
  673. event.Error = err
  674. })
  675. fmt.Println("mongo.find.many.end", uniq, f.Collection)
  676. }()
  677. if f.Entities == nil {
  678. err = errs.Internal().Details(&errs.Detail{
  679. Message: "Entities can't be nil",
  680. })
  681. return
  682. }
  683. entitiesValue := reflect.ValueOf(f.Entities)
  684. if entitiesValue.Kind() != reflect.Ptr || entitiesValue.Elem().Kind() != reflect.Slice {
  685. err = errs.Internal().Details(&errs.Detail{
  686. Message: "Entities argument must be a slice address",
  687. })
  688. }
  689. t.exec(f, func(ctx context.Context, collection *mongo.Collection) {
  690. var (
  691. lerr error
  692. err *errs.Error
  693. elemp reflect.Value
  694. )
  695. if f.QueryType != "aggregate" {
  696. f.Pipeline = bson.A{}
  697. if f.Sort != nil && len(*f.Sort) > 0 {
  698. f.Pipeline = append(f.Pipeline, bson.M{"$sort": f.Sort})
  699. }
  700. f.Pipeline = append(f.Pipeline, bson.M{"$match": f.Query})
  701. if f.Fields != nil && len(*f.Fields) > 0 {
  702. f.Pipeline = append(f.Pipeline, bson.M{"$project": f.Fields})
  703. }
  704. if f.MaxResults > 0 {
  705. f.Pipeline = append(f.Pipeline, bson.M{"$limit": f.MaxResults})
  706. }
  707. f.QueryType = "aggregate"
  708. }
  709. wg := sync.WaitGroup{}
  710. wg.Add(2)
  711. go func() {
  712. var countError error
  713. defer wg.Done()
  714. if f.ResultSizeEstimate, countError = collection.CountDocuments(nil, f.Query); countError != nil {
  715. err = errs.FromError(countError)
  716. }
  717. }()
  718. go func() {
  719. defer wg.Done()
  720. if cursor, err = f.Aggregate(ctx, collection, nil); err != nil {
  721. return
  722. }
  723. defer cursor.Close(ctx)
  724. slicev := entitiesValue.Elem()
  725. slicev = slicev.Slice(0, slicev.Cap())
  726. typ := slicev.Type().Elem()
  727. for cursor.Next(ctx) {
  728. elemp = reflect.New(typ)
  729. if lerr = cursor.Decode(elemp.Interface()); lerr != nil {
  730. err = errs.FromError(lerr)
  731. return
  732. }
  733. slicev = reflect.Append(slicev, elemp.Elem())
  734. }
  735. if elemp.IsValid() {
  736. var (
  737. data = map[string]interface{}{}
  738. nextPageQuery = map[string]interface{}{}
  739. out []byte
  740. variablesJson string
  741. )
  742. if out, lerr = json.Marshal(elemp.Elem().Interface()); lerr != nil {
  743. err = errs.FromError(lerr)
  744. return
  745. }
  746. if variablesJson, lerr = flatten.FlattenString(string(out), ".", flatten.DotStyle); lerr != nil {
  747. err = errs.FromError(lerr)
  748. return
  749. }
  750. if lerr = json.Unmarshal([]byte(variablesJson), &data); err != nil {
  751. err = errs.FromError(lerr)
  752. return
  753. }
  754. orderOperator := map[int]string{
  755. 1: "$gt",
  756. -1: "$lt",
  757. }
  758. if f.Sort == nil {
  759. f.Sort = &bson.M{}
  760. }
  761. if (*f.Sort)["_id"] == nil {
  762. (*f.Sort)["_id"] = 1
  763. }
  764. for prop, order := range *f.Sort {
  765. value := data[fmt.Sprintf(".%s", prop)]
  766. if prop == "_id" {
  767. value = bson.M{"$oid": value}
  768. }
  769. nextPageQuery[prop] = bson.M{
  770. orderOperator[order.(int)]: value,
  771. }
  772. }
  773. if out, lerr = json.Marshal(nextPageQuery); lerr != nil {
  774. err = errs.FromError(lerr)
  775. return
  776. }
  777. f.NextPageToken = primitive.NewObjectID().Hex()
  778. t.FindManyNextPaginationMux.Lock()
  779. NextPageTokenMap[f.NextPageToken] = string(out)
  780. t.FindManyNextPaginationMux.Unlock()
  781. }
  782. entitiesValue.Elem().Set(slicev)
  783. }()
  784. wg.Wait()
  785. return
  786. },
  787. func(e *errs.Error) { err = e },
  788. )
  789. return
  790. }
  791. func (this *Mongo) Exists(actionOptions *Filter) (exists bool, err *errs.Error) {
  792. uniq := this.uniqID()
  793. fmt.Println("mongo.exists", uniq)
  794. defer func() {
  795. createDebugEvent(actionOptions, "models.exists", func(event *DebugEvent) {
  796. event.Error = err
  797. })
  798. fmt.Println("mongo.exists.end", uniq)
  799. }()
  800. actionOptions.Entity = map[string]interface{}{}
  801. actionOptions.Fields = &bson.M{"_id": 1}
  802. if _, err = this.FindOne(actionOptions); err != nil {
  803. return
  804. }
  805. exists = true
  806. return
  807. }
  808. func (driver *Mongo) GetContext(options *Filter) (context.Context, *mongo.Collection) {
  809. var ctx context.Context
  810. if options.SessionContext != nil {
  811. ctx = options.SessionContext
  812. } else {
  813. ctx, _ = context.WithTimeout(context.Background(), 40*time.Second)
  814. }
  815. collection := driver.client.Database(options.DB).Collection(options.Collection)
  816. return ctx, collection
  817. }
  818. func (t *Mongo) exec(f *Filter, execAction execfn, errorAction func(*errs.Error)) {
  819. var (
  820. err *errs.Error
  821. errl error
  822. ctx context.Context
  823. // client *mongo.Client
  824. )
  825. defer func() {
  826. if err == nil {
  827. if errl == nil {
  828. return
  829. }
  830. err = errs.FromError(errl)
  831. }
  832. // spew.Dump(err)
  833. errorAction(err)
  834. }()
  835. if err = f.Check(); err != nil {
  836. return
  837. }
  838. switch f.Insertion {
  839. case InsertOne:
  840. if f.Entity == nil {
  841. errl = fmt.Errorf("Entity can't be nil")
  842. return
  843. }
  844. case InsertMany:
  845. if f.Entities == nil {
  846. errl = fmt.Errorf("Entities can't be nil")
  847. return
  848. }
  849. case Patch:
  850. if f.Patchs == nil {
  851. errl = fmt.Errorf("Patchs can't be nil")
  852. return
  853. }
  854. }
  855. // fmt.Println("passei do exec")
  856. if f.SessionContext != nil {
  857. ctx = f.SessionContext
  858. } else {
  859. ctx, _ = context.WithTimeout(context.Background(), time.Minute)
  860. }
  861. fmt.Println("###############", t.client == nil)
  862. collection := t.client.Database(f.DB).Collection(f.Collection)
  863. execAction(ctx, collection)
  864. return
  865. }
  866. func createDebugEvent(options *Filter, eventType string, fn func(event *DebugEvent)) {
  867. // debug := options.Context.Values().Get("#debug")
  868. if options.Context != nil {
  869. debug, defined := options.Context.Values().Get("#debug").(*DebugTaks)
  870. if defined {
  871. event := debug.Event(eventType, "")
  872. event.Data = iris.Map{
  873. "database": options.DB,
  874. "collection": options.Collection,
  875. "query": options.Query,
  876. "aggregate": options.Pipeline,
  877. "sort": options.Sort,
  878. "fields": options.Fields,
  879. "maxResults": options.MaxResults,
  880. "patchs": options.Patchs,
  881. }
  882. fn(event)
  883. }
  884. }
  885. }
  886. func DeletedPatch() *bson.A {
  887. return &bson.A{
  888. bson.M{
  889. "$set": bson.M{
  890. "deleted": true,
  891. "deletedIn": time.Now().Unix(),
  892. },
  893. },
  894. }
  895. }
  896. // func mergeQueries(filter *Filter) *bson.M {
  897. // if baseQeuryDocument != nil {
  898. // if filter.Query == nil {
  899. // filter.Query = baseQeuryDocument
  900. // } else {
  901. // filter.Query = &bson.M{
  902. // "$and": bson.A{baseQeuryDocument, filter.Query},
  903. // }
  904. // }
  905. // } else if filter.Query == nil {
  906. // filter.Query = &bson.M{}
  907. // }
  908. // }
  909. // func yieldIndexModel() mongo.IndexModel {
  910. // keys := bsonx.Doc{{Key: *key, Value: bsonx.Int32(int32(*value))}}
  911. // index := mongo.IndexModel{}
  912. // index.Keys = keys
  913. // if *unique {
  914. // index.Options = bsonx.Doc{{Key: "unique", Value: bsonx.Boolean(true)}}
  915. // }
  916. // return index
  917. // }
  918. // func (t *Mongo) GetClient() (*mgo.Session, *errs.Error) {
  919. // func (t *Mongo) GetClient(id string) (*mongo.Client, *errs.Error) {
  920. // var (
  921. // client *mongo.Client
  922. // found bool
  923. // )
  924. // if id == "" {
  925. // panic("Client id not defined!")
  926. // }
  927. // if client, found = t.Clients[id]; !found {
  928. // return nil, FullError(ERR_SERVICE_UNAVAILABLE, &errs.Detail{
  929. // Message: fmt.Sprintf("Client %s not exists!", id),
  930. // })
  931. // }
  932. // return client, nil
  933. // }
  934. // func errorcheck(err error) *errs.Error {
  935. // if err != nil {
  936. // return FullError(ERR_GENERAL, &errs.Detail{
  937. // Message: err.Error(),
  938. // })
  939. // }
  940. // return nil
  941. // }
  942. // func Paginate(collection *mongo.Collection, startValue primitive.ObjectID, nPerPage int64) ([]bson.D, *bson.Value, error) {
  943. // // Query range filter using the default indexed _id field.
  944. // filter := bson.VC.DocumentFromElements(
  945. // bson.EC.SubDocumentFromElements(
  946. // "_id",
  947. // bson.EC.ObjectID("$gt", startValue),
  948. // ),
  949. // )
  950. // var opts []findopt.Find
  951. // opts = append(opts, findopt.Sort(bson.NewDocument(bson.EC.Int32("_id", -1))))
  952. // opts = append(opts, findopt.Limit(nPerPage))
  953. // cursor, _ := collection.Find(context.Background(), filter, opts...)
  954. // var lastValue *bson.Value
  955. // var results []bson.Document
  956. // for cursor.Next(context.Background()) {
  957. // elem := bson.NewDocument()
  958. // err := cursor.Decode(elem)
  959. // if err != nil {
  960. // return results, lastValue, err
  961. // }
  962. // results = append(results, *elem)
  963. // lastValue = elem.Lookup("_id")
  964. // }
  965. // return results, lastValue, nil
  966. // }
  967. // func (t *Mongo) Patch(f *Filter) *errs.Error {
  968. // var (
  969. // entityinterface EntityInterface
  970. // col *mongo.Collection
  971. // entity interface{}
  972. // ok bool
  973. // session *mgo.Session
  974. // err error
  975. // x *bson.M
  976. // )
  977. // if session, err = t.GetClient(f.DB); err == nil {
  978. // if !f.Id.IsZero() {
  979. // f.Query = &bson.M{"_id": f.Id}
  980. // } else if f.Query == nil {
  981. // err = fmt.Errorf("Query not defined!")
  982. // goto ErrorPatch
  983. // }
  984. // defer session.Close()
  985. // if entityinterface, ok = f.Entity.(EntityInterface); ok {
  986. // entity = entityinterface.Update()
  987. // x = entity.(*bson.M)
  988. // // delete(*x, "$set")
  989. // } else {
  990. // entity = bson.M{"$set": f.Entity}
  991. // }
  992. // col = session.DB(f.DB).C(f.Collection)
  993. // // entity["$set"]
  994. // spew.Dump(entity)
  995. // _, err = col.Upsert(f.Query, entity)
  996. // }
  997. // ErrorPatch:
  998. // if err != nil {
  999. // return Error(ERR_PERSIST, err.Error())
  1000. // }
  1001. // return nil
  1002. // }
  1003. // func (t *Mongo) Upsert(f *Filter) *errs.Error {
  1004. // var (
  1005. // entityinterface EntityInterface
  1006. // col *mongo.Collection
  1007. // entity interface{}
  1008. // ok bool
  1009. // )
  1010. // session, err := t.GetClient(f.DB)
  1011. // if err == nil {
  1012. // if !f.Id.IsZero() {
  1013. // f.Query = &bson.M{"_id": f.Id}
  1014. // } else if f.Query == nil {
  1015. // err = fmt.Errorf("Query not defined!")
  1016. // goto ErrorUp
  1017. // }
  1018. // defer session.Close()
  1019. // col = session.DB(f.DB).C(f.Collection)
  1020. // // update = bson.M{"$set": f.Entity}
  1021. // // if data, ok = f.Entity.Push(); ok {
  1022. // // update["$push"] = data
  1023. // // }
  1024. // // if data, ok = f.Entity.Pull(); ok {
  1025. // // update["$pull"] = data
  1026. // // }
  1027. // // spew.Dump(f.Entity)
  1028. // if entityinterface, ok = f.Entity.(EntityInterface); ok {
  1029. // // fmt.Println("Implement interface")
  1030. // entity = entityinterface.Update()
  1031. // } else {
  1032. // entity = f.Entity
  1033. // }
  1034. // // spew.Dump(entity)
  1035. // // _, err = col.Upsert(f.Query, entity)
  1036. // err = col.Update(f.Query, entity)
  1037. // }
  1038. // ErrorUp:
  1039. // if err != nil {
  1040. // return Error(ERR_PERSIST, err.Error())
  1041. // }
  1042. // return nil
  1043. // }
  1044. //-----------------------------------------------------------------------
  1045. // func (t *Mongo) Find(f *Filter, one bool) (*errs.Error, int) {
  1046. // var cursor string
  1047. // pageToken := &f.PageToken
  1048. // session, err := t.GetClient(f.DB)
  1049. // if err == nil {
  1050. // defer session.Close()
  1051. // if f.Query == nil {
  1052. // if one {
  1053. // if !f.Id.IsZero() {
  1054. // f.Query = &bson.M{"_id": f.Id}
  1055. // } else {
  1056. // return Error(ERR_INVALID_PARAM_VALUE, "Param id not valid."), 0
  1057. // }
  1058. // } else if f.Query == nil {
  1059. // f.Query = &bson.M{}
  1060. // }
  1061. // }
  1062. // query := minquery.New(session.DB(f.DB), f.Collection, f.Query)
  1063. // // Se tem um token de paginacao
  1064. // hasToken := pageToken.HasToken()
  1065. // if hasToken {
  1066. // fmt.Println("consultando com token", pageToken.Cursor)
  1067. // query = query.Cursor(pageToken.Cursor)
  1068. // }
  1069. // // cursorFields := []string{"_id"}
  1070. // cursorFields := []string{}
  1071. // if len(f.Sort) > 0 {
  1072. // query.Sort(f.Sort...)
  1073. // for _, key := range f.Sort {
  1074. // if !strings.Contains(key, ".") {
  1075. // cursorFields = append(cursorFields, key)
  1076. // }
  1077. // }
  1078. // } else {
  1079. // cursorFields = append(cursorFields, "_id")
  1080. // }
  1081. // // Seleciona os campos se forem especificados
  1082. // if f.Fields != nil {
  1083. // query.Select(f.Fields)
  1084. // }
  1085. // // Determina o numero de itens a ser retornado
  1086. // if f.MaxResults > 0 {
  1087. // query.Limit(f.MaxResults)
  1088. // }
  1089. // if one {
  1090. // err = query.One(f.Entity)
  1091. // pageToken.Count = 1 // Filter one
  1092. // } else {
  1093. // // spew.Dump(f.Entity, cursorFields)
  1094. // if cursor, err = query.All(f.Entity, cursorFields...); err != nil {
  1095. // goto ErroFind
  1096. // }
  1097. // // Numero total de documentos
  1098. // // Se tem um token de paginacao o valor total esta no token
  1099. // // Caso contrario consulta no banco
  1100. // if !hasToken {
  1101. // c := session.DB(f.DB).C(f.Collection)
  1102. // if pageToken.Count, err = c.Find(f.Query).Select(&bson.M{"_id": 1}).Count(); err != nil {
  1103. // goto ErroFind
  1104. // }
  1105. // }
  1106. // // if ()
  1107. // fmt.Println("Cursor return", cursor, "-")
  1108. // pageToken.NewCursor = cursor
  1109. // }
  1110. // }
  1111. // ErroFind:
  1112. // if err != nil {
  1113. // return Error(ERR_PERSIST, err.Error()), 0
  1114. // }
  1115. // return nil, pageToken.Count
  1116. // }
  1117. // func (t *Mongo) FindOne(f *Filter) (*errs.Error, int) {
  1118. // return t.Find(f, true)
  1119. // }
  1120. // func (t *Mongo) FindAll(f *Filter) (*errs.Error, int) {
  1121. // return t.Find(f, false)
  1122. // }
  1123. // func (t *Mongo) Count(f *Filter) (*errs.Error, int) {
  1124. // session, err := t.GetClient(f.DB)
  1125. // if err == nil {
  1126. // defer session.Close()
  1127. // if f.Query == nil {
  1128. // f.Query = &bson.M{}
  1129. // }
  1130. // query := session.DB(f.DB).C(f.Collection).Find(f.Query)
  1131. // // Seleciona os campos se forem especificados
  1132. // f.PageToken.Count, err = query.Select(&bson.M{"_id": 1}).Count()
  1133. // }
  1134. // if err != nil {
  1135. // return Error(ERR_PERSIST, err.Error()), 0
  1136. // }
  1137. // return nil, f.PageToken.Count
  1138. // }
  1139. // func (t *Mongo) Update(f *Filter) *errs.Error {
  1140. // var (
  1141. // entityinterface EntityInterface
  1142. // entity interface{}
  1143. // )
  1144. // one := false
  1145. // ok := false
  1146. // session, err := t.GetClient(f.DB)
  1147. // if err == nil {
  1148. // defer session.Close()
  1149. // if !f.Id.IsZero() {
  1150. // one = true
  1151. // f.Query = &bson.M{"_id": f.Id}
  1152. // } else if f.Query == nil {
  1153. // err = fmt.Errorf("Query not defined!")
  1154. // }
  1155. // if entityinterface, ok = f.Entity.(EntityInterface); ok {
  1156. // entity = entityinterface.Update()
  1157. // } else {
  1158. // entity = f.Entity
  1159. // }
  1160. // col := session.DB(f.DB).C(f.Collection)
  1161. // if one {
  1162. // err = col.Update(f.Query, entity)
  1163. // } else {
  1164. // _, err = col.UpdateAll(f.Query, entity)
  1165. // }
  1166. // }
  1167. // if err != nil {
  1168. // return Error(ERR_PERSIST, err.Error())
  1169. // }
  1170. // return nil
  1171. // }
  1172. //-----------------------------------------------------------------------
  1173. // func (t *Mongo) Aggregation(f *Filter, q []bson.M, one bool) *errs.Error {
  1174. // session, err := t.GetClient(f.DB)
  1175. // if err == nil {
  1176. // defer session.Close()
  1177. // pipe := session.DB(f.DB).C(f.Collection).Pipe(q)
  1178. // if one {
  1179. // err = pipe.One(&f.Entity)
  1180. // } else {
  1181. // err = pipe.All(&f.Entity)
  1182. // }
  1183. // }
  1184. // if err != nil {
  1185. // return Error(ERR_PERSIST, err.Error())
  1186. // }
  1187. // return nil
  1188. // }
  1189. // all := true
  1190. // session, err := t.GetClient(f.DB)
  1191. // if err == nil {
  1192. // defer session.Close()
  1193. // if !f.Id.IsZero() {
  1194. // all = false
  1195. // f.Query = &bson.M{"_id": f.Id}
  1196. // } else if f.Query == nil {
  1197. // err = fmt.Errorf("Query not defined!")
  1198. // }
  1199. // C := session.DB(f.DB).C(f.Collection)
  1200. // if all {
  1201. // _, err = C.RemoveAll(f.Query)
  1202. // } else {
  1203. // err = C.Remove(f.Query)
  1204. // }
  1205. // }
  1206. // if err != nil {
  1207. // return Error(ERR_PERSIST, err.Error())
  1208. // }
  1209. // return nil
  1210. // VERIFICAR A NECESSIDADE E CORRIGIR O ERRO
  1211. // func (t *Mongo) GetArrayById(f *Filter, ids []primitive.ObjectID) error {
  1212. // session, err := t.GetClient(f.DB)
  1213. // if err != nil {
  1214. // return err
  1215. // }
  1216. // defer session.Close()
  1217. // q := bson.M{"_id": bson.M{"$in": ids}}
  1218. // err = session.DB(f.DB).C(f.Collection).Find(q).One(f.Entity)
  1219. // return err
  1220. // }
  1221. // func (t *Mongo) GetEmbedFromArray(f *Filter, embed string, id primitive.ObjectID, sid primitive.ObjectID) (errs error) {
  1222. // session, err := t.GetClient(f.DB)
  1223. // if err != nil {
  1224. // return err
  1225. // }
  1226. // defer session.Close()
  1227. // field := "$" + embed
  1228. // match := bson.M{"_id": id}
  1229. // match[embed] = bson.M{"$exists": true}
  1230. // aggregations := []bson.M{
  1231. // bson.M{"$unwind": field},
  1232. // bson.M{"$match": match},
  1233. // bson.M{"$replaceRoot": bson.M{"newRoot": field}},
  1234. // bson.M{"$match": bson.M{"_id": sid}},
  1235. // }
  1236. // C := session.DB(f.DB).C(f.Collection)
  1237. // return C.Pipe(aggregations).One(&f.Entity)
  1238. // }
  1239. // func (t *Mongo) UpdateEmbedFromArray(f *Filter, embed string, id primitive.ObjectID, sid primitive.ObjectID) error {
  1240. // session, err := t.GetClient(f.DB)
  1241. // if err != nil {
  1242. // return err
  1243. // }
  1244. // defer session.Close()
  1245. // col := session.DB(f.DB).C(f.Collection)
  1246. // //Define o elemento que sera atualizado
  1247. // element := bson.M{}
  1248. // element[embed+".$"] = &f.Entity
  1249. // //Define o seletor do elemento que sera atualizado
  1250. // selector := bson.M{"_id": id}
  1251. // selector[embed+"._id"] = sid
  1252. // return col.Update(selector, bson.M{"$set": element})
  1253. // }
  1254. // func (t *Mongo) AddEmbedFromArray(f *Filter, embed string, id primitive.ObjectID) error {
  1255. // session, errs := t.GetClient(f.DB)
  1256. // if errs != nil {
  1257. // return errs
  1258. // }
  1259. // defer session.Close()
  1260. // col := session.DB(f.DB).C(f.Collection)
  1261. // //Define o elemento que sera atualizado
  1262. // element := bson.M{}
  1263. // element[embed] = &f.Entity
  1264. // //Define o seletor do elemento que sera atualizado
  1265. // selector := bson.M{"_id": id}
  1266. // return col.Update(selector, bson.M{"$push": element})
  1267. // }
  1268. // 0000000000000000000000000000000000000000000000000000000000000000
  1269. // func (t *Mongo) NewTransaction(dbname string) (*Transaction, error) {
  1270. // session, e := t.GetClient(dbname)
  1271. // if e != nil {
  1272. // return nil, e
  1273. // }
  1274. // collection := session.DB(dbname).C("txns")
  1275. // return &Transaction{R: txn.NewRunner(collection)}, nil
  1276. // }
  1277. // Transactions
  1278. // type Transaction struct {
  1279. // R *txn.Runner
  1280. // Ops []txn.Op
  1281. // }
  1282. // func (t *Transaction) Insert(collection string, id primitive.ObjectID, entity interface{}) *Transaction {
  1283. // t.Ops = append(t.Ops, txn.Op{
  1284. // C: collection,
  1285. // Id: id,
  1286. // Insert: entity,
  1287. // })
  1288. // return t
  1289. // }
  1290. // func (t *Transaction) Update(collection string, id primitive.ObjectID, assert *bson.M, entity interface{}) *Transaction {
  1291. // op := txn.Op{
  1292. // C: collection,
  1293. // Id: id,
  1294. // Update: entity,
  1295. // }
  1296. // if assert != nil {
  1297. // op.Assert = assert
  1298. // }
  1299. // t.Ops = append(t.Ops, op)
  1300. // return t
  1301. // }
  1302. // func (t *Transaction) Remove(collection string, id primitive.ObjectID) *Transaction {
  1303. // t.Ops = append(t.Ops, txn.Op{
  1304. // C: collection,
  1305. // Id: id,
  1306. // Remove: true,
  1307. // })
  1308. // return t
  1309. // }
  1310. // func (t *Transaction) Run() error {
  1311. // return t.R.Run(t.Ops, primitive.NewObjectID(), nil)
  1312. // }
  1313. // func (t *Mongo) Find(f *Filter, one bool) (*errs.Error, int) {
  1314. // session, err := t.GetClient()
  1315. // if err == nil {
  1316. // defer session.Close()
  1317. // if !f.Id.IsZero() {
  1318. // f.Query = &bson.M{"_id": f.Id}
  1319. // } else if f.Query == nil {
  1320. // f.Query = &bson.M{}
  1321. // }
  1322. // pageToken := &f.PageToken
  1323. // aggregations := []*bson.M{}
  1324. // // Se tem um token de paginacao
  1325. // hasToken := pageToken.HasToken()
  1326. // if hasToken {
  1327. // fmt.Println("consultando com token", pageToken.CurrentID)
  1328. // // {$or:
  1329. // // [
  1330. // // {$and :
  1331. // // [
  1332. // // {<sort key>:{$gte: <last sort key value of previous page>}},
  1333. // // {"_id" : {$gt : <last result id of previous page>}}
  1334. // // ]
  1335. // // },
  1336. // // ou
  1337. // // {<sort key> :{$gt: <last sort key value>}}
  1338. // // ]
  1339. // // }.
  1340. // // f.Query = &bson.M{"$and": []bson.M{
  1341. // // *f.Query,
  1342. // // bson.M{"_id": bson.M{"$gt": f.NextPageToken}},
  1343. // // }}
  1344. // f.Query = &bson.M{"_id": bson.M{
  1345. // // "$gt": primitive.ObjectIDHex(pageToken.CurrentID),
  1346. // "$gt": pageToken.CurrentID,
  1347. // }}
  1348. // } else {
  1349. // }
  1350. // // Adiciona o primeiro estagio da agregacao
  1351. // aggregations = append(aggregations, &bson.M{"$match": f.Query})
  1352. // if len(f.Sort) > 0 {
  1353. // f.Sort = append(f.Sort, "_id")
  1354. // aggregations = append(aggregations, MgoSortBson(f.Sort))
  1355. // // query.Sort(f.Sort...)
  1356. // }
  1357. // // aggregations = append(aggregations, &bson.M{
  1358. // // "$addFields": bson.M{"position":},
  1359. // // })
  1360. // // Seleciona os campos se forem especificados
  1361. // if f.Fields != nil {
  1362. // // aggregations = append(aggregations, bson.M{"$limit": f.MaxResults})
  1363. // }
  1364. // // countQuery := session.DB(f.DB).C(f.Collection).Pipe(aggregations)
  1365. // // Determina o numero de itens a ser retornado
  1366. // if f.MaxResults > 0 {
  1367. // aggregations = append(aggregations, &bson.M{"$limit": f.MaxResults})
  1368. // }
  1369. // spew.Dump(aggregations)
  1370. // // query := session.DB(f.DB).C(f.Collection).Find(f.Query)
  1371. // collection := session.DB(f.DB).C(f.Collection)
  1372. // query := collection.Pipe(aggregations)
  1373. // if one {
  1374. // err = query.One(f.Entity)
  1375. // f.Count = 1 // Filter one
  1376. // } else {
  1377. // err = query.All(f.Entity)
  1378. // // Numero total de documentos
  1379. // // Se tem um token de paginacao o valor total esta no token
  1380. // // Caso contrario consulta no banco
  1381. // if hasToken {
  1382. // f.Count = pageToken.Count
  1383. // } else {
  1384. // f.Count, _ = collection.Find(f.Query).Select(&bson.M{"_id": 1}).Count()
  1385. // }
  1386. // }
  1387. // }
  1388. // if err != nil {
  1389. // return Error(ERR_PERSIST, err.Error()), 0
  1390. // }
  1391. // return nil, f.Count
  1392. // }
  1393. // v := reflect.ValueOf(f.Entity)
  1394. // fmt.Println("UpdateCursorResponse")
  1395. // // spew.Dump(f.PageToken)
  1396. // // fmt.Println(v.Kind())
  1397. // if v.Kind() == reflect.Ptr {
  1398. // // Atualiza v para o elemento apontado
  1399. // v = v.Elem()
  1400. // if v.Kind() == reflect.Slice || v.Kind() == reflect.Array {
  1401. // // Acessa o atributo ID da ultima entidade do array
  1402. // field := v.Index(v.Len() - 1).Elem().FieldByName("ID")
  1403. // // Converte o atributo ID para objectId
  1404. // last := field.Interface().(primitive.ObjectID)
  1405. // // Cria a consulta que verifica se existe outros registros alem do ultimo
  1406. // filter := &Filter{
  1407. // Collection: f.Collection,
  1408. // Query: &bson.M{"_id": bson.M{
  1409. // "$lt": last,
  1410. // }},
  1411. // }
  1412. // // Se existirem elementos adiciona o nextTOKEN
  1413. // if _, count := models.Count(filter); count > 0 {
  1414. // // atualiza os valores do token
  1415. // f.PageToken.StartID = last.Hex()
  1416. // f.PageToken.Count = f.Count
  1417. // resp.NextPageToken = f.PageToken.Encode()
  1418. // }
  1419. // }
  1420. // }
  1421. // func NewMongo() (*Mongo, error) {
  1422. // // session, err := mgo.Dial(addrs)
  1423. // // session, err := mgo.Dial("mongodb://localhost:27017")
  1424. // session, err := mgo.Dial("mongodb://localhost:27017")
  1425. // if err != nil {
  1426. // return nil, err
  1427. // }
  1428. // session.SetMode(mgo.Monotonic, true)
  1429. // // "addrs": "mongodb://localhost:27017",
  1430. // // "user": "guest",
  1431. // // "password": "welcome",
  1432. // // "database": "financeiro"
  1433. // m := &Mongo{
  1434. // Session: session,
  1435. // DataBase: "accounts",
  1436. // Addrs: ,
  1437. // // Config: cfg,
  1438. // }
  1439. // return m, err
  1440. // }