123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package tick
- // 本文件实现lmax数据源接口, 实时数据和历史数据的获取和保存
- import (
- "fmt"
- "log"
- "time"
- "tickserver/api/lmaxapi"
- "tickserver/api/lmaxapi/request"
- "tickserver/api/lmaxapi/response"
- )
- // lmaxDS实现了dataSource接口, 并对lmax的历史数据和实时数据保存
- type LmaxDS struct {
- *DSBase
- errcount int64
- }
- func init() {
- drivers[Lmax] = newLmaxDS
- }
- func newLmaxDS(conf *DsConf) (DataSource, error) {
- return &LmaxDS{
- DSBase: NewDsBase(conf), // lmax 自己下载历史数据, 所以参数db==nil
- }, nil
- }
- func (lds *LmaxDS) Name() string {
- return Lmax
- }
- func (lds *LmaxDS) Run() {
- if !lds.conf.Run {
- log.Println("LmaxDS.run config NOT run")
- return
- }
- log.Println("LmaxDS.run")
- // 此goroutine用来处理实时行情数据, 避免lmaxapi回调阻塞
- eventCh := make(chan *response.OrderBookEvent, 4096)
- go func() {
- for {
- ev := <-eventCh
- lds.onMarket(ev)
- }
- }()
- fn := func() {
- for {
- // 登录产生新的Session
- ss, err := lds.login(lds.conf.User, lds.conf.PassWord, request.ProductType.CFD_LIVE, lds.conf.Url)
- if err != nil {
- time.Sleep(time.Second * 10)
- continue
- }
- ss.RegisterSessionDisconnected(func(s *lmaxapi.Session) {
- log.Println("session disconnected. STOP session!")
- s.Stop()
- })
- lds.regStreamError(ss)
- lds.marketdata(ss, eventCh) // 实时行情
- ss.LoadAllInstruments(func(value *response.Instrument) {
- lds.onInstrument(value)
- log.Println("subcribe", value.Id)
- ss.Subscribe(request.NewOrderBookSubscriptionRequest(value.Id), DefaultSubscribeCB)
- })
- ss.Wait()
- ss.HeartbeatTimeout(5 * time.Second)
- ss.Start()
- // 有错误时跳出Start重启
- log.Println("lmax session RESTART !!!")
- }
- }
- fn()
- }
- func (lds *LmaxDS) doErrCode(s *lmaxapi.Session, err error) {
- //1. 处理下面的事件
- //1.1 网络中断:如果是交易,可能需要重新登录,以获取最新订单状态,如果是行情,可能不需要重启
- //1.2 session 过期,发生403错误,必须重新登录
- //1.3 heartbeat 心跳无响应: Op=stream err=heart beart timeout, Code=-1
- // 这个时候调用 session.StreamClose() 重新启动stream
- operr, ok := err.(*lmaxapi.OpError)
- log.Println("operr:", operr)
- if !ok {
- return
- }
- //1.2
- if operr.Code == 403 {
- log.Println("stop session")
- s.Stop()
- return
- }
- //1.1 and 1.3
- //stream 中发生错误,重启stream, 如果是交易,可能要选择重启session
- if operr.Op == "Stream" {
- lds.errcount++
- log.Println("stop stream")
- if operr.Code == 0 {
- time.Sleep(1 * time.Second)
- }
- if lds.errcount == 3 {
- lds.errcount = 0
- s.Stop()
- return
- }
- s.StopStream()
- // ss.Stop()
- return
- }
- }
- func (lds *LmaxDS) regStreamError(ss *lmaxapi.Session) {
- // 注册数据流失败处理函数
- ss.RegisterStreamFailureEvent(func(s *lmaxapi.Session, err error) {
- lds.doErrCode(s, err)
- })
- }
- func (lds *LmaxDS) login(name, password, typ, url string) (*lmaxapi.Session, error) {
- log.Println("LmaxDS.login:", name)
- lmaxapi.SetBaseUrl(url)
- req := request.NewLoginRequest(name, password, typ)
- ss, err := lmaxapi.Login(req)
- if err != nil {
- s := err.Error()
- if len(s) > 32 {
- s = s[:32]
- }
- log.Println("lmaxapi.Login error:", s)
- return nil, err
- }
- log.Println(name, "login OK!")
- return ss, nil
- }
- func (lds *LmaxDS) marketdata(session *lmaxapi.Session, eventCh chan *response.OrderBookEvent) {
- log.Println("@@@:marketdata")
- // 注册请求实时行情数据
- session.RegisterOrderBookEvent(func(s *lmaxapi.Session, ev *response.OrderBookEvent) {
- debugDelay("###:", fmt.Sprintf("lmax_%d", ev.InstrumentId), ev.Timestamp)
- lds.errcount = 0
- select {
- case eventCh <- ev:
- default:
- }
- })
- }
- func (lds *LmaxDS) onMarket(event *response.OrderBookEvent) {
- insId := event.InstrumentId
- mk := &Market{}
- mk.InsId = insId
- mk.Timestamp = event.Timestamp
- mk.Type = IntLmax
- if event.HasMarketClosePrice {
- mk.Close = event.MktClosePrice
- mk.Open = mk.Close
- }
- if event.HasLastTradedPrice {
- mk.LastPrice = event.LastTradedPrice
- }
- if event.HasDailyHighestTradedPrice {
- mk.High = event.DailyHighestTradedPrice
- }
- if event.HasDailyLowestTradedPrice {
- mk.Low = event.DailyLowestTradedPrice
- }
- if len(event.AskPrices) > 0 {
- asks := make([]PP, len(event.AskPrices))
- for i, b := range event.AskPrices {
- asks[i][0] = b.Price
- asks[i][1] = b.Quantity
- }
- mk.Asks = asks
- }
- if len(event.BidPrices) > 0 {
- bids := make([]PP, len(event.BidPrices))
- for i, b := range event.BidPrices {
- bids[i][0] = b.Price
- bids[i][1] = b.Quantity
- }
- mk.Bids = bids
- }
- lds.Save(mk)
- }
- func (lds *LmaxDS) onInstrument(value *response.Instrument) {
- ins := &Instrument{
- Id: value.Id,
- Name: value.Name,
- ExId: Lmax,
- Type: Forex,
- PriceInc: value.PriceIncrement,
- Margin: value.MarginRate,
- StartTime: value.StartTime.Unix() * 1000, // ms
- }
- lds.insMap[ins.Id] = ins
- }
- func DefaultSubscribeCB(err error) {
- if err != nil {
- fmt.Println("Failed:", err)
- }
- }
|