package api import ( "fmt" "sync" "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs" ) var ( subscriptionControlID = 0 ) type OperatorFunction = func(source *ObservableStruct) *ObservableStruct type Subscription struct { ID int callbacks map[string]func(...interface{}) Unsubscribe func() } func (subscription *Subscription) execute(callbackID string, value interface{}) { // fmt.Println("execute", callbackID) // spew.Dump(value) if callback, found := subscription.callbacks[callbackID]; found { callback(value) } } func (subscription *Subscription) Next(value interface{}) { subscription.execute("next", value) } func (subscription *Subscription) Err(value interface{}) { subscription.execute("error", value) } func (subscription *Subscription) Complete(value interface{}) { subscription.execute("complete", value) } type Subscriber struct { subscriptions map[*Subscription]*Subscription callbacks map[string]interface{} wg sync.WaitGroup } func (subscriber *Subscriber) Wait() { subscriber.wg.Wait() } func (subscriber *Subscriber) Add(number int) { subscriber.wg.Add(number) } func (subscriber *Subscriber) Done() { defer func() { recover() }() subscriber.wg.Done() } func (subscriber *Subscriber) Next(value ...interface{}) { var send = First(value...) // spew.Dump("call next", value, send) // fmt.Println("----------------------------------") for _, subscription := range subscriber.subscriptions { subscription.Next(send) } } func (subscriber *Subscriber) Err(value ...interface{}) { var send = First(value...) // spew.Dump("call Err", value, send) // fmt.Println("----------------------------------") for _, subscription := range subscriber.subscriptions { subscription.Err(send) } } func (subscriber *Subscriber) Complete(value ...interface{}) { var send = First(value...) // spew.Dump("call Complete", value, send) // fmt.Println("----------------------------------") for _, subscription := range subscriber.subscriptions { subscription.Complete(send) } } var callbacksOrder = []string{"next", "error", "complete"} // func (subject *Subscriber) Subscribe(options SubscribeOptions) (subscription *Subscription) { func (subject *Subscriber) Subscribe(options ...func(...interface{})) (subscription *Subscription) { var ( callbackID string ) subscription = &Subscription{ callbacks: map[string]func(...interface{}){}, Unsubscribe: func() { fmt.Println("Unsubscribe...") delete(subject.subscriptions, subscription) }, } for index, callback := range options { callbackID = callbacksOrder[index] subscription.callbacks[callbackID] = func(callbackReference func(...interface{})) func(arg ...interface{}) { return func(arg ...interface{}) { defer subject.Done() // spew.Dump("closure callback ", callbacksOrder[index], arg, callback) if callbackReference != nil { callbackReference(arg...) } } }(callback) } // spew.Dump(subscription.callbacks) subject.subscriptions[subscription] = subscription return } func NewSubscriber() *Subscriber { subscriber := &Subscriber{ callbacks: map[string]interface{}{}, subscriptions: map[*Subscription]*Subscription{}, } return subscriber } type ObservableStruct struct { subscribe func(*Subscriber) } func Observable(subscribe func(observer *Subscriber)) *ObservableStruct { return &ObservableStruct{ subscribe: subscribe, } } func (this *ObservableStruct) Pipe(operators ...OperatorFunction) *ObservableStruct { observable := this for _, function := range operators { observable = function(observable) } return observable } func (observable *ObservableStruct) Subscribe(handles ...func(...interface{})) (subscription *Subscription) { subject := NewSubscriber() subscription = subject.Subscribe(handles...) observable.subscribe(subject) return } func rxErrorHandle(observer *Subscriber, err error) { if err != nil { observer.Err(err) } else if err := recover(); err != nil { observer.Err(err.(error)) } } func First(values ...interface{}) interface{} { if len(values) > 0 { return values[0] } return nil } func FirstError(values ...interface{}) *errs.Error { if len(values) > 0 { return values[0].(*errs.Error) } return nil }