123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- package api
- import (
- "fmt"
- "sync"
- "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs"
- )
- var (
- subscriptionControlID = 0
- )
- type OperatorFunction = func(source *ObservableStruct) *ObservableStruct
- type Subscription struct {
- ID int
- callbacks map[string]func(...interface{})
- Unsubscribe func()
- }
- func (subscription *Subscription) execute(callbackID string, value interface{}) {
- // fmt.Println("execute", callbackID)
- // spew.Dump(value)
- if callback, found := subscription.callbacks[callbackID]; found {
- callback(value)
- }
- }
- func (subscription *Subscription) Next(value interface{}) {
- subscription.execute("next", value)
- }
- func (subscription *Subscription) Err(value interface{}) {
- subscription.execute("error", value)
- }
- func (subscription *Subscription) Complete(value interface{}) {
- subscription.execute("complete", value)
- }
- type Subscriber struct {
- subscriptions map[*Subscription]*Subscription
- callbacks map[string]interface{}
- wg sync.WaitGroup
- }
- func (subscriber *Subscriber) Wait() {
- subscriber.wg.Wait()
- }
- func (subscriber *Subscriber) Add(number int) {
- subscriber.wg.Add(number)
- }
- func (subscriber *Subscriber) Done() {
- defer func() {
- recover()
- }()
- subscriber.wg.Done()
- }
- func (subscriber *Subscriber) Next(value ...interface{}) {
- var send = First(value...)
- // spew.Dump("call next", value, send)
- // fmt.Println("----------------------------------")
- for _, subscription := range subscriber.subscriptions {
- subscription.Next(send)
- }
- }
- func (subscriber *Subscriber) Err(value ...interface{}) {
- var send = First(value...)
- // spew.Dump("call Err", value, send)
- // fmt.Println("----------------------------------")
- for _, subscription := range subscriber.subscriptions {
- subscription.Err(send)
- }
- }
- func (subscriber *Subscriber) Complete(value ...interface{}) {
- var send = First(value...)
- // spew.Dump("call Complete", value, send)
- // fmt.Println("----------------------------------")
- for _, subscription := range subscriber.subscriptions {
- subscription.Complete(send)
- }
- }
- var callbacksOrder = []string{"next", "error", "complete"}
- // func (subject *Subscriber) Subscribe(options SubscribeOptions) (subscription *Subscription) {
- func (subject *Subscriber) Subscribe(options ...func(...interface{})) (subscription *Subscription) {
- var (
- callbackID string
- )
- subscription = &Subscription{
- callbacks: map[string]func(...interface{}){},
- Unsubscribe: func() {
- fmt.Println("Unsubscribe...")
- delete(subject.subscriptions, subscription)
- },
- }
- for index, callback := range options {
- callbackID = callbacksOrder[index]
- subscription.callbacks[callbackID] = func(callbackReference func(...interface{})) func(arg ...interface{}) {
- return func(arg ...interface{}) {
- defer subject.Done()
- // spew.Dump("closure callback ", callbacksOrder[index], arg, callback)
- if callbackReference != nil {
- callbackReference(arg...)
- }
- }
- }(callback)
- }
- // spew.Dump(subscription.callbacks)
- subject.subscriptions[subscription] = subscription
- return
- }
- func NewSubscriber() *Subscriber {
- subscriber := &Subscriber{
- callbacks: map[string]interface{}{},
- subscriptions: map[*Subscription]*Subscription{},
- }
- return subscriber
- }
- type ObservableStruct struct {
- subscribe func(*Subscriber)
- }
- func Observable(subscribe func(observer *Subscriber)) *ObservableStruct {
- return &ObservableStruct{
- subscribe: subscribe,
- }
- }
- func (this *ObservableStruct) Pipe(operators ...OperatorFunction) *ObservableStruct {
- observable := this
- for _, function := range operators {
- observable = function(observable)
- }
- return observable
- }
- func (observable *ObservableStruct) Subscribe(handles ...func(...interface{})) (subscription *Subscription) {
- subject := NewSubscriber()
- subscription = subject.Subscribe(handles...)
- observable.subscribe(subject)
- return
- }
- func rxErrorHandle(observer *Subscriber, err error) {
- if err != nil {
- observer.Err(err)
- } else if err := recover(); err != nil {
- observer.Err(err.(error))
- }
- }
- func First(values ...interface{}) interface{} {
- if len(values) > 0 {
- return values[0]
- }
- return nil
- }
- func FirstError(values ...interface{}) *errs.Error {
- if len(values) > 0 {
- return values[0].(*errs.Error)
- }
- return nil
- }
|