123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- package msq
- import (
- "fmt"
- "log"
- "net/http"
- "testing"
- "time"
- )
- var _ = log.Println
- //测试一个服务器:
- //1. 有个数字不断的增加
- //2. 通过一个http服务器读出这个数字的值,不需要lock
- //这个测试服务器会部署到智能交易读取当前内存状态的函数里面去
- var gcount int
- const MsgTestCount = MsgCount + 1
- func TestNoLockServer(t *testing.T) {
- q := NewMsqServer()
- go func() {
- for {
- time.Sleep(time.Second)
- q.ProcessOne()
- }
- }()
- var MsgTestCount = MsgCount + 1
- q.RegisterAction(MsgTestCount, func(MsqServer *MsqServer, msg *Message) {
- msg2 := q.ProcessMultiStart(msg)
- log.Println("count = ", gcount)
- gcount++
- msg2.data = gcount
- q.ProcessMultiEnd(msg, msg2)
- })
- RegMessage(MsgTestCount, "MsgTestCount")
- client := NewMsqClient(q, time.Second, false, 1024)
- go client.Recv(nil)
- client.ConnectPrivate()
- http.HandleFunc("/count", func(w http.ResponseWriter, r *http.Request) {
- msg := client.SendMessage(MsgTestCount, "count")
- if msg.err != nil {
- t.Error(msg.err)
- return
- }
- fmt.Fprintf(w, "Hello, %d", msg.data.(int))
- })
- log.Fatal(http.ListenAndServe(":8080", nil))
- }
- func TestEchoMsq(t *testing.T) {
- q := NewMsqServer()
- go func() {
- for {
- time.Sleep(time.Second)
- q.ProcessOne()
- }
- }()
- client := NewMsqClient(q, time.Second, false, 1024)
- go client.Recv(nil)
- client.ConnectPrivate()
- msg := client.SendMessage(MsgEcho, "echo")
- if msg.err != nil {
- t.Error(msg.err)
- return
- }
- if msg.data.(string) != "echo" {
- t.Errorf("mtf echo error")
- return
- }
- err := client.Close()
- if err != nil {
- t.Error(err)
- return
- }
- q.Close()
- }
- func BenchmarkEchoMsq(b *testing.B) {
- q := NewMsqServer()
- go q.Start()
- client := NewMsqClient(q, time.Millisecond*10000, false, 1024)
- go client.Recv(nil)
- err := client.ConnectPrivate()
- if err != nil {
- b.Error(err)
- }
- //log.Println("ready")
- for i := 0; i < b.N; i++ {
- msg := client.SendMessageAsyn(MsgEcho, "echo")
- if msg.err != nil {
- b.Error(msg.err)
- return
- }
- }
- lastmsg := client.GetLastSyncMsg()
- <-lastmsg.Ch
- err = client.Close()
- if err != nil {
- b.Error(err)
- return
- }
- q.Close()
- }
- //ToDO: 提高multi的性能。使得在异步的条件下可以提高发送的速度
- func BenchmarkEchoMsgMulti(b *testing.B) {
- q := NewMsqServer()
- go q.Start()
- client := NewMsqClient(q, time.Millisecond*10000, false, 1024)
- go client.Recv(nil)
- err := client.ConnectPrivate()
- if err != nil {
- b.Error(err)
- }
- //log.Println("ready")
- msg1 := client.NewMessage(MsgEcho, "placeorder_log")
- msg2 := client.NewMessage(MsgEcho, "placeorder_matcher")
- for i := 0; i < b.N; i++ {
- msg1.data = "placeorder_log"
- msg2.data = "placeorder_matcher"
- msg := client.SendMessagesAsyn(msg1, msg2)
- if msg.err != nil {
- b.Error(msg.err)
- return
- }
- }
- lastmsg := client.GetLastSyncMsg()
- <-lastmsg.Ch
- err = client.Close()
- if err != nil {
- b.Error(err)
- return
- }
- q.Close()
- }
- func TestMsqMulti(t *testing.T) {
- q := NewMsqServer()
- go q.Start()
- client := NewMsqClient(q, time.Second, false, 1024)
- go client.Recv(nil)
- err := client.ConnectPrivate()
- if err != nil {
- t.Error(err)
- }
- //这个msg 实际上是由多个环节组成的,所以msg.Data 返回的是一个数组
- msg1 := client.NewMessage(MsgEcho, "placeorder_log")
- msg2 := client.NewMessage(MsgEcho, "placeorder_matcher")
- msg := client.SendMessages(msg1, msg2)
- //两组消息会分别执行,如果批量执行回保留消息的执行顺序
- //如果有消息执行错误,立马回返回。无错误的情况只有可能
- //是所有的消息执行都没有错误。
- if msg.err != nil {
- t.Error(msg.err)
- return
- }
- msgs := msg.data.([]*Message)
- for i := 0; i < len(msgs); i++ {
- if msgs[i].err != nil {
- t.Error(msg.err)
- return
- }
- }
- if msgs[0].data.(string) != "placeorder_log" {
- t.Error("msg placeorder_log error")
- }
- if msgs[1].data.(string) != "placeorder_matcher" {
- t.Error("msg placeorder_matcher error")
- }
- err = client.Close()
- if err != nil {
- t.Error(err)
- return
- }
- q.Close()
- }
- //处理某个操作可能比较费时间
- func TestMsqLog(t *testing.T) {
- q := NewMsqServer()
- go q.Start()
- defer q.Close()
- logclient, err := NewLogClient(q, 10000)
- if err != nil {
- t.Error(err)
- }
- q.RegisterAction(MsgLog, func(MsqServer *MsqServer, msg *Message) {
- logclient.Log(msg)
- })
- q.RegisterAction(MsgMatch, func(MsqServer *MsqServer, msg *Message) {
- msg1 := q.ProcessMultiPrev(msg)
- //log.Println("logId = ", msg1.Id)
- q.ProcessMultiEnd(msg, msg1)
- })
- client := NewMsqClient(q, time.Millisecond*100, false, 1024)
- go client.Recv(nil)
- err = client.ConnectPrivate()
- if err != nil {
- t.Error(err)
- }
- wait := make(chan int, 100)
- for i := 0; i < 1; i++ {
- go func() {
- msg1 := client.NewMessage(MsgLog, "placeorder_log")
- msg2 := client.NewMessage(MsgMatch, "placeorder_matcher")
- msg := client.SendMessages(msg1, msg2)
- if msg.err != nil {
- t.Error(msg.err)
- return
- }
- //msgs := msg.Data.([]*Message)
- //log.Println("logid =", msgs[0].Id, "index = ", msg.Index)
- wait <- 1
- }()
- }
- for i := 0; i < 1; i++ {
- <-wait
- }
- }
|