package tick import "time" //import "log" import "errors" import "tickserver/framework/msq" type LogClient struct { logclient *msq.MsqClient logId int64 } func NewLogClient(server *msq.MsqServer, logId int64) (*LogClient, error) { this := &LogClient{} this.logclient = msq.NewMsqClient(server, time.Second, true, 10240) go this.start() err := this.logclient.ConnectPrivate() if err != nil { //log.Println(err) return nil, err } this.logId = logId return this, nil } func (this *LogClient) SetLogId(logId int64) { this.logId = logId } func (this *LogClient) start() { //log.Println("LogClient start()") this.logclient.RecvMulti(func(msgs []*msq.Message) { this.log(msgs) }) //log.Println("LogClient start end") } //database id,time,insId,data func (this *LogClient) log(msgs []*msq.Message) { msgs2 := make([]*msq.Message, len(msgs)) server := this.logclient.GetMsq() for i := 0; i < len(msgs); i++ { msgs2[i] = server.ProcessMultiStart(msgs[i]) msgs2[i].RecvTime = nowTime() } retry := 2 for { retry-- if retry == 0 { break } err := this.log2(msgs2) if err != nil { //log.Println("log message err = ", err) time.Sleep(time.Second) continue } break } //保存成功 //重新发送到队列中去 for i := 0; i < len(msgs); i++ { server.ProcessMultiEnd(msgs[i], msgs2[i]) } } func (this *LogClient) log2(msgs []*msq.Message) error { for i := 0; i < len(msgs); i++ { base, ok := msgs[i].GetData().(*Market) if !ok { msgs[i].SetErr(errors.New("msg not impl IReqbase")) continue } var err error typ := DataTypeName(int(base.Type)) logsave, ok1 := tserver.logsaves[typ] if !ok1 { msgs[i].SetErr(errors.New("no save writer")) continue } m2 := Market2{ InsId: base.InsId, Timestamp: base.Timestamp, Close: base.Close, Open: base.Open, High: base.High, Low: base.Low, AllVolume: base.AllVolume, AllAmount: base.AllAmount, LastPrice: base.LastPrice, LastVolume: base.LastVolume, //Bids: base.Bids, //Asks: base.Asks, Type: int64(base.Type), } for i, v := range base.Bids { if i >= 10 { break } m2.Bids[i] = v } for i, v := range base.Asks { if i >= 10 { break } m2.Asks[i] = v } ms := &MarketSave{m2} err = logsave.Save(ms) if err != nil { msgs[i].SetErr(err) //log.Println("debugggggggg33333333", DataTypeName(int(base.Type)), m2.InsId, err) continue } } return nil } func (this *LogClient) Log(msg *msq.Message) error { return this.logclient.Send(msg) }