package sse import ( "encoding/json" "fmt" "net/http" "time" "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs" "github.com/gomodule/redigo/redis" context "github.com/kataras/iris/v12/context" ) type Event struct { Timestamp int64 `json:"timestamp"` Payload string `json:"payload"` Kind string `json:"kind"` } type ChannelClient struct { // Context context.Context Flusher http.Flusher Response http.ResponseWriter Request *http.Request } type Channel struct { ID string Clients map[*ChannelClient]bool } func (channel *Channel) RemoveClient(client *ChannelClient) { delete(channel.Clients, client) } func (channel *Channel) AddClient(client *ChannelClient) { channel.Clients[client] = true } func (channel *Channel) Emit(event *Event) { var ( err error ) for client := range channel.Clients { if event.Kind != "" { if _, err = fmt.Fprintf(client.Response, "event: %s\n", event.Kind); err != nil { fmt.Println(fmt.Sprintf("%+v", err)) continue } } if _, err = fmt.Fprintf(client.Response, "data: %s\n\n", event.Payload); err != nil { fmt.Println(fmt.Sprintf("%+v", err)) continue } client.Flusher.Flush() } } type SSEHub struct { // New client connections newClients chan chan []byte // Closed client connections closingClients chan chan []byte consume chan bool // Client connections registry Channels map[string]*Channel ChannelCollection string RedisPool *redis.Pool } type SSEOptions struct { URI string Password string ChannelCollection string } func NewSSEHub(options *SSEOptions) *SSEHub { return &SSEHub{ newClients: make(chan chan []byte), closingClients: make(chan chan []byte), consume: make(chan bool), Channels: make(map[string]*Channel), ChannelCollection: options.ChannelCollection, RedisPool: &redis.Pool{ // Maximum number of idle connections in the pool. MaxIdle: 80, // max number of connections MaxActive: 12000, // Dial is an application supplied function for creating and // configuring a connection. Dial: func() (redis.Conn, error) { conn, err := redis.Dial("tcp", options.URI) if err != nil { panic(err.Error()) } if options.Password != "" { if _, err := conn.Do("AUTH", options.Password); err != nil { conn.Close() return nil, err } } return conn, err }, }, } } func (hub *SSEHub) GetChannel(channelID string) *Channel { if _, exist := hub.Channels[channelID]; !exist { channel := &Channel{ Clients: make(map[*ChannelClient]bool), ID: channelID, } hub.Channels[channelID] = channel } return hub.Channels[channelID] } func (hub *SSEHub) Dispatch(event *Event, channels ...string) { var ( exists bool err error conn = hub.RedisPool.Get() ) defer conn.Close() for _, channel := range channels { if exists, err = redis.Bool(conn.Do("HEXISTS", hub.ChannelCollection, channel)); err != nil || !exists { continue } eventBytes, _ := json.Marshal(event) conn.Do("RPUSH", channel, string(eventBytes)) } hub.consume <- true } func (hub *SSEHub) UpgradeConnection(ctx context.Context, channelId string) (err *errs.Error) { var ( conn = hub.RedisPool.Get() payload string count int ok bool consuming bool redisErr error flusher http.Flusher response = ctx.ResponseWriter().Naive() request = ctx.Request() ) defer conn.Close() if flusher, ok = response.(http.Flusher); !ok { err = errs.HTTPVersionNotSupported().Details(&errs.Detail{ Dominio: "", Reason: "Streaming unsupported", Location: "hook.beforePersist", LocationType: "", }) return } if _, redisErr = redis.Int(conn.Do("HSET", hub.ChannelCollection, channelId, true)); redisErr != nil { err = errs.Internal().Details(&errs.Detail{ Dominio: "", Reason: "Fail on register channel", Location: "hook.beforePersist", LocationType: "", }) return } header := response.Header() header.Set("Access-Control-Allow-Origin", "*") header.Set("Content-Type", "text/event-stream") header.Set("Cache-Control", "no-cache") header.Set("Connection", "keep-alive") flusher.Flush() client := &ChannelClient{ Flusher: flusher, Response: response, Request: request, } channel := hub.GetChannel(channelId) channel.AddClient(client) finalizeMain := make(chan bool) // finalizePing := make(chan bool) finalized := false // Listen to connection close and when the entire request handler chain exits(this handler here) and un-register messageChan. notify := response.(http.CloseNotifier).CloseNotify() go func() { <-notify fmt.Println("SSE request end") defer func() { // recover from panic caused by writing to a closed channel recover() return }() if finalized { return } finalized = true // ctx.Application().Logger().Infof("notification.channel.disconect(%s)", channelId) // Remove this client from the map of connected clients // when this handler exits. redis.Int(conn.Do("HDEL", hub.ChannelCollection, channelId)) // closesd = true finalizeMain <- finalized // finalizePing <- finalized channel.RemoveClient(client) }() ticker5Second := time.NewTicker(5 * time.Second) defer ticker5Second.Stop() flusher.Flush() reset: for { select { case <-ticker5Second.C: go func () { if !consuming { hub.consume <- true } }() case <-hub.consume: ctx.Application().Logger().Warnf("init.notification.loop 5s") if count, redisErr = redis.Int(conn.Do("LLEN", channelId)); count == 0 || redisErr != nil { continue reset } consuming = true for ; count > 0; count-- { if payload, redisErr = redis.String(conn.Do("LPOP", channelId)); redisErr != nil { ctx.Application().Logger().Errorf("NotificationError:", redisErr) } else { event := &Event{} json.Unmarshal([]byte(payload), event) channel.Emit(event) break } } consuming = false case <-finalizeMain: for { if !consuming { break } time.Sleep(time.Second * 2) } // ctx.Application().Logger().Infof("notification.finalize.disconect(%s)", channelId) close(finalizeMain) return } } } // ctx.OnClose(func() { // defer func() { // // recover from panic caused by writing to a closed channel // recover() // return // }() // if finalized { // return // } // finalized = true // // ctx.Application().Logger().Infof("notification.channel.disconect(%s)", channelId) // // Remove this client from the map of connected clients // // when this handler exits. // redis.Int(conn.Do("HDEL", hub.ChannelCollection, channelId)) // // closesd = true // finalizeMain <- finalized // finalizePing <- finalized // channel.RemoveClient(client) // }) // Set the headers related to event streaming, you can omit the "application/json" if you send plain text. // If you develop a go client, you must have: "Accept" : "application/json, text/event-stream" header as well. // header := ctx.ResponseWriter().Header() // header.Set("Access-Control-Allow-Origin", "*") // header.Set("Content-Type", "text/event-stream") // header.Set("Cache-Control", "no-cache") // header.Set("Connection", "keep-alive") // ctx.ContentType("text/event-stream") // ctx.Header("Cache-Control", "no-cache") // ctx.Header("Connection", "keep-alive") // ctx.Request().Response.Header.Set("Access-Control-Allow-Origin", "*") // Block waiting for messages broadcast on this connection's messageChan. // check notification in redis // go func() { // ctx.Application().Logger().Warnf("init.notification.loop %v", closesd)