// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package tick // 本文件实现lmax数据源接口, 实时数据和历史数据的获取和保存 import ( "fmt" "log" "time" "tickserver/api/lmaxapi" "tickserver/api/lmaxapi/request" "tickserver/api/lmaxapi/response" ) // lmaxDS实现了dataSource接口, 并对lmax的历史数据和实时数据保存 type LmaxDS struct { *DSBase errcount int64 } func init() { drivers[Lmax] = newLmaxDS } func newLmaxDS(conf *DsConf) (DataSource, error) { return &LmaxDS{ DSBase: NewDsBase(conf), // lmax 自己下载历史数据, 所以参数db==nil }, nil } func (lds *LmaxDS) Name() string { return Lmax } func (lds *LmaxDS) Run() { if !lds.conf.Run { log.Println("LmaxDS.run config NOT run") return } log.Println("LmaxDS.run") // 此goroutine用来处理实时行情数据, 避免lmaxapi回调阻塞 eventCh := make(chan *response.OrderBookEvent, 4096) go func() { for { ev := <-eventCh lds.onMarket(ev) } }() fn := func() { for { // 登录产生新的Session ss, err := lds.login(lds.conf.User, lds.conf.PassWord, request.ProductType.CFD_LIVE, lds.conf.Url) if err != nil { time.Sleep(time.Second * 10) continue } ss.RegisterSessionDisconnected(func(s *lmaxapi.Session) { log.Println("session disconnected. STOP session!") s.Stop() }) lds.regStreamError(ss) lds.marketdata(ss, eventCh) // 实时行情 ss.LoadAllInstruments(func(value *response.Instrument) { lds.onInstrument(value) log.Println("subcribe", value.Id) ss.Subscribe(request.NewOrderBookSubscriptionRequest(value.Id), DefaultSubscribeCB) }) ss.Wait() ss.HeartbeatTimeout(5 * time.Second) ss.Start() // 有错误时跳出Start重启 log.Println("lmax session RESTART !!!") } } fn() } func (lds *LmaxDS) doErrCode(s *lmaxapi.Session, err error) { //1. 处理下面的事件 //1.1 网络中断:如果是交易,可能需要重新登录,以获取最新订单状态,如果是行情,可能不需要重启 //1.2 session 过期,发生403错误,必须重新登录 //1.3 heartbeat 心跳无响应: Op=stream err=heart beart timeout, Code=-1 // 这个时候调用 session.StreamClose() 重新启动stream operr, ok := err.(*lmaxapi.OpError) log.Println("operr:", operr) if !ok { return } //1.2 if operr.Code == 403 { log.Println("stop session") s.Stop() return } //1.1 and 1.3 //stream 中发生错误,重启stream, 如果是交易,可能要选择重启session if operr.Op == "Stream" { lds.errcount++ log.Println("stop stream") if operr.Code == 0 { time.Sleep(1 * time.Second) } if lds.errcount == 3 { lds.errcount = 0 s.Stop() return } s.StopStream() // ss.Stop() return } } func (lds *LmaxDS) regStreamError(ss *lmaxapi.Session) { // 注册数据流失败处理函数 ss.RegisterStreamFailureEvent(func(s *lmaxapi.Session, err error) { lds.doErrCode(s, err) }) } func (lds *LmaxDS) login(name, password, typ, url string) (*lmaxapi.Session, error) { log.Println("LmaxDS.login:", name) lmaxapi.SetBaseUrl(url) req := request.NewLoginRequest(name, password, typ) ss, err := lmaxapi.Login(req) if err != nil { s := err.Error() if len(s) > 32 { s = s[:32] } log.Println("lmaxapi.Login error:", s) return nil, err } log.Println(name, "login OK!") return ss, nil } func (lds *LmaxDS) marketdata(session *lmaxapi.Session, eventCh chan *response.OrderBookEvent) { log.Println("@@@:marketdata") // 注册请求实时行情数据 session.RegisterOrderBookEvent(func(s *lmaxapi.Session, ev *response.OrderBookEvent) { debugDelay("###:", fmt.Sprintf("lmax_%d", ev.InstrumentId), ev.Timestamp) lds.errcount = 0 select { case eventCh <- ev: default: } }) } func (lds *LmaxDS) onMarket(event *response.OrderBookEvent) { insId := event.InstrumentId mk := &Market{} mk.InsId = insId mk.Timestamp = event.Timestamp mk.Type = IntLmax if event.HasMarketClosePrice { mk.Close = event.MktClosePrice mk.Open = mk.Close } if event.HasLastTradedPrice { mk.LastPrice = event.LastTradedPrice } if event.HasDailyHighestTradedPrice { mk.High = event.DailyHighestTradedPrice } if event.HasDailyLowestTradedPrice { mk.Low = event.DailyLowestTradedPrice } if len(event.AskPrices) > 0 { asks := make([]PP, len(event.AskPrices)) for i, b := range event.AskPrices { asks[i][0] = b.Price asks[i][1] = b.Quantity } mk.Asks = asks } if len(event.BidPrices) > 0 { bids := make([]PP, len(event.BidPrices)) for i, b := range event.BidPrices { bids[i][0] = b.Price bids[i][1] = b.Quantity } mk.Bids = bids } lds.Save(mk) } func (lds *LmaxDS) onInstrument(value *response.Instrument) { ins := &Instrument{ Id: value.Id, Name: value.Name, ExId: Lmax, Type: Forex, PriceInc: value.PriceIncrement, Margin: value.MarginRate, StartTime: value.StartTime.Unix() * 1000, // ms } lds.insMap[ins.Id] = ins } func DefaultSubscribeCB(err error) { if err != nil { fmt.Println("Failed:", err) } }