package msq import ( "fmt" "log" "net/http" "testing" "time" ) var _ = log.Println //测试一个服务器: //1. 有个数字不断的增加 //2. 通过一个http服务器读出这个数字的值,不需要lock //这个测试服务器会部署到智能交易读取当前内存状态的函数里面去 var gcount int const MsgTestCount = MsgCount + 1 func TestNoLockServer(t *testing.T) { q := NewMsqServer() go func() { for { time.Sleep(time.Second) q.ProcessOne() } }() var MsgTestCount = MsgCount + 1 q.RegisterAction(MsgTestCount, func(MsqServer *MsqServer, msg *Message) { msg2 := q.ProcessMultiStart(msg) log.Println("count = ", gcount) gcount++ msg2.data = gcount q.ProcessMultiEnd(msg, msg2) }) RegMessage(MsgTestCount, "MsgTestCount") client := NewMsqClient(q, time.Second, false, 1024) go client.Recv(nil) client.ConnectPrivate() http.HandleFunc("/count", func(w http.ResponseWriter, r *http.Request) { msg := client.SendMessage(MsgTestCount, "count") if msg.err != nil { t.Error(msg.err) return } fmt.Fprintf(w, "Hello, %d", msg.data.(int)) }) log.Fatal(http.ListenAndServe(":8080", nil)) } func TestEchoMsq(t *testing.T) { q := NewMsqServer() go func() { for { time.Sleep(time.Second) q.ProcessOne() } }() client := NewMsqClient(q, time.Second, false, 1024) go client.Recv(nil) client.ConnectPrivate() msg := client.SendMessage(MsgEcho, "echo") if msg.err != nil { t.Error(msg.err) return } if msg.data.(string) != "echo" { t.Errorf("mtf echo error") return } err := client.Close() if err != nil { t.Error(err) return } q.Close() } func BenchmarkEchoMsq(b *testing.B) { q := NewMsqServer() go q.Start() client := NewMsqClient(q, time.Millisecond*10000, false, 1024) go client.Recv(nil) err := client.ConnectPrivate() if err != nil { b.Error(err) } //log.Println("ready") for i := 0; i < b.N; i++ { msg := client.SendMessageAsyn(MsgEcho, "echo") if msg.err != nil { b.Error(msg.err) return } } lastmsg := client.GetLastSyncMsg() <-lastmsg.Ch err = client.Close() if err != nil { b.Error(err) return } q.Close() } //ToDO: 提高multi的性能。使得在异步的条件下可以提高发送的速度 func BenchmarkEchoMsgMulti(b *testing.B) { q := NewMsqServer() go q.Start() client := NewMsqClient(q, time.Millisecond*10000, false, 1024) go client.Recv(nil) err := client.ConnectPrivate() if err != nil { b.Error(err) } //log.Println("ready") msg1 := client.NewMessage(MsgEcho, "placeorder_log") msg2 := client.NewMessage(MsgEcho, "placeorder_matcher") for i := 0; i < b.N; i++ { msg1.data = "placeorder_log" msg2.data = "placeorder_matcher" msg := client.SendMessagesAsyn(msg1, msg2) if msg.err != nil { b.Error(msg.err) return } } lastmsg := client.GetLastSyncMsg() <-lastmsg.Ch err = client.Close() if err != nil { b.Error(err) return } q.Close() } func TestMsqMulti(t *testing.T) { q := NewMsqServer() go q.Start() client := NewMsqClient(q, time.Second, false, 1024) go client.Recv(nil) err := client.ConnectPrivate() if err != nil { t.Error(err) } //这个msg 实际上是由多个环节组成的,所以msg.Data 返回的是一个数组 msg1 := client.NewMessage(MsgEcho, "placeorder_log") msg2 := client.NewMessage(MsgEcho, "placeorder_matcher") msg := client.SendMessages(msg1, msg2) //两组消息会分别执行,如果批量执行回保留消息的执行顺序 //如果有消息执行错误,立马回返回。无错误的情况只有可能 //是所有的消息执行都没有错误。 if msg.err != nil { t.Error(msg.err) return } msgs := msg.data.([]*Message) for i := 0; i < len(msgs); i++ { if msgs[i].err != nil { t.Error(msg.err) return } } if msgs[0].data.(string) != "placeorder_log" { t.Error("msg placeorder_log error") } if msgs[1].data.(string) != "placeorder_matcher" { t.Error("msg placeorder_matcher error") } err = client.Close() if err != nil { t.Error(err) return } q.Close() } //处理某个操作可能比较费时间 func TestMsqLog(t *testing.T) { q := NewMsqServer() go q.Start() defer q.Close() logclient, err := NewLogClient(q, 10000) if err != nil { t.Error(err) } q.RegisterAction(MsgLog, func(MsqServer *MsqServer, msg *Message) { logclient.Log(msg) }) q.RegisterAction(MsgMatch, func(MsqServer *MsqServer, msg *Message) { msg1 := q.ProcessMultiPrev(msg) //log.Println("logId = ", msg1.Id) q.ProcessMultiEnd(msg, msg1) }) client := NewMsqClient(q, time.Millisecond*100, false, 1024) go client.Recv(nil) err = client.ConnectPrivate() if err != nil { t.Error(err) } wait := make(chan int, 100) for i := 0; i < 1; i++ { go func() { msg1 := client.NewMessage(MsgLog, "placeorder_log") msg2 := client.NewMessage(MsgMatch, "placeorder_matcher") msg := client.SendMessages(msg1, msg2) if msg.err != nil { t.Error(msg.err) return } //msgs := msg.Data.([]*Message) //log.Println("logid =", msgs[0].Id, "index = ", msg.Index) wait <- 1 }() } for i := 0; i < 1; i++ { <-wait } }