package tick import "time" //import "log" import "tickserver/framework/msq" import "database/sql" import "fmt" import "strings" import "errors" type QueryClient struct { client *msq.MsqClient } func NewQueryClient(server *msq.MsqServer) (*QueryClient, error) { this := &QueryClient{} this.client = msq.NewMsqClient(server, time.Second, true, 102400) go this.start() err := this.client.ConnectPrivate() if err != nil { //log.Println(err) return nil, err } return this, nil } func (this *QueryClient) start() { this.client.RecvMulti(func(msgs []*msq.Message) { this.process(msgs) }) //log.Println("QueryClient start end") } func (this *QueryClient) process(msgs []*msq.Message) { msgs2 := make([]*msq.Message, len(msgs)) server := this.client.GetMsq() for i := 0; i < len(msgs); i++ { msgs2[i] = server.ProcessMultiStart(msgs[i]) } //保存到数据库 err := this.process2(msgs2) if err != nil { for i := 0; i < len(msgs); i++ { msgs2[i].SetErr(err) } } //保存成功 //重新发送到队列中去 for i := 0; i < len(msgs); i++ { server.ProcessMultiEnd(msgs[i], msgs2[i]) } } func (this *QueryClient) process2(msgs []*msq.Message) error { for i := 0; i < len(msgs); i++ { if msgs[i].Type == msq.MsgTKDown { err := this.download(msgs[i]) if err != nil { return err } } else if msgs[i].Type == msq.MsgTKHis { err := this.history(msgs[i]) if err != nil { return err } } } return nil } func (this *QueryClient) history(msg *msq.Message) error { req := msg.GetData().(*DownloadRequest) table := "tick_index" q := "select `begtime`, `endtime`, `path`, `ty`, `tickcount`, `totalcount` from " + table if req.End == 0 { req.End = 0x7FFFFFFF * 1000 } q += fmt.Sprintf(" where `begtime` >= %d and `begtime` <= %d and `ty` = '%s'", req.Start, req.End, req.Type) q += "order by `id` desc" if req.Count == 0 { req.Count = 0x7FFFFFFF } limit := fmt.Sprintf(" limit %d, %d", req.Offset, req.Count) q += limit var rows *sql.Rows var err error rows, err = db.Query(q) if err != nil { msg.SetErr(err) return nil } defer rows.Close() var indexs []TickIndex for rows.Next() { var idx TickIndex if err := rows.Scan(&idx.Begtime, &idx.Endtime, &idx.Path, &idx.Ty, &idx.Tickcount, &idx.Totalcount); err != nil { msg.SetErr(err) return nil } //idx.Path = path.Base(idx.Path) idx.Path = strings.TrimPrefix(idx.Path, serverconf.DataDir) indexs = append(indexs, idx) } if err := rows.Err(); err != nil { msg.SetErr(err) return nil } msg.SetData(indexs) return nil } func (this *QueryClient) download(msg *msq.Message) error { req := msg.GetData().(*DownloadRequest) if req.End == 0 { req.End = 0x7FFFFFFF * 1000 } //log.Println("beg", req) logsave, ok := tserver.logsaves[req.Type] if !ok { msg.SetErr(errors.New("no save writer")) return nil } ms := &MarketSave{} datas := logsave.GetData(req.Start/1000, req.End/1000, req.Offset, req.Count, ms) var ticks []Market for _, v := range datas { m := Market{ InsId: v.(*MarketSave).InsId, Timestamp: v.(*MarketSave).Timestamp, Close: v.(*MarketSave).Close, Open: v.(*MarketSave).Open, High: v.(*MarketSave).High, Low: v.(*MarketSave).Low, AllVolume: v.(*MarketSave).AllVolume, AllAmount: v.(*MarketSave).AllAmount, LastPrice: v.(*MarketSave).LastPrice, LastVolume: v.(*MarketSave).LastVolume, //Bids: v.(*MarketSave).Bids, //Asks: v.(*MarketSave).Asks, Type: int32(v.(*MarketSave).Type), } var zero float64 for i, bid := range v.(*MarketSave).Bids { if bid[0] == zero && bid[1] == zero { m.Bids = v.(*MarketSave).Bids[:i] break } } for i, ask := range v.(*MarketSave).Asks { if ask[0] == zero && ask[1] == zero { m.Asks = v.(*MarketSave).Asks[:i] break } } ticks = append(ticks, m) } //log.Println("end") msg.SetData(ticks) return nil } func (this *QueryClient) Query(msg *msq.Message) error { return this.client.Send(msg) }