package api import ( "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs" "github.com/davecgh/go-spew/spew" ) // Operators func Take(total int) OperatorFunction { return func(source *ObservableStruct) *ObservableStruct { return Observable(func(observer *Subscriber) { var ( count = 0 subscription *Subscription ) subscription = source.Subscribe( func(value ...interface{}) { if count == total { spew.Dump(subscription) subscription.Unsubscribe() return } count++ observer.Next(First(value...)) }, observer.Err, observer.Complete, ) }) } } func RxMap(operator func(interface{}) (interface{}, error)) OperatorFunction { return func(source *ObservableStruct) *ObservableStruct { return Observable(func(observer *Subscriber) { source.Subscribe( func(value ...interface{}) { var ( resp interface{} err error ) defer rxErrorHandle(observer, err) if resp, err = operator(First(value...)); err == nil { observer.Next(resp) } }, observer.Err, observer.Complete, ) }) } } func RxSwitchMap(operator func(interface{}) *ObservableStruct) OperatorFunction { return func(source *ObservableStruct) *ObservableStruct { return Observable(func(observer *Subscriber) { source.Subscribe( func(value ...interface{}) { defer rxErrorHandle(observer, nil) operator(First(value...)).Subscribe( observer.Next, observer.Err, observer.Complete, ) }, observer.Err, observer.Complete, ) }) // return operator() } } func RxCatchError(operator func(interface{}) *ObservableStruct) OperatorFunction { return func(source *ObservableStruct) *ObservableStruct { return Observable(func(observer *Subscriber) { source.Subscribe( observer.Next, func(erros ...interface{}) { var ( ok bool err interface{} ) if err, ok = First(erros).(*errs.Error); !ok { err = errs.FromError(err.(error)) } operator(err).Subscribe( observer.Next, observer.Err, observer.Complete, ) }, observer.Complete, ) }) } }