123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- package sse
- import (
- "encoding/json"
- "fmt"
- "net/http"
- "time"
- "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs"
- "github.com/gomodule/redigo/redis"
- context "github.com/kataras/iris/v12/context"
- )
- type Event struct {
- Timestamp int64 `json:"timestamp"`
- Payload string `json:"payload"`
- Kind string `json:"kind"`
- }
- type ChannelClient struct {
- Context context.Context
- Flusher http.Flusher
- }
- // type *event
- type Channel struct {
- ID string
- Clients map[*ChannelClient]bool
- }
- func (channel *Channel) RemoveClient(client *ChannelClient) {
- delete(channel.Clients, client)
- }
- func (channel *Channel) AddClient(client *ChannelClient) {
- channel.Clients[client] = true
- }
- func (channel *Channel) Emit(event *Event) {
- for client := range channel.Clients {
- if event.Kind != "" {
- client.Context.Writef("event: %s\n", event.Kind)
- }
- client.Context.Writef("data: %s\n\n", event.Payload)
- client.Flusher.Flush()
- }
- }
- type SSEHub struct {
- // New client connections
- newClients chan chan []byte
- // Closed client connections
- closingClients chan chan []byte
- // Client connections registry
- Channels map[string]*Channel
- ChannelCollection string
- RedisPool *redis.Pool
- }
- type SSEOptions struct {
- URI string
- Password string
- ChannelCollection string
- }
- func NewSSEHub(options *SSEOptions) *SSEHub {
- return &SSEHub{
- newClients: make(chan chan []byte),
- closingClients: make(chan chan []byte),
- Channels: make(map[string]*Channel),
- ChannelCollection: options.ChannelCollection,
- RedisPool: &redis.Pool{
- // Maximum number of idle connections in the pool.
- MaxIdle: 80,
- // max number of connections
- MaxActive: 12000,
- // Dial is an application supplied function for creating and
- // configuring a connection.
- Dial: func() (redis.Conn, error) {
- conn, err := redis.Dial("tcp", options.URI)
- if err != nil {
- panic(err.Error())
- }
- if options.Password != "" {
- if _, err := conn.Do("AUTH", options.Password); err != nil {
- conn.Close()
- return nil, err
- }
- }
- return conn, err
- },
- },
- }
- }
- func (hub *SSEHub) GetChannel(channelID string) *Channel {
- if _, exist := hub.Channels[channelID]; !exist {
- channel := &Channel{
- Clients: make(map[*ChannelClient]bool),
- ID: channelID,
- }
- hub.Channels[channelID] = channel
- }
- return hub.Channels[channelID]
- }
- func (hub *SSEHub) Dispatch(event *Event, channels ...string) {
- var (
- exists bool
- err error
- conn = hub.RedisPool.Get()
- )
- defer conn.Close()
- for _, channel := range channels {
- if exists, err = redis.Bool(conn.Do("HEXISTS", hub.ChannelCollection, channel)); err != nil || !exists {
- continue
- }
- eventBytes, _ := json.Marshal(event)
- conn.Do("RPUSH", channel, string(eventBytes))
- }
- }
- func (hub *SSEHub) UpgradeConnection(ctx context.Context, channelId string) (err *errs.Error) {
- var (
- conn = hub.RedisPool.Get()
- payload string
- count int
- // ok, closed bool
- ok bool
- redisErr error
- flusher http.Flusher
- )
- defer conn.Close()
- if flusher, ok = ctx.ResponseWriter().Flusher(); !ok {
- err = errs.HTTPVersionNotSupported().Details(&errs.Detail{
- Dominio: "",
- Reason: "Streaming unsupported",
- Location: "hook.beforePersist",
- LocationType: "",
- })
- return
- }
- // Each connection registers its own message channel with the Broker's connections registry.
- // messageChan := make(chan []byte)
- // Signal the broker that we have a new connection.
- // hub.newClients <- messageChan
- if _, redisErr = redis.Int(conn.Do("HSET", hub.ChannelCollection, channelId, true)); redisErr != nil {
- err = errs.Internal().Details(&errs.Detail{
- Dominio: "",
- Reason: "Fail on register channel",
- Location: "hook.beforePersist",
- LocationType: "",
- })
- return
- }
- client := &ChannelClient{
- Flusher: flusher,
- Context: ctx,
- }
- channel := hub.GetChannel(channelId)
- channel.AddClient(client)
- finalizeMain := make(chan bool)
- finalizePing := make(chan bool)
- finalized := false
- // Listen to connection close and when the entire request handler chain exits(this handler here) and un-register messageChan.
- ctx.OnClose(func() {
- defer func() {
- // recover from panic caused by writing to a closed channel
- recover()
- return
- }()
- if finalized {
- return
- }
- finalized = true
- // ctx.Application().Logger().Infof("notification.channel.disconect(%s)", channelId)
- // Remove this client from the map of connected clients
- // when this handler exits.
- redis.Int(conn.Do("HDEL", hub.ChannelCollection, channelId))
- // closesd = true
- finalizeMain <- finalized
- finalizePing <- finalized
- channel.RemoveClient(client)
- })
- // Set the headers related to event streaming, you can omit the "application/json" if you send plain text.
- // If you develop a go client, you must have: "Accept" : "application/json, text/event-stream" header as well.
- ctx.ContentType("text/event-stream")
- // ctx.Header("Access-Control-Allow-Origin", "*")
- ctx.Header("Cache-Control", "no-cache")
- ctx.Header("Connection", "keep-alive")
- ctx.ResponseWriter().Header().Set("Access-Control-Allow-Origin", "*")
- flusher.Flush()
- // ctx.Request().Response.Header.Set("Access-Control-Allow-Origin", "*")
- // Block waiting for messages broadcast on this connection's messageChan.
- // check notification in redis
- // go func() {
- // ctx.Application().Logger().Warnf("init.notification.loop %v", closesd)
- // Essa thread dispara um ping para manter a conexão ativa com o client
- go func() {
- ticker := time.NewTicker(3 * time.Second)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- // ctx.Application().Logger().Warnf("init.notification.loop 2s")
- flusher.Flush()
- case <-finalizePing:
- // ctx.Application().Logger().Warnf("finalize init.notification.loop 2s")
- close(finalizePing)
- return
- }
- }
- }()
- ticker5Second := time.NewTicker(5 * time.Second)
- defer ticker5Second.Stop()
- reset:
- for {
- select {
- case <-ticker5Second.C:
- ctx.Application().Logger().Warnf("init.notification.loop 5s")
- if count, redisErr = redis.Int(conn.Do("LLEN", channelId)); count == 0 || redisErr != nil {
- continue reset
- }
- for ; count > 0; count-- {
- if payload, redisErr = redis.String(conn.Do("LPOP", channelId)); redisErr != nil {
- ctx.Application().Logger().Errorf("NotificationError:", redisErr)
- } else {
- event := &Event{}
- json.Unmarshal([]byte(payload), event)
- fmt.Println("emit event ", event)
- channel.Emit(event)
- break
- }
- }
- case <-finalizeMain:
- ctx.Application().Logger().Infof("notification.finalize.disconect(%s)", channelId)
- close(finalizeMain)
- return
- }
- }
- return
- }
|