observable_test.go 794 B

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package api
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. )
  7. func TestObeservable(t *testing.T) {
  8. observable := Observable(func(observer *Subscriber) {
  9. for i := 0; i < 5; i++ {
  10. observer.Add(1)
  11. go func(number int) {
  12. fmt.Println("execute ", number)
  13. observer.Next(number)
  14. time.Sleep(200 * time.Millisecond)
  15. }(i)
  16. }
  17. observer.Wait()
  18. })
  19. // subscription := observable.
  20. observable.
  21. Pipe(
  22. // Take(2),
  23. RxMap(func(value interface{}) (interface{}, error) {
  24. return value.(int) * 2, nil
  25. }),
  26. ).
  27. Subscribe(
  28. func(value ...interface{}) {
  29. fmt.Println("expeted ", value)
  30. },
  31. func(err ...interface{}) {
  32. fmt.Println("error ", err)
  33. },
  34. func(...interface{}) {
  35. fmt.Println("complete")
  36. // subscription.Done()
  37. },
  38. )
  39. // subscription.Wait()
  40. }