package api import ( "fmt" "testing" "time" ) func TestObeservable(t *testing.T) { observable := Observable(func(observer *Subscriber) { for i := 0; i < 5; i++ { observer.Add(1) go func(number int) { fmt.Println("execute ", number) observer.Next(number) time.Sleep(200 * time.Millisecond) }(i) } observer.Wait() }) // subscription := observable. observable. Pipe( // Take(2), RxMap(func(value interface{}) (interface{}, error) { return value.(int) * 2, nil }), ). Subscribe( func(value ...interface{}) { fmt.Println("expeted ", value) }, func(err ...interface{}) { fmt.Println("error ", err) }, func(...interface{}) { fmt.Println("complete") // subscription.Done() }, ) // subscription.Wait() }