package sse import ( "encoding/json" "net/http" "time" "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs" "github.com/gomodule/redigo/redis" context "github.com/kataras/iris/v12/context" ) type Event struct { Timestamp int64 `json:"timestamp"` Payload string `json:"payload"` Kind string `json:"kind"` } type ChannelClient struct { Context context.Context Flusher http.Flusher } // type *event type Channel struct { ID string Clients map[*ChannelClient]bool } func (channel *Channel) RemoveClient(client *ChannelClient) { delete(channel.Clients, client) } func (channel *Channel) AddClient(client *ChannelClient) { channel.Clients[client] = true } func (channel *Channel) Emit(event *Event) { for client := range channel.Clients { if event.Kind != "" { client.Context.Writef("event: %s\n", event.Kind) } client.Context.Writef("data: %s\n\n", event.Payload) client.Flusher.Flush() } } type SSEHub struct { // New client connections newClients chan chan []byte // Closed client connections closingClients chan chan []byte // Client connections registry Channels map[string]*Channel ChannelCollection string RedisPool *redis.Pool } type SSEOptions struct { URI string Password string ChannelCollection string } func NewSSEHub(options *SSEOptions) *SSEHub { return &SSEHub{ newClients: make(chan chan []byte), closingClients: make(chan chan []byte), Channels: make(map[string]*Channel), ChannelCollection: options.ChannelCollection, RedisPool: &redis.Pool{ // Maximum number of idle connections in the pool. MaxIdle: 80, // max number of connections MaxActive: 12000, // Dial is an application supplied function for creating and // configuring a connection. Dial: func() (redis.Conn, error) { conn, err := redis.Dial("tcp", options.URI) if err != nil { panic(err.Error()) } if options.Password != "" { if _, err := conn.Do("AUTH", options.Password); err != nil { conn.Close() return nil, err } } return conn, err }, }, } } func (hub *SSEHub) GetChannel(channelID string) *Channel { if _, exist := hub.Channels[channelID]; !exist { channel := &Channel{ Clients: make(map[*ChannelClient]bool), ID: channelID, } hub.Channels[channelID] = channel } return hub.Channels[channelID] } func (hub *SSEHub) Dispatch(event *Event, channels ...string) { var ( exists bool err error conn = hub.RedisPool.Get() ) defer conn.Close() for _, channel := range channels { if exists, err = redis.Bool(conn.Do("HEXISTS", hub.ChannelCollection, channel)); err != nil || !exists { continue } eventBytes, _ := json.Marshal(event) conn.Do("RPUSH", channel, string(eventBytes)) } } func (hub *SSEHub) UpgradeConnection(ctx context.Context, channelId string) (err *errs.Error) { var ( conn = hub.RedisPool.Get() payload string count int // ok, closed bool ok bool redisErr error flusher http.Flusher ) defer conn.Close() if flusher, ok = ctx.ResponseWriter().Flusher(); !ok { err = errs.HTTPVersionNotSupported().Details(&errs.Detail{ Dominio: "", Reason: "Streaming unsupported", Location: "hook.beforePersist/caracterizationForm", LocationType: "", }) return } // Each connection registers its own message channel with the Broker's connections registry. // messageChan := make(chan []byte) // Signal the broker that we have a new connection. // hub.newClients <- messageChan if _, redisErr = redis.Int(conn.Do("HSET", hub.ChannelCollection, channelId, true)); redisErr != nil { err = errs.Internal().Details(&errs.Detail{ Dominio: "", Reason: "Fail on register channel", Location: "hook.beforePersist/caracterizationForm", LocationType: "", }) return } client := &ChannelClient{ Flusher: flusher, Context: ctx, } channel := hub.GetChannel(channelId) channel.AddClient(client) finalizeMain := make(chan bool) finalizePing := make(chan bool) finalized := false // Listen to connection close and when the entire request handler chain exits(this handler here) and un-register messageChan. ctx.OnClose(func() { defer func() { // recover from panic caused by writing to a closed channel recover() return }() if finalized { return } finalized = true // ctx.Application().Logger().Infof("notification.channel.disconect(%s)", channelId) // Remove this client from the map of connected clients // when this handler exits. redis.Int(conn.Do("HDEL", hub.ChannelCollection, channelId)) // closesd = true finalizeMain <- finalized finalizePing <- finalized channel.RemoveClient(client) }) // Set the headers related to event streaming, you can omit the "application/json" if you send plain text. // If you develop a go client, you must have: "Accept" : "application/json, text/event-stream" header as well. ctx.ContentType("text/event-stream") // ctx.Header("Access-Control-Allow-Origin", "*") ctx.Header("Cache-Control", "no-cache") ctx.Header("Connection", "keep-alive") ctx.ResponseWriter().Header().Set("Access-Control-Allow-Origin", "*") flusher.Flush() // ctx.Request().Response.Header.Set("Access-Control-Allow-Origin", "*") // Block waiting for messages broadcast on this connection's messageChan. // check notification in redis // go func() { // ctx.Application().Logger().Warnf("init.notification.loop %v", closesd) // Essa thread dispara um ping para manter a conexão ativa com o client go func() { ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: // ctx.Application().Logger().Warnf("init.notification.loop 2s") flusher.Flush() case <-finalizePing: // ctx.Application().Logger().Warnf("finalize init.notification.loop 2s") close(finalizePing) return } } }() ticker5Second := time.NewTicker(5 * time.Second) defer ticker5Second.Stop() reset: for { select { case <-ticker5Second.C: // ctx.Application().Logger().Warnf("init.notification.loop 5s") if count, redisErr = redis.Int(conn.Do("LLEN", channelId)); count == 0 || redisErr != nil { continue reset } for ; count > 0; count-- { if payload, redisErr = redis.String(conn.Do("LPOP", channelId)); redisErr != nil { // ctx.Application().Logger().Errorf("NotificationError:", redisErr) } else { event := &Event{} json.Unmarshal([]byte(payload), event) channel.Emit(event) break } } case <-finalizeMain: // ctx.Application().Logger().Infof("notification.finalize.disconect(%s)", channelId) close(finalizeMain) return } } return }