MarketDataClient3.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package main
  2. import (
  3. "compress/gzip"
  4. "encoding/json"
  5. "io"
  6. "tickserver/api/lmaxapi"
  7. "tickserver/api/lmaxapi/main/common"
  8. "tickserver/api/lmaxapi/request"
  9. "tickserver/api/lmaxapi/response"
  10. "log"
  11. "net/http"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. var instrumentMap map[int64]*response.Instrument
  17. var imu sync.Mutex
  18. var priceMap = make(map[int64]*response.OrderBookEvent)
  19. var pmu sync.Mutex
  20. func setPrice(event *response.OrderBookEvent) {
  21. pmu.Lock()
  22. defer pmu.Unlock()
  23. priceMap[event.InstrumentId] = event
  24. }
  25. func getPrice() (events []*response.OrderBookEvent) {
  26. pmu.Lock()
  27. defer pmu.Unlock()
  28. for _, item := range priceMap {
  29. events = append(events, item)
  30. }
  31. return
  32. }
  33. func setInst(id int64, inst *response.Instrument) {
  34. imu.Lock()
  35. defer imu.Unlock()
  36. if id == 0 && inst == nil {
  37. instrumentMap = nil
  38. }
  39. if instrumentMap == nil {
  40. instrumentMap = make(map[int64]*response.Instrument)
  41. }
  42. instrumentMap[id] = inst
  43. }
  44. func getInst(id int64) *response.Instrument {
  45. imu.Lock()
  46. defer imu.Unlock()
  47. data, ok := instrumentMap[id]
  48. if ok {
  49. return data
  50. }
  51. return nil
  52. }
  53. func main() {
  54. ch := make(chan *response.OrderBookEvent, 100)
  55. go func() {
  56. for {
  57. event := <-ch
  58. setPrice(event)
  59. log.Println("dt = ", time.Now().UnixNano() / 1e6 - event.Timestamp, time.Unix(event.Timestamp / 1000, (event.Timestamp % 1000) * 1e6), event.InstrumentId)
  60. }
  61. }()
  62. go httpserver()
  63. for {
  64. marketdata(ch)
  65. //session stop的情况下,休息一会
  66. time.Sleep(1 * time.Second)
  67. }
  68. }
  69. func marketdata(ch chan *response.OrderBookEvent) {
  70. log.Println("MarketDataClient2")
  71. logw, err := common.EnableLog("proto.log")
  72. if err != nil {
  73. log.Println("EnableLog Failed:", err)
  74. return
  75. }
  76. defer logw.Close()
  77. session, err := common.CreateSession()
  78. if err != nil {
  79. log.Println("Login Failed:", err)
  80. time.Sleep(10 * time.Second)
  81. return
  82. }
  83. failed := 0
  84. session.RegisterStreamFailureEvent(func(s *lmaxapi.Session, err error) {
  85. //1. 处理下面的事件
  86. //1.1 网络中断:如果是交易,可能需要重新登录,以获取最新订单状态,如果是行情,可能不需要重启
  87. //1.2 session 过期,发生403错误,必须重新登录
  88. //1.3 heartbeat 心跳无响应: Op=stream err=heart beart timeout, Code=-1
  89. // 这个时候调用 session.StreamClose() 重新启动stream
  90. operr, ok := err.(*lmaxapi.OpError)
  91. log.Println("operr:", operr)
  92. if !ok {
  93. return
  94. }
  95. //1.2
  96. if operr.Code == 403 {
  97. log.Println("stop session")
  98. session.Stop()
  99. return
  100. }
  101. //1.1 and 1.3
  102. //stream 中发生错误,重启stream, 如果是交易,可能要选择重启session
  103. if operr.Op == "Stream" {
  104. log.Println("stop stream")
  105. if operr.Code == 0 {
  106. time.Sleep(1 * time.Second)
  107. }
  108. failed++
  109. if (failed == 3) {
  110. session.Stop()
  111. return
  112. }
  113. session.StopStream()
  114. return
  115. }
  116. })
  117. session.RegisterOrderBookEvent(func(s *lmaxapi.Session, event *response.OrderBookEvent) {
  118. failed = 0
  119. select {
  120. case ch <- event:
  121. default:
  122. }
  123. })
  124. log.Println("begin subscribe")
  125. session.LoadAllInstruments(func(value *response.Instrument) {
  126. if (value.Id > 4020) {
  127. return
  128. }
  129. setInst(value.Id, value)
  130. session.Subscribe(request.NewOrderBookSubscriptionRequest(value.Id), common.DefaultSubscribeCB)
  131. })
  132. session.Wait()
  133. log.Println("end subscribe")
  134. //检查stream 是否 alive, 5s 请求heartbeat 5s内没有任何响应,就是stream已经损坏
  135. session.HeartbeatTimeout(5 * time.Second)
  136. session.Start()
  137. }
  138. func httpserver() {
  139. s := &http.Server{
  140. Addr: ":6060",
  141. ReadTimeout: 10 * time.Second,
  142. WriteTimeout: 10 * time.Second,
  143. MaxHeaderBytes: 1 << 20,
  144. }
  145. http.Handle("/tickdata", MakeGzipHandler(http.HandlerFunc(tickdata)))
  146. log.Fatal(s.ListenAndServe())
  147. }
  148. func tickdata(w http.ResponseWriter, r *http.Request) {
  149. //ty := r.FormValue("t")
  150. cb := r.FormValue("callback")
  151. price := getPrice()
  152. str := "if (" + cb + ") " + cb + "("
  153. js, _ := json.Marshal(price)
  154. data := append([]byte(str), js...)
  155. data = append(data, []byte(")")...)
  156. w.Write(data)
  157. }
  158. func MakeGzipHandler(fn http.HandlerFunc) http.HandlerFunc {
  159. return func(w http.ResponseWriter, r *http.Request) {
  160. if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
  161. fn(w, r)
  162. return
  163. }
  164. w.Header().Set("Content-Encoding", "gzip")
  165. w.Header().Set("Content-Type", "text/javascript")
  166. gz := gzip.NewWriter(w)
  167. defer gz.Close()
  168. fn(gzipResponseWriter{Writer: gz, ResponseWriter: w}, r)
  169. }
  170. }
  171. type gzipResponseWriter struct {
  172. io.Writer
  173. http.ResponseWriter
  174. }
  175. func (w gzipResponseWriter) Write(b []byte) (int, error) {
  176. return w.Writer.Write(b)
  177. }