123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- package msq
- import "unsafe"
- import "time"
- import "sync/atomic"
- import "log"
- import "errors"
- import "io/ioutil"
- import "io"
- import "sync"
- var debug = false
- var tracelog = log.New(ioutil.Discard, "[trace]", log.Ltime)
- func EnableLog(w io.Writer) {
- tracelog = log.New(w, "[trace]", log.Ldate|log.Lmicroseconds)
- }
- func DisableLog() {
- tracelog = log.New(ioutil.Discard, "[trace]", log.Ldate|log.Lmicroseconds)
- }
- type MsqClient struct {
- msq *MsqServer
- clientId int64
- chrecv chan *Message
- lastmsg *Message
- isFullClose bool
- timeout time.Duration
- refCount int64
- stoptick chan int
- count int64
- recvcount int64
- mu sync.Mutex
- caches []*Message
- }
- var ErrChanFull = errors.New("chan is full")
- //fullClose 这个参数很重要,如果设置为 true,那么send 函数完全异步,否则就是有可能
- //发生等待的情况。
- func NewMsqClient(msq *MsqServer, timeout time.Duration, isFullClose bool, bufferCount int) *MsqClient {
- client := &MsqClient{}
- client.msq = msq
- client.chrecv = make(chan *Message, bufferCount)
- client.isFullClose = isFullClose
- client.timeout = timeout
- client.clientId = int64(uintptr(unsafe.Pointer(client)))
- if debug {
- client.stoptick = make(chan int, 1)
- //在调试状态下可以打开,看看是否有timeout
- go client.countStat()
- }
- return client
- }
- func (this *MsqClient) countStat() {
- ticker := time.NewTicker(time.Second)
- lastcount := int64(0)
- for {
- select {
- case <-ticker.C:
- if lastcount > 0 {
- count := atomic.LoadInt64(&this.count) - lastcount
- tracelog.Println("send count:", count)
- }
- lastcount = atomic.LoadInt64(&this.count)
- case <-this.stoptick:
- return
- }
- }
- log.Println("stop count")
- }
- func (this *MsqClient) IncRef() int64 {
- return atomic.AddInt64(&this.refCount, 1)
- }
- func (this *MsqClient) DecRef() int64 {
- return atomic.AddInt64(&this.refCount, -1)
- }
- func (this *MsqClient) GetRef() int64 {
- return atomic.LoadInt64(&this.refCount)
- }
- func (this *MsqClient) GetMsq() *MsqServer {
- return this.msq
- }
- func (this *MsqClient) SetMsq(msq *MsqServer) {
- this.msq = msq
- }
- func (this *MsqClient) Recv(cb func(msg *Message)) {
- //log.Println("client is ready", this.GetId())
- for {
- msg, ok := <-this.chrecv
- if !ok { //chan is closed
- break
- }
- atomic.AddInt64(&this.recvcount, 1)
- if cb != nil {
- cb(msg)
- }
- }
- //log.Println("client is stop", this.GetId())
- }
- func (this *MsqClient) GetRecvCount() int64 {
- return atomic.LoadInt64(&this.recvcount)
- }
- func (this *MsqClient) RecvMulti(cb func(msg []*Message)) {
- //log.Println("client is ready, multi", this.GetId())
- msgs := make([]*Message, 100)
- for {
- n, err := this.read(msgs)
- if err != nil {
- if n > 0 {
- if cb != nil {
- cb(msgs[:n])
- }
- }
- break
- }
- if cb != nil {
- cb(msgs[:n])
- }
- }
- //log.Println("client is stop, multi", this.GetId())
- }
- func (s *MsqClient) read(buf []*Message) (int, error) {
- var i = 1
- var ok bool
- buf[0], ok = <-s.chrecv
- if !ok {
- return 0, errors.New("chan is closed")
- }
- for {
- if i == len(buf) {
- return i, nil
- }
- select {
- case data, ok := <-s.chrecv:
- if !ok {
- return i, errors.New("chan is closed")
- }
- buf[i] = data
- i++
- default:
- return i, nil
- }
- }
- panic("nerver reach")
- }
- func (this *MsqClient) GetId() int64 {
- return this.clientId
- }
- func (this *MsqClient) SetId(id int64) {
- this.clientId = id
- }
- func (this *MsqClient) Send(data interface{}) error {
- this.mu.Lock()
- defer this.mu.Unlock()
- if data != nil {
- msg := data.(*Message)
- this.caches = append(this.caches, msg)
- }
- for i := 0; i < len(this.caches); i++ {
- err := this.SendAsyn(this.caches[i])
- if err != nil {
- if i > 0 {
- this.caches = this.caches[i:]
- }
- go func () {
- time.Sleep(time.Millisecond)
- this.Send(nil)
- }()
- return nil;
- }
- }
- this.caches = nil
- return nil
- }
- func (this *MsqClient) SendAsyn(data interface{}) error {
- msg := data.(*Message)
- select {
- case this.chrecv <- msg:
- default:
- return ErrChanFull
- }
- return nil
- }
- func (this *MsqClient) Close() error {
- if this.GetRef() != 0 {
- return errors.New("ref count > 0")
- }
- msg := this.SendMessage(MsgClose, nil)
- err := msg.GetErr()
- if err != nil {
- return err
- }
- close(this.chrecv)
- this.stoptick <- 1
- //log.Println("msq client close success.", this.GetId())
- return nil
- }
- func (this *MsqClient) ConnectPrivate() error {
- _, err := this.sendMessage(MsgMsqBindPrivate, this)
- return err
- }
- func (this *MsqClient) ConnectPublic() error {
- _, err := this.sendMessage(MsgMsqBindPublic, this)
- return err
- }
- func (this *MsqClient) sendMessage(msgType int, data interface{}) (interface{}, error) {
- msg := this.SendMessage(msgType, data)
- if msg.GetErr() != nil {
- return msg.GetData(), msg.GetErr()
- }
- return msg.GetData(), nil
- }
- func (this *MsqClient) NewMessage(msgType int, data interface{}) *Message {
- //准备
- clientid := this.GetId()
- msg := NewMessage(msgType, clientid, data, 0)
- if msgType == MsgLog {
- msg.Flag |= NeedLog
- }
- msg.Ch = make(chan *Message, 5)
- return msg
- }
- func (this *MsqClient) SendMessage(msgType int, data interface{}) *Message {
- //准备
- msg := this.NewMessage(msgType, data)
- return this.SendMessage2(msg)
- }
- func (this *MsqClient) SendMessage2(msg *Message) *Message {
- this.msq.Send(msg)
- //接受
- recvmsg := <-msg.Ch
- if debug {
- atomic.AddInt64(&this.count, 1)
- recvmsg.RecvTime = getTime()
- if recvmsg.RecvTime-recvmsg.SendTime > int64(this.timeout) {
- log.Println("msg timeout", msg)
- }
- }
- return recvmsg
- }
- //messages
- func (this *MsqClient) SendMessages(msgs ...*Message) *Message {
- return this.SendMessage(MsgMulti, msgs)
- }
- func (this *MsqClient) SendMessagesAsyn(msgs ...*Message) *Message {
- return this.SendMessageAsyn(MsgMulti, msgs)
- }
- //asyn
- func (this *MsqClient) SendMessageAsyn(msgType int, data interface{}) *Message {
- //准备
- msg := this.NewMessage(msgType, data)
- return this.SendMessageAsyn2(msg)
- }
- func (this *MsqClient) SendMessageAsyn2(msg *Message) *Message {
- msg.Flag |= SendAsyn
- this.msq.Send(msg)
- this.mu.Lock()
- defer this.mu.Unlock()
- this.lastmsg = msg
- return msg
- }
- func (this *MsqClient) GetLastSyncMsg() *Message {
- this.mu.Lock()
- defer this.mu.Unlock()
- return this.lastmsg
- }
|