package msq import "time" import "log" type LogClient struct { logclient *MsqClient logId int64 } func NewLogClient(server *MsqServer, logId int64) (*LogClient, error) { this := &LogClient{} this.logclient = NewMsqClient(server, time.Second, true, 10240) go this.start() err := this.logclient.ConnectPrivate() if err != nil { log.Println(err) return nil, err } this.logId = logId return this, nil } func (this *LogClient) start() { //log.Println("LogClient start()") this.logclient.RecvMulti(func (msgs []*Message) { this.log(msgs) }) log.Println("LogClient start end") } func (this *LogClient) log(msgs []*Message) { msgs2 := make([]*Message, len(msgs)) server := this.logclient.GetMsq() for i := 0; i < len(msgs); i++ { msgs2[i] = server.ProcessMultiStart(msgs[i]) this.logId++ msgs2[i].Id = this.logId } //保存到数据库 err := this.log2(msgs2) if err != nil { for i := 0; i < len(msgs); i++ { msgs2[i].SetErr(err) } } //保存成功 //重新发送到队列中去 for i := 0; i < len(msgs); i++ { server.ProcessMultiEnd(msgs[i], msgs2[i]) } } func (this *LogClient) log2(msgs []*Message) error { time.Sleep(time.Millisecond * 100) log.Println("log message:", len(msgs)) return nil } func (this *LogClient) Log(msg *Message) error { return this.logclient.Send(msg) }