package msq //管理整体的一个结构 //接受事件,更新 //接受事件,fetch import "errors" import "log" import "crypto/rand" import "encoding/hex" import "sync" func getId() string { var b [5]byte rand.Read(b[:]) return hex.EncodeToString(b[:]) } var ErrNoAction = errors.New("ErrNoAction") var ErrChainFull = errors.New("ErrChainFull") var ErrOutOfIndex = errors.New("ErrOutOfIndex") var ErrMultiMsgError = errors.New("ErrMultiMsgError") //处理基本的消息传递处理问题,具体逻辑放到之类里面去 //有一块公共的内存,提供访问就可以了。 type MsqServer struct { clients map[int64]*MsqClient chrecvg chan *Message server *ChanServer cachemsgs []*Message msgAction map[int]func(MsqServer *MsqServer, msg *Message) pendding map[string]*Message mu sync.Mutex } func NewMsqServer() *MsqServer { server := &MsqServer{} server.clients = make(map[int64]*MsqClient) server.server = NewChanServer() server.chrecvg = make(chan *Message, 102400) server.msgAction = make(map[int]func(MsqServer *MsqServer, msg *Message)) //注册事件回调函数 server.bindPrivateAction() server.bindPublicAction() server.echoAction() server.closeAction() return server } //为了方便用户通过clientId 查询client ,这里加了一个lock func (server *MsqServer) Bind(ty int, id int64, client *MsqClient) { server.mu.Lock() defer server.mu.Unlock() server.clients[id] = client server.server.Bind(ty, id, client) } var ErrClientNotFound = errors.New("ErrClientNotFound") //可以被多线程查询 func (server *MsqServer) GetClient(clientId int64) (*MsqClient, error) { server.mu.Lock() defer server.mu.Unlock() if client, ok := server.clients[clientId]; ok { return client, nil } return nil, ErrClientNotFound } //这个函数会产生等待,所以一般不会使用,MsqServer 要使用异步版本 func (server *MsqServer) SendMessage(clientId int64, msgType int, data interface{}) *Message { //准备 msg := NewMessage(msgType, clientId, data, 0) msg.Ch = make(chan *Message, 1) server.Send(msg) //接受 recvmsg := <-msg.Ch return recvmsg } func (server *MsqServer) SendMessageAsyn(clientId int64, msgType int, data interface{}) *Message { msg := NewMessage(msgType, clientId, data, 0) msg.Flag |= SendAsyn //发送 server.Send(msg) //接受 return msg } func (server *MsqServer) bindPrivateAction() { server.RegisterAction(MsgMsqBindPrivate, func(MsqServer *MsqServer, msg *Message) { msg2 := server.ProcessMultiStart(msg) data := msg2.GetData().(*MsqClient) server.Bind(Private, msg2.ClientId, data) server.ProcessMultiEnd(msg, msg2) }) } func (server *MsqServer) bindPublicAction() { server.RegisterAction(MsgMsqBindPublic, func(MsqServer *MsqServer, msg *Message) { msg2 := server.ProcessMultiStart(msg) data := msg2.GetData().(*MsqClient) server.Bind(Public, msg2.ClientId, data) server.ProcessMultiEnd(msg, msg2) }) } func (server *MsqServer) echoAction() { server.RegisterAction(MsgEcho, func(MsqServer *MsqServer, msg *Message) { msg2 := server.ProcessMultiStart(msg) server.ProcessMultiEnd(msg, msg2) }) } func (server *MsqServer) closeAction() { server.RegisterAction(MsgClose, func(mtf *MsqServer, msg *Message) { msg2 := server.ProcessMultiStart(msg) server.server.UnBind(Private, msg.ClientId) server.server.UnBind(Public, msg.ClientId) server.ProcessMultiEnd(msg, msg2) }) } func (server *MsqServer) ProcessMultiStart(msg *Message) *Message { if msg.Type == MsgMulti { msgs := msg.GetData().([]*Message) return msgs[msg.GetIndex()] } return msg } func (server *MsqServer) ProcessMultiPrev(msg *Message) *Message { if msg.Type == MsgMulti { msgs := msg.GetData().([]*Message) if msg.GetIndex() == 0 { return nil } return msgs[msg.GetIndex()-1] } return msg } func (server *MsqServer) ProcessMultiNext(msg *Message) *Message { if msg.Type == MsgMulti { msgs := msg.GetData().([]*Message) return msgs[msg.GetIndex()+1] } return msg } func (server *MsqServer) ProcessMultiEnd(msg *Message, msg2 *Message) { if msg.Type == MsgMulti { msg.IncIndex() msg.SetErr(msg2.GetErr()) server.sendToCache(msg) } } func (server *MsqServer) sendToCache(msg *Message) { server.mu.Lock() defer server.mu.Unlock() server.cachemsgs = append(server.cachemsgs, msg) server.SendAsyn(NewMessage(MsgEmpty, 0, nil, 0)) } func (server *MsqServer) sendtoQueue() { server.mu.Lock() defer server.mu.Unlock() for i := 0; i < len(server.cachemsgs); i++ { err := server.SendAsyn(server.cachemsgs[i]) if err != nil { if i > 0 { server.cachemsgs = server.cachemsgs[i:] } return } } server.cachemsgs = nil } func replyMessage(msg *Message) { select { case msg.Ch <- msg: //tracelog.Println("reply", msg.Id) default: log.Println("reply msg error", msg) } } func (server *MsqServer) processMsg(msg *Message) { //关闭MsqServer if msg.Type == MsgMulti { msgs, ok := msg.GetData().([]*Message) if !ok { msg.SetErr(ErrMultiMsgError) replyMessage(msg) //向客户端回复 log.Println("[MsqServer_ACTION_ERROR]", msg) return } if len(msgs) == int(msg.GetIndex()) || msg.GetErr() != nil { replyMessage(msg) return } if len(msgs) < int(msg.GetIndex()) { msg.SetErr(ErrOutOfIndex) replyMessage(msg) //向客户端回复 log.Println("[MsqServer_ACTION_ERROR]", msg) panic("ErrOutOfIndex") //属于系统异常 return } msg2 := msgs[msg.GetIndex()] action := server.msgAction[msg2.Type] if action == nil { msg.SetErr(ErrNoAction) replyMessage(msg) //向客户端回复 log.Println("[MsqServer_ACTION_ERROR]", msg) return } action(server, msg) if msg.GetErr() != nil { printmsg := *msg printmsg.SetData(nil) log.Println(msg.GetErr(), printmsg) replyMessage(msg) return } return } action := server.msgAction[msg.Type] if action == nil { msg.SetErr(ErrNoAction) replyMessage(msg) //向客户端回复 log.Println("[MsqServer_ACTION_ERROR]", msg) return } action(server, msg) if msg.GetErr() != nil { printmsg := *msg printmsg.SetData(nil) log.Println(msg.GetErr(), printmsg) } replyMessage(msg) //向客户端回复 return } //Start 中的所有函数必须是非阻塞的 func (server *MsqServer) Start() { //log.Println("server is ready") for { msg, ok := <-server.chrecvg if !ok || msg == nil { break } if msg.Type == MsgShutDown { server.Close() replyMessage(msg) break } server.sendtoQueue() if msg.Type == MsgEmpty { continue } server.processMsg(msg) } //log.Println("MsqServer is closed") } func (server *MsqServer) ProcessOne() error { for { select { case msg, ok := <-server.chrecvg: if !ok || msg == nil { return errors.New("ServerClosed") } if msg.Type == MsgShutDown { server.Close() replyMessage(msg) return errors.New("ServerClosed") } server.sendtoQueue() if msg.Type == MsgEmpty { continue } server.processMsg(msg) default: return nil } } } func (server *MsqServer) Close() error { server.chrecvg <- nil close(server.chrecvg) return nil } func (server *MsqServer) SendAsyn(msg *Message) error { if debug { msg.SendTime = getTime() } select { case server.chrecvg <- msg: default: return ErrChainFull } return nil } func (server *MsqServer) Send(msg *Message) { if debug { msg.SendTime = getTime() } server.chrecvg <- msg } func (server *MsqServer) RegisterAction(ty int, cb func(MsqServer *MsqServer, msg *Message)) { server.msgAction[ty] = cb }