client.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. package msq
  2. import "unsafe"
  3. import "time"
  4. import "sync/atomic"
  5. import "log"
  6. import "errors"
  7. import "io/ioutil"
  8. import "io"
  9. import "sync"
  10. var debug = false
  11. var tracelog = log.New(ioutil.Discard, "[trace]", log.Ltime)
  12. func EnableLog(w io.Writer) {
  13. tracelog = log.New(w, "[trace]", log.Ldate|log.Lmicroseconds)
  14. }
  15. func DisableLog() {
  16. tracelog = log.New(ioutil.Discard, "[trace]", log.Ldate|log.Lmicroseconds)
  17. }
  18. type MsqClient struct {
  19. msq *MsqServer
  20. clientId int64
  21. chrecv chan *Message
  22. lastmsg *Message
  23. isFullClose bool
  24. timeout time.Duration
  25. refCount int64
  26. stoptick chan int
  27. count int64
  28. recvcount int64
  29. mu sync.Mutex
  30. caches []*Message
  31. }
  32. var ErrChanFull = errors.New("chan is full")
  33. //fullClose 这个参数很重要,如果设置为 true,那么send 函数完全异步,否则就是有可能
  34. //发生等待的情况。
  35. func NewMsqClient(msq *MsqServer, timeout time.Duration, isFullClose bool, bufferCount int) *MsqClient {
  36. client := &MsqClient{}
  37. client.msq = msq
  38. client.chrecv = make(chan *Message, bufferCount)
  39. client.isFullClose = isFullClose
  40. client.timeout = timeout
  41. client.clientId = int64(uintptr(unsafe.Pointer(client)))
  42. if debug {
  43. client.stoptick = make(chan int, 1)
  44. //在调试状态下可以打开,看看是否有timeout
  45. go client.countStat()
  46. }
  47. return client
  48. }
  49. func (this *MsqClient) countStat() {
  50. ticker := time.NewTicker(time.Second)
  51. lastcount := int64(0)
  52. for {
  53. select {
  54. case <-ticker.C:
  55. if lastcount > 0 {
  56. count := atomic.LoadInt64(&this.count) - lastcount
  57. tracelog.Println("send count:", count)
  58. }
  59. lastcount = atomic.LoadInt64(&this.count)
  60. case <-this.stoptick:
  61. return
  62. }
  63. }
  64. log.Println("stop count")
  65. }
  66. func (this *MsqClient) IncRef() int64 {
  67. return atomic.AddInt64(&this.refCount, 1)
  68. }
  69. func (this *MsqClient) DecRef() int64 {
  70. return atomic.AddInt64(&this.refCount, -1)
  71. }
  72. func (this *MsqClient) GetRef() int64 {
  73. return atomic.LoadInt64(&this.refCount)
  74. }
  75. func (this *MsqClient) GetMsq() *MsqServer {
  76. return this.msq
  77. }
  78. func (this *MsqClient) SetMsq(msq *MsqServer) {
  79. this.msq = msq
  80. }
  81. func (this *MsqClient) Recv(cb func(msg *Message)) {
  82. //log.Println("client is ready", this.GetId())
  83. for {
  84. msg, ok := <-this.chrecv
  85. if !ok { //chan is closed
  86. break
  87. }
  88. atomic.AddInt64(&this.recvcount, 1)
  89. if cb != nil {
  90. cb(msg)
  91. }
  92. }
  93. //log.Println("client is stop", this.GetId())
  94. }
  95. func (this *MsqClient) GetRecvCount() int64 {
  96. return atomic.LoadInt64(&this.recvcount)
  97. }
  98. func (this *MsqClient) RecvMulti(cb func(msg []*Message)) {
  99. //log.Println("client is ready, multi", this.GetId())
  100. msgs := make([]*Message, 100)
  101. for {
  102. n, err := this.read(msgs)
  103. if err != nil {
  104. if n > 0 {
  105. if cb != nil {
  106. cb(msgs[:n])
  107. }
  108. }
  109. break
  110. }
  111. if cb != nil {
  112. cb(msgs[:n])
  113. }
  114. }
  115. //log.Println("client is stop, multi", this.GetId())
  116. }
  117. func (s *MsqClient) read(buf []*Message) (int, error) {
  118. var i = 1
  119. var ok bool
  120. buf[0], ok = <-s.chrecv
  121. if !ok {
  122. return 0, errors.New("chan is closed")
  123. }
  124. for {
  125. if i == len(buf) {
  126. return i, nil
  127. }
  128. select {
  129. case data, ok := <-s.chrecv:
  130. if !ok {
  131. return i, errors.New("chan is closed")
  132. }
  133. buf[i] = data
  134. i++
  135. default:
  136. return i, nil
  137. }
  138. }
  139. panic("nerver reach")
  140. }
  141. func (this *MsqClient) GetId() int64 {
  142. return this.clientId
  143. }
  144. func (this *MsqClient) SetId(id int64) {
  145. this.clientId = id
  146. }
  147. func (this *MsqClient) Send(data interface{}) error {
  148. this.mu.Lock()
  149. defer this.mu.Unlock()
  150. if data != nil {
  151. msg := data.(*Message)
  152. this.caches = append(this.caches, msg)
  153. }
  154. for i := 0; i < len(this.caches); i++ {
  155. err := this.SendAsyn(this.caches[i])
  156. if err != nil {
  157. if i > 0 {
  158. this.caches = this.caches[i:]
  159. }
  160. go func () {
  161. time.Sleep(time.Millisecond)
  162. this.Send(nil)
  163. }()
  164. return nil;
  165. }
  166. }
  167. this.caches = nil
  168. return nil
  169. }
  170. func (this *MsqClient) SendAsyn(data interface{}) error {
  171. msg := data.(*Message)
  172. select {
  173. case this.chrecv <- msg:
  174. default:
  175. return ErrChanFull
  176. }
  177. return nil
  178. }
  179. func (this *MsqClient) Close() error {
  180. if this.GetRef() != 0 {
  181. return errors.New("ref count > 0")
  182. }
  183. msg := this.SendMessage(MsgClose, nil)
  184. err := msg.GetErr()
  185. if err != nil {
  186. return err
  187. }
  188. close(this.chrecv)
  189. this.stoptick <- 1
  190. //log.Println("msq client close success.", this.GetId())
  191. return nil
  192. }
  193. func (this *MsqClient) ConnectPrivate() error {
  194. _, err := this.sendMessage(MsgMsqBindPrivate, this)
  195. return err
  196. }
  197. func (this *MsqClient) ConnectPublic() error {
  198. _, err := this.sendMessage(MsgMsqBindPublic, this)
  199. return err
  200. }
  201. func (this *MsqClient) sendMessage(msgType int, data interface{}) (interface{}, error) {
  202. msg := this.SendMessage(msgType, data)
  203. if msg.GetErr() != nil {
  204. return msg.GetData(), msg.GetErr()
  205. }
  206. return msg.GetData(), nil
  207. }
  208. func (this *MsqClient) NewMessage(msgType int, data interface{}) *Message {
  209. //准备
  210. clientid := this.GetId()
  211. msg := NewMessage(msgType, clientid, data, 0)
  212. if msgType == MsgLog {
  213. msg.Flag |= NeedLog
  214. }
  215. msg.Ch = make(chan *Message, 5)
  216. return msg
  217. }
  218. func (this *MsqClient) SendMessage(msgType int, data interface{}) *Message {
  219. //准备
  220. msg := this.NewMessage(msgType, data)
  221. return this.SendMessage2(msg)
  222. }
  223. func (this *MsqClient) SendMessage2(msg *Message) *Message {
  224. this.msq.Send(msg)
  225. //接受
  226. recvmsg := <-msg.Ch
  227. if debug {
  228. atomic.AddInt64(&this.count, 1)
  229. recvmsg.RecvTime = getTime()
  230. if recvmsg.RecvTime-recvmsg.SendTime > int64(this.timeout) {
  231. log.Println("msg timeout", msg)
  232. }
  233. }
  234. return recvmsg
  235. }
  236. //messages
  237. func (this *MsqClient) SendMessages(msgs ...*Message) *Message {
  238. return this.SendMessage(MsgMulti, msgs)
  239. }
  240. func (this *MsqClient) SendMessagesAsyn(msgs ...*Message) *Message {
  241. return this.SendMessageAsyn(MsgMulti, msgs)
  242. }
  243. //asyn
  244. func (this *MsqClient) SendMessageAsyn(msgType int, data interface{}) *Message {
  245. //准备
  246. msg := this.NewMessage(msgType, data)
  247. return this.SendMessageAsyn2(msg)
  248. }
  249. func (this *MsqClient) SendMessageAsyn2(msg *Message) *Message {
  250. msg.Flag |= SendAsyn
  251. this.msq.Send(msg)
  252. this.mu.Lock()
  253. defer this.mu.Unlock()
  254. this.lastmsg = msg
  255. return msg
  256. }
  257. func (this *MsqClient) GetLastSyncMsg() *Message {
  258. this.mu.Lock()
  259. defer this.mu.Unlock()
  260. return this.lastmsg
  261. }