package common import ( "fmt" "log" "time" // api "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api" // "git.eugeniocarvalho.dev/eugeniucarvalho/apicodegen/api/errs" // "github.com/davecgh/go-spew/spew" "github.com/gorilla/websocket" // bson "go.mongodb.org/mongo-driver/bson" // "go.mongodb.org/mongo-driver/bson/primitive" ) var ( visits uint64 newline = []byte{'\n'} space = []byte{' '} ) type Event struct { Kind string `json:"kind"` ThreadID string `json:"thread,omitempty"` SectionID string `json:"section,omitempty"` UserId string `json:"-"` Payload string `json:"payload,omitempty"` Entity interface{} `json:"-"` Date time.Time `json:"date"` } const ( // Time allowed to write a message to the peer. writeWait = 10 * time.Second // Time allowed to read the next pong message from the peer. pongWait = 60 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 // Maximum message size allowed from peer. maxMessageSize = 512 ) type WsClient struct { Id string Hub *Hub // The websocket connection. Conn *websocket.Conn // Buffered channel of outbound messages. // Send chan []byte Send chan *Event } // Hub maintains the set of active Clients and Broadcasts messages to the // Clients. type Hub struct { // Registered Clients. Clients map[string]*WsClient // Inbound messages from the Clients. Broadcast chan *Event Inbound chan *Event EventHandlers map[string]func(*Hub, *Event) // Register requests from the WsClient. Register chan *WsClient // Unregister requests from WsClient. Unregister chan *WsClient } func NewHub() *Hub { return &Hub{ Broadcast: make(chan *Event), Inbound: make(chan *Event), Register: make(chan *WsClient), Unregister: make(chan *WsClient), Clients: make(map[string]*WsClient), EventHandlers: map[string]func(*Hub, *Event){ }, } } // func (control *ThreadCtrl) RegisterClient(client *WsClient) { // for _, member := range *(control.Thread.Members) { // if member.Id == client.Id { // control.Clients[client.Id] = client // break // } // } // } // func tsyncHandler(hub *Hub, event *Event) { // // ctrl, err := hub.GetThread(event.ThreadID) // // if err != nil { // // fmt.Println(err) // // return // // } // // for _, cli := range ctrl.Clients { // // if cli.Id != event.UserId { // // cli.Send <- event // // } // // } // } func (hub *Hub) Run() { for { select { case client := <-hub.Register: // Se o cliente possui uma conexão. Encerra e seta o novo cliente // if c, found := hub.Clients[client.Id]; found { // hub.Unregister <- c // } // fmt.Println("register client ", client) hub.Clients[client.Id] = client case client := <-hub.Unregister: if _, ok := hub.Clients[client.Id]; ok { delete(hub.Clients, client.Id) close(client.Send) } case event := <-hub.Inbound: // fmt.Println("runing dispatch ", event) hub.dispatch(event) case event := <-hub.Broadcast: // spew.Dump(hub.Clients) for ID, client := range hub.Clients { // fmt.Println("broadcast to client") select { case client.Send <- event: default: delete(hub.Clients, ID) close(client.Send) } } } } } func (h *Hub) dispatch(event *Event) { fmt.Println("dispatch", event) if fn, found := h.EventHandlers[event.Kind]; found { fn(h, event) } else { //TODO register erro } } func (c *WsClient) Handle() { go c.WriteHandler() go c.ReadHandler() } func (c *WsClient) ReadHandler() { var ( // payload []byte err error event *Event hub = c.Hub ) defer func() { // fmt.Println("FINALIZANDO ReadHandler()") if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("error: %v", err) } } hub.Unregister <- c c.Conn.Close() }() // c.Conn.SetReadLimit(maxMessageSize) // c.Conn.SetReadDeadline(time.Now().Add(pongWait)) // c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) for { event = &Event{} if err = c.Conn.ReadJSON(event); err != nil { break } fmt.Println("read payload") // spew.Dump(event) event.UserId = c.Id hub.Inbound <- event } } func (c *WsClient) WriteHandler() { var ( // ticker = time.NewTicker(pingPeriod) // message []byte err error ) defer func() { if err != nil { c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) } // fmt.Println("FINALIZANDO WriteHandler()") // ticker.Stop() c.Conn.Close() }() for { select { case event, ok := <-c.Send: // spew.Dump(event) fmt.Println("write message", !ok) // c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { err = fmt.Errorf("0000000000") return } if err = c.Conn.WriteJSON(event); err != nil { return } // case <-ticker.C: // // c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) // if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { // return // } // } } } }