12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
- package api
- import (
- "fmt"
- "testing"
- "time"
- )
- func TestObeservable(t *testing.T) {
- observable := Observable(func(observer *Subscriber) {
- for i := 0; i < 5; i++ {
- observer.Add(1)
- go func(number int) {
- fmt.Println("execute ", number)
- observer.Next(number)
- time.Sleep(200 * time.Millisecond)
- }(i)
- }
- observer.Wait()
- })
- // subscription := observable.
- observable.
- Pipe(
- // Take(2),
- RxMap(func(value interface{}) (interface{}, error) {
- return value.(int) * 2, nil
- }),
- ).
- Subscribe(
- func(value ...interface{}) {
- fmt.Println("expeted ", value)
- },
- func(err ...interface{}) {
- fmt.Println("error ", err)
- },
- func(...interface{}) {
- fmt.Println("complete")
- // subscription.Done()
- },
- )
- // subscription.Wait()
- }
|