log_client.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package tick
  2. import "time"
  3. //import "log"
  4. import "errors"
  5. import "tickserver/framework/msq"
  6. type LogClient struct {
  7. logclient *msq.MsqClient
  8. logId int64
  9. }
  10. func NewLogClient(server *msq.MsqServer, logId int64) (*LogClient, error) {
  11. this := &LogClient{}
  12. this.logclient = msq.NewMsqClient(server, time.Second, true, 10240)
  13. go this.start()
  14. err := this.logclient.ConnectPrivate()
  15. if err != nil {
  16. //log.Println(err)
  17. return nil, err
  18. }
  19. this.logId = logId
  20. return this, nil
  21. }
  22. func (this *LogClient) SetLogId(logId int64) {
  23. this.logId = logId
  24. }
  25. func (this *LogClient) start() {
  26. //log.Println("LogClient start()")
  27. this.logclient.RecvMulti(func(msgs []*msq.Message) {
  28. this.log(msgs)
  29. })
  30. //log.Println("LogClient start end")
  31. }
  32. //database id,time,insId,data
  33. func (this *LogClient) log(msgs []*msq.Message) {
  34. msgs2 := make([]*msq.Message, len(msgs))
  35. server := this.logclient.GetMsq()
  36. for i := 0; i < len(msgs); i++ {
  37. msgs2[i] = server.ProcessMultiStart(msgs[i])
  38. msgs2[i].RecvTime = nowTime()
  39. }
  40. retry := 2
  41. for {
  42. retry--
  43. if retry == 0 {
  44. break
  45. }
  46. err := this.log2(msgs2)
  47. if err != nil {
  48. //log.Println("log message err = ", err)
  49. time.Sleep(time.Second)
  50. continue
  51. }
  52. break
  53. }
  54. //保存成功
  55. //重新发送到队列中去
  56. for i := 0; i < len(msgs); i++ {
  57. server.ProcessMultiEnd(msgs[i], msgs2[i])
  58. }
  59. }
  60. func (this *LogClient) log2(msgs []*msq.Message) error {
  61. for i := 0; i < len(msgs); i++ {
  62. base, ok := msgs[i].GetData().(*Market)
  63. if !ok {
  64. msgs[i].SetErr(errors.New("msg not impl IReqbase"))
  65. continue
  66. }
  67. var err error
  68. typ := DataTypeName(int(base.Type))
  69. logsave, ok1 := tserver.logsaves[typ]
  70. if !ok1 {
  71. msgs[i].SetErr(errors.New("no save writer"))
  72. continue
  73. }
  74. m2 := Market2{
  75. InsId: base.InsId,
  76. Timestamp: base.Timestamp,
  77. Close: base.Close,
  78. Open: base.Open,
  79. High: base.High,
  80. Low: base.Low,
  81. AllVolume: base.AllVolume,
  82. AllAmount: base.AllAmount,
  83. LastPrice: base.LastPrice,
  84. LastVolume: base.LastVolume,
  85. //Bids: base.Bids,
  86. //Asks: base.Asks,
  87. Type: int64(base.Type),
  88. }
  89. for i, v := range base.Bids {
  90. if i >= 10 {
  91. break
  92. }
  93. m2.Bids[i] = v
  94. }
  95. for i, v := range base.Asks {
  96. if i >= 10 {
  97. break
  98. }
  99. m2.Asks[i] = v
  100. }
  101. ms := &MarketSave{m2}
  102. err = logsave.Save(ms)
  103. if err != nil {
  104. msgs[i].SetErr(err)
  105. //log.Println("debugggggggg33333333", DataTypeName(int(base.Type)), m2.InsId, err)
  106. continue
  107. }
  108. }
  109. return nil
  110. }
  111. func (this *LogClient) Log(msg *msq.Message) error {
  112. return this.logclient.Send(msg)
  113. }