chan_server.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package msq
  2. import "errors"
  3. import "fmt"
  4. import "log"
  5. type Sender interface {
  6. Send(interface{}) error
  7. Close() error
  8. }
  9. //具体的发送策略让客户端来做
  10. type ChanServer struct {
  11. private map[int64]Sender
  12. public map[int64]Sender
  13. }
  14. func NewChanServer() *ChanServer {
  15. server := &ChanServer{}
  16. server.private = make(map[int64]Sender)
  17. server.public = make(map[int64]Sender)
  18. return server
  19. }
  20. func (server *ChanServer) Bind(ty int, id int64, sender Sender) {
  21. if ty == Private {
  22. server.private[id] = sender
  23. }
  24. if ty == Public {
  25. server.public[id] = sender
  26. }
  27. }
  28. func (server *ChanServer) UnBind(ty int, id int64) {
  29. if ty == Private {
  30. delete(server.private, id)
  31. }
  32. if ty == Public {
  33. delete(server.public, id)
  34. }
  35. }
  36. func (server *ChanServer) Send(ty int, id int64, data interface{}) error {
  37. if ty == Private {
  38. return server.sendOne(server.private, ty, id, data)
  39. } else {
  40. return server.sendOne(server.public, ty, id, data)
  41. }
  42. return nil
  43. }
  44. func (server *ChanServer) sendOne(senders map[int64]Sender, ty int, id int64, data interface{}) error {
  45. if id == 0 {
  46. for newid, v := range senders {
  47. err := v.Send(data)
  48. if err != nil {
  49. server.UnBind(ty, newid)
  50. v.Close()
  51. }
  52. }
  53. } else {
  54. if sender, ok := senders[id]; ok {
  55. err := sender.Send(data)
  56. if err != nil {
  57. server.UnBind(ty, id)
  58. sender.Close()
  59. }
  60. } else {
  61. err := errors.New("sendOne error, id not exist" + fmt.Sprint(id))
  62. log.Println(err)
  63. return err
  64. }
  65. }
  66. return nil
  67. }
  68. //默认向所有的public的客户端发送
  69. func (server *ChanServer) SendPublic(msg *Message) error {
  70. return server.Send(Public, 0, msg)
  71. }
  72. func (server *ChanServer) SendPrivate(msg *Message) error {
  73. return server.Send(Private, msg.ClientId, msg)
  74. }