|
@@ -67,6 +67,7 @@ type SSEHub struct {
|
|
|
// Closed client connections
|
|
|
closingClients chan chan []byte
|
|
|
|
|
|
+ consume chan bool
|
|
|
// Client connections registry
|
|
|
Channels map[string]*Channel
|
|
|
|
|
@@ -86,6 +87,7 @@ func NewSSEHub(options *SSEOptions) *SSEHub {
|
|
|
return &SSEHub{
|
|
|
newClients: make(chan chan []byte),
|
|
|
closingClients: make(chan chan []byte),
|
|
|
+ consume: make(chan bool),
|
|
|
Channels: make(map[string]*Channel),
|
|
|
ChannelCollection: options.ChannelCollection,
|
|
|
RedisPool: &redis.Pool{
|
|
@@ -146,6 +148,7 @@ func (hub *SSEHub) Dispatch(event *Event, channels ...string) {
|
|
|
|
|
|
conn.Do("RPUSH", channel, string(eventBytes))
|
|
|
}
|
|
|
+ hub.consume <- true
|
|
|
}
|
|
|
|
|
|
func (hub *SSEHub) UpgradeConnection(ctx context.Context, channelId string) (err *errs.Error) {
|
|
@@ -155,6 +158,7 @@ func (hub *SSEHub) UpgradeConnection(ctx context.Context, channelId string) (err
|
|
|
payload string
|
|
|
count int
|
|
|
ok bool
|
|
|
+ consuming bool
|
|
|
redisErr error
|
|
|
flusher http.Flusher
|
|
|
response = ctx.ResponseWriter().Naive()
|
|
@@ -232,38 +236,26 @@ func (hub *SSEHub) UpgradeConnection(ctx context.Context, channelId string) (err
|
|
|
channel.RemoveClient(client)
|
|
|
}()
|
|
|
|
|
|
- // Essa thread dispara um ping para manter a conexão ativa com o client
|
|
|
- // go func() {
|
|
|
- // ticker := time.NewTicker(3 * time.Second)
|
|
|
- // defer ticker.Stop()
|
|
|
-
|
|
|
- // for {
|
|
|
- // select {
|
|
|
- // case <-ticker.C:
|
|
|
- // // ctx.Application().Logger().Warnf("init.notification.loop 2s")
|
|
|
- // // flusher.Flush()
|
|
|
-
|
|
|
- // case <-finalizePing:
|
|
|
- // // ctx.Application().Logger().Warnf("finalize init.notification.loop 2s")
|
|
|
- // close(finalizePing)
|
|
|
- // return
|
|
|
- // }
|
|
|
- // }
|
|
|
- // }()
|
|
|
-
|
|
|
ticker5Second := time.NewTicker(5 * time.Second)
|
|
|
defer ticker5Second.Stop()
|
|
|
|
|
|
flusher.Flush()
|
|
|
-
|
|
|
+
|
|
|
reset:
|
|
|
for {
|
|
|
select {
|
|
|
case <-ticker5Second.C:
|
|
|
- // ctx.Application().Logger().Warnf("init.notification.loop 5s")
|
|
|
+ if !consuming {
|
|
|
+ hub.consume <- true
|
|
|
+ }
|
|
|
+
|
|
|
+ case <-hub.consume:
|
|
|
+ ctx.Application().Logger().Warnf("init.notification.loop 5s")
|
|
|
if count, redisErr = redis.Int(conn.Do("LLEN", channelId)); count == 0 || redisErr != nil {
|
|
|
continue reset
|
|
|
}
|
|
|
+
|
|
|
+ consuming = true
|
|
|
|
|
|
for ; count > 0; count-- {
|
|
|
|
|
@@ -277,13 +269,21 @@ reset:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ consuming = false
|
|
|
+
|
|
|
case <-finalizeMain:
|
|
|
- ctx.Application().Logger().Infof("notification.finalize.disconect(%s)", channelId)
|
|
|
+
|
|
|
+ for {
|
|
|
+ if !consuming {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ time.Sleep(time.Second * 2)
|
|
|
+ }
|
|
|
+ // ctx.Application().Logger().Infof("notification.finalize.disconect(%s)", channelId)
|
|
|
close(finalizeMain)
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
- return
|
|
|
}
|
|
|
|
|
|
// ctx.OnClose(func() {
|