|
- package msq
- //管理整体的一个结构
- //接受事件,更新
- //接受事件,fetch
- import "errors"
- import "log"
- import "crypto/rand"
- import "encoding/hex"
- import "sync"
- func getId() string {
- var b [5]byte
- rand.Read(b[:])
- return hex.EncodeToString(b[:])
- }
- var ErrNoAction = errors.New("ErrNoAction")
- var ErrChainFull = errors.New("ErrChainFull")
- var ErrOutOfIndex = errors.New("ErrOutOfIndex")
- var ErrMultiMsgError = errors.New("ErrMultiMsgError")
- //处理基本的消息传递处理问题,具体逻辑放到之类里面去
- //有一块公共的内存,提供访问就可以了。
- type MsqServer struct {
- clients map[int64]*MsqClient
- chrecvg chan *Message
- server *ChanServer
- cachemsgs []*Message
- msgAction map[int]func(MsqServer *MsqServer, msg *Message)
- pendding map[string]*Message
- mu sync.Mutex
- }
- func NewMsqServer() *MsqServer {
- server := &MsqServer{}
- server.clients = make(map[int64]*MsqClient)
- server.server = NewChanServer()
- server.chrecvg = make(chan *Message, 102400)
- server.msgAction = make(map[int]func(MsqServer *MsqServer, msg *Message))
- //注册事件回调函数
- server.bindPrivateAction()
- server.bindPublicAction()
- server.echoAction()
- server.closeAction()
- return server
- }
- //为了方便用户通过clientId 查询client ,这里加了一个lock
- func (server *MsqServer) Bind(ty int, id int64, client *MsqClient) {
- server.mu.Lock()
- defer server.mu.Unlock()
- server.clients[id] = client
- server.server.Bind(ty, id, client)
- }
- var ErrClientNotFound = errors.New("ErrClientNotFound")
- //可以被多线程查询
- func (server *MsqServer) GetClient(clientId int64) (*MsqClient, error) {
- server.mu.Lock()
- defer server.mu.Unlock()
- if client, ok := server.clients[clientId]; ok {
- return client, nil
- }
- return nil, ErrClientNotFound
- }
- //这个函数会产生等待,所以一般不会使用,MsqServer 要使用异步版本
- func (server *MsqServer) SendMessage(clientId int64, msgType int, data interface{}) *Message {
- //准备
- msg := NewMessage(msgType, clientId, data, 0)
- msg.Ch = make(chan *Message, 1)
- server.Send(msg)
- //接受
- recvmsg := <-msg.Ch
- return recvmsg
- }
- func (server *MsqServer) SendMessageAsyn(clientId int64, msgType int, data interface{}) *Message {
- msg := NewMessage(msgType, clientId, data, 0)
- msg.Flag |= SendAsyn
- //发送
- server.Send(msg)
- //接受
- return msg
- }
- func (server *MsqServer) bindPrivateAction() {
- server.RegisterAction(MsgMsqBindPrivate, func(MsqServer *MsqServer, msg *Message) {
- msg2 := server.ProcessMultiStart(msg)
- data := msg2.GetData().(*MsqClient)
- server.Bind(Private, msg2.ClientId, data)
- server.ProcessMultiEnd(msg, msg2)
- })
- }
- func (server *MsqServer) bindPublicAction() {
- server.RegisterAction(MsgMsqBindPublic, func(MsqServer *MsqServer, msg *Message) {
- msg2 := server.ProcessMultiStart(msg)
- data := msg2.GetData().(*MsqClient)
- server.Bind(Public, msg2.ClientId, data)
- server.ProcessMultiEnd(msg, msg2)
- })
- }
- func (server *MsqServer) echoAction() {
- server.RegisterAction(MsgEcho, func(MsqServer *MsqServer, msg *Message) {
- msg2 := server.ProcessMultiStart(msg)
- server.ProcessMultiEnd(msg, msg2)
- })
- }
- func (server *MsqServer) closeAction() {
- server.RegisterAction(MsgClose, func(mtf *MsqServer, msg *Message) {
- msg2 := server.ProcessMultiStart(msg)
- server.server.UnBind(Private, msg.ClientId)
- server.server.UnBind(Public, msg.ClientId)
- server.ProcessMultiEnd(msg, msg2)
- })
- }
- func (server *MsqServer) ProcessMultiStart(msg *Message) *Message {
- if msg.Type == MsgMulti {
- msgs := msg.GetData().([]*Message)
- return msgs[msg.GetIndex()]
- }
- return msg
- }
- func (server *MsqServer) ProcessMultiPrev(msg *Message) *Message {
- if msg.Type == MsgMulti {
- msgs := msg.GetData().([]*Message)
- if msg.GetIndex() == 0 {
- return nil
- }
- return msgs[msg.GetIndex()-1]
- }
- return msg
- }
- func (server *MsqServer) ProcessMultiNext(msg *Message) *Message {
- if msg.Type == MsgMulti {
- msgs := msg.GetData().([]*Message)
- return msgs[msg.GetIndex()+1]
- }
- return msg
- }
- func (server *MsqServer) ProcessMultiEnd(msg *Message, msg2 *Message) {
- if msg.Type == MsgMulti {
- msg.IncIndex()
- msg.SetErr(msg2.GetErr())
- server.sendToCache(msg)
- }
- }
- func (server *MsqServer) sendToCache(msg *Message) {
- server.mu.Lock()
- defer server.mu.Unlock()
- server.cachemsgs = append(server.cachemsgs, msg)
- server.SendAsyn(NewMessage(MsgEmpty, 0, nil, 0))
- }
- func (server *MsqServer) sendtoQueue() {
- server.mu.Lock()
- defer server.mu.Unlock()
- for i := 0; i < len(server.cachemsgs); i++ {
- err := server.SendAsyn(server.cachemsgs[i])
- if err != nil {
- if i > 0 {
- server.cachemsgs = server.cachemsgs[i:]
- }
- return
- }
- }
- server.cachemsgs = nil
- }
- func replyMessage(msg *Message) {
- select {
- case msg.Ch <- msg:
- //tracelog.Println("reply", msg.Id)
- default:
- log.Println("reply msg error", msg)
- }
- }
- func (server *MsqServer) processMsg(msg *Message) {
- //关闭MsqServer
- if msg.Type == MsgMulti {
- msgs, ok := msg.GetData().([]*Message)
- if !ok {
- msg.SetErr(ErrMultiMsgError)
- replyMessage(msg) //向客户端回复
- log.Println("[MsqServer_ACTION_ERROR]", msg)
- return
- }
- if len(msgs) == int(msg.GetIndex()) || msg.GetErr() != nil {
- replyMessage(msg)
- return
- }
- if len(msgs) < int(msg.GetIndex()) {
- msg.SetErr(ErrOutOfIndex)
- replyMessage(msg) //向客户端回复
- log.Println("[MsqServer_ACTION_ERROR]", msg)
- panic("ErrOutOfIndex") //属于系统异常
- return
- }
- msg2 := msgs[msg.GetIndex()]
- action := server.msgAction[msg2.Type]
- if action == nil {
- msg.SetErr(ErrNoAction)
- replyMessage(msg) //向客户端回复
- log.Println("[MsqServer_ACTION_ERROR]", msg)
- return
- }
- action(server, msg)
- if msg.GetErr() != nil {
- printmsg := *msg
- printmsg.SetData(nil)
- log.Println(msg.GetErr(), printmsg)
- replyMessage(msg)
- return
- }
- return
- }
- action := server.msgAction[msg.Type]
- if action == nil {
- msg.SetErr(ErrNoAction)
- replyMessage(msg) //向客户端回复
- log.Println("[MsqServer_ACTION_ERROR]", msg)
- return
- }
- action(server, msg)
- if msg.GetErr() != nil {
- printmsg := *msg
- printmsg.SetData(nil)
- log.Println(msg.GetErr(), printmsg)
- }
- replyMessage(msg) //向客户端回复
- return
- }
- //Start 中的所有函数必须是非阻塞的
- func (server *MsqServer) Start() {
- //log.Println("server is ready")
- for {
- msg, ok := <-server.chrecvg
- if !ok || msg == nil {
- break
- }
- if msg.Type == MsgShutDown {
- server.Close()
- replyMessage(msg)
- break
- }
- server.sendtoQueue()
- if msg.Type == MsgEmpty {
- continue
- }
- server.processMsg(msg)
- }
- //log.Println("MsqServer is closed")
- }
- func (server *MsqServer) ProcessOne() error {
- for {
- select {
- case msg, ok := <-server.chrecvg:
- if !ok || msg == nil {
- return errors.New("ServerClosed")
- }
- if msg.Type == MsgShutDown {
- server.Close()
- replyMessage(msg)
- return errors.New("ServerClosed")
- }
- server.sendtoQueue()
- if msg.Type == MsgEmpty {
- continue
- }
- server.processMsg(msg)
- default:
- return nil
- }
- }
- }
- func (server *MsqServer) Close() error {
- server.chrecvg <- nil
- close(server.chrecvg)
- return nil
- }
- func (server *MsqServer) SendAsyn(msg *Message) error {
- if debug {
- msg.SendTime = getTime()
- }
- select {
- case server.chrecvg <- msg:
- default:
- return ErrChainFull
- }
- return nil
- }
- func (server *MsqServer) Send(msg *Message) {
- if debug {
- msg.SendTime = getTime()
- }
- server.chrecvg <- msg
- }
- func (server *MsqServer) RegisterAction(ty int, cb func(MsqServer *MsqServer, msg *Message)) {
- server.msgAction[ty] = cb
- }
|