hub.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. package sse
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs"
  8. "github.com/gomodule/redigo/redis"
  9. context "github.com/kataras/iris/v12/context"
  10. )
  11. type Event struct {
  12. Timestamp int64 `json:"timestamp"`
  13. Payload string `json:"payload"`
  14. Kind string `json:"kind"`
  15. }
  16. type ChannelClient struct {
  17. // Context context.Context
  18. Flusher http.Flusher
  19. Response http.ResponseWriter
  20. Request *http.Request
  21. }
  22. type Channel struct {
  23. ID string
  24. Clients map[*ChannelClient]bool
  25. }
  26. func (channel *Channel) RemoveClient(client *ChannelClient) {
  27. delete(channel.Clients, client)
  28. }
  29. func (channel *Channel) AddClient(client *ChannelClient) {
  30. channel.Clients[client] = true
  31. }
  32. func (channel *Channel) Emit(event *Event) {
  33. var (
  34. err error
  35. )
  36. for client := range channel.Clients {
  37. if event.Kind != "" {
  38. if _, err = fmt.Fprintf(client.Response, "event: %s\n", event.Kind); err != nil {
  39. fmt.Println(fmt.Sprintf("%+v", err))
  40. continue
  41. }
  42. }
  43. if _, err = fmt.Fprintf(client.Response, "data: %s\n\n", event.Payload); err != nil {
  44. fmt.Println(fmt.Sprintf("%+v", err))
  45. continue
  46. }
  47. client.Flusher.Flush()
  48. }
  49. }
  50. type SSEHub struct {
  51. // New client connections
  52. newClients chan chan []byte
  53. // Closed client connections
  54. closingClients chan chan []byte
  55. consume chan bool
  56. // Client connections registry
  57. Channels map[string]*Channel
  58. ChannelCollection string
  59. RedisPool *redis.Pool
  60. }
  61. type SSEOptions struct {
  62. URI string
  63. Password string
  64. ChannelCollection string
  65. }
  66. func NewSSEHub(options *SSEOptions) *SSEHub {
  67. return &SSEHub{
  68. newClients: make(chan chan []byte),
  69. closingClients: make(chan chan []byte),
  70. consume: make(chan bool),
  71. Channels: make(map[string]*Channel),
  72. ChannelCollection: options.ChannelCollection,
  73. RedisPool: &redis.Pool{
  74. // Maximum number of idle connections in the pool.
  75. MaxIdle: 80,
  76. // max number of connections
  77. MaxActive: 12000,
  78. // Dial is an application supplied function for creating and
  79. // configuring a connection.
  80. Dial: func() (redis.Conn, error) {
  81. conn, err := redis.Dial("tcp", options.URI)
  82. if err != nil {
  83. panic(err.Error())
  84. }
  85. if options.Password != "" {
  86. if _, err := conn.Do("AUTH", options.Password); err != nil {
  87. conn.Close()
  88. return nil, err
  89. }
  90. }
  91. return conn, err
  92. },
  93. },
  94. }
  95. }
  96. func (hub *SSEHub) GetChannel(channelID string) *Channel {
  97. if _, exist := hub.Channels[channelID]; !exist {
  98. channel := &Channel{
  99. Clients: make(map[*ChannelClient]bool),
  100. ID: channelID,
  101. }
  102. hub.Channels[channelID] = channel
  103. }
  104. return hub.Channels[channelID]
  105. }
  106. func (hub *SSEHub) Dispatch(event *Event, channels ...string) {
  107. var (
  108. exists bool
  109. err error
  110. conn = hub.RedisPool.Get()
  111. )
  112. defer conn.Close()
  113. eventBytes, _ := json.Marshal(event)
  114. for _, channel := range channels {
  115. if exists, err = redis.Bool(conn.Do("HEXISTS", hub.ChannelCollection, channel)); err != nil || !exists {
  116. continue
  117. }
  118. conn.Do("RPUSH", channel, string(eventBytes))
  119. }
  120. hub.consume <- true
  121. }
  122. func (hub *SSEHub) Close(channels ...string) {
  123. var (
  124. exists bool
  125. count int
  126. err error
  127. conn = hub.RedisPool.Get()
  128. )
  129. for _, channel := range channels {
  130. if exists, err = redis.Bool(conn.Do("HEXISTS", hub.ChannelCollection, channel)); err != nil || !exists {
  131. continue
  132. }
  133. go func(channelID string) {
  134. for {
  135. if count, err = redis.Int(conn.Do("LLEN", channelID)); count > 0 {
  136. continue
  137. }
  138. conn.Do("HDEL", hub.ChannelCollection, channelID)
  139. }
  140. }(channel)
  141. }
  142. }
  143. func (hub *SSEHub) UpgradeConnection(ctx context.Context, channelId string) (err *errs.Error) {
  144. var (
  145. conn = hub.RedisPool.Get()
  146. payload string
  147. count int
  148. ok bool
  149. consuming bool
  150. redisErr error
  151. flusher http.Flusher
  152. response = ctx.ResponseWriter().Naive()
  153. request = ctx.Request()
  154. )
  155. defer conn.Close()
  156. if flusher, ok = response.(http.Flusher); !ok {
  157. err = errs.HTTPVersionNotSupported().Details(&errs.Detail{
  158. Dominio: "",
  159. Reason: "Streaming unsupported",
  160. Location: "hook.beforePersist",
  161. LocationType: "",
  162. })
  163. return
  164. }
  165. if _, redisErr = redis.Int(conn.Do("HSET", hub.ChannelCollection, channelId, true)); redisErr != nil {
  166. err = errs.Internal().Details(&errs.Detail{
  167. Dominio: "",
  168. Reason: "Fail on register channel",
  169. Location: "hook.beforePersist",
  170. LocationType: "",
  171. })
  172. return
  173. }
  174. header := response.Header()
  175. header.Set("Access-Control-Allow-Origin", "*")
  176. header.Set("Content-Type", "text/event-stream")
  177. header.Set("Cache-Control", "no-cache")
  178. header.Set("Connection", "keep-alive")
  179. flusher.Flush()
  180. client := &ChannelClient{
  181. Flusher: flusher,
  182. Response: response,
  183. Request: request,
  184. }
  185. channel := hub.GetChannel(channelId)
  186. channel.AddClient(client)
  187. finalizeMain := make(chan bool)
  188. // finalizePing := make(chan bool)
  189. finalized := false
  190. // Listen to connection close and when the entire request handler chain exits(this handler here) and un-register messageChan.
  191. notify := response.(http.CloseNotifier).CloseNotify()
  192. go func() {
  193. <-notify
  194. fmt.Println("SSE request end")
  195. defer func() {
  196. // recover from panic caused by writing to a closed channel
  197. recover()
  198. return
  199. }()
  200. if finalized {
  201. return
  202. }
  203. finalized = true
  204. // ctx.Application().Logger().Infof("notification.channel.disconect(%s)", channelId)
  205. // Remove this client from the map of connected clients
  206. // when this handler exits.
  207. redis.Int(conn.Do("HDEL", hub.ChannelCollection, channelId))
  208. // closesd = true
  209. finalizeMain <- finalized
  210. // finalizePing <- finalized
  211. channel.RemoveClient(client)
  212. }()
  213. ticker5Second := time.NewTicker(5 * time.Second)
  214. defer ticker5Second.Stop()
  215. flusher.Flush()
  216. reset:
  217. for {
  218. select {
  219. case <-ticker5Second.C:
  220. go func () {
  221. if !consuming {
  222. hub.consume <- true
  223. }
  224. }()
  225. case <-hub.consume:
  226. // ctx.Application().Logger().Warnf("init.notification.loop 5s")
  227. if count, redisErr = redis.Int(conn.Do("LLEN", channelId)); count == 0 || redisErr != nil {
  228. continue reset
  229. }
  230. consuming = true
  231. for ; count > 0; count-- {
  232. if payload, redisErr = redis.String(conn.Do("LPOP", channelId)); redisErr != nil {
  233. ctx.Application().Logger().Errorf("NotificationError:", redisErr)
  234. } else {
  235. event := &Event{}
  236. json.Unmarshal([]byte(payload), event)
  237. channel.Emit(event)
  238. break
  239. }
  240. }
  241. consuming = false
  242. case <-finalizeMain:
  243. for {
  244. if !consuming {
  245. break
  246. }
  247. time.Sleep(time.Second * 2)
  248. }
  249. // ctx.Application().Logger().Infof("notification.finalize.disconect(%s)", channelId)
  250. close(finalizeMain)
  251. return
  252. }
  253. }
  254. }
  255. // ctx.OnClose(func() {
  256. // defer func() {
  257. // // recover from panic caused by writing to a closed channel
  258. // recover()
  259. // return
  260. // }()
  261. // if finalized {
  262. // return
  263. // }
  264. // finalized = true
  265. // // ctx.Application().Logger().Infof("notification.channel.disconect(%s)", channelId)
  266. // // Remove this client from the map of connected clients
  267. // // when this handler exits.
  268. // redis.Int(conn.Do("HDEL", hub.ChannelCollection, channelId))
  269. // // closesd = true
  270. // finalizeMain <- finalized
  271. // finalizePing <- finalized
  272. // channel.RemoveClient(client)
  273. // })
  274. // Set the headers related to event streaming, you can omit the "application/json" if you send plain text.
  275. // If you develop a go client, you must have: "Accept" : "application/json, text/event-stream" header as well.
  276. // header := ctx.ResponseWriter().Header()
  277. // header.Set("Access-Control-Allow-Origin", "*")
  278. // header.Set("Content-Type", "text/event-stream")
  279. // header.Set("Cache-Control", "no-cache")
  280. // header.Set("Connection", "keep-alive")
  281. // ctx.ContentType("text/event-stream")
  282. // ctx.Header("Cache-Control", "no-cache")
  283. // ctx.Header("Connection", "keep-alive")
  284. // ctx.Request().Response.Header.Set("Access-Control-Allow-Origin", "*")
  285. // Block waiting for messages broadcast on this connection's messageChan.
  286. // check notification in redis
  287. // go func() {
  288. // ctx.Application().Logger().Warnf("init.notification.loop %v", closesd)