package main import ( "base" "compress/gzip" "tickserver/api/lmaxapi" "tickserver/api/lmaxapi/main/common" "tickserver/api/lmaxapi/request" "tickserver/api/lmaxapi/response" // "encoding/binary" "encoding/json" "io" "log" "markinfo" "net/http" "strings" "sync" // "tick" "sort" "time" ) const ( fxddOffset = 0 lmaxOffset = 200 ) var instrumentMap map[int64]*response.Instrument var imu sync.Mutex var priceMap = make(map[int64]*response.OrderBookEvent) var pmu sync.Mutex type instrumentInfo struct { Unit float64 PriceIncrement float64 Symbol int Name string } type byPrice []response.PricePoint func (a byPrice) Len() int { return len(a) } func (a byPrice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a byPrice) Less(i, j int) bool { return a[i].Price < a[j].Price } func setPrice(event *response.OrderBookEvent, offset int64) { pmu.Lock() defer pmu.Unlock() offset += int64(event.InstrumentId) priceMap[offset] = event } func checkOrder(event *response.OrderBookEvent) { for i, _ := range event.AskPrices { if i < len(event.AskPrices)-1 { if event.AskPrices[i].Price > event.AskPrices[i+1].Price { log.Fatal("XXXX") } } } for i, _ := range event.BidPrices { if i < len(event.BidPrices)-1 { if event.BidPrices[i].Price > event.BidPrices[i+1].Price { log.Fatal("ZZZZ") } } } } func getPrice5(ty string) (events []*response.OrderBookEvent) { pmu.Lock() defer pmu.Unlock() start := int64(0) if ty == "lmax" { start = lmaxOffset } else if ty == "fxdd" { start = fxddOffset } else { return nil } for k, evt := range priceMap { event := *evt // copy new if true || k >= start && k < start+200 { symbolId, err := markinfo.BookIdToSymbolId(int(event.InstrumentId)) if err != nil { continue } event.InstrumentId = int64(symbolId) // use symbolId sort.Sort(byPrice(event.AskPrices)) // sort asks sort.Sort(byPrice(event.BidPrices)) // sort bids // checkOrder(event) events = append(events, &event) } } return } func getPrice(ty string) (events []*base.TickGo) { pmu.Lock() defer pmu.Unlock() start := int64(0) if ty == "lmax" { start = lmaxOffset } else if ty == "fxdd" { start = fxddOffset } else { return nil } for k, event := range priceMap { if true || k >= start && k < start+200 { event2 := event.ToTickGo() if event2.Time == 0 { v, ok := instrumentMap[event.InstrumentId] if !ok { continue } if strings.Index(v.Name, "US Crude (Spot)") != -1 { event2 = event.TickGo(markinfo.OILUSD) } if event2.Time == 0 { continue } } events = append(events, (*base.TickGo)(event2)) } } return } func setInst(id int64, inst *response.Instrument) { imu.Lock() defer imu.Unlock() if id == 0 && inst == nil { instrumentMap = nil } if instrumentMap == nil { instrumentMap = make(map[int64]*response.Instrument) } instrumentMap[id] = inst } func getInst(id int64) *response.Instrument { imu.Lock() defer imu.Unlock() data, ok := instrumentMap[id] if ok { return data } return nil } func main() { go httpserver() go lmaxLoop() // go fxddLoop() // go btcLoop() select {} } func lmaxLoop() { for { marketdata() //session stop的情况下,休息一会 time.Sleep(1 * time.Second) } } /* func fxddLoop() { for { fxdddata() time.Sleep(time.Second) } } func btcLoop() { for { btcdata() time.Sleep(time.Second) } } func fxdddata() error { d, err := tick.NewDownload("real/mt", "115.236.165.20", "34567") //23346 if err != nil { log.Println(err) return err } d.SetOption(tick.ConnTimeOut, time.Second*10) d.SetOption(tick.ReadTimeOut, time.Second*10) d.SetOption(tick.WriteTimeOut, time.Second*10) _, err = d.Query() if err != nil { log.Println(err) return err } defer d.Close() for { var ti base.TickGo err := binary.Read(d, binary.LittleEndian, &ti) if err == tick.ErrSymbol { log.Println(err) continue } if err != nil { //打印错误 log.Println(err) return err } setPrice(&ti, int64(fxddOffset)) } } func btcdata() error { d, err := tick.NewDownload("real/mt", "115.236.165.20", "34560") //23346 if err != nil { log.Println(err) return err } d.SetOption(tick.ConnTimeOut, time.Second*10) d.SetOption(tick.ReadTimeOut, time.Second*10) d.SetOption(tick.WriteTimeOut, time.Second*10) _, err = d.Query() if err != nil { log.Println(err) return err } defer d.Close() for { var ti base.TickGo err := binary.Read(d, binary.LittleEndian, &ti) if err == tick.ErrSymbol { log.Println(err) continue } if err != nil { //打印错误 log.Println(err) return err } setPrice(&ti, int64(fxddOffset)) } } */ func marketdata() { log.Println("MarketDataClient2") logw, err := common.EnableLog("proto.log") if err != nil { log.Println("EnableLog Failed:", err) return } defer logw.Close() session, err := common.CreateSession() if err != nil { log.Println("Login Failed:", err) time.Sleep(10 * time.Second) return } session.RegisterStreamFailureEvent(func(s *lmaxapi.Session, err error) { //1. 处理下面的事件 //1.1 网络中断:如果是交易,可能需要重新登录,以获取最新订单状态,如果是行情,可能不需要重启 //1.2 session 过期,发生403错误,必须重新登录 //1.3 heartbeat 心跳无响应: Op=stream err=heart beart timeout, Code=-1 // 这个时候调用 session.StreamClose() 重新启动stream operr, ok := err.(*lmaxapi.OpError) log.Println("operr:", operr) if !ok { return } //1.2 if operr.Code == 403 { log.Println("stop session") session.Stop() return } //1.1 and 1.3 //stream 中发生错误,重启stream, 如果是交易,可能要选择重启session if operr.Op == "Stream" { log.Println("stop stream") if operr.Code == 0 { time.Sleep(1 * time.Second) } session.StopStream() return } }) session.RegisterOrderBookEvent(func(s *lmaxapi.Session, event *response.OrderBookEvent) { setPrice(event, int64(lmaxOffset)) }) log.Println("begin subscribe") session.LoadAllInstruments(func(value *response.Instrument) { setInst(value.Id, value) session.Subscribe(request.NewOrderBookSubscriptionRequest(value.Id), common.DefaultSubscribeCB) }) session.Wait() log.Println("end subscribe") //检查stream 是否 alive, 5s 请求heartbeat 5s内没有任何响应,就是stream已经损坏 session.HeartbeatTimeout(5 * time.Second) session.Start() } func httpserver() { s := &http.Server{ Addr: ":6061", ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, MaxHeaderBytes: 1 << 20, } http.Handle("/tickdata", MakeGzipHandler(http.HandlerFunc(tickdata))) http.Handle("/tickdata5", MakeGzipHandler(http.HandlerFunc(tickdata5))) http.Handle("/symbols", MakeGzipHandler(http.HandlerFunc(symbols))) log.Fatal(s.ListenAndServe()) } func tickdata(w http.ResponseWriter, r *http.Request) { //ty := r.FormValue("t") cb := r.FormValue("callback") str := "if (" + cb + ") " + cb + "(" price := getPrice("lmax") js, _ := json.Marshal(price) data := append([]byte(str), js...) data = append(data, []byte(")")...) w.Write(data) } func tickdata5(w http.ResponseWriter, r *http.Request) { //ty := r.FormValue("t") cb := r.FormValue("callback") str := "if (" + cb + ") " + cb + "(" price := getPrice5("lmax") js, _ := json.Marshal(price) data := append([]byte(str), js...) data = append(data, []byte(")")...) w.Write(data) } func MakeGzipHandler(fn http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { fn(w, r) return } w.Header().Set("Content-Encoding", "gzip") w.Header().Set("Content-Type", "text/javascript") gz := gzip.NewWriter(w) defer gz.Close() fn(gzipResponseWriter{Writer: gz, ResponseWriter: w}, r) } } type gzipResponseWriter struct { io.Writer http.ResponseWriter } func (w gzipResponseWriter) Write(b []byte) (int, error) { return w.Writer.Write(b) } func getInsts() []instrumentInfo { imu.Lock() defer imu.Unlock() ret := make([]instrumentInfo, 0) var err error for _, v := range instrumentMap { info := instrumentInfo{} info.Symbol, err = markinfo.BookIdToSymbolId(int(v.Id)) if err != nil { //匹配一些特殊的货币对 if strings.Index(v.Name, "US Crude (Spot)") != -1 { info.Symbol = markinfo.OILUSD } else { continue } } info.Name, err = markinfo.SymbolName(info.Symbol) info.Unit = v.UnitPrice info.PriceIncrement = v.PriceIncrement ret = append(ret, info) } //添加两个固定的信息 ltc := instrumentInfo{10000.0, 1e-5, 65, "LTCUSD"} btc := instrumentInfo{10000.0, 1e-3, 66, "BTCUSD"} ret = append(ret, ltc) ret = append(ret, btc) return ret } //symbols func symbols(w http.ResponseWriter, r *http.Request) { cb := r.FormValue("callback") insts := getInsts() str := "if (" + cb + ") " + cb + "(" js, _ := json.Marshal(insts) data := append([]byte(str), js...) data = append(data, []byte(")")...) w.Write(data) }