mongo.rx.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package api
  2. import (
  3. "fmt" // "html/template"
  4. "reflect"
  5. "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs"
  6. "go.mongodb.org/mongo-driver/mongo"
  7. "go.mongodb.org/mongo-driver/mongo/options"
  8. )
  9. func (mongo *Mongo) InsertOneRx(options *Filter) *ObservableStruct {
  10. return Observable(func(observer *Subscriber) {
  11. if res, err := mongo.InsertOne(options); err != nil {
  12. observer.Err(err)
  13. } else {
  14. observer.Next(res)
  15. }
  16. })
  17. }
  18. func (_mongo *Mongo) FindManyRx(params *Filter) *ObservableStruct {
  19. var (
  20. ctx, collection = _mongo.GetContext(params)
  21. cursor *mongo.Cursor
  22. err error
  23. )
  24. return Observable(func(observer *Subscriber) {
  25. switch {
  26. case params.Pipeline != nil:
  27. // opts := options.Aggregate().SetBatchSize(int32(params.MaxResults))
  28. opts := options.Aggregate()
  29. if cursor, err = collection.Aggregate(ctx, params.Pipeline, opts); err != nil {
  30. err = errs.FromError(err)
  31. return
  32. }
  33. case params.Query != nil:
  34. findOptions := options.Find()
  35. findOptions.SetLimit(int64(params.MaxResults))
  36. if params.Sort != nil {
  37. findOptions.SetSort(params.Sort)
  38. }
  39. if params.Fields != nil {
  40. findOptions.SetProjection(params.Fields)
  41. }
  42. cursor, err = collection.Find(ctx, params.Query, findOptions)
  43. fmt.Println("erro no query")
  44. default:
  45. err = errs.Internal().Details(&errs.Detail{
  46. Message: "FindMany requires a Pipeline or Query param.",
  47. })
  48. }
  49. if err == nil {
  50. observer.Next(cursor)
  51. } else {
  52. observer.Next(err)
  53. }
  54. }).Pipe(
  55. RxMap(func(value interface{}) (interface{}, error) {
  56. var (
  57. cursor, _ = value.(*mongo.Cursor)
  58. elemp reflect.Value
  59. err error
  60. )
  61. defer cursor.Close(ctx)
  62. if params.Entities == nil {
  63. return nil, fmt.Errorf("Entities can't be nil")
  64. }
  65. entitiesValue := reflect.ValueOf(params.Entities)
  66. if entitiesValue.Kind() != reflect.Ptr || entitiesValue.Elem().Kind() != reflect.Slice {
  67. return nil, fmt.Errorf("Entities argument must be a slice address")
  68. }
  69. sliceVector := entitiesValue.Elem()
  70. sliceVector = sliceVector.Slice(0, sliceVector.Cap())
  71. typ := sliceVector.Type().Elem()
  72. // LogError(0, cursor.Err().Error())
  73. for cursor.Next(ctx) {
  74. fmt.Println("interate")
  75. elemp = reflect.New(typ)
  76. if err = cursor.Decode(elemp.Interface()); err != nil {
  77. return nil, err
  78. }
  79. sliceVector = reflect.Append(sliceVector, elemp.Elem())
  80. }
  81. entitiesValue.Elem().Set(sliceVector)
  82. return entitiesValue.Elem(), nil
  83. }),
  84. )
  85. }