package msq import "unsafe" import "time" import "sync/atomic" import "log" import "errors" import "io/ioutil" import "io" import "sync" var debug = false var tracelog = log.New(ioutil.Discard, "[trace]", log.Ltime) func EnableLog(w io.Writer) { tracelog = log.New(w, "[trace]", log.Ldate|log.Lmicroseconds) } func DisableLog() { tracelog = log.New(ioutil.Discard, "[trace]", log.Ldate|log.Lmicroseconds) } type MsqClient struct { msq *MsqServer clientId int64 chrecv chan *Message lastmsg *Message isFullClose bool timeout time.Duration refCount int64 stoptick chan int count int64 recvcount int64 mu sync.Mutex caches []*Message } var ErrChanFull = errors.New("chan is full") //fullClose 这个参数很重要,如果设置为 true,那么send 函数完全异步,否则就是有可能 //发生等待的情况。 func NewMsqClient(msq *MsqServer, timeout time.Duration, isFullClose bool, bufferCount int) *MsqClient { client := &MsqClient{} client.msq = msq client.chrecv = make(chan *Message, bufferCount) client.isFullClose = isFullClose client.timeout = timeout client.clientId = int64(uintptr(unsafe.Pointer(client))) if debug { client.stoptick = make(chan int, 1) //在调试状态下可以打开,看看是否有timeout go client.countStat() } return client } func (this *MsqClient) countStat() { ticker := time.NewTicker(time.Second) lastcount := int64(0) for { select { case <-ticker.C: if lastcount > 0 { count := atomic.LoadInt64(&this.count) - lastcount tracelog.Println("send count:", count) } lastcount = atomic.LoadInt64(&this.count) case <-this.stoptick: return } } log.Println("stop count") } func (this *MsqClient) IncRef() int64 { return atomic.AddInt64(&this.refCount, 1) } func (this *MsqClient) DecRef() int64 { return atomic.AddInt64(&this.refCount, -1) } func (this *MsqClient) GetRef() int64 { return atomic.LoadInt64(&this.refCount) } func (this *MsqClient) GetMsq() *MsqServer { return this.msq } func (this *MsqClient) SetMsq(msq *MsqServer) { this.msq = msq } func (this *MsqClient) Recv(cb func(msg *Message)) { //log.Println("client is ready", this.GetId()) for { msg, ok := <-this.chrecv if !ok { //chan is closed break } atomic.AddInt64(&this.recvcount, 1) if cb != nil { cb(msg) } } //log.Println("client is stop", this.GetId()) } func (this *MsqClient) GetRecvCount() int64 { return atomic.LoadInt64(&this.recvcount) } func (this *MsqClient) RecvMulti(cb func(msg []*Message)) { //log.Println("client is ready, multi", this.GetId()) msgs := make([]*Message, 100) for { n, err := this.read(msgs) if err != nil { if n > 0 { if cb != nil { cb(msgs[:n]) } } break } if cb != nil { cb(msgs[:n]) } } //log.Println("client is stop, multi", this.GetId()) } func (s *MsqClient) read(buf []*Message) (int, error) { var i = 1 var ok bool buf[0], ok = <-s.chrecv if !ok { return 0, errors.New("chan is closed") } for { if i == len(buf) { return i, nil } select { case data, ok := <-s.chrecv: if !ok { return i, errors.New("chan is closed") } buf[i] = data i++ default: return i, nil } } panic("nerver reach") } func (this *MsqClient) GetId() int64 { return this.clientId } func (this *MsqClient) SetId(id int64) { this.clientId = id } func (this *MsqClient) Send(data interface{}) error { this.mu.Lock() defer this.mu.Unlock() if data != nil { msg := data.(*Message) this.caches = append(this.caches, msg) } for i := 0; i < len(this.caches); i++ { err := this.SendAsyn(this.caches[i]) if err != nil { if i > 0 { this.caches = this.caches[i:] } go func () { time.Sleep(time.Millisecond) this.Send(nil) }() return nil; } } this.caches = nil return nil } func (this *MsqClient) SendAsyn(data interface{}) error { msg := data.(*Message) select { case this.chrecv <- msg: default: return ErrChanFull } return nil } func (this *MsqClient) Close() error { if this.GetRef() != 0 { return errors.New("ref count > 0") } msg := this.SendMessage(MsgClose, nil) err := msg.GetErr() if err != nil { return err } close(this.chrecv) this.stoptick <- 1 //log.Println("msq client close success.", this.GetId()) return nil } func (this *MsqClient) ConnectPrivate() error { _, err := this.sendMessage(MsgMsqBindPrivate, this) return err } func (this *MsqClient) ConnectPublic() error { _, err := this.sendMessage(MsgMsqBindPublic, this) return err } func (this *MsqClient) sendMessage(msgType int, data interface{}) (interface{}, error) { msg := this.SendMessage(msgType, data) if msg.GetErr() != nil { return msg.GetData(), msg.GetErr() } return msg.GetData(), nil } func (this *MsqClient) NewMessage(msgType int, data interface{}) *Message { //准备 clientid := this.GetId() msg := NewMessage(msgType, clientid, data, 0) if msgType == MsgLog { msg.Flag |= NeedLog } msg.Ch = make(chan *Message, 5) return msg } func (this *MsqClient) SendMessage(msgType int, data interface{}) *Message { //准备 msg := this.NewMessage(msgType, data) return this.SendMessage2(msg) } func (this *MsqClient) SendMessage2(msg *Message) *Message { this.msq.Send(msg) //接受 recvmsg := <-msg.Ch if debug { atomic.AddInt64(&this.count, 1) recvmsg.RecvTime = getTime() if recvmsg.RecvTime-recvmsg.SendTime > int64(this.timeout) { log.Println("msg timeout", msg) } } return recvmsg } //messages func (this *MsqClient) SendMessages(msgs ...*Message) *Message { return this.SendMessage(MsgMulti, msgs) } func (this *MsqClient) SendMessagesAsyn(msgs ...*Message) *Message { return this.SendMessageAsyn(MsgMulti, msgs) } //asyn func (this *MsqClient) SendMessageAsyn(msgType int, data interface{}) *Message { //准备 msg := this.NewMessage(msgType, data) return this.SendMessageAsyn2(msg) } func (this *MsqClient) SendMessageAsyn2(msg *Message) *Message { msg.Flag |= SendAsyn this.msq.Send(msg) this.mu.Lock() defer this.mu.Unlock() this.lastmsg = msg return msg } func (this *MsqClient) GetLastSyncMsg() *Message { this.mu.Lock() defer this.mu.Unlock() return this.lastmsg }