MarketDataClient2.go 9.0 KB


  1. package main
  2. import (
  3. "base"
  4. "compress/gzip"
  5. "tickserver/api/lmaxapi"
  6. "tickserver/api/lmaxapi/main/common"
  7. "tickserver/api/lmaxapi/request"
  8. "tickserver/api/lmaxapi/response"
  9. // "encoding/binary"
  10. "encoding/json"
  11. "io"
  12. "log"
  13. "markinfo"
  14. "net/http"
  15. "strings"
  16. "sync"
  17. // "tick"
  18. "sort"
  19. "time"
  20. )
  21. const (
  22. fxddOffset = 0
  23. lmaxOffset = 200
  24. )
  25. var instrumentMap map[int64]*response.Instrument
  26. var imu sync.Mutex
  27. var priceMap = make(map[int64]*response.OrderBookEvent)
  28. var pmu sync.Mutex
  29. type instrumentInfo struct {
  30. Unit float64
  31. PriceIncrement float64
  32. Symbol int
  33. Name string
  34. }
  35. type byPrice []response.PricePoint
  36. func (a byPrice) Len() int { return len(a) }
  37. func (a byPrice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  38. func (a byPrice) Less(i, j int) bool { return a[i].Price < a[j].Price }
  39. func setPrice(event *response.OrderBookEvent, offset int64) {
  40. pmu.Lock()
  41. defer pmu.Unlock()
  42. offset += int64(event.InstrumentId)
  43. priceMap[offset] = event
  44. }
  45. func checkOrder(event *response.OrderBookEvent) {
  46. for i, _ := range event.AskPrices {
  47. if i < len(event.AskPrices)-1 {
  48. if event.AskPrices[i].Price > event.AskPrices[i+1].Price {
  49. log.Fatal("XXXX")
  50. }
  51. }
  52. }
  53. for i, _ := range event.BidPrices {
  54. if i < len(event.BidPrices)-1 {
  55. if event.BidPrices[i].Price > event.BidPrices[i+1].Price {
  56. log.Fatal("ZZZZ")
  57. }
  58. }
  59. }
  60. }
  61. func getPrice5(ty string) (events []*response.OrderBookEvent) {
  62. pmu.Lock()
  63. defer pmu.Unlock()
  64. start := int64(0)
  65. if ty == "lmax" {
  66. start = lmaxOffset
  67. } else if ty == "fxdd" {
  68. start = fxddOffset
  69. } else {
  70. return nil
  71. }
  72. for k, evt := range priceMap {
  73. event := *evt // copy new
  74. if true || k >= start && k < start+200 {
  75. symbolId, err := markinfo.BookIdToSymbolId(int(event.InstrumentId))
  76. if err != nil {
  77. continue
  78. }
  79. event.InstrumentId = int64(symbolId) // use symbolId
  80. sort.Sort(byPrice(event.AskPrices)) // sort asks
  81. sort.Sort(byPrice(event.BidPrices)) // sort bids
  82. // checkOrder(event)
  83. events = append(events, &event)
  84. }
  85. }
  86. return
  87. }
  88. func getPrice(ty string) (events []*base.TickGo) {
  89. pmu.Lock()
  90. defer pmu.Unlock()
  91. start := int64(0)
  92. if ty == "lmax" {
  93. start = lmaxOffset
  94. } else if ty == "fxdd" {
  95. start = fxddOffset
  96. } else {
  97. return nil
  98. }
  99. for k, event := range priceMap {
  100. if true || k >= start && k < start+200 {
  101. event2 := event.ToTickGo()
  102. if event2.Time == 0 {
  103. v, ok := instrumentMap[event.InstrumentId]
  104. if !ok {
  105. continue
  106. }
  107. if strings.Index(v.Name, "US Crude (Spot)") != -1 {
  108. event2 = event.TickGo(markinfo.OILUSD)
  109. }
  110. if event2.Time == 0 {
  111. continue
  112. }
  113. }
  114. events = append(events, (*base.TickGo)(event2))
  115. }
  116. }
  117. return
  118. }
  119. func setInst(id int64, inst *response.Instrument) {
  120. imu.Lock()
  121. defer imu.Unlock()
  122. if id == 0 && inst == nil {
  123. instrumentMap = nil
  124. }
  125. if instrumentMap == nil {
  126. instrumentMap = make(map[int64]*response.Instrument)
  127. }
  128. instrumentMap[id] = inst
  129. }
  130. func getInst(id int64) *response.Instrument {
  131. imu.Lock()
  132. defer imu.Unlock()
  133. data, ok := instrumentMap[id]
  134. if ok {
  135. return data
  136. }
  137. return nil
  138. }
  139. func main() {
  140. go httpserver()
  141. go lmaxLoop()
  142. // go fxddLoop()
  143. // go btcLoop()
  144. select {}
  145. }
  146. func lmaxLoop() {
  147. for {
  148. marketdata()
  149. //session stop的情况下,休息一会
  150. time.Sleep(1 * time.Second)
  151. }
  152. }
  153. /*
  154. func fxddLoop() {
  155. for {
  156. fxdddata()
  157. time.Sleep(time.Second)
  158. }
  159. }
  160. func btcLoop() {
  161. for {
  162. btcdata()
  163. time.Sleep(time.Second)
  164. }
  165. }
  166. func fxdddata() error {
  167. d, err := tick.NewDownload("real/mt", "115.236.165.20", "34567") //23346
  168. if err != nil {
  169. log.Println(err)
  170. return err
  171. }
  172. d.SetOption(tick.ConnTimeOut, time.Second*10)
  173. d.SetOption(tick.ReadTimeOut, time.Second*10)
  174. d.SetOption(tick.WriteTimeOut, time.Second*10)
  175. _, err = d.Query()
  176. if err != nil {
  177. log.Println(err)
  178. return err
  179. }
  180. defer d.Close()
  181. for {
  182. var ti base.TickGo
  183. err := binary.Read(d, binary.LittleEndian, &ti)
  184. if err == tick.ErrSymbol {
  185. log.Println(err)
  186. continue
  187. }
  188. if err != nil {
  189. //打印错误
  190. log.Println(err)
  191. return err
  192. }
  193. setPrice(&ti, int64(fxddOffset))
  194. }
  195. }
  196. func btcdata() error {
  197. d, err := tick.NewDownload("real/mt", "115.236.165.20", "34560") //23346
  198. if err != nil {
  199. log.Println(err)
  200. return err
  201. }
  202. d.SetOption(tick.ConnTimeOut, time.Second*10)
  203. d.SetOption(tick.ReadTimeOut, time.Second*10)
  204. d.SetOption(tick.WriteTimeOut, time.Second*10)
  205. _, err = d.Query()
  206. if err != nil {
  207. log.Println(err)
  208. return err
  209. }
  210. defer d.Close()
  211. for {
  212. var ti base.TickGo
  213. err := binary.Read(d, binary.LittleEndian, &ti)
  214. if err == tick.ErrSymbol {
  215. log.Println(err)
  216. continue
  217. }
  218. if err != nil {
  219. //打印错误
  220. log.Println(err)
  221. return err
  222. }
  223. setPrice(&ti, int64(fxddOffset))
  224. }
  225. }
  226. */
  227. func marketdata() {
  228. log.Println("MarketDataClient2")
  229. logw, err := common.EnableLog("proto.log")
  230. if err != nil {
  231. log.Println("EnableLog Failed:", err)
  232. return
  233. }
  234. defer logw.Close()
  235. session, err := common.CreateSession()
  236. if err != nil {
  237. log.Println("Login Failed:", err)
  238. time.Sleep(10 * time.Second)
  239. return
  240. }
  241. session.RegisterStreamFailureEvent(func(s *lmaxapi.Session, err error) {
  242. //1. 处理下面的事件
  243. //1.1 网络中断:如果是交易,可能需要重新登录,以获取最新订单状态,如果是行情,可能不需要重启
  244. //1.2 session 过期,发生403错误,必须重新登录
  245. //1.3 heartbeat 心跳无响应: Op=stream err=heart beart timeout, Code=-1
  246. // 这个时候调用 session.StreamClose() 重新启动stream
  247. operr, ok := err.(*lmaxapi.OpError)
  248. log.Println("operr:", operr)
  249. if !ok {
  250. return
  251. }
  252. //1.2
  253. if operr.Code == 403 {
  254. log.Println("stop session")
  255. session.Stop()
  256. return
  257. }
  258. //1.1 and 1.3
  259. //stream 中发生错误,重启stream, 如果是交易,可能要选择重启session
  260. if operr.Op == "Stream" {
  261. log.Println("stop stream")
  262. if operr.Code == 0 {
  263. time.Sleep(1 * time.Second)
  264. }
  265. session.StopStream()
  266. return
  267. }
  268. })
  269. session.RegisterOrderBookEvent(func(s *lmaxapi.Session, event *response.OrderBookEvent) {
  270. setPrice(event, int64(lmaxOffset))
  271. })
  272. log.Println("begin subscribe")
  273. session.LoadAllInstruments(func(value *response.Instrument) {
  274. setInst(value.Id, value)
  275. session.Subscribe(request.NewOrderBookSubscriptionRequest(value.Id), common.DefaultSubscribeCB)
  276. })
  277. session.Wait()
  278. log.Println("end subscribe")
  279. //检查stream 是否 alive, 5s 请求heartbeat 5s内没有任何响应,就是stream已经损坏
  280. session.HeartbeatTimeout(5 * time.Second)
  281. session.Start()
  282. }
  283. func httpserver() {
  284. s := &http.Server{
  285. Addr: ":6061",
  286. ReadTimeout: 10 * time.Second,
  287. WriteTimeout: 10 * time.Second,
  288. MaxHeaderBytes: 1 << 20,
  289. }
  290. http.Handle("/tickdata", MakeGzipHandler(http.HandlerFunc(tickdata)))
  291. http.Handle("/tickdata5", MakeGzipHandler(http.HandlerFunc(tickdata5)))
  292. http.Handle("/symbols", MakeGzipHandler(http.HandlerFunc(symbols)))
  293. log.Fatal(s.ListenAndServe())
  294. }
  295. func tickdata(w http.ResponseWriter, r *http.Request) {
  296. //ty := r.FormValue("t")
  297. cb := r.FormValue("callback")
  298. str := "if (" + cb + ") " + cb + "("
  299. price := getPrice("lmax")
  300. js, _ := json.Marshal(price)
  301. data := append([]byte(str), js...)
  302. data = append(data, []byte(")")...)
  303. w.Write(data)
  304. }
  305. func tickdata5(w http.ResponseWriter, r *http.Request) {
  306. //ty := r.FormValue("t")
  307. cb := r.FormValue("callback")
  308. str := "if (" + cb + ") " + cb + "("
  309. price := getPrice5("lmax")
  310. js, _ := json.Marshal(price)
  311. data := append([]byte(str), js...)
  312. data = append(data, []byte(")")...)
  313. w.Write(data)
  314. }
  315. func MakeGzipHandler(fn http.HandlerFunc) http.HandlerFunc {
  316. return func(w http.ResponseWriter, r *http.Request) {
  317. if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
  318. fn(w, r)
  319. return
  320. }
  321. w.Header().Set("Content-Encoding", "gzip")
  322. w.Header().Set("Content-Type", "text/javascript")
  323. gz := gzip.NewWriter(w)
  324. defer gz.Close()
  325. fn(gzipResponseWriter{Writer: gz, ResponseWriter: w}, r)
  326. }
  327. }
  328. type gzipResponseWriter struct {
  329. io.Writer
  330. http.ResponseWriter
  331. }
  332. func (w gzipResponseWriter) Write(b []byte) (int, error) {
  333. return w.Writer.Write(b)
  334. }
  335. func getInsts() []instrumentInfo {
  336. imu.Lock()
  337. defer imu.Unlock()
  338. ret := make([]instrumentInfo, 0)
  339. var err error
  340. for _, v := range instrumentMap {
  341. info := instrumentInfo{}
  342. info.Symbol, err = markinfo.BookIdToSymbolId(int(v.Id))
  343. if err != nil {
  344. //匹配一些特殊的货币对
  345. if strings.Index(v.Name, "US Crude (Spot)") != -1 {
  346. info.Symbol = markinfo.OILUSD
  347. } else {
  348. continue
  349. }
  350. }
  351. info.Name, err = markinfo.SymbolName(info.Symbol)
  352. info.Unit = v.UnitPrice
  353. info.PriceIncrement = v.PriceIncrement
  354. ret = append(ret, info)
  355. }
  356. //添加两个固定的信息
  357. ltc := instrumentInfo{10000.0, 1e-5, 65, "LTCUSD"}
  358. btc := instrumentInfo{10000.0, 1e-3, 66, "BTCUSD"}
  359. ret = append(ret, ltc)
  360. ret = append(ret, btc)
  361. return ret
  362. }
  363. //symbols
  364. func symbols(w http.ResponseWriter, r *http.Request) {
  365. cb := r.FormValue("callback")
  366. insts := getInsts()
  367. str := "if (" + cb + ") " + cb + "("
  368. js, _ := json.Marshal(insts)
  369. data := append([]byte(str), js...)
  370. data = append(data, []byte(")")...)
  371. w.Write(data)
  372. }