123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- package tick
- import "time"
- //import "log"
- import "tickserver/framework/msq"
- import "database/sql"
- import "fmt"
- import "strings"
- import "errors"
- type QueryClient struct {
- client *msq.MsqClient
- }
- func NewQueryClient(server *msq.MsqServer) (*QueryClient, error) {
- this := &QueryClient{}
- this.client = msq.NewMsqClient(server, time.Second, true, 102400)
- go this.start()
- err := this.client.ConnectPrivate()
- if err != nil {
- //log.Println(err)
- return nil, err
- }
- return this, nil
- }
- func (this *QueryClient) start() {
- this.client.RecvMulti(func(msgs []*msq.Message) {
- this.process(msgs)
- })
- //log.Println("QueryClient start end")
- }
- func (this *QueryClient) process(msgs []*msq.Message) {
- msgs2 := make([]*msq.Message, len(msgs))
- server := this.client.GetMsq()
- for i := 0; i < len(msgs); i++ {
- msgs2[i] = server.ProcessMultiStart(msgs[i])
- }
- //保存到数据库
- err := this.process2(msgs2)
- if err != nil {
- for i := 0; i < len(msgs); i++ {
- msgs2[i].SetErr(err)
- }
- }
- //保存成功
- //重新发送到队列中去
- for i := 0; i < len(msgs); i++ {
- server.ProcessMultiEnd(msgs[i], msgs2[i])
- }
- }
- func (this *QueryClient) process2(msgs []*msq.Message) error {
- for i := 0; i < len(msgs); i++ {
- if msgs[i].Type == msq.MsgTKDown {
- err := this.download(msgs[i])
- if err != nil {
- return err
- }
- } else if msgs[i].Type == msq.MsgTKHis {
- err := this.history(msgs[i])
- if err != nil {
- return err
- }
- }
- }
- return nil
- }
- func (this *QueryClient) history(msg *msq.Message) error {
- req := msg.GetData().(*DownloadRequest)
- table := "tick_index"
- q := "select `begtime`, `endtime`, `path`, `ty`, `tickcount`, `totalcount` from " + table
- if req.End == 0 {
- req.End = 0x7FFFFFFF * 1000
- }
- q += fmt.Sprintf(" where `begtime` >= %d and `begtime` <= %d and `ty` = '%s'",
- req.Start, req.End, req.Type)
- q += "order by `id` desc"
- if req.Count == 0 {
- req.Count = 0x7FFFFFFF
- }
- limit := fmt.Sprintf(" limit %d, %d", req.Offset, req.Count)
- q += limit
- var rows *sql.Rows
- var err error
- rows, err = db.Query(q)
- if err != nil {
- msg.SetErr(err)
- return nil
- }
- defer rows.Close()
- var indexs []TickIndex
- for rows.Next() {
- var idx TickIndex
- if err := rows.Scan(&idx.Begtime, &idx.Endtime, &idx.Path, &idx.Ty, &idx.Tickcount, &idx.Totalcount); err != nil {
- msg.SetErr(err)
- return nil
- }
- //idx.Path = path.Base(idx.Path)
- idx.Path = strings.TrimPrefix(idx.Path, serverconf.DataDir)
- indexs = append(indexs, idx)
- }
- if err := rows.Err(); err != nil {
- msg.SetErr(err)
- return nil
- }
- msg.SetData(indexs)
- return nil
- }
- func (this *QueryClient) download(msg *msq.Message) error {
- req := msg.GetData().(*DownloadRequest)
- if req.End == 0 {
- req.End = 0x7FFFFFFF * 1000
- }
- //log.Println("beg", req)
- logsave, ok := tserver.logsaves[req.Type]
- if !ok {
- msg.SetErr(errors.New("no save writer"))
- return nil
- }
- ms := &MarketSave{}
- datas := logsave.GetData(req.Start/1000, req.End/1000, req.Offset, req.Count, ms)
- var ticks []Market
- for _, v := range datas {
- m := Market{
- InsId: v.(*MarketSave).InsId,
- Timestamp: v.(*MarketSave).Timestamp,
- Close: v.(*MarketSave).Close,
- Open: v.(*MarketSave).Open,
- High: v.(*MarketSave).High,
- Low: v.(*MarketSave).Low,
- AllVolume: v.(*MarketSave).AllVolume,
- AllAmount: v.(*MarketSave).AllAmount,
- LastPrice: v.(*MarketSave).LastPrice,
- LastVolume: v.(*MarketSave).LastVolume,
- //Bids: v.(*MarketSave).Bids,
- //Asks: v.(*MarketSave).Asks,
- Type: int32(v.(*MarketSave).Type),
- }
- var zero float64
- for i, bid := range v.(*MarketSave).Bids {
- if bid[0] == zero && bid[1] == zero {
- m.Bids = v.(*MarketSave).Bids[:i]
- break
- }
- }
- for i, ask := range v.(*MarketSave).Asks {
- if ask[0] == zero && ask[1] == zero {
- m.Asks = v.(*MarketSave).Asks[:i]
- break
- }
- }
- ticks = append(ticks, m)
- }
- //log.Println("end")
- msg.SetData(ticks)
- return nil
- }
- func (this *QueryClient) Query(msg *msq.Message) error {
- return this.client.Send(msg)
- }
|