observable.operators.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package api
  2. import (
  3. "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs"
  4. "github.com/davecgh/go-spew/spew"
  5. )
  6. // Operators
  7. func Take(total int) OperatorFunction {
  8. return func(source *ObservableStruct) *ObservableStruct {
  9. return Observable(func(observer *Subscriber) {
  10. var (
  11. count = 0
  12. subscription *Subscription
  13. )
  14. subscription = source.Subscribe(
  15. func(value ...interface{}) {
  16. if count == total {
  17. spew.Dump(subscription)
  18. subscription.Unsubscribe()
  19. return
  20. }
  21. count++
  22. observer.Next(First(value...))
  23. },
  24. observer.Err,
  25. observer.Complete,
  26. )
  27. })
  28. }
  29. }
  30. func RxMap(operator func(interface{}) (interface{}, error)) OperatorFunction {
  31. return func(source *ObservableStruct) *ObservableStruct {
  32. return Observable(func(observer *Subscriber) {
  33. source.Subscribe(
  34. func(value ...interface{}) {
  35. var (
  36. resp interface{}
  37. err error
  38. )
  39. defer rxErrorHandle(observer, err)
  40. if resp, err = operator(First(value...)); err == nil {
  41. observer.Next(resp)
  42. }
  43. },
  44. observer.Err,
  45. observer.Complete,
  46. )
  47. })
  48. }
  49. }
  50. func RxSwitchMap(operator func(interface{}) *ObservableStruct) OperatorFunction {
  51. return func(source *ObservableStruct) *ObservableStruct {
  52. return Observable(func(observer *Subscriber) {
  53. source.Subscribe(
  54. func(value ...interface{}) {
  55. defer rxErrorHandle(observer, nil)
  56. operator(First(value...)).Subscribe(
  57. observer.Next,
  58. observer.Err,
  59. observer.Complete,
  60. )
  61. },
  62. observer.Err,
  63. observer.Complete,
  64. )
  65. })
  66. // return operator()
  67. }
  68. }
  69. func RxCatchError(operator func(interface{}) *ObservableStruct) OperatorFunction {
  70. return func(source *ObservableStruct) *ObservableStruct {
  71. return Observable(func(observer *Subscriber) {
  72. source.Subscribe(
  73. observer.Next,
  74. func(erros ...interface{}) {
  75. var (
  76. ok bool
  77. err interface{}
  78. )
  79. if err, ok = First(erros).(*errs.Error); !ok {
  80. err = errs.FromError(err.(error))
  81. }
  82. operator(err).Subscribe(
  83. observer.Next,
  84. observer.Err,
  85. observer.Complete,
  86. )
  87. },
  88. observer.Complete,
  89. )
  90. })
  91. }
  92. }