msq_test.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. package msq
  2. import (
  3. "fmt"
  4. "log"
  5. "net/http"
  6. "testing"
  7. "time"
  8. )
  9. var _ = log.Println
  10. //测试一个服务器:
  11. //1. 有个数字不断的增加
  12. //2. 通过一个http服务器读出这个数字的值,不需要lock
  13. //这个测试服务器会部署到智能交易读取当前内存状态的函数里面去
  14. var gcount int
  15. const MsgTestCount = MsgCount + 1
  16. func TestNoLockServer(t *testing.T) {
  17. q := NewMsqServer()
  18. go func() {
  19. for {
  20. time.Sleep(time.Second)
  21. q.ProcessOne()
  22. }
  23. }()
  24. var MsgTestCount = MsgCount + 1
  25. q.RegisterAction(MsgTestCount, func(MsqServer *MsqServer, msg *Message) {
  26. msg2 := q.ProcessMultiStart(msg)
  27. log.Println("count = ", gcount)
  28. gcount++
  29. msg2.data = gcount
  30. q.ProcessMultiEnd(msg, msg2)
  31. })
  32. RegMessage(MsgTestCount, "MsgTestCount")
  33. client := NewMsqClient(q, time.Second, false, 1024)
  34. go client.Recv(nil)
  35. client.ConnectPrivate()
  36. http.HandleFunc("/count", func(w http.ResponseWriter, r *http.Request) {
  37. msg := client.SendMessage(MsgTestCount, "count")
  38. if msg.err != nil {
  39. t.Error(msg.err)
  40. return
  41. }
  42. fmt.Fprintf(w, "Hello, %d", msg.data.(int))
  43. })
  44. log.Fatal(http.ListenAndServe(":8080", nil))
  45. }
  46. func TestEchoMsq(t *testing.T) {
  47. q := NewMsqServer()
  48. go func() {
  49. for {
  50. time.Sleep(time.Second)
  51. q.ProcessOne()
  52. }
  53. }()
  54. client := NewMsqClient(q, time.Second, false, 1024)
  55. go client.Recv(nil)
  56. client.ConnectPrivate()
  57. msg := client.SendMessage(MsgEcho, "echo")
  58. if msg.err != nil {
  59. t.Error(msg.err)
  60. return
  61. }
  62. if msg.data.(string) != "echo" {
  63. t.Errorf("mtf echo error")
  64. return
  65. }
  66. err := client.Close()
  67. if err != nil {
  68. t.Error(err)
  69. return
  70. }
  71. q.Close()
  72. }
  73. func BenchmarkEchoMsq(b *testing.B) {
  74. q := NewMsqServer()
  75. go q.Start()
  76. client := NewMsqClient(q, time.Millisecond*10000, false, 1024)
  77. go client.Recv(nil)
  78. err := client.ConnectPrivate()
  79. if err != nil {
  80. b.Error(err)
  81. }
  82. //log.Println("ready")
  83. for i := 0; i < b.N; i++ {
  84. msg := client.SendMessageAsyn(MsgEcho, "echo")
  85. if msg.err != nil {
  86. b.Error(msg.err)
  87. return
  88. }
  89. }
  90. lastmsg := client.GetLastSyncMsg()
  91. <-lastmsg.Ch
  92. err = client.Close()
  93. if err != nil {
  94. b.Error(err)
  95. return
  96. }
  97. q.Close()
  98. }
  99. //ToDO: 提高multi的性能。使得在异步的条件下可以提高发送的速度
  100. func BenchmarkEchoMsgMulti(b *testing.B) {
  101. q := NewMsqServer()
  102. go q.Start()
  103. client := NewMsqClient(q, time.Millisecond*10000, false, 1024)
  104. go client.Recv(nil)
  105. err := client.ConnectPrivate()
  106. if err != nil {
  107. b.Error(err)
  108. }
  109. //log.Println("ready")
  110. msg1 := client.NewMessage(MsgEcho, "placeorder_log")
  111. msg2 := client.NewMessage(MsgEcho, "placeorder_matcher")
  112. for i := 0; i < b.N; i++ {
  113. msg1.data = "placeorder_log"
  114. msg2.data = "placeorder_matcher"
  115. msg := client.SendMessagesAsyn(msg1, msg2)
  116. if msg.err != nil {
  117. b.Error(msg.err)
  118. return
  119. }
  120. }
  121. lastmsg := client.GetLastSyncMsg()
  122. <-lastmsg.Ch
  123. err = client.Close()
  124. if err != nil {
  125. b.Error(err)
  126. return
  127. }
  128. q.Close()
  129. }
  130. func TestMsqMulti(t *testing.T) {
  131. q := NewMsqServer()
  132. go q.Start()
  133. client := NewMsqClient(q, time.Second, false, 1024)
  134. go client.Recv(nil)
  135. err := client.ConnectPrivate()
  136. if err != nil {
  137. t.Error(err)
  138. }
  139. //这个msg 实际上是由多个环节组成的,所以msg.Data 返回的是一个数组
  140. msg1 := client.NewMessage(MsgEcho, "placeorder_log")
  141. msg2 := client.NewMessage(MsgEcho, "placeorder_matcher")
  142. msg := client.SendMessages(msg1, msg2)
  143. //两组消息会分别执行,如果批量执行回保留消息的执行顺序
  144. //如果有消息执行错误,立马回返回。无错误的情况只有可能
  145. //是所有的消息执行都没有错误。
  146. if msg.err != nil {
  147. t.Error(msg.err)
  148. return
  149. }
  150. msgs := msg.data.([]*Message)
  151. for i := 0; i < len(msgs); i++ {
  152. if msgs[i].err != nil {
  153. t.Error(msg.err)
  154. return
  155. }
  156. }
  157. if msgs[0].data.(string) != "placeorder_log" {
  158. t.Error("msg placeorder_log error")
  159. }
  160. if msgs[1].data.(string) != "placeorder_matcher" {
  161. t.Error("msg placeorder_matcher error")
  162. }
  163. err = client.Close()
  164. if err != nil {
  165. t.Error(err)
  166. return
  167. }
  168. q.Close()
  169. }
  170. //处理某个操作可能比较费时间
  171. func TestMsqLog(t *testing.T) {
  172. q := NewMsqServer()
  173. go q.Start()
  174. defer q.Close()
  175. logclient, err := NewLogClient(q, 10000)
  176. if err != nil {
  177. t.Error(err)
  178. }
  179. q.RegisterAction(MsgLog, func(MsqServer *MsqServer, msg *Message) {
  180. logclient.Log(msg)
  181. })
  182. q.RegisterAction(MsgMatch, func(MsqServer *MsqServer, msg *Message) {
  183. msg1 := q.ProcessMultiPrev(msg)
  184. //log.Println("logId = ", msg1.Id)
  185. q.ProcessMultiEnd(msg, msg1)
  186. })
  187. client := NewMsqClient(q, time.Millisecond*100, false, 1024)
  188. go client.Recv(nil)
  189. err = client.ConnectPrivate()
  190. if err != nil {
  191. t.Error(err)
  192. }
  193. wait := make(chan int, 100)
  194. for i := 0; i < 1; i++ {
  195. go func() {
  196. msg1 := client.NewMessage(MsgLog, "placeorder_log")
  197. msg2 := client.NewMessage(MsgMatch, "placeorder_matcher")
  198. msg := client.SendMessages(msg1, msg2)
  199. if msg.err != nil {
  200. t.Error(msg.err)
  201. return
  202. }
  203. //msgs := msg.Data.([]*Message)
  204. //log.Println("logid =", msgs[0].Id, "index = ", msg.Index)
  205. wait <- 1
  206. }()
  207. }
  208. for i := 0; i < 1; i++ {
  209. <-wait
  210. }
  211. }