123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- package api
- import (
- "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs"
- "github.com/davecgh/go-spew/spew"
- )
- // Operators
- func Take(total int) OperatorFunction {
- return func(source *ObservableStruct) *ObservableStruct {
- return Observable(func(observer *Subscriber) {
- var (
- count = 0
- subscription *Subscription
- )
- subscription = source.Subscribe(
- func(value ...interface{}) {
- if count == total {
- spew.Dump(subscription)
- subscription.Unsubscribe()
- return
- }
- count++
- observer.Next(First(value...))
- },
- observer.Err,
- observer.Complete,
- )
- })
- }
- }
- func RxMap(operator func(interface{}) (interface{}, error)) OperatorFunction {
- return func(source *ObservableStruct) *ObservableStruct {
- return Observable(func(observer *Subscriber) {
- source.Subscribe(
- func(value ...interface{}) {
- var (
- resp interface{}
- err error
- )
- defer rxErrorHandle(observer, err)
- if resp, err = operator(First(value...)); err == nil {
- observer.Next(resp)
- }
- },
- observer.Err,
- observer.Complete,
- )
- })
- }
- }
- func RxSwitchMap(operator func(interface{}) *ObservableStruct) OperatorFunction {
- return func(source *ObservableStruct) *ObservableStruct {
- return Observable(func(observer *Subscriber) {
- source.Subscribe(
- func(value ...interface{}) {
- defer rxErrorHandle(observer, nil)
- operator(First(value...)).Subscribe(
- observer.Next,
- observer.Err,
- observer.Complete,
- )
- },
- observer.Err,
- observer.Complete,
- )
- })
- // return operator()
- }
- }
- func RxCatchError(operator func(interface{}) *ObservableStruct) OperatorFunction {
- return func(source *ObservableStruct) *ObservableStruct {
- return Observable(func(observer *Subscriber) {
- source.Subscribe(
- observer.Next,
- func(erros ...interface{}) {
- var (
- ok bool
- err interface{}
- )
- if err, ok = First(erros).(*errs.Error); !ok {
- err = errs.FromError(err.(error))
- }
- operator(err).Subscribe(
- observer.Next,
- observer.Err,
- observer.Complete,
- )
- },
- observer.Complete,
- )
- })
- }
- }
|