observable.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package api
  2. import (
  3. "fmt"
  4. "sync"
  5. "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs"
  6. )
  7. var (
  8. subscriptionControlID = 0
  9. )
  10. type OperatorFunction = func(source *ObservableStruct) *ObservableStruct
  11. type Subscription struct {
  12. ID int
  13. callbacks map[string]func(...interface{})
  14. Unsubscribe func()
  15. }
  16. func (subscription *Subscription) execute(callbackID string, value interface{}) {
  17. // fmt.Println("execute", callbackID)
  18. // spew.Dump(value)
  19. if callback, found := subscription.callbacks[callbackID]; found {
  20. callback(value)
  21. }
  22. }
  23. func (subscription *Subscription) Next(value interface{}) {
  24. subscription.execute("next", value)
  25. }
  26. func (subscription *Subscription) Err(value interface{}) {
  27. subscription.execute("error", value)
  28. }
  29. func (subscription *Subscription) Complete(value interface{}) {
  30. subscription.execute("complete", value)
  31. }
  32. type Subscriber struct {
  33. subscriptions map[*Subscription]*Subscription
  34. callbacks map[string]interface{}
  35. wg sync.WaitGroup
  36. }
  37. func (subscriber *Subscriber) Wait() {
  38. subscriber.wg.Wait()
  39. }
  40. func (subscriber *Subscriber) Add(number int) {
  41. subscriber.wg.Add(number)
  42. }
  43. func (subscriber *Subscriber) Done() {
  44. defer func() {
  45. recover()
  46. }()
  47. subscriber.wg.Done()
  48. }
  49. func (subscriber *Subscriber) Next(value ...interface{}) {
  50. var send = First(value...)
  51. // spew.Dump("call next", value, send)
  52. // fmt.Println("----------------------------------")
  53. for _, subscription := range subscriber.subscriptions {
  54. subscription.Next(send)
  55. }
  56. }
  57. func (subscriber *Subscriber) Err(value ...interface{}) {
  58. var send = First(value...)
  59. // spew.Dump("call Err", value, send)
  60. // fmt.Println("----------------------------------")
  61. for _, subscription := range subscriber.subscriptions {
  62. subscription.Err(send)
  63. }
  64. }
  65. func (subscriber *Subscriber) Complete(value ...interface{}) {
  66. var send = First(value...)
  67. // spew.Dump("call Complete", value, send)
  68. // fmt.Println("----------------------------------")
  69. for _, subscription := range subscriber.subscriptions {
  70. subscription.Complete(send)
  71. }
  72. }
  73. var callbacksOrder = []string{"next", "error", "complete"}
  74. // func (subject *Subscriber) Subscribe(options SubscribeOptions) (subscription *Subscription) {
  75. func (subject *Subscriber) Subscribe(options ...func(...interface{})) (subscription *Subscription) {
  76. var (
  77. callbackID string
  78. )
  79. subscription = &Subscription{
  80. callbacks: map[string]func(...interface{}){},
  81. Unsubscribe: func() {
  82. fmt.Println("Unsubscribe...")
  83. delete(subject.subscriptions, subscription)
  84. },
  85. }
  86. for index, callback := range options {
  87. callbackID = callbacksOrder[index]
  88. subscription.callbacks[callbackID] = func(callbackReference func(...interface{})) func(arg ...interface{}) {
  89. return func(arg ...interface{}) {
  90. defer subject.Done()
  91. // spew.Dump("closure callback ", callbacksOrder[index], arg, callback)
  92. if callbackReference != nil {
  93. callbackReference(arg...)
  94. }
  95. }
  96. }(callback)
  97. }
  98. // spew.Dump(subscription.callbacks)
  99. subject.subscriptions[subscription] = subscription
  100. return
  101. }
  102. func NewSubscriber() *Subscriber {
  103. subscriber := &Subscriber{
  104. callbacks: map[string]interface{}{},
  105. subscriptions: map[*Subscription]*Subscription{},
  106. }
  107. return subscriber
  108. }
  109. type ObservableStruct struct {
  110. subscribe func(*Subscriber)
  111. }
  112. func Observable(subscribe func(observer *Subscriber)) *ObservableStruct {
  113. return &ObservableStruct{
  114. subscribe: subscribe,
  115. }
  116. }
  117. func (this *ObservableStruct) Pipe(operators ...OperatorFunction) *ObservableStruct {
  118. observable := this
  119. for _, function := range operators {
  120. observable = function(observable)
  121. }
  122. return observable
  123. }
  124. func (observable *ObservableStruct) Subscribe(handles ...func(...interface{})) (subscription *Subscription) {
  125. subject := NewSubscriber()
  126. subscription = subject.Subscribe(handles...)
  127. observable.subscribe(subject)
  128. return
  129. }
  130. func rxErrorHandle(observer *Subscriber, err error) {
  131. if err != nil {
  132. observer.Err(err)
  133. } else if err := recover(); err != nil {
  134. observer.Err(err.(error))
  135. }
  136. }
  137. func First(values ...interface{}) interface{} {
  138. if len(values) > 0 {
  139. return values[0]
  140. }
  141. return nil
  142. }
  143. func FirstError(values ...interface{}) *errs.Error {
  144. if len(values) > 0 {
  145. return values[0].(*errs.Error)
  146. }
  147. return nil
  148. }