123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- package sse
- import (
- "encoding/json"
- "fmt"
- "net/http"
- "time"
- "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs"
- "github.com/gomodule/redigo/redis"
- context "github.com/kataras/iris/v12/context"
- )
- type Event struct {
- Timestamp int64 `json:"timestamp"`
- Payload string `json:"payload"`
- Kind string `json:"kind"`
- }
- type ChannelClient struct {
- // Context context.Context
- Flusher http.Flusher
- Response http.ResponseWriter
- Request *http.Request
- }
- type Channel struct {
- ID string
- Clients map[*ChannelClient]bool
- }
- func (channel *Channel) RemoveClient(client *ChannelClient) {
- delete(channel.Clients, client)
- }
- func (channel *Channel) AddClient(client *ChannelClient) {
- channel.Clients[client] = true
- }
- func (channel *Channel) Emit(event *Event) {
- var (
- err error
- )
- for client := range channel.Clients {
- if event.Kind != "" {
- if _, err = fmt.Fprintf(client.Response, "event: %s\n", event.Kind); err != nil {
- fmt.Println(fmt.Sprintf("%+v", err))
- continue
- }
- }
- if _, err = fmt.Fprintf(client.Response, "data: %s\n\n", event.Payload); err != nil {
- fmt.Println(fmt.Sprintf("%+v", err))
- continue
- }
- client.Flusher.Flush()
- }
- }
- type SSEHub struct {
- // New client connections
- newClients chan chan []byte
- // Closed client connections
- closingClients chan chan []byte
- consume chan bool
- // Client connections registry
- Channels map[string]*Channel
- ChannelCollection string
- RedisPool *redis.Pool
- }
- type SSEOptions struct {
- URI string
- Password string
- ChannelCollection string
- }
- func NewSSEHub(options *SSEOptions) *SSEHub {
- return &SSEHub{
- newClients: make(chan chan []byte),
- closingClients: make(chan chan []byte),
- consume: make(chan bool),
- Channels: make(map[string]*Channel),
- ChannelCollection: options.ChannelCollection,
- RedisPool: &redis.Pool{
- // Maximum number of idle connections in the pool.
- MaxIdle: 80,
- // max number of connections
- MaxActive: 12000,
- // Dial is an application supplied function for creating and
- // configuring a connection.
- Dial: func() (redis.Conn, error) {
- conn, err := redis.Dial("tcp", options.URI)
- if err != nil {
- panic(err.Error())
- }
- if options.Password != "" {
- if _, err := conn.Do("AUTH", options.Password); err != nil {
- conn.Close()
- return nil, err
- }
- }
- return conn, err
- },
- },
- }
- }
- func (hub *SSEHub) GetChannel(channelID string) *Channel {
- if _, exist := hub.Channels[channelID]; !exist {
- channel := &Channel{
- Clients: make(map[*ChannelClient]bool),
- ID: channelID,
- }
- hub.Channels[channelID] = channel
- }
- return hub.Channels[channelID]
- }
- func (hub *SSEHub) Dispatch(event *Event, channels ...string) {
- var (
- exists bool
- err error
- conn = hub.RedisPool.Get()
- )
- defer conn.Close()
-
- eventBytes, _ := json.Marshal(event)
- for _, channel := range channels {
- if exists, err = redis.Bool(conn.Do("HEXISTS", hub.ChannelCollection, channel)); err != nil || !exists {
- continue
- }
- conn.Do("RPUSH", channel, string(eventBytes))
- }
-
- hub.consume <- true
- }
- func (hub *SSEHub) Close(channels ...string) {
- var (
- exists bool
- count int
- err error
- conn = hub.RedisPool.Get()
- )
- for _, channel := range channels {
- if exists, err = redis.Bool(conn.Do("HEXISTS", hub.ChannelCollection, channel)); err != nil || !exists {
- continue
- }
- go func(channelID string) {
- for {
- if count, err = redis.Int(conn.Do("LLEN", channelID)); count > 0 {
- continue
- }
- conn.Do("HDEL", hub.ChannelCollection, channelID)
- }
- }(channel)
- }
- }
- func (hub *SSEHub) UpgradeConnection(ctx context.Context, channelId string) (err *errs.Error) {
- var (
- conn = hub.RedisPool.Get()
- payload string
- count int
- ok bool
- consuming bool
- redisErr error
- flusher http.Flusher
- response = ctx.ResponseWriter().Naive()
- request = ctx.Request()
- )
- defer conn.Close()
- if flusher, ok = response.(http.Flusher); !ok {
- err = errs.HTTPVersionNotSupported().Details(&errs.Detail{
- Dominio: "",
- Reason: "Streaming unsupported",
- Location: "hook.beforePersist",
- LocationType: "",
- })
- return
- }
- if _, redisErr = redis.Int(conn.Do("HSET", hub.ChannelCollection, channelId, true)); redisErr != nil {
- err = errs.Internal().Details(&errs.Detail{
- Dominio: "",
- Reason: "Fail on register channel",
- Location: "hook.beforePersist",
- LocationType: "",
- })
- return
- }
- header := response.Header()
- header.Set("Access-Control-Allow-Origin", "*")
- header.Set("Content-Type", "text/event-stream")
- header.Set("Cache-Control", "no-cache")
- header.Set("Connection", "keep-alive")
- flusher.Flush()
- client := &ChannelClient{
- Flusher: flusher,
- Response: response,
- Request: request,
- }
- channel := hub.GetChannel(channelId)
- channel.AddClient(client)
- finalizeMain := make(chan bool)
- // finalizePing := make(chan bool)
- finalized := false
- // Listen to connection close and when the entire request handler chain exits(this handler here) and un-register messageChan.
- notify := response.(http.CloseNotifier).CloseNotify()
- go func() {
- <-notify
- fmt.Println("SSE request end")
- defer func() {
- // recover from panic caused by writing to a closed channel
- recover()
- return
- }()
- if finalized {
- return
- }
- finalized = true
- // ctx.Application().Logger().Infof("notification.channel.disconect(%s)", channelId)
- // Remove this client from the map of connected clients
- // when this handler exits.
- redis.Int(conn.Do("HDEL", hub.ChannelCollection, channelId))
- // closesd = true
- finalizeMain <- finalized
- // finalizePing <- finalized
- channel.RemoveClient(client)
- }()
- ticker5Second := time.NewTicker(5 * time.Second)
- defer ticker5Second.Stop()
- flusher.Flush()
-
- reset:
- for {
- select {
- case <-ticker5Second.C:
- go func () {
- if !consuming {
- hub.consume <- true
- }
- }()
- case <-hub.consume:
- // ctx.Application().Logger().Warnf("init.notification.loop 5s")
- if count, redisErr = redis.Int(conn.Do("LLEN", channelId)); count == 0 || redisErr != nil {
- continue reset
- }
-
- consuming = true
- for ; count > 0; count-- {
- if payload, redisErr = redis.String(conn.Do("LPOP", channelId)); redisErr != nil {
- ctx.Application().Logger().Errorf("NotificationError:", redisErr)
- } else {
- event := &Event{}
- json.Unmarshal([]byte(payload), event)
- channel.Emit(event)
- break
- }
- }
- consuming = false
- case <-finalizeMain:
-
- for {
- if !consuming {
- break
- }
- time.Sleep(time.Second * 2)
- }
- // ctx.Application().Logger().Infof("notification.finalize.disconect(%s)", channelId)
- close(finalizeMain)
- return
- }
- }
- }
- // ctx.OnClose(func() {
- // defer func() {
- // // recover from panic caused by writing to a closed channel
- // recover()
- // return
- // }()
- // if finalized {
- // return
- // }
- // finalized = true
- // // ctx.Application().Logger().Infof("notification.channel.disconect(%s)", channelId)
- // // Remove this client from the map of connected clients
- // // when this handler exits.
- // redis.Int(conn.Do("HDEL", hub.ChannelCollection, channelId))
- // // closesd = true
- // finalizeMain <- finalized
- // finalizePing <- finalized
- // channel.RemoveClient(client)
- // })
- // Set the headers related to event streaming, you can omit the "application/json" if you send plain text.
- // If you develop a go client, you must have: "Accept" : "application/json, text/event-stream" header as well.
- // header := ctx.ResponseWriter().Header()
- // header.Set("Access-Control-Allow-Origin", "*")
- // header.Set("Content-Type", "text/event-stream")
- // header.Set("Cache-Control", "no-cache")
- // header.Set("Connection", "keep-alive")
- // ctx.ContentType("text/event-stream")
- // ctx.Header("Cache-Control", "no-cache")
- // ctx.Header("Connection", "keep-alive")
- // ctx.Request().Response.Header.Set("Access-Control-Allow-Origin", "*")
- // Block waiting for messages broadcast on this connection's messageChan.
- // check notification in redis
- // go func() {
- // ctx.Application().Logger().Warnf("init.notification.loop %v", closesd)
|