ds_lmax.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package tick
  3. // 本文件实现lmax数据源接口, 实时数据和历史数据的获取和保存
  4. import (
  5. "fmt"
  6. "log"
  7. "time"
  8. "tickserver/api/lmaxapi"
  9. "tickserver/api/lmaxapi/request"
  10. "tickserver/api/lmaxapi/response"
  11. )
  12. // lmaxDS实现了dataSource接口, 并对lmax的历史数据和实时数据保存
  13. type LmaxDS struct {
  14. *DSBase
  15. errcount int64
  16. }
  17. func init() {
  18. drivers[Lmax] = newLmaxDS
  19. }
  20. func newLmaxDS(conf *DsConf) (DataSource, error) {
  21. return &LmaxDS{
  22. DSBase: NewDsBase(conf), // lmax 自己下载历史数据, 所以参数db==nil
  23. }, nil
  24. }
  25. func (lds *LmaxDS) Name() string {
  26. return Lmax
  27. }
  28. func (lds *LmaxDS) Run() {
  29. if !lds.conf.Run {
  30. log.Println("LmaxDS.run config NOT run")
  31. return
  32. }
  33. log.Println("LmaxDS.run")
  34. // 此goroutine用来处理实时行情数据, 避免lmaxapi回调阻塞
  35. eventCh := make(chan *response.OrderBookEvent, 4096)
  36. go func() {
  37. for {
  38. ev := <-eventCh
  39. lds.onMarket(ev)
  40. }
  41. }()
  42. fn := func() {
  43. for {
  44. // 登录产生新的Session
  45. ss, err := lds.login(lds.conf.User, lds.conf.PassWord, request.ProductType.CFD_LIVE, lds.conf.Url)
  46. if err != nil {
  47. time.Sleep(time.Second * 10)
  48. continue
  49. }
  50. ss.RegisterSessionDisconnected(func(s *lmaxapi.Session) {
  51. log.Println("session disconnected. STOP session!")
  52. s.Stop()
  53. })
  54. lds.regStreamError(ss)
  55. lds.marketdata(ss, eventCh) // 实时行情
  56. ss.LoadAllInstruments(func(value *response.Instrument) {
  57. lds.onInstrument(value)
  58. log.Println("subcribe", value.Id)
  59. ss.Subscribe(request.NewOrderBookSubscriptionRequest(value.Id), DefaultSubscribeCB)
  60. })
  61. ss.Wait()
  62. ss.HeartbeatTimeout(5 * time.Second)
  63. ss.Start()
  64. // 有错误时跳出Start重启
  65. log.Println("lmax session RESTART !!!")
  66. }
  67. }
  68. fn()
  69. }
  70. func (lds *LmaxDS) doErrCode(s *lmaxapi.Session, err error) {
  71. //1. 处理下面的事件
  72. //1.1 网络中断:如果是交易,可能需要重新登录,以获取最新订单状态,如果是行情,可能不需要重启
  73. //1.2 session 过期,发生403错误,必须重新登录
  74. //1.3 heartbeat 心跳无响应: Op=stream err=heart beart timeout, Code=-1
  75. // 这个时候调用 session.StreamClose() 重新启动stream
  76. operr, ok := err.(*lmaxapi.OpError)
  77. log.Println("operr:", operr)
  78. if !ok {
  79. return
  80. }
  81. //1.2
  82. if operr.Code == 403 {
  83. log.Println("stop session")
  84. s.Stop()
  85. return
  86. }
  87. //1.1 and 1.3
  88. //stream 中发生错误,重启stream, 如果是交易,可能要选择重启session
  89. if operr.Op == "Stream" {
  90. lds.errcount++
  91. log.Println("stop stream")
  92. if operr.Code == 0 {
  93. time.Sleep(1 * time.Second)
  94. }
  95. if lds.errcount == 3 {
  96. lds.errcount = 0
  97. s.Stop()
  98. return
  99. }
  100. s.StopStream()
  101. // ss.Stop()
  102. return
  103. }
  104. }
  105. func (lds *LmaxDS) regStreamError(ss *lmaxapi.Session) {
  106. // 注册数据流失败处理函数
  107. ss.RegisterStreamFailureEvent(func(s *lmaxapi.Session, err error) {
  108. lds.doErrCode(s, err)
  109. })
  110. }
  111. func (lds *LmaxDS) login(name, password, typ, url string) (*lmaxapi.Session, error) {
  112. log.Println("LmaxDS.login:", name)
  113. lmaxapi.SetBaseUrl(url)
  114. req := request.NewLoginRequest(name, password, typ)
  115. ss, err := lmaxapi.Login(req)
  116. if err != nil {
  117. s := err.Error()
  118. if len(s) > 32 {
  119. s = s[:32]
  120. }
  121. log.Println("lmaxapi.Login error:", s)
  122. return nil, err
  123. }
  124. log.Println(name, "login OK!")
  125. return ss, nil
  126. }
  127. func (lds *LmaxDS) marketdata(session *lmaxapi.Session, eventCh chan *response.OrderBookEvent) {
  128. log.Println("@@@:marketdata")
  129. // 注册请求实时行情数据
  130. session.RegisterOrderBookEvent(func(s *lmaxapi.Session, ev *response.OrderBookEvent) {
  131. debugDelay("###:", fmt.Sprintf("lmax_%d", ev.InstrumentId), ev.Timestamp)
  132. lds.errcount = 0
  133. select {
  134. case eventCh <- ev:
  135. default:
  136. }
  137. })
  138. }
  139. func (lds *LmaxDS) onMarket(event *response.OrderBookEvent) {
  140. insId := event.InstrumentId
  141. mk := &Market{}
  142. mk.InsId = insId
  143. mk.Timestamp = event.Timestamp
  144. mk.Type = IntLmax
  145. if event.HasMarketClosePrice {
  146. mk.Close = event.MktClosePrice
  147. mk.Open = mk.Close
  148. }
  149. if event.HasLastTradedPrice {
  150. mk.LastPrice = event.LastTradedPrice
  151. }
  152. if event.HasDailyHighestTradedPrice {
  153. mk.High = event.DailyHighestTradedPrice
  154. }
  155. if event.HasDailyLowestTradedPrice {
  156. mk.Low = event.DailyLowestTradedPrice
  157. }
  158. if len(event.AskPrices) > 0 {
  159. asks := make([]PP, len(event.AskPrices))
  160. for i, b := range event.AskPrices {
  161. asks[i][0] = b.Price
  162. asks[i][1] = b.Quantity
  163. }
  164. mk.Asks = asks
  165. }
  166. if len(event.BidPrices) > 0 {
  167. bids := make([]PP, len(event.BidPrices))
  168. for i, b := range event.BidPrices {
  169. bids[i][0] = b.Price
  170. bids[i][1] = b.Quantity
  171. }
  172. mk.Bids = bids
  173. }
  174. lds.Save(mk)
  175. }
  176. func (lds *LmaxDS) onInstrument(value *response.Instrument) {
  177. ins := &Instrument{
  178. Id: value.Id,
  179. Name: value.Name,
  180. ExId: Lmax,
  181. Type: Forex,
  182. PriceInc: value.PriceIncrement,
  183. Margin: value.MarginRate,
  184. StartTime: value.StartTime.Unix() * 1000, // ms
  185. }
  186. lds.insMap[ins.Id] = ins
  187. }
  188. func DefaultSubscribeCB(err error) {
  189. if err != nil {
  190. fmt.Println("Failed:", err)
  191. }
  192. }