hub.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. package sse
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs"
  8. "github.com/gomodule/redigo/redis"
  9. context "github.com/kataras/iris/v12/context"
  10. )
  11. type Event struct {
  12. Timestamp int64 `json:"timestamp"`
  13. Payload string `json:"payload"`
  14. Kind string `json:"kind"`
  15. }
  16. type ChannelClient struct {
  17. Context context.Context
  18. Flusher http.Flusher
  19. }
  20. // type *event
  21. type Channel struct {
  22. ID string
  23. Clients map[*ChannelClient]bool
  24. }
  25. func (channel *Channel) RemoveClient(client *ChannelClient) {
  26. delete(channel.Clients, client)
  27. }
  28. func (channel *Channel) AddClient(client *ChannelClient) {
  29. channel.Clients[client] = true
  30. }
  31. func (channel *Channel) Emit(event *Event) {
  32. for client := range channel.Clients {
  33. if event.Kind != "" {
  34. client.Context.Writef("event: %s\n", event.Kind)
  35. }
  36. client.Context.Writef("data: %s\n\n", event.Payload)
  37. client.Flusher.Flush()
  38. }
  39. }
  40. type SSEHub struct {
  41. // New client connections
  42. newClients chan chan []byte
  43. // Closed client connections
  44. closingClients chan chan []byte
  45. // Client connections registry
  46. Channels map[string]*Channel
  47. ChannelCollection string
  48. RedisPool *redis.Pool
  49. }
  50. type SSEOptions struct {
  51. URI string
  52. Password string
  53. ChannelCollection string
  54. }
  55. func NewSSEHub(options *SSEOptions) *SSEHub {
  56. return &SSEHub{
  57. newClients: make(chan chan []byte),
  58. closingClients: make(chan chan []byte),
  59. Channels: make(map[string]*Channel),
  60. ChannelCollection: options.ChannelCollection,
  61. RedisPool: &redis.Pool{
  62. // Maximum number of idle connections in the pool.
  63. MaxIdle: 80,
  64. // max number of connections
  65. MaxActive: 12000,
  66. // Dial is an application supplied function for creating and
  67. // configuring a connection.
  68. Dial: func() (redis.Conn, error) {
  69. conn, err := redis.Dial("tcp", options.URI)
  70. if err != nil {
  71. panic(err.Error())
  72. }
  73. if options.Password != "" {
  74. if _, err := conn.Do("AUTH", options.Password); err != nil {
  75. conn.Close()
  76. return nil, err
  77. }
  78. }
  79. return conn, err
  80. },
  81. },
  82. }
  83. }
  84. func (hub *SSEHub) GetChannel(channelID string) *Channel {
  85. if _, exist := hub.Channels[channelID]; !exist {
  86. channel := &Channel{
  87. Clients: make(map[*ChannelClient]bool),
  88. ID: channelID,
  89. }
  90. hub.Channels[channelID] = channel
  91. }
  92. return hub.Channels[channelID]
  93. }
  94. func (hub *SSEHub) Dispatch(event *Event, channels ...string) {
  95. var (
  96. exists bool
  97. err error
  98. conn = hub.RedisPool.Get()
  99. )
  100. defer conn.Close()
  101. for _, channel := range channels {
  102. if exists, err = redis.Bool(conn.Do("HEXISTS", hub.ChannelCollection, channel)); err != nil || !exists {
  103. continue
  104. }
  105. eventBytes, _ := json.Marshal(event)
  106. conn.Do("RPUSH", channel, string(eventBytes))
  107. }
  108. }
  109. func (hub *SSEHub) UpgradeConnection(ctx context.Context, channelId string) (err *errs.Error) {
  110. var (
  111. conn = hub.RedisPool.Get()
  112. payload string
  113. count int
  114. // ok, closed bool
  115. ok bool
  116. redisErr error
  117. flusher http.Flusher
  118. )
  119. defer conn.Close()
  120. if flusher, ok = ctx.ResponseWriter().Flusher(); !ok {
  121. err = errs.HTTPVersionNotSupported().Details(&errs.Detail{
  122. Dominio: "",
  123. Reason: "Streaming unsupported",
  124. Location: "hook.beforePersist",
  125. LocationType: "",
  126. })
  127. return
  128. }
  129. // Each connection registers its own message channel with the Broker's connections registry.
  130. // messageChan := make(chan []byte)
  131. // Signal the broker that we have a new connection.
  132. // hub.newClients <- messageChan
  133. if _, redisErr = redis.Int(conn.Do("HSET", hub.ChannelCollection, channelId, true)); redisErr != nil {
  134. err = errs.Internal().Details(&errs.Detail{
  135. Dominio: "",
  136. Reason: "Fail on register channel",
  137. Location: "hook.beforePersist",
  138. LocationType: "",
  139. })
  140. return
  141. }
  142. client := &ChannelClient{
  143. Flusher: flusher,
  144. Context: ctx,
  145. }
  146. channel := hub.GetChannel(channelId)
  147. channel.AddClient(client)
  148. finalizeMain := make(chan bool)
  149. finalizePing := make(chan bool)
  150. finalized := false
  151. // Listen to connection close and when the entire request handler chain exits(this handler here) and un-register messageChan.
  152. ctx.OnClose(func() {
  153. defer func() {
  154. // recover from panic caused by writing to a closed channel
  155. recover()
  156. return
  157. }()
  158. if finalized {
  159. return
  160. }
  161. finalized = true
  162. // ctx.Application().Logger().Infof("notification.channel.disconect(%s)", channelId)
  163. // Remove this client from the map of connected clients
  164. // when this handler exits.
  165. redis.Int(conn.Do("HDEL", hub.ChannelCollection, channelId))
  166. // closesd = true
  167. finalizeMain <- finalized
  168. finalizePing <- finalized
  169. channel.RemoveClient(client)
  170. })
  171. // Set the headers related to event streaming, you can omit the "application/json" if you send plain text.
  172. // If you develop a go client, you must have: "Accept" : "application/json, text/event-stream" header as well.
  173. ctx.ContentType("text/event-stream")
  174. // ctx.Header("Access-Control-Allow-Origin", "*")
  175. ctx.Header("Cache-Control", "no-cache")
  176. ctx.Header("Connection", "keep-alive")
  177. ctx.ResponseWriter().Header().Set("Access-Control-Allow-Origin", "*")
  178. flusher.Flush()
  179. // ctx.Request().Response.Header.Set("Access-Control-Allow-Origin", "*")
  180. // Block waiting for messages broadcast on this connection's messageChan.
  181. // check notification in redis
  182. // go func() {
  183. // ctx.Application().Logger().Warnf("init.notification.loop %v", closesd)
  184. // Essa thread dispara um ping para manter a conexão ativa com o client
  185. go func() {
  186. ticker := time.NewTicker(3 * time.Second)
  187. defer ticker.Stop()
  188. for {
  189. select {
  190. case <-ticker.C:
  191. // ctx.Application().Logger().Warnf("init.notification.loop 2s")
  192. flusher.Flush()
  193. case <-finalizePing:
  194. // ctx.Application().Logger().Warnf("finalize init.notification.loop 2s")
  195. close(finalizePing)
  196. return
  197. }
  198. }
  199. }()
  200. ticker5Second := time.NewTicker(5 * time.Second)
  201. defer ticker5Second.Stop()
  202. reset:
  203. for {
  204. select {
  205. case <-ticker5Second.C:
  206. ctx.Application().Logger().Warnf("init.notification.loop 5s")
  207. if count, redisErr = redis.Int(conn.Do("LLEN", channelId)); count == 0 || redisErr != nil {
  208. continue reset
  209. }
  210. for ; count > 0; count-- {
  211. if payload, redisErr = redis.String(conn.Do("LPOP", channelId)); redisErr != nil {
  212. ctx.Application().Logger().Errorf("NotificationError:", redisErr)
  213. } else {
  214. event := &Event{}
  215. json.Unmarshal([]byte(payload), event)
  216. fmt.Println("emit event ", event)
  217. channel.Emit(event)
  218. break
  219. }
  220. }
  221. case <-finalizeMain:
  222. ctx.Application().Logger().Infof("notification.finalize.disconect(%s)", channelId)
  223. close(finalizeMain)
  224. return
  225. }
  226. }
  227. return
  228. }