server.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. package msq
  2. //管理整体的一个结构
  3. //接受事件,更新
  4. //接受事件,fetch
  5. import "errors"
  6. import "log"
  7. import "crypto/rand"
  8. import "encoding/hex"
  9. import "sync"
  10. func getId() string {
  11. var b [5]byte
  12. rand.Read(b[:])
  13. return hex.EncodeToString(b[:])
  14. }
  15. var ErrNoAction = errors.New("ErrNoAction")
  16. var ErrChainFull = errors.New("ErrChainFull")
  17. var ErrOutOfIndex = errors.New("ErrOutOfIndex")
  18. var ErrMultiMsgError = errors.New("ErrMultiMsgError")
  19. //处理基本的消息传递处理问题,具体逻辑放到之类里面去
  20. //有一块公共的内存,提供访问就可以了。
  21. type MsqServer struct {
  22. clients map[int64]*MsqClient
  23. chrecvg chan *Message
  24. server *ChanServer
  25. cachemsgs []*Message
  26. msgAction map[int]func(MsqServer *MsqServer, msg *Message)
  27. pendding map[string]*Message
  28. mu sync.Mutex
  29. }
  30. func NewMsqServer() *MsqServer {
  31. server := &MsqServer{}
  32. server.clients = make(map[int64]*MsqClient)
  33. server.server = NewChanServer()
  34. server.chrecvg = make(chan *Message, 102400)
  35. server.msgAction = make(map[int]func(MsqServer *MsqServer, msg *Message))
  36. //注册事件回调函数
  37. server.bindPrivateAction()
  38. server.bindPublicAction()
  39. server.echoAction()
  40. server.closeAction()
  41. return server
  42. }
  43. //为了方便用户通过clientId 查询client ,这里加了一个lock
  44. func (server *MsqServer) Bind(ty int, id int64, client *MsqClient) {
  45. server.mu.Lock()
  46. defer server.mu.Unlock()
  47. server.clients[id] = client
  48. server.server.Bind(ty, id, client)
  49. }
  50. var ErrClientNotFound = errors.New("ErrClientNotFound")
  51. //可以被多线程查询
  52. func (server *MsqServer) GetClient(clientId int64) (*MsqClient, error) {
  53. server.mu.Lock()
  54. defer server.mu.Unlock()
  55. if client, ok := server.clients[clientId]; ok {
  56. return client, nil
  57. }
  58. return nil, ErrClientNotFound
  59. }
  60. //这个函数会产生等待,所以一般不会使用,MsqServer 要使用异步版本
  61. func (server *MsqServer) SendMessage(clientId int64, msgType int, data interface{}) *Message {
  62. //准备
  63. msg := NewMessage(msgType, clientId, data, 0)
  64. msg.Ch = make(chan *Message, 1)
  65. server.Send(msg)
  66. //接受
  67. recvmsg := <-msg.Ch
  68. return recvmsg
  69. }
  70. func (server *MsqServer) SendMessageAsyn(clientId int64, msgType int, data interface{}) *Message {
  71. msg := NewMessage(msgType, clientId, data, 0)
  72. msg.Flag |= SendAsyn
  73. //发送
  74. server.Send(msg)
  75. //接受
  76. return msg
  77. }
  78. func (server *MsqServer) bindPrivateAction() {
  79. server.RegisterAction(MsgMsqBindPrivate, func(MsqServer *MsqServer, msg *Message) {
  80. msg2 := server.ProcessMultiStart(msg)
  81. data := msg2.GetData().(*MsqClient)
  82. server.Bind(Private, msg2.ClientId, data)
  83. server.ProcessMultiEnd(msg, msg2)
  84. })
  85. }
  86. func (server *MsqServer) bindPublicAction() {
  87. server.RegisterAction(MsgMsqBindPublic, func(MsqServer *MsqServer, msg *Message) {
  88. msg2 := server.ProcessMultiStart(msg)
  89. data := msg2.GetData().(*MsqClient)
  90. server.Bind(Public, msg2.ClientId, data)
  91. server.ProcessMultiEnd(msg, msg2)
  92. })
  93. }
  94. func (server *MsqServer) echoAction() {
  95. server.RegisterAction(MsgEcho, func(MsqServer *MsqServer, msg *Message) {
  96. msg2 := server.ProcessMultiStart(msg)
  97. server.ProcessMultiEnd(msg, msg2)
  98. })
  99. }
  100. func (server *MsqServer) closeAction() {
  101. server.RegisterAction(MsgClose, func(mtf *MsqServer, msg *Message) {
  102. msg2 := server.ProcessMultiStart(msg)
  103. server.server.UnBind(Private, msg.ClientId)
  104. server.server.UnBind(Public, msg.ClientId)
  105. server.ProcessMultiEnd(msg, msg2)
  106. })
  107. }
  108. func (server *MsqServer) ProcessMultiStart(msg *Message) *Message {
  109. if msg.Type == MsgMulti {
  110. msgs := msg.GetData().([]*Message)
  111. return msgs[msg.GetIndex()]
  112. }
  113. return msg
  114. }
  115. func (server *MsqServer) ProcessMultiPrev(msg *Message) *Message {
  116. if msg.Type == MsgMulti {
  117. msgs := msg.GetData().([]*Message)
  118. if msg.GetIndex() == 0 {
  119. return nil
  120. }
  121. return msgs[msg.GetIndex()-1]
  122. }
  123. return msg
  124. }
  125. func (server *MsqServer) ProcessMultiNext(msg *Message) *Message {
  126. if msg.Type == MsgMulti {
  127. msgs := msg.GetData().([]*Message)
  128. return msgs[msg.GetIndex()+1]
  129. }
  130. return msg
  131. }
  132. func (server *MsqServer) ProcessMultiEnd(msg *Message, msg2 *Message) {
  133. if msg.Type == MsgMulti {
  134. msg.IncIndex()
  135. msg.SetErr(msg2.GetErr())
  136. server.sendToCache(msg)
  137. }
  138. }
  139. func (server *MsqServer) sendToCache(msg *Message) {
  140. server.mu.Lock()
  141. defer server.mu.Unlock()
  142. server.cachemsgs = append(server.cachemsgs, msg)
  143. server.SendAsyn(NewMessage(MsgEmpty, 0, nil, 0))
  144. }
  145. func (server *MsqServer) sendtoQueue() {
  146. server.mu.Lock()
  147. defer server.mu.Unlock()
  148. for i := 0; i < len(server.cachemsgs); i++ {
  149. err := server.SendAsyn(server.cachemsgs[i])
  150. if err != nil {
  151. if i > 0 {
  152. server.cachemsgs = server.cachemsgs[i:]
  153. }
  154. return
  155. }
  156. }
  157. server.cachemsgs = nil
  158. }
  159. func replyMessage(msg *Message) {
  160. select {
  161. case msg.Ch <- msg:
  162. //tracelog.Println("reply", msg.Id)
  163. default:
  164. log.Println("reply msg error", msg)
  165. }
  166. }
  167. func (server *MsqServer) processMsg(msg *Message) {
  168. //关闭MsqServer
  169. if msg.Type == MsgMulti {
  170. msgs, ok := msg.GetData().([]*Message)
  171. if !ok {
  172. msg.SetErr(ErrMultiMsgError)
  173. replyMessage(msg) //向客户端回复
  174. log.Println("[MsqServer_ACTION_ERROR]", msg)
  175. return
  176. }
  177. if len(msgs) == int(msg.GetIndex()) || msg.GetErr() != nil {
  178. replyMessage(msg)
  179. return
  180. }
  181. if len(msgs) < int(msg.GetIndex()) {
  182. msg.SetErr(ErrOutOfIndex)
  183. replyMessage(msg) //向客户端回复
  184. log.Println("[MsqServer_ACTION_ERROR]", msg)
  185. panic("ErrOutOfIndex") //属于系统异常
  186. return
  187. }
  188. msg2 := msgs[msg.GetIndex()]
  189. action := server.msgAction[msg2.Type]
  190. if action == nil {
  191. msg.SetErr(ErrNoAction)
  192. replyMessage(msg) //向客户端回复
  193. log.Println("[MsqServer_ACTION_ERROR]", msg)
  194. return
  195. }
  196. action(server, msg)
  197. if msg.GetErr() != nil {
  198. printmsg := *msg
  199. printmsg.SetData(nil)
  200. log.Println(msg.GetErr(), printmsg)
  201. replyMessage(msg)
  202. return
  203. }
  204. return
  205. }
  206. action := server.msgAction[msg.Type]
  207. if action == nil {
  208. msg.SetErr(ErrNoAction)
  209. replyMessage(msg) //向客户端回复
  210. log.Println("[MsqServer_ACTION_ERROR]", msg)
  211. return
  212. }
  213. action(server, msg)
  214. if msg.GetErr() != nil {
  215. printmsg := *msg
  216. printmsg.SetData(nil)
  217. log.Println(msg.GetErr(), printmsg)
  218. }
  219. replyMessage(msg) //向客户端回复
  220. return
  221. }
  222. //Start 中的所有函数必须是非阻塞的
  223. func (server *MsqServer) Start() {
  224. //log.Println("server is ready")
  225. for {
  226. msg, ok := <-server.chrecvg
  227. if !ok || msg == nil {
  228. break
  229. }
  230. if msg.Type == MsgShutDown {
  231. server.Close()
  232. replyMessage(msg)
  233. break
  234. }
  235. server.sendtoQueue()
  236. if msg.Type == MsgEmpty {
  237. continue
  238. }
  239. server.processMsg(msg)
  240. }
  241. //log.Println("MsqServer is closed")
  242. }
  243. func (server *MsqServer) ProcessOne() error {
  244. for {
  245. select {
  246. case msg, ok := <-server.chrecvg:
  247. if !ok || msg == nil {
  248. return errors.New("ServerClosed")
  249. }
  250. if msg.Type == MsgShutDown {
  251. server.Close()
  252. replyMessage(msg)
  253. return errors.New("ServerClosed")
  254. }
  255. server.sendtoQueue()
  256. if msg.Type == MsgEmpty {
  257. continue
  258. }
  259. server.processMsg(msg)
  260. default:
  261. return nil
  262. }
  263. }
  264. }
  265. func (server *MsqServer) Close() error {
  266. server.chrecvg <- nil
  267. close(server.chrecvg)
  268. return nil
  269. }
  270. func (server *MsqServer) SendAsyn(msg *Message) error {
  271. if debug {
  272. msg.SendTime = getTime()
  273. }
  274. select {
  275. case server.chrecvg <- msg:
  276. default:
  277. return ErrChainFull
  278. }
  279. return nil
  280. }
  281. func (server *MsqServer) Send(msg *Message) {
  282. if debug {
  283. msg.SendTime = getTime()
  284. }
  285. server.chrecvg <- msg
  286. }
  287. func (server *MsqServer) RegisterAction(ty int, cb func(MsqServer *MsqServer, msg *Message)) {
  288. server.msgAction[ty] = cb
  289. }