123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- package common
- import (
- "fmt"
- "log"
- "time"
- // api "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api"
- // "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs"
- // "github.com/davecgh/go-spew/spew"
- "github.com/gorilla/websocket"
- // bson "go.mongodb.org/mongo-driver/bson"
- // "go.mongodb.org/mongo-driver/bson/primitive"
- )
- var (
- visits uint64
- newline = []byte{'\n'}
- space = []byte{' '}
- )
- type Event struct {
- Kind string `json:"kind"`
- ThreadID string `json:"thread,omitempty"`
- SectionID string `json:"section,omitempty"`
- UserId string `json:"-"`
- Payload string `json:"payload,omitempty"`
- Entity interface{} `json:"-"`
- Date time.Time `json:"date"`
- }
- const (
- // Time allowed to write a message to the peer.
- writeWait = 10 * time.Second
- // Time allowed to read the next pong message from the peer.
- pongWait = 60 * time.Second
- // Send pings to peer with this period. Must be less than pongWait.
- pingPeriod = (pongWait * 9) / 10
- // Maximum message size allowed from peer.
- maxMessageSize = 512
- )
- type WsClient struct {
- Id string
- Hub *Hub
- // The websocket connection.
- Conn *websocket.Conn
- // Buffered channel of outbound messages.
- // Send chan []byte
- Send chan *Event
- }
- // Hub maintains the set of active Clients and Broadcasts messages to the
- // Clients.
- type Hub struct {
- // Registered Clients.
- Clients map[string]*WsClient
- // Inbound messages from the Clients.
- Broadcast chan *Event
- Inbound chan *Event
- EventHandlers map[string]func(*Hub, *Event)
- // Register requests from the WsClient.
- Register chan *WsClient
- // Unregister requests from WsClient.
- Unregister chan *WsClient
- }
- func NewHub() *Hub {
- return &Hub{
- Broadcast: make(chan *Event),
- Inbound: make(chan *Event),
- Register: make(chan *WsClient),
- Unregister: make(chan *WsClient),
- Clients: make(map[string]*WsClient),
- EventHandlers: map[string]func(*Hub, *Event){
- },
- }
- }
- // func (control *ThreadCtrl) RegisterClient(client *WsClient) {
- // for _, member := range *(control.Thread.Members) {
- // if member.Id == client.Id {
- // control.Clients[client.Id] = client
- // break
- // }
- // }
- // }
- // func tsyncHandler(hub *Hub, event *Event) {
- // // ctrl, err := hub.GetThread(event.ThreadID)
- // // if err != nil {
- // // fmt.Println(err)
- // // return
- // // }
- // // for _, cli := range ctrl.Clients {
- // // if cli.Id != event.UserId {
- // // cli.Send <- event
- // // }
- // // }
- // }
- func (hub *Hub) Run() {
- for {
- select {
- case client := <-hub.Register:
- // Se o cliente possui uma conexão. Encerra e seta o novo cliente
- // if c, found := hub.Clients[client.Id]; found {
- // hub.Unregister <- c
- // }
- // fmt.Println("register client ", client)
- hub.Clients[client.Id] = client
- case client := <-hub.Unregister:
- if _, ok := hub.Clients[client.Id]; ok {
- delete(hub.Clients, client.Id)
- close(client.Send)
- }
- case event := <-hub.Inbound:
- // fmt.Println("runing dispatch ", event)
- hub.dispatch(event)
- case event := <-hub.Broadcast:
- // spew.Dump(hub.Clients)
- for ID, client := range hub.Clients {
- // fmt.Println("broadcast to client")
- select {
- case client.Send <- event:
- default:
- delete(hub.Clients, ID)
- close(client.Send)
- }
- }
- }
- }
- }
- func (h *Hub) dispatch(event *Event) {
- fmt.Println("dispatch", event)
- if fn, found := h.EventHandlers[event.Kind]; found {
- fn(h, event)
- } else {
- //TODO register erro
- }
- }
- func (c *WsClient) Handle() {
- go c.WriteHandler()
- go c.ReadHandler()
- }
- func (c *WsClient) ReadHandler() {
- var (
- // payload []byte
- err error
- event *Event
- hub = c.Hub
- )
- defer func() {
- // fmt.Println("FINALIZANDO ReadHandler()")
- if err != nil {
- if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
- log.Printf("error: %v", err)
- }
- }
- hub.Unregister <- c
- c.Conn.Close()
- }()
- // c.Conn.SetReadLimit(maxMessageSize)
- // c.Conn.SetReadDeadline(time.Now().Add(pongWait))
- // c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
- for {
- event = &Event{}
- if err = c.Conn.ReadJSON(event); err != nil {
- break
- }
- fmt.Println("read payload")
- // spew.Dump(event)
- event.UserId = c.Id
- hub.Inbound <- event
- }
- }
- func (c *WsClient) WriteHandler() {
- var (
- // ticker = time.NewTicker(pingPeriod)
- // message []byte
- err error
- )
- defer func() {
- if err != nil {
- c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
- }
- // fmt.Println("FINALIZANDO WriteHandler()")
- // ticker.Stop()
- c.Conn.Close()
- }()
- for {
- select {
- case event, ok := <-c.Send:
- // spew.Dump(event)
- fmt.Println("write message", !ok)
- // c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
- if !ok {
- err = fmt.Errorf("0000000000")
- return
- }
- if err = c.Conn.WriteJSON(event); err != nil {
- return
- }
- // case <-ticker.C:
- // // c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
- // if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
- // return
- // }
- // }
- }
- }
- }
|