hub.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package sse
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs"
  8. "github.com/davecgh/go-spew/spew"
  9. "github.com/gomodule/redigo/redis"
  10. context "github.com/kataras/iris/v12/context"
  11. )
  12. type Event struct {
  13. Timestamp int64 `json:"timestamp"`
  14. Payload string `json:"payload"`
  15. Kind string `json:"kind"`
  16. }
  17. type ChannelClient struct {
  18. Context context.Context
  19. Flusher http.Flusher
  20. }
  21. // type *event
  22. type Channel struct {
  23. ID string
  24. Clients map[*ChannelClient]bool
  25. }
  26. func (channel *Channel) RemoveClient(client *ChannelClient) {
  27. delete(channel.Clients, client)
  28. }
  29. func (channel *Channel) AddClient(client *ChannelClient) {
  30. channel.Clients[client] = true
  31. }
  32. func (channel *Channel) Emit(event *Event) {
  33. defer func () {
  34. if err := recover(); err != nil {
  35. fmt.Printf("//--------------------\n%s\n-----------------//", err)
  36. spew.Dump(err)
  37. }
  38. }()
  39. for client := range channel.Clients {
  40. if event.Kind != "" {
  41. client.Context.Writef("event: %s\n", event.Kind)
  42. }
  43. client.Context.Writef("data: %s\n\n", event.Payload)
  44. client.Flusher.Flush()
  45. }
  46. }
  47. type SSEHub struct {
  48. // New client connections
  49. newClients chan chan []byte
  50. // Closed client connections
  51. closingClients chan chan []byte
  52. // Client connections registry
  53. Channels map[string]*Channel
  54. ChannelCollection string
  55. RedisPool *redis.Pool
  56. }
  57. type SSEOptions struct {
  58. URI string
  59. Password string
  60. ChannelCollection string
  61. }
  62. func NewSSEHub(options *SSEOptions) *SSEHub {
  63. return &SSEHub{
  64. newClients: make(chan chan []byte),
  65. closingClients: make(chan chan []byte),
  66. Channels: make(map[string]*Channel),
  67. ChannelCollection: options.ChannelCollection,
  68. RedisPool: &redis.Pool{
  69. // Maximum number of idle connections in the pool.
  70. MaxIdle: 80,
  71. // max number of connections
  72. MaxActive: 12000,
  73. // Dial is an application supplied function for creating and
  74. // configuring a connection.
  75. Dial: func() (redis.Conn, error) {
  76. conn, err := redis.Dial("tcp", options.URI)
  77. if err != nil {
  78. panic(err.Error())
  79. }
  80. if options.Password != "" {
  81. if _, err := conn.Do("AUTH", options.Password); err != nil {
  82. conn.Close()
  83. return nil, err
  84. }
  85. }
  86. return conn, err
  87. },
  88. },
  89. }
  90. }
  91. func (hub *SSEHub) GetChannel(channelID string) *Channel {
  92. if _, exist := hub.Channels[channelID]; !exist {
  93. channel := &Channel{
  94. Clients: make(map[*ChannelClient]bool),
  95. ID: channelID,
  96. }
  97. hub.Channels[channelID] = channel
  98. }
  99. return hub.Channels[channelID]
  100. }
  101. func (hub *SSEHub) Dispatch(event *Event, channels ...string) {
  102. var (
  103. exists bool
  104. err error
  105. conn = hub.RedisPool.Get()
  106. )
  107. defer conn.Close()
  108. for _, channel := range channels {
  109. if exists, err = redis.Bool(conn.Do("HEXISTS", hub.ChannelCollection, channel)); err != nil || !exists {
  110. continue
  111. }
  112. eventBytes, _ := json.Marshal(event)
  113. conn.Do("RPUSH", channel, string(eventBytes))
  114. }
  115. }
  116. func (hub *SSEHub) UpgradeConnection(ctx context.Context, channelId string) (err *errs.Error) {
  117. var (
  118. conn = hub.RedisPool.Get()
  119. payload string
  120. count int
  121. // ok, closed bool
  122. ok bool
  123. redisErr error
  124. flusher http.Flusher
  125. )
  126. defer conn.Close()
  127. if flusher, ok = ctx.ResponseWriter().Flusher(); !ok {
  128. err = errs.HTTPVersionNotSupported().Details(&errs.Detail{
  129. Dominio: "",
  130. Reason: "Streaming unsupported",
  131. Location: "hook.beforePersist",
  132. LocationType: "",
  133. })
  134. return
  135. }
  136. // Each connection registers its own message channel with the Broker's connections registry.
  137. // messageChan := make(chan []byte)
  138. // Signal the broker that we have a new connection.
  139. // hub.newClients <- messageChan
  140. if _, redisErr = redis.Int(conn.Do("HSET", hub.ChannelCollection, channelId, true)); redisErr != nil {
  141. err = errs.Internal().Details(&errs.Detail{
  142. Dominio: "",
  143. Reason: "Fail on register channel",
  144. Location: "hook.beforePersist",
  145. LocationType: "",
  146. })
  147. return
  148. }
  149. client := &ChannelClient{
  150. Flusher: flusher,
  151. Context: ctx,
  152. }
  153. channel := hub.GetChannel(channelId)
  154. channel.AddClient(client)
  155. finalizeMain := make(chan bool)
  156. finalizePing := make(chan bool)
  157. finalized := false
  158. // Listen to connection close and when the entire request handler chain exits(this handler here) and un-register messageChan.
  159. ctx.OnClose(func() {
  160. defer func() {
  161. // recover from panic caused by writing to a closed channel
  162. recover()
  163. return
  164. }()
  165. if finalized {
  166. return
  167. }
  168. finalized = true
  169. // ctx.Application().Logger().Infof("notification.channel.disconect(%s)", channelId)
  170. // Remove this client from the map of connected clients
  171. // when this handler exits.
  172. redis.Int(conn.Do("HDEL", hub.ChannelCollection, channelId))
  173. // closesd = true
  174. finalizeMain <- finalized
  175. finalizePing <- finalized
  176. channel.RemoveClient(client)
  177. })
  178. // Set the headers related to event streaming, you can omit the "application/json" if you send plain text.
  179. // If you develop a go client, you must have: "Accept" : "application/json, text/event-stream" header as well.
  180. ctx.ContentType("text/event-stream")
  181. // ctx.Header("Access-Control-Allow-Origin", "*")
  182. ctx.Header("Cache-Control", "no-cache")
  183. ctx.Header("Connection", "keep-alive")
  184. ctx.ResponseWriter().Header().Set("Access-Control-Allow-Origin", "*")
  185. flusher.Flush()
  186. // ctx.Request().Response.Header.Set("Access-Control-Allow-Origin", "*")
  187. // Block waiting for messages broadcast on this connection's messageChan.
  188. // check notification in redis
  189. // go func() {
  190. // ctx.Application().Logger().Warnf("init.notification.loop %v", closesd)
  191. // Essa thread dispara um ping para manter a conexão ativa com o client
  192. go func() {
  193. ticker := time.NewTicker(3 * time.Second)
  194. defer ticker.Stop()
  195. for {
  196. select {
  197. case <-ticker.C:
  198. // ctx.Application().Logger().Warnf("init.notification.loop 2s")
  199. // flusher.Flush()
  200. case <-finalizePing:
  201. // ctx.Application().Logger().Warnf("finalize init.notification.loop 2s")
  202. close(finalizePing)
  203. return
  204. }
  205. }
  206. }()
  207. ticker5Second := time.NewTicker(5 * time.Second)
  208. defer ticker5Second.Stop()
  209. reset:
  210. for {
  211. select {
  212. case <-ticker5Second.C:
  213. ctx.Application().Logger().Warnf("init.notification.loop 5s")
  214. if count, redisErr = redis.Int(conn.Do("LLEN", channelId)); count == 0 || redisErr != nil {
  215. continue reset
  216. }
  217. for ; count > 0; count-- {
  218. if payload, redisErr = redis.String(conn.Do("LPOP", channelId)); redisErr != nil {
  219. ctx.Application().Logger().Errorf("NotificationError:", redisErr)
  220. } else {
  221. event := &Event{}
  222. json.Unmarshal([]byte(payload), event)
  223. fmt.Println("emit event ", event)
  224. channel.Emit(event)
  225. break
  226. }
  227. }
  228. case <-finalizeMain:
  229. ctx.Application().Logger().Infof("notification.finalize.disconect(%s)", channelId)
  230. close(finalizeMain)
  231. return
  232. }
  233. }
  234. return
  235. }