query_client.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package tick
  2. import "time"
  3. //import "log"
  4. import "tickserver/framework/msq"
  5. import "database/sql"
  6. import "fmt"
  7. import "strings"
  8. import "errors"
  9. type QueryClient struct {
  10. client *msq.MsqClient
  11. }
  12. func NewQueryClient(server *msq.MsqServer) (*QueryClient, error) {
  13. this := &QueryClient{}
  14. this.client = msq.NewMsqClient(server, time.Second, true, 102400)
  15. go this.start()
  16. err := this.client.ConnectPrivate()
  17. if err != nil {
  18. //log.Println(err)
  19. return nil, err
  20. }
  21. return this, nil
  22. }
  23. func (this *QueryClient) start() {
  24. this.client.RecvMulti(func(msgs []*msq.Message) {
  25. this.process(msgs)
  26. })
  27. //log.Println("QueryClient start end")
  28. }
  29. func (this *QueryClient) process(msgs []*msq.Message) {
  30. msgs2 := make([]*msq.Message, len(msgs))
  31. server := this.client.GetMsq()
  32. for i := 0; i < len(msgs); i++ {
  33. msgs2[i] = server.ProcessMultiStart(msgs[i])
  34. }
  35. //保存到数据库
  36. err := this.process2(msgs2)
  37. if err != nil {
  38. for i := 0; i < len(msgs); i++ {
  39. msgs2[i].SetErr(err)
  40. }
  41. }
  42. //保存成功
  43. //重新发送到队列中去
  44. for i := 0; i < len(msgs); i++ {
  45. server.ProcessMultiEnd(msgs[i], msgs2[i])
  46. }
  47. }
  48. func (this *QueryClient) process2(msgs []*msq.Message) error {
  49. for i := 0; i < len(msgs); i++ {
  50. if msgs[i].Type == msq.MsgTKDown {
  51. err := this.download(msgs[i])
  52. if err != nil {
  53. return err
  54. }
  55. } else if msgs[i].Type == msq.MsgTKHis {
  56. err := this.history(msgs[i])
  57. if err != nil {
  58. return err
  59. }
  60. }
  61. }
  62. return nil
  63. }
  64. func (this *QueryClient) history(msg *msq.Message) error {
  65. req := msg.GetData().(*DownloadRequest)
  66. table := "tick_index"
  67. q := "select `begtime`, `endtime`, `path`, `ty`, `tickcount`, `totalcount` from " + table
  68. if req.End == 0 {
  69. req.End = 0x7FFFFFFF * 1000
  70. }
  71. q += fmt.Sprintf(" where `begtime` >= %d and `begtime` <= %d and `ty` = '%s'",
  72. req.Start, req.End, req.Type)
  73. q += "order by `id` desc"
  74. if req.Count == 0 {
  75. req.Count = 0x7FFFFFFF
  76. }
  77. limit := fmt.Sprintf(" limit %d, %d", req.Offset, req.Count)
  78. q += limit
  79. var rows *sql.Rows
  80. var err error
  81. rows, err = db.Query(q)
  82. if err != nil {
  83. msg.SetErr(err)
  84. return nil
  85. }
  86. defer rows.Close()
  87. var indexs []TickIndex
  88. for rows.Next() {
  89. var idx TickIndex
  90. if err := rows.Scan(&idx.Begtime, &idx.Endtime, &idx.Path, &idx.Ty, &idx.Tickcount, &idx.Totalcount); err != nil {
  91. msg.SetErr(err)
  92. return nil
  93. }
  94. //idx.Path = path.Base(idx.Path)
  95. idx.Path = strings.TrimPrefix(idx.Path, serverconf.DataDir)
  96. indexs = append(indexs, idx)
  97. }
  98. if err := rows.Err(); err != nil {
  99. msg.SetErr(err)
  100. return nil
  101. }
  102. msg.SetData(indexs)
  103. return nil
  104. }
  105. func (this *QueryClient) download(msg *msq.Message) error {
  106. req := msg.GetData().(*DownloadRequest)
  107. if req.End == 0 {
  108. req.End = 0x7FFFFFFF * 1000
  109. }
  110. //log.Println("beg", req)
  111. logsave, ok := tserver.logsaves[req.Type]
  112. if !ok {
  113. msg.SetErr(errors.New("no save writer"))
  114. return nil
  115. }
  116. ms := &MarketSave{}
  117. datas := logsave.GetData(req.Start/1000, req.End/1000, req.Offset, req.Count, ms)
  118. var ticks []Market
  119. for _, v := range datas {
  120. m := Market{
  121. InsId: v.(*MarketSave).InsId,
  122. Timestamp: v.(*MarketSave).Timestamp,
  123. Close: v.(*MarketSave).Close,
  124. Open: v.(*MarketSave).Open,
  125. High: v.(*MarketSave).High,
  126. Low: v.(*MarketSave).Low,
  127. AllVolume: v.(*MarketSave).AllVolume,
  128. AllAmount: v.(*MarketSave).AllAmount,
  129. LastPrice: v.(*MarketSave).LastPrice,
  130. LastVolume: v.(*MarketSave).LastVolume,
  131. //Bids: v.(*MarketSave).Bids,
  132. //Asks: v.(*MarketSave).Asks,
  133. Type: int32(v.(*MarketSave).Type),
  134. }
  135. var zero float64
  136. for i, bid := range v.(*MarketSave).Bids {
  137. if bid[0] == zero && bid[1] == zero {
  138. m.Bids = v.(*MarketSave).Bids[:i]
  139. break
  140. }
  141. }
  142. for i, ask := range v.(*MarketSave).Asks {
  143. if ask[0] == zero && ask[1] == zero {
  144. m.Asks = v.(*MarketSave).Asks[:i]
  145. break
  146. }
  147. }
  148. ticks = append(ticks, m)
  149. }
  150. //log.Println("end")
  151. msg.SetData(ticks)
  152. return nil
  153. }
  154. func (this *QueryClient) Query(msg *msq.Message) error {
  155. return this.client.Send(msg)
  156. }