|
- package main
- import (
- "base"
- "compress/gzip"
- "tickserver/api/lmaxapi"
- "tickserver/api/lmaxapi/main/common"
- "tickserver/api/lmaxapi/request"
- "tickserver/api/lmaxapi/response"
- // "encoding/binary"
- "encoding/json"
- "io"
- "log"
- "markinfo"
- "net/http"
- "strings"
- "sync"
- // "tick"
- "sort"
- "time"
- )
- const (
- fxddOffset = 0
- lmaxOffset = 200
- )
- var instrumentMap map[int64]*response.Instrument
- var imu sync.Mutex
- var priceMap = make(map[int64]*response.OrderBookEvent)
- var pmu sync.Mutex
- type instrumentInfo struct {
- Unit float64
- PriceIncrement float64
- Symbol int
- Name string
- }
- type byPrice []response.PricePoint
- func (a byPrice) Len() int { return len(a) }
- func (a byPrice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
- func (a byPrice) Less(i, j int) bool { return a[i].Price < a[j].Price }
- func setPrice(event *response.OrderBookEvent, offset int64) {
- pmu.Lock()
- defer pmu.Unlock()
- offset += int64(event.InstrumentId)
- priceMap[offset] = event
- }
- func checkOrder(event *response.OrderBookEvent) {
- for i, _ := range event.AskPrices {
- if i < len(event.AskPrices)-1 {
- if event.AskPrices[i].Price > event.AskPrices[i+1].Price {
- log.Fatal("XXXX")
- }
- }
- }
- for i, _ := range event.BidPrices {
- if i < len(event.BidPrices)-1 {
- if event.BidPrices[i].Price > event.BidPrices[i+1].Price {
- log.Fatal("ZZZZ")
- }
- }
- }
- }
- func getPrice5(ty string) (events []*response.OrderBookEvent) {
- pmu.Lock()
- defer pmu.Unlock()
- start := int64(0)
- if ty == "lmax" {
- start = lmaxOffset
- } else if ty == "fxdd" {
- start = fxddOffset
- } else {
- return nil
- }
- for k, evt := range priceMap {
- event := *evt // copy new
- if true || k >= start && k < start+200 {
- symbolId, err := markinfo.BookIdToSymbolId(int(event.InstrumentId))
- if err != nil {
- continue
- }
- event.InstrumentId = int64(symbolId) // use symbolId
- sort.Sort(byPrice(event.AskPrices)) // sort asks
- sort.Sort(byPrice(event.BidPrices)) // sort bids
- // checkOrder(event)
- events = append(events, &event)
- }
- }
- return
- }
- func getPrice(ty string) (events []*base.TickGo) {
- pmu.Lock()
- defer pmu.Unlock()
- start := int64(0)
- if ty == "lmax" {
- start = lmaxOffset
- } else if ty == "fxdd" {
- start = fxddOffset
- } else {
- return nil
- }
- for k, event := range priceMap {
- if true || k >= start && k < start+200 {
- event2 := event.ToTickGo()
- if event2.Time == 0 {
- v, ok := instrumentMap[event.InstrumentId]
- if !ok {
- continue
- }
- if strings.Index(v.Name, "US Crude (Spot)") != -1 {
- event2 = event.TickGo(markinfo.OILUSD)
- }
- if event2.Time == 0 {
- continue
- }
- }
- events = append(events, (*base.TickGo)(event2))
- }
- }
- return
- }
- func setInst(id int64, inst *response.Instrument) {
- imu.Lock()
- defer imu.Unlock()
- if id == 0 && inst == nil {
- instrumentMap = nil
- }
- if instrumentMap == nil {
- instrumentMap = make(map[int64]*response.Instrument)
- }
- instrumentMap[id] = inst
- }
- func getInst(id int64) *response.Instrument {
- imu.Lock()
- defer imu.Unlock()
- data, ok := instrumentMap[id]
- if ok {
- return data
- }
- return nil
- }
- func main() {
- go httpserver()
- go lmaxLoop()
- // go fxddLoop()
- // go btcLoop()
- select {}
- }
- func lmaxLoop() {
- for {
- marketdata()
- //session stop的情况下,休息一会
- time.Sleep(1 * time.Second)
- }
- }
- /*
- func fxddLoop() {
- for {
- fxdddata()
- time.Sleep(time.Second)
- }
- }
- func btcLoop() {
- for {
- btcdata()
- time.Sleep(time.Second)
- }
- }
- func fxdddata() error {
- d, err := tick.NewDownload("real/mt", "115.236.165.20", "34567") //23346
- if err != nil {
- log.Println(err)
- return err
- }
- d.SetOption(tick.ConnTimeOut, time.Second*10)
- d.SetOption(tick.ReadTimeOut, time.Second*10)
- d.SetOption(tick.WriteTimeOut, time.Second*10)
- _, err = d.Query()
- if err != nil {
- log.Println(err)
- return err
- }
- defer d.Close()
- for {
- var ti base.TickGo
- err := binary.Read(d, binary.LittleEndian, &ti)
- if err == tick.ErrSymbol {
- log.Println(err)
- continue
- }
- if err != nil {
- //打印错误
- log.Println(err)
- return err
- }
- setPrice(&ti, int64(fxddOffset))
- }
- }
- func btcdata() error {
- d, err := tick.NewDownload("real/mt", "115.236.165.20", "34560") //23346
- if err != nil {
- log.Println(err)
- return err
- }
- d.SetOption(tick.ConnTimeOut, time.Second*10)
- d.SetOption(tick.ReadTimeOut, time.Second*10)
- d.SetOption(tick.WriteTimeOut, time.Second*10)
- _, err = d.Query()
- if err != nil {
- log.Println(err)
- return err
- }
- defer d.Close()
- for {
- var ti base.TickGo
- err := binary.Read(d, binary.LittleEndian, &ti)
- if err == tick.ErrSymbol {
- log.Println(err)
- continue
- }
- if err != nil {
- //打印错误
- log.Println(err)
- return err
- }
- setPrice(&ti, int64(fxddOffset))
- }
- }
- */
- func marketdata() {
- log.Println("MarketDataClient2")
- logw, err := common.EnableLog("proto.log")
- if err != nil {
- log.Println("EnableLog Failed:", err)
- return
- }
- defer logw.Close()
- session, err := common.CreateSession()
- if err != nil {
- log.Println("Login Failed:", err)
- time.Sleep(10 * time.Second)
- return
- }
- session.RegisterStreamFailureEvent(func(s *lmaxapi.Session, err error) {
- //1. 处理下面的事件
- //1.1 网络中断:如果是交易,可能需要重新登录,以获取最新订单状态,如果是行情,可能不需要重启
- //1.2 session 过期,发生403错误,必须重新登录
- //1.3 heartbeat 心跳无响应: Op=stream err=heart beart timeout, Code=-1
- // 这个时候调用 session.StreamClose() 重新启动stream
- operr, ok := err.(*lmaxapi.OpError)
- log.Println("operr:", operr)
- if !ok {
- return
- }
- //1.2
- if operr.Code == 403 {
- log.Println("stop session")
- session.Stop()
- return
- }
- //1.1 and 1.3
- //stream 中发生错误,重启stream, 如果是交易,可能要选择重启session
- if operr.Op == "Stream" {
- log.Println("stop stream")
- if operr.Code == 0 {
- time.Sleep(1 * time.Second)
- }
- session.StopStream()
- return
- }
- })
- session.RegisterOrderBookEvent(func(s *lmaxapi.Session, event *response.OrderBookEvent) {
- setPrice(event, int64(lmaxOffset))
- })
- log.Println("begin subscribe")
- session.LoadAllInstruments(func(value *response.Instrument) {
- setInst(value.Id, value)
- session.Subscribe(request.NewOrderBookSubscriptionRequest(value.Id), common.DefaultSubscribeCB)
- })
- session.Wait()
- log.Println("end subscribe")
- //检查stream 是否 alive, 5s 请求heartbeat 5s内没有任何响应,就是stream已经损坏
- session.HeartbeatTimeout(5 * time.Second)
- session.Start()
- }
- func httpserver() {
- s := &http.Server{
- Addr: ":6061",
- ReadTimeout: 10 * time.Second,
- WriteTimeout: 10 * time.Second,
- MaxHeaderBytes: 1 << 20,
- }
- http.Handle("/tickdata", MakeGzipHandler(http.HandlerFunc(tickdata)))
- http.Handle("/tickdata5", MakeGzipHandler(http.HandlerFunc(tickdata5)))
- http.Handle("/symbols", MakeGzipHandler(http.HandlerFunc(symbols)))
- log.Fatal(s.ListenAndServe())
- }
- func tickdata(w http.ResponseWriter, r *http.Request) {
- //ty := r.FormValue("t")
- cb := r.FormValue("callback")
- str := "if (" + cb + ") " + cb + "("
- price := getPrice("lmax")
- js, _ := json.Marshal(price)
- data := append([]byte(str), js...)
- data = append(data, []byte(")")...)
- w.Write(data)
- }
- func tickdata5(w http.ResponseWriter, r *http.Request) {
- //ty := r.FormValue("t")
- cb := r.FormValue("callback")
- str := "if (" + cb + ") " + cb + "("
- price := getPrice5("lmax")
- js, _ := json.Marshal(price)
- data := append([]byte(str), js...)
- data = append(data, []byte(")")...)
- w.Write(data)
- }
- func MakeGzipHandler(fn http.HandlerFunc) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
- fn(w, r)
- return
- }
- w.Header().Set("Content-Encoding", "gzip")
- w.Header().Set("Content-Type", "text/javascript")
- gz := gzip.NewWriter(w)
- defer gz.Close()
- fn(gzipResponseWriter{Writer: gz, ResponseWriter: w}, r)
- }
- }
- type gzipResponseWriter struct {
- io.Writer
- http.ResponseWriter
- }
- func (w gzipResponseWriter) Write(b []byte) (int, error) {
- return w.Writer.Write(b)
- }
- func getInsts() []instrumentInfo {
- imu.Lock()
- defer imu.Unlock()
- ret := make([]instrumentInfo, 0)
- var err error
- for _, v := range instrumentMap {
- info := instrumentInfo{}
- info.Symbol, err = markinfo.BookIdToSymbolId(int(v.Id))
- if err != nil {
- //匹配一些特殊的货币对
- if strings.Index(v.Name, "US Crude (Spot)") != -1 {
- info.Symbol = markinfo.OILUSD
- } else {
- continue
- }
- }
- info.Name, err = markinfo.SymbolName(info.Symbol)
- info.Unit = v.UnitPrice
- info.PriceIncrement = v.PriceIncrement
- ret = append(ret, info)
- }
- //添加两个固定的信息
- ltc := instrumentInfo{10000.0, 1e-5, 65, "LTCUSD"}
- btc := instrumentInfo{10000.0, 1e-3, 66, "BTCUSD"}
- ret = append(ret, ltc)
- ret = append(ret, btc)
- return ret
- }
- //symbols
- func symbols(w http.ResponseWriter, r *http.Request) {
- cb := r.FormValue("callback")
- insts := getInsts()
- str := "if (" + cb + ") " + cb + "("
- js, _ := json.Marshal(insts)
- data := append([]byte(str), js...)
- data = append(data, []byte(")")...)
- w.Write(data)
- }
|